In [4]:
import pyspark as spark
import datetime, time
import pandas as pd
from pyspark.sql.types import (StructField, StringType,FloatType, 
                               DoubleType, IntegerType, StructType,
                              DateType)
from pyspark.sql.functions import (format_number, isnan, when, count, col, lit, 
                                   sum, split, udf, current_date, unix_timestamp)

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('Python Spark SQL basic example') \
        .config('spark.some.config.option','some-value') \
        .getOrCreate()

## Import DATA

In [6]:
proc_data_schema = [StructField('time',IntegerType(),True),
                   StructField('user@domain',StringType(),True),
                   StructField('comp',StringType(),True),
                   StructField('proc_name',StringType(),True),
                   StructField('start',StringType(),True)]
proc_final_struc = StructType(fields = proc_data_schema)
proc = spark.read.csv('E:/Data Science/Capstone/Dataset/proc.txt',schema=proc_final_struc)

auth_data_schema = [StructField('time',IntegerType(),True),
                   StructField('src_user@domain',StringType(),True),
                   StructField('dest_user@domain',StringType(),True),
                   StructField('src_comp',StringType(),True),
                   StructField('dest_comp',StringType(),True),
                   StructField('auth_type',StringType(),True),
                   StructField('logon_type',StringType(),True),
                   StructField('auth_orientation',StringType(),True),
                   StructField('success',StringType(),True)]
auth_final_struc = StructType(fields = auth_data_schema)
auth = spark.read.csv('E:/Data Science/Capstone/Dataset/auth.txt',schema=auth_final_struc )

flows_data_schema = [StructField('time',IntegerType(),False),
                   StructField('duration',IntegerType(),False),
                   StructField('src_comp',StringType(),False),
                   StructField('src_prt',StringType(),False),
                   StructField('dest_comp',StringType(),False),
                   StructField('dest_prt',StringType(),False),
                   StructField('protocol',StringType(),False),
                   StructField('pkt_cnt',IntegerType(),False),
                   StructField('byt_cnt',IntegerType(),False)]
flows_final_struc = StructType(fields = flows_data_schema)
flows = spark.read.csv('E:/Data Science/Capstone/Dataset/flows.txt',schema=flows_final_struc )

dns_data_schema = [StructField('time',DateType(),True),
                   StructField('src_comp',StringType(),True),
                   StructField('cmp_resolved',StringType(),True)]
dns_final_struc = StructType(fields = dns_data_schema)
dns = spark.read.csv('E:/Data Science/Capstone/Dataset/dns.txt',schema=dns_final_struc)

redteam_data_schema = [StructField('time',IntegerType(),True),
                   StructField('user@domain',StringType(),True),
                   StructField('src_comp',StringType(),True),
                   StructField('dst_comp',StringType(),True)]
redteam_final_struc = StructType(fields = redteam_data_schema)
redteam = spark.read.csv('E:/Data Science/Capstone/Dataset/redteam.txt',schema=redteam_final_struc)

## Transform Data

In [66]:
proc_split = split(proc['user@domain'],'@')
proc = proc.withColumn('user',proc_split.getItem(0))
proc = proc.withColumn('domain',proc_split.getItem(1))
proc = proc.drop('user@domain')

proc = proc.withColumn('type',lit('Process'))

In [67]:
auth_src_split = split(auth['src_user@domain'],'@')
auth = auth.withColumn('src_user',auth_src_split.getItem(0))
auth = auth.withColumn('src_dmn',auth_src_split.getItem(1))

auth_dest_split = split(auth['dest_user@domain'],'@')
auth = auth.withColumn('dest_user',auth_dest_split.getItem(0))
auth = auth.withColumn('dest_dmn',auth_dest_split.getItem(1))

auth = auth.drop('src_user@domain','dest_user@domain')

auth = auth.withColumn('type',lit('Authorization'))

In [68]:
redteam_split = split(redteam['user@domain'],'@')
redteam = redteam.withColumn('src_user',redteam_split.getItem(0))
redteam = redteam.withColumn('src_dmn',redteam_split.getItem(1))

