In [49]:
import findspark
findspark.init()

In [50]:
import pyspark

In [51]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "8g") \
    .appName('vb-app') \
    .getOrCreate()

In [110]:
path = "../data/raw/evtx_data.csv"
df = spark.read.options(header='true', inferSchema='true') \
          .csv(path)

In [113]:
df = df.select("Computer","EventID","SystemTime", "EVTX_FileName")
# df.show()

In [92]:
df.groupBy('EventID').count().orderBy(desc('count')).show()

+-------+-----+
|EventID|count|
+-------+-----+
|   null| 5240|
|   5145|  950|
|      1|  843|
|      7|  613|
|     11|  185|
|   1040|  178|
|   1042|  173|
|     13|  162|
|      3|  153|
|     10|  140|
|   4663|  115|
|      8|  101|
|   5156|   92|
|     12|   80|
|   4624|   69|
|   1155|   64|
|    258|   64|
|   1136|   64|
|     18|   46|
|      5|   43|
+-------+-----+
only showing top 20 rows



In [114]:
print((df.count(), len(df.columns)))

(9873, 4)


In [115]:
df = df.na.drop()

In [116]:
print((df.count(), len(df.columns)))

(4633, 4)


In [93]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def convert_to_unix(event_time):
    time = event_time[10:]
    print(time)
    time_list = time.split(":")
    return int(time_list[0])*3600+int(time_list[1])*60+int(float(time_list[2]))

convert_to_unix_udf = udf(convert_to_unix, IntegerType())

df = df.withColumn("time", convert_to_unix_udf(df.SystemTime))
# df.select("time").show()

### Total events per user in (last) 15 minutes window interval  
Ref: https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data

In [94]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
new_df = df.select("time", "Computer", "EventID").filter(df.EventID.isin([1, 3, 8, 10, 11, 12, 4624, 4625, 4648, 4658, 4661, 4663, 4672,4698,4768,5140,5145, 5156, 5158]))
w = (Window.partitionBy('Computer').orderBy(F.col("time").cast('long')).rangeBetween(-1800, Window.currentRow))
new_df = new_df.withColumn('total_events', F.count("time").over(w))
# new_df.show()

In [57]:
# from pyspark.sql.functions import desc
# new_df.groupBy("EventID",'total_events').count().sort(desc("EventID")).show(30)

In [58]:
# new_df.filter((new_df.EventID == '4624')).sort("time").show(50)

## Paritition By UserName and Event

In [95]:
w = (Window.partitionBy('Computer', 'EventID').orderBy(F.col("time").cast('long')).rangeBetween(-1800, Window.currentRow))
new_df = new_df.withColumn('total_per_event', F.count("time").over(w))

In [60]:
# new_df.show()

In [96]:
columns = ["time", "host", "event_id", "total_events", "total_per_event"]
new_df = new_df.toDF(*columns)
new_df = new_df.select(*columns)

In [97]:
new_df = new_df.distinct()
new_df = new_df.groupBy("time", "host", "total_events").pivot("event_id").sum("total_per_event")

In [63]:
# new_df.sort("time", "host").show()

In [99]:
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

def forward_fill(column_name):
    # define the window
    window = Window.partitionBy('host')\
                   .orderBy(F.col("time").cast('long'))\
                   .rangeBetween(-1800, Window.currentRow)

    # what column to fill forward
    return last(column_name, True).over(window)

#     # perform fill
#     return new_df.withColumn(column_name, filled_column)


# # show 
# spark_df_filled.orderBy('time', 'host').show(10)  

In [100]:
columns_to_fill = new_df.schema.names[3:]
print(columns_to_fill)
for column_name in columns_to_fill:
    new_df = new_df.withColumn('event_' + column_name, forward_fill(column_name))

['1', '3', '8', '10', '11', '12', '4624', '4625', '4648', '4658', '4661', '4663', '4672', '4698', '4768', '5140', '5145', '5156', '5158']


In [67]:
new_df.orderBy("time", "host").show()

+----+--------------------+------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+-------+-------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|time|                host|total_events|   1|   3|   8|4624|4625|4648|4658|4661|4672|4698|4768|5140|5145|5156|5158|event_1|event_3|event_8|event_4624|event_4625|event_4648|event_4658|event_4661|event_4672|event_4698|event_4768|event_5140|event_5145|event_5156|event_5158|
+----+--------------------+------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+-------+-------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
| 124|WIN-77LTAPHIQ1R.e...|          14|null|null|null|   4|null|null|null|null|   3|   2|null|   2|   3|null|null|   null|   null|   null|         4|      null|      null|      null| 

In [68]:
new_df.select("time", "host","4624","event_4624").orderBy("time", "host").show()

+----+--------------------+----+----------+
|time|                host|4624|event_4624|
+----+--------------------+----+----------+
| 124|WIN-77LTAPHIQ1R.e...|   4|         4|
| 127|WIN-77LTAPHIQ1R.e...|null|         4|
| 137|WIN-77LTAPHIQ1R.e...|null|         4|
| 141|WIN-77LTAPHIQ1R.e...|   6|         6|
| 576|         MSEDGEWIN10|null|      null|
| 578|         MSEDGEWIN10|null|      null|
| 583|         MSEDGEWIN10|null|      null|
| 676|         MSEDGEWIN10|null|      null|
| 680|         MSEDGEWIN10|null|      null|
|1696|         MSEDGEWIN10|null|      null|
|1732|         MSEDGEWIN10|null|      null|
|1792|              IEWIN7|null|      null|
|1942|              IEWIN7|null|      null|
|1944|              IEWIN7|null|      null|
|1950|              IEWIN7|null|      null|
|1955|              IEWIN7|null|      null|
|1956|              IEWIN7|null|      null|
|1960|              IEWIN7|null|      null|
|2107|              IEWIN7|null|      null|
|2108|              IEWIN7|null|

In [101]:
new_df = new_df.na.fill(value=0)

In [102]:
new_df = new_df.drop(*columns_to_fill)
new_df = new_df.distinct()

In [103]:
new_df.coalesce(1).write.option("header",True).csv("../data/processed/malicious_pivoted")

### Sysmon 1 (Process Creation)

### Sysmon 3 (Network Connection)

### Sysmon 8 (Remote Thread Creation)

### Total number of Event 4624 per user in an hour window interval  (Successful Logon)


### Total number of Event 4625 per user in an hour window interval  (Failed Logon)

### Total number of Event 4627 per user in an hour window interval  (Group Membership)


### Total number of Event 4648 per user in an hour window interval  (A logon was attempted using explicit credentials)

### Total number of Event 4658 per user in an hour window interval  (handle to an object was closed)

### Total number of Event 4661 per user in an hour window interval  (handle to an object was requsted)

### Total number of Event 4672 per user in an hour window interval  (Special privileges assigned to new logon)

### Total number of Event 4698 per user in an hour window interval  (A scheduled task was created)

### Total number of Event 4768 per user in an hour window interval  (Kerberos Authentication)

### Total number of Event 5140 per user in an hour window interval  (A network share object was accessed)

### Total number of Event 5145 per user in an hour window interval  (A network share object was checked to see whether client can be granted desired access)

### 5156: The Windows Filtering Platform has allowed a connection

### Total number of Event 5158 per user in an hour window interval  (The Windows Filtering Platform has permitted a bind to a local port)