In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_unixtime,
    to_timestamp,
    min,
    max,
    sum,
    avg,
    col,
    countDistinct,
    broadcast,
    date_trunc,
    count,
)
from pyspark.sql import Window
import pyspark.sql.functions as F
import plotly.express as px



In [2]:


filepaths = ["/home/ams/Documents/python/vscode/PySpark/data/CTU-IoT-Malware-Capture-1-1conn.log.labeled.csv", "/home/ams/Documents/python/vscode/PySpark/data/CTU-IoT-Malware-Capture-3-1conn.log.labeled.csv"]


spark = (
    SparkSession.builder.appName("iot")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.version



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/16 03:32:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.5.1'

In [3]:


df = spark.read.option("delimiter", "|").csv(filepaths, inferSchema = True, header = True)
df.show(5)



In [None]:
df.printSchema()

root
 |-- ts: double (nullable = true)
 |-- uid: string (nullable = true)
 |-- id.orig_h: string (nullable = true)
 |-- id.orig_p: double (nullable = true)
 |-- id.resp_h: string (nullable = true)
 |-- id.resp_p: double (nullable = true)
 |-- proto: string (nullable = true)
 |-- service: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- orig_bytes: string (nullable = true)
 |-- resp_bytes: string (nullable = true)
 |-- conn_state: string (nullable = true)
 |-- local_orig: string (nullable = true)
 |-- local_resp: string (nullable = true)
 |-- missed_bytes: double (nullable = true)
 |-- history: string (nullable = true)
 |-- orig_pkts: double (nullable = true)
 |-- orig_ip_bytes: double (nullable = true)
 |-- resp_pkts: double (nullable = true)
 |-- resp_ip_bytes: double (nullable = true)
 |-- tunnel_parents: string (nullable = true)
 |-- label: string (nullable = true)
 |-- detailed-label: string (nullable = true)



## data preprocessing

In [None]:
df=df.withColumn('dt',from_unixtime('ts')).withColumn('dt',to_timestamp('dt'))

In [None]:
df.show(5)

+-------------------+------------------+---------------+---------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+---------+--------------------+-------------------+
|                 ts|               uid|      id.orig_h|id.orig_p|      id.resp_h|id.resp_p|proto|service|duration|orig_bytes|resp_bytes|conn_state|local_orig|local_resp|missed_bytes|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|tunnel_parents|    label|      detailed-label|                 dt|
+-------------------+------------------+---------------+---------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+---------+--------------------+-------------------+
|1.525879831015811E9|CUmrqr4svHuSXJy5z7|192.168.100.103|  51524.0| 65.127.233.163|

In [None]:
df=df.withColumnsRenamed({

    "id.orig_h": "source_ip",
    "id.orig_p": "source_port",
    "id.resp_h": "dest_ip",
    "id.resp_p": "dest_port"
})

In [None]:
df.show(5)

+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+---------+--------------------+-------------------+
|                 ts|               uid|      source_ip|source_port|        dest_ip|dest_port|proto|service|duration|orig_bytes|resp_bytes|conn_state|local_orig|local_resp|missed_bytes|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|tunnel_parents|    label|      detailed-label|                 dt|
+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+---------+--------------------+-------------------+
|1.525879831015811E9|CUmrqr4svHuSXJy5z7|192.168.100.103|    51524.0| 65.127.

In [None]:
df.select('ts').show()

+-------------------+
|                 ts|
+-------------------+
|1.525879831015811E9|
|1.525879831025055E9|
|1.525879831045045E9|
| 1.52587983201624E9|
|1.525879832024985E9|
|1.525879832044975E9|
|1.525879833016171E9|
|1.525879833044906E9|
|1.525879834024847E9|
|1.525879834045086E9|
|1.525879836044966E9|
|1.525879837005652E9|
|1.525879838006081E9|
|1.525879838024838E9|
|1.525879839006262E9|
|1.525879839025003E9|
|1.525879839044992E9|
|1.525879840024934E9|
|1.525879840044922E9|
|1.525879841005875E9|
+-------------------+
only showing top 20 rows



## data quality check

In [None]:
df.show(5)

+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+---------+--------------------+-------------------+
|                 ts|               uid|      source_ip|source_port|        dest_ip|dest_port|proto|service|duration|orig_bytes|resp_bytes|conn_state|local_orig|local_resp|missed_bytes|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|tunnel_parents|    label|      detailed-label|                 dt|
+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+---------+--------------------+-------------------+
|1.525879831015811E9|CUmrqr4svHuSXJy5z7|192.168.100.103|    51524.0| 65.127.

In [None]:
df.agg(
    max('dt').alias('max_dt'),

    min('dt').alias('min_dt')
).show()



+-------------------+-------------------+
|             max_dt|             min_dt|
+-------------------+-------------------+
|2018-05-21 12:34:46|2018-05-09 21:00:31|
+-------------------+-------------------+



                                                                                

## shape of data

In [None]:
df.count()

1164851

In [25]:
len(df.columns)

24

In [27]:
to_analyse = [
    "source_ip",
    "source_port",
    "dest_ip",
    "dest_port",
    "proto",
    "service",
    "duration",
    "orig_bytes",
    "resp_bytes",
    "conn_state",
    "local_orig",
    "local_resp",
    "missed_bytes",
    "history",
    "orig_pkts",
    "orig_ip_bytes",
    "resp_pkts",
    "resp_ip_bytes",
    "tunnel_parents",
    "label",
    "detailed-label",
]




unique_cnt=df.agg(*(countDistinct(col(c)).alias(c) for c in to_analyse))
print(unique_cnt.show())

[Stage 19:>                                                         (0 + 4) / 4]

+---------+-----------+-------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+-----+--------------+
|source_ip|source_port|dest_ip|dest_port|proto|service|duration|orig_bytes|resp_bytes|conn_state|local_orig|local_resp|missed_bytes|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|tunnel_parents|label|detailed-label|
+---------+-----------+-------+---------+-----+-------+--------+----------+----------+----------+----------+----------+------------+-------+---------+-------------+---------+-------------+--------------+-----+--------------+
|    15898|      28259| 661294|    65427|    3|      6|   34789|       200|       601|        12|         1|         1|           1|    167|       58|         1311|       74|         1480|             1|    2|             4|
+---------+-----------+-------+---------+-----+-------+--------+----------+----------+----------+---

                                                                                

In [None]:
unique_counts = unique_counts.first()
static_cols = [c for c in unique_counts.asDict() if unique_counts[c] == 1]
print("Dataset has", len(static_cols), "static columns: ", static_cols)
df = df.drop(*static_cols)

In [28]:
unique_cnt=unique_cnt.first()
static_cols=[c for c in unique_cnt.asDict() if unique_cnt[c]==1]
print('dataset has ',len(static_cols),'static cols:',static_cols)

df=df.drop(*static_cols)



dataset has  4 static cols: ['local_orig', 'local_resp', 'missed_bytes', 'tunnel_parents']


                                                                                

##  Count Distinct Values


In [29]:
source_ips = df.select(col("source_ip")).distinct()
dest_ips = df.select(col("dest_ip")).distinct()
common_ips = source_ips.join(broadcast(dest_ips), source_ips.source_ip == dest_ips.dest_ip, how='inner')


print("Source IPs count:", source_ips.count())
print("Dest IPs count:", dest_ips.count())
print("IPs as both:", common_ips.count())

                                                                                

Source IPs count: 15898


                                                                                

Dest IPs count: 661294


                                                                                

IPs as both: 9423


In [30]:


source_ports = df.select(col("source_port")).distinct()
dest_ports = df.select(col("dest_port")).distinct()
common_ports = source_ports.join(broadcast(dest_ports), source_ports.source_port == dest_ports.dest_port, how='inner')


print("Source Ports count:", source_ports.count())
print("Dest Ports count:", dest_ports.count())
print("Ports as both:", common_ports.count())



                                                                                

Source Ports count: 28259


                                                                                

Dest Ports count: 65427


                                                                                

Ports as both: 28212


## Count Nulls

In [31]:
df = df.replace("-", None)

In [32]:


remaining_cols = [f for f in to_analyse if f not in static_cols]
df.select(
    [count(F.when(F.isnan(c) | col(c).isNull(), c)).alias(c) for c in remaining_cols]
).show()





+---------+-----------+-------+---------+-----+-------+--------+----------+----------+----------+-------+---------+-------------+---------+-------------+-----+--------------+
|source_ip|source_port|dest_ip|dest_port|proto|service|duration|orig_bytes|resp_bytes|conn_state|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|label|detailed-label|
+---------+-----------+-------+---------+-----+-------+--------+----------+----------+----------+-------+---------+-------------+---------+-------------+-----+--------------+
|        0|          0|      0|        0|    0|1155702|  870244|    870244|    870244|         0|  18628|        0|            0|        0|            0|    0|        473811|
+---------+-----------+-------+---------+-----+-------+--------+----------+----------+----------+-------+---------+-------------+---------+-------------+-----+--------------+



                                                                                


## Time-Series Plots


In [33]:


df = df.withColumns(
    {
        "day": date_trunc("day", "dt"),
        "hour": date_trunc("hour", "dt"),
        "minute": date_trunc("minute", "dt"),
        "second": date_trunc("second", "dt"),
    }
)



In [39]:


for agg in ['day', 'hour', 'minute']:
    plotting_table = df.groupBy([agg, "label"]).agg(count("uid").alias("counts")).orderBy(agg).toPandas()
    fig = px.line(plotting_table, x=agg, y="counts", color="label", title=f'Event Counts per {agg}')
    fig.show()



ModuleNotFoundError: No module named 'distutils'

In [38]:
pip install distutils

[31mERROR: Could not find a version that satisfies the requirement distutils (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for distutils[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


In [37]:
pip install setuptools

Collecting setuptools
  Using cached setuptools-70.0.0-py3-none-any.whl.metadata (5.9 kB)
Using cached setuptools-70.0.0-py3-none-any.whl (863 kB)
Installing collected packages: setuptools
Successfully installed setuptools-70.0.0
Note: you may need to restart the kernel to use updated packages.