redteam = redteam.drop('user@domain')

redteam = redteam.withColumn('type',lit('RedTeam'))

In [22]:
redteam1 = redteam.rdd.map(lambda x: (x['time'], time.strftime('%m/%d %H:%M:%S', time.gmtime(x['time']) ))).toDF(['time','timestam'])

In [28]:

flows = flows.withColumn('avg_pkt_size', (flows['byt_cnt']/flows['pkt_cnt']).cast(DoubleType()))
flows = flows.na.drop(how='all')

flows = flows.withColumn('type',lit('DataFlow'))

#flows.select(format_number('avg_pkt_size',2).alias('avg_pkt_size'))

In [None]:
                   StructField('auth_type',StringType(),True),
                   StructField('logon_type',StringType(),True),
                   StructField('auth_orientation',StringType(),True),
                   StructField('success',StringType(),True)]

In [69]:
master = auth.union(redteam)

AnalysisException: "Union can only be performed on tables with the same number of columns, but the first table has 12 columns and the second table has 6 columns;;\n'Union\n:- AnalysisBarrier\n:     +- Project [time#1259, src_comp#1262, dest_comp#1263, auth_type#1264, logon_type#1265, auth_orientation#1266, success#1267, src_user#1338, src_dmn#1349, dest_user#1361, dest_dmn#1374, Authorization AS type#1399]\n:        +- Project [time#1259, src_comp#1262, dest_comp#1263, auth_type#1264, logon_type#1265, auth_orientation#1266, success#1267, src_user#1338, src_dmn#1349, dest_user#1361, dest_dmn#1374]\n:           +- Project [time#1259, src_user@domain#1260, dest_user@domain#1261, src_comp#1262, dest_comp#1263, auth_type#1264, logon_type#1265, auth_orientation#1266, success#1267, src_user#1338, src_dmn#1349, dest_user#1361, split(dest_user@domain#1261, @)[1] AS dest_dmn#1374]\n:              +- Project [time#1259, src_user@domain#1260, dest_user@domain#1261, src_comp#1262, dest_comp#1263, auth_type#1264, logon_type#1265, auth_orientation#1266, success#1267, src_user#1338, src_dmn#1349, split(dest_user@domain#1261, @)[0] AS dest_user#1361]\n:                 +- Project [time#1259, src_user@domain#1260, dest_user@domain#1261, src_comp#1262, dest_comp#1263, auth_type#1264, logon_type#1265, auth_orientation#1266, success#1267, src_user#1338, split(src_user@domain#1260, @)[1] AS src_dmn#1349]\n:                    +- Project [time#1259, src_user@domain#1260, dest_user@domain#1261, src_comp#1262, dest_comp#1263, auth_type#1264, logon_type#1265, auth_orientation#1266, success#1267, split(src_user@domain#1260, @)[0] AS src_user#1338]\n:                       +- Relation[time#1259,src_user@domain#1260,dest_user@domain#1261,src_comp#1262,dest_comp#1263,auth_type#1264,logon_type#1265,auth_orientation#1266,success#1267] csv\n+- AnalysisBarrier\n      +- Project [time#1301, src_comp#1303, dst_comp#1304, src_user#1412, src_dmn#1418, RedTeam AS type#1430]\n         +- Project [time#1301, src_comp#1303, dst_comp#1304, src_user#1412, src_dmn#1418]\n            +- Project [time#1301, user@domain#1302, src_comp#1303, dst_comp#1304, src_user#1412, split(user@domain#1302, @)[1] AS src_dmn#1418]\n               +- Project [time#1301, user@domain#1302, src_comp#1303, dst_comp#1304, split(user@domain#1302, @)[0] AS src_user#1412]\n                  +- Relation[time#1301,user@domain#1302,src_comp#1303,dst_comp#1304] csv\n"

## Data Show

In [7]:
auth.show()

+----+--------------------+--------------------+--------+---------+---------+----------+----------------+-------+
|time|     src_user@domain|    dest_user@domain|src_comp|dest_comp|auth_type|logon_type|auth_orientation|success|
+----+--------------------+--------------------+--------+---------+---------+----------+----------------+-------+
|   1|ANONYMOUS LOGON@C586|ANONYMOUS LOGON@C586|   C1250|     C586|     NTLM|   Network|           LogOn|Success|
|   1|ANONYMOUS LOGON@C586|ANONYMOUS LOGON@C586|    C586|     C586|        ?|   Network|          LogOff|Success|
|   1|          C101$@DOM1|          C101$@DOM1|    C988|     C988|        ?|   Network|          LogOff|Success|
|   1|         C1020$@DOM1|        SYSTEM@C1020|   C1020|    C1020|Negotiate|   Service|           LogOn|Success|
|   1|         C1021$@DOM1|         C1021$@DOM1|   C1021|     C625| Kerberos|   Network|           LogOn|Success|
|   1|         C1035$@DOM1|         C1035$@DOM1|   C1035|     C586| Kerberos|   Network|

In [8]:
redteam.show()

+------+-----------+--------+--------+
|  time|user@domain|src_comp|dst_comp|
+------+-----------+--------+--------+
|150885|  U620@DOM1|  C17693|   C1003|
|151036|  U748@DOM1|  C17693|    C305|
|151648|  U748@DOM1|  C17693|    C728|
|151993| U6115@DOM1|  C17693|   C1173|
|153792|  U636@DOM1|  C17693|    C294|
|155219|  U748@DOM1|  C17693|   C5693|
|155399|  U748@DOM1|  C17693|    C152|
|155460|  U748@DOM1|  C17693|   C2341|
|155591|  U748@DOM1|  C17693|    C332|
|156658|  U748@DOM1|  C17693|   C4280|
|210086|  U748@DOM1|  C18025|   C1493|
|210294|  U748@DOM1|  C18025|   C1493|
|210312|  U748@DOM1|  C18025|   C1493|
|218418|  U748@DOM1|  C17693|    C504|
|227052|  U748@DOM1|  C17693|    C148|
|227408|  U748@DOM1|  C17693|    C148|
|227520|  U748@DOM1|  C17693|    C148|
|227780|  U748@DOM1|  C17693|    C148|
|228024|  U748@DOM1|  C17693|    C148|
|228150|  U748@DOM1|  C17693|    C148|
+------+-----------+--------+--------+
only showing top 20 rows



In [9]:
dns.show()

+----+--------+------------+
|time|src_comp|cmp_resolved|
+----+--------+------------+
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
|null|    null|        null|
+----+--------+------------+
only showing top 20 rows



In [14]:
flows.show()

+----+--------+--------+-------+---------+--------+--------+-------+-------+------------+
|time|duration|src_comp|src_prt|dest_comp|dest_prt|protocol|pkt_cnt|byt_cnt|avg_pkt_size|
+----+--------+--------+-------+---------+--------+--------+-------+-------+------------+
|   1|       0|   C1065|    389|    C3799|  N10451|       6|     10|   5323|      532.30|
|   1|       0|   C1423|  N1136|    C1707|      N1|       6|      5|    847|      169.40|
|   1|       0|   C1423|  N1142|    C1707|      N1|       6|      5|    847|      169.40|
|   1|       0|  C14909|  N8191|    C5720|    2049|       6|      1|     52|       52.00|
|   1|       0|  C14909|  N8192|    C5720|    2049|       6|      1|     52|       52.00|
|   1|       0|  C14909|  N8193|    C5720|    2049|       6|      1|     52|       52.00|
|   1|       0|   C1707|     N1|    C1423|   N1136|       6|      4|    414|      103.50|
|   1|       0|   C1707|     N1|    C1423|   N1142|       6|      4|    413|      103.25|
|   1|    

In [64]:
master.show()

+----+--------+---------+---------+----------+----------------+-------+---------------+-------+---------------+--------+-------------+
|time|src_comp|dest_comp|auth_type|logon_type|auth_orientation|success|       src_user|src_dmn|      dest_user|dest_dmn|         type|
+----+--------+---------+---------+----------+----------------+-------+---------------+-------+---------------+--------+-------------+
|   1|   C1250|     C586|     NTLM|   Network|           LogOn|Success|ANONYMOUS LOGON|   C586|ANONYMOUS LOGON|    C586|Authorization|
|   1|    C586|     C586|        ?|   Network|          LogOff|Success|ANONYMOUS LOGON|   C586|ANONYMOUS LOGON|    C586|Authorization|
|   1|    C988|     C988|        ?|   Network|          LogOff|Success|          C101$|   DOM1|          C101$|    DOM1|Authorization|
|   1|   C1020|    C1020|Negotiate|   Service|           LogOn|Success|         C1020$|   DOM1|         SYSTEM|   C1020|Authorization|
|   1|   C1021|     C625| Kerberos|   Network|         

## Data Analysis

In [5]:
# proc_domains = proc.select('domain').distinct()
# proc_users = proc.select('user').distinct()

In [13]:
# proc_domains.coalesce(1).write.csv('domains.csv')
# proc_users.coalesce(1).write.csv('users.csv')

In [60]:
# proc.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in proc.columns)).show()

+----+-----------+---------+-----+
|time|user@domain|proc_name|start|
+----+-----------+---------+-----+
|   0|          0|        0|    0|
+----+-----------+---------+-----+



In [58]:
# flows.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in flows.columns)).show()

+----+--------+--------+-------+---------+--------+--------+-------+-------+
|time|duration|src_comp|src_prt|dest_comp|dest_prt|protocol|pkt_cnt|byt_cnt|
+----+--------+--------+-------+---------+--------+--------+-------+-------+
| 530|     530|     530|    530|      530|     530|     530|    530|    530|
+----+--------+--------+-------+---------+--------+--------+-------+-------+



In [77]:
auth.select('success').distinct().show()

+-------+
|success|
+-------+
|Success|
|   Fail|
+-------+



In [59]:
auth.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in auth.columns)).show()

+----+---------------+----------------+--------+---------+---------+----------+----------------+-------+
|time|src_user@domain|dest_user@domain|src_comp|dest_comp|auth_type|logon_type|auth_orientation|success|
+----+---------------+----------------+--------+---------+---------+----------+----------------+-------+
|   0|              0|               0|       0|        0|        0|         0|               0|      0|
+----+---------------+----------------+--------+---------+---------+----------+----------------+-------+



In [32]:
flows.orderBy("avg_pkt_size").show()

+-----+--------+--------+-------+---------+--------+--------+-------+-------+------------+
| time|duration|src_comp|src_prt|dest_comp|dest_prt|protocol|pkt_cnt|byt_cnt|avg_pkt_size|
+-----+--------+--------+-------+---------+--------+--------+-------+-------+------------+
|41972|       0|   C1654|     80|   C13742|   N4427|       6|      2|     92|        46.0|
|41974|      38|   C1015|   N221|    C8681|   N2153|       6|      2|     92|        46.0|
|41972|       0|   C7632|   N294|   C20510|  N18962|       6|      1|     46|        46.0|
|41970|       0|   C8974|  N4126|    C5787|  N30556|       6|      1|     46|        46.0|
|41972|      38|   C1015|   N221|    C8964|   N2024|       6|      2|     92|        46.0|
|41971|       0|  C14402|  N9113|     C585|     139|       6|      1|     46|        46.0|
|41972|      60|  C11149|  N2801|    C2588|     N76|       6|      4|    184|        46.0|
|41971|       0|   C3873|     80|    C3959|   N3771|       6|      2|     92|        46.0|

In [49]:
flows.select([count(when(isnan(c)|col(c).isNull(), c)).alias(c) for c in flows.columns]).show()

+----+--------+--------+-------+---------+--------+--------+-------+-------+------------+
|time|duration|src_comp|src_prt|dest_comp|dest_prt|protocol|pkt_cnt|byt_cnt|avg_pkt_size|
+----+--------+--------+-------+---------+--------+--------+-------+-------+------------+
|   0|       0|       0|      0|        0|       0|       0|      0|      0|           0|
+----+--------+--------+-------+---------+--------+--------+-------+-------+------------+



In [54]:
dns.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in dns.columns)).show()

+----+--------+------------+
|time|src_comp|cmp_resolved|
+----+--------+------------+
|   0|       0|           0|
+----+--------+------------+



In [25]:
dns.printSchema()

root
 |-- time: date (nullable = true)
 |-- src_comp: string (nullable = true)
 |-- cmp_resolved: string (nullable = true)



In [10]:
proc.show()

+----+-----------+-----+---------+-----+
|time|user@domain| comp|proc_name|start|
+----+-----------+-----+---------+-----+
|   1|   C1$@DOM1|   C1|      P16|Start|
|   1|C1001$@DOM1|C1001|       P4|Start|
|   1|C1002$@DOM1|C1002|       P4|Start|
|   1|C1004$@DOM1|C1004|       P4|Start|
|   1|C1017$@DOM1|C1017|       P4|Start|
|   1|C1018$@DOM1|C1018|       P4|Start|
|   1|C1020$@DOM1|C1020|       P3|Start|
|   1|C1020$@DOM1|C1020|       P4|Start|
|   1|C1028$@DOM1|C1028|      P16|  End|
|   1|C1029$@DOM1|C1029|       P4|Start|
|   1|C1030$@DOM1|C1030|       P4|Start|
|   1|C1032$@DOM1|C1032|       P4|Start|
|   1|C1035$@DOM1|C1035|      P37|Start|
|   1|C1035$@DOM1|C1035|       P5|Start|
|   1|C1051$@DOM1|C1051|      P16|Start|
|   1|C1069$@DOM1|C1069|       P3|Start|
|   1|C1069$@DOM1|C1069|       P4|Start|
|   1|C1079$@DOM1|C1079|       P4|Start|
|   1|C1084$@DOM1|C1084|       P4|Start|
|   1|C1088$@DOM1|C1088|       P4|Start|
+----+-----------+-----+---------+-----+
only showing top

In [20]:
redteam.printSchema()

root
 |-- time: date (nullable = true)
 |-- user@domain: string (nullable = true)
 |-- src_comp: string (nullable = true)
 |-- dst_comp: string (nullable = true)



In [6]:
from pyspark.sql.functions import lit, unix_timestamp
start = datetime.date(2018,1,1)
#datetime.timestamp(2018,1,1,12,0,0)



In [7]:
datetime.datetime.fromtimestamp(time.mktime(start.timetuple()) + 228150)

datetime.datetime(2018, 1, 3, 15, 22, 30)

In [8]:
func = udf (lambda x: datetime.datetime.fromtimestamp(time.mktime(start.timetuple()) + x).date(),DateType() )

In [9]:
redteam1 = redteam.withColumn("timestam", redteam.select("time")),'yyyy-MM-dd HH:mm:ss').cast("timestamp") )

SyntaxError: invalid syntax (<ipython-input-9-48ac7615f935>, line 1)

In [10]:
import time

time_update = udf(lambda x: time.strftime('%m/%d %H:%M:%S', time.gmtime(x)))

#timestam = time.strftime('%m/%d %H:%M:%S', time.gmtime(redteam.select("time")))

In [23]:
redteam1.show()

+------+--------------+
|  time|      timestam|
+------+--------------+
|150885|01/02 17:54:45|
|151036|01/02 17:57:16|
|151648|01/02 18:07:28|
|151993|01/02 18:13:13|
|153792|01/02 18:43:12|
|155219|01/02 19:06:59|
|155399|01/02 19:09:59|
|155460|01/02 19:11:00|
|155591|01/02 19:13:11|
|156658|01/02 19:30:58|
|210086|01/03 10:21:26|
|210294|01/03 10:24:54|
|210312|01/03 10:25:12|
|218418|01/03 12:40:18|
|227052|01/03 15:04:12|
|227408|01/03 15:10:08|
|227520|01/03 15:12:00|
|227780|01/03 15:16:20|
|228024|01/03 15:20:24|
|228150|01/03 15:22:30|
+------+--------------+
only showing top 20 rows

