# **KDDCup Data Analytics with PySpark DF: A structured case study**

### Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark

### Author: Amin Karami (PhD, FHEA)

##### data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

In [None]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

In [None]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
# !pip3 install -q findspark
# import findspark
# findspark.init()
########## ONLY in Ubuntu Machine ##########

In [1]:
# import SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark

In [3]:
# Read and Load Data to Spark
# Data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [27]:
path = '/content/drive/MyDrive/Colab Notebooks/Udemy - Best Hands-on Big Data Practices with PySpark & Spark Tuning 2022-8/kddcup.data.gz'
df = spark.read.text(path)  # text porque esta comprimido el archivo

In [28]:
# Print data
print(df.printSchema())
df.show()

root
 |-- value: string (nullable = true)

None
+--------------------+
|               value|
+--------------------+
|0,tcp,http,SF,215...|
|0,tcp,http,SF,162...|
|0,tcp,http,SF,236...|
|0,tcp,http,SF,233...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,238...|
|0,tcp,http,SF,235...|
|0,tcp,http,SF,234...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,181...|
|0,tcp,http,SF,184...|
|0,tcp,http,SF,185...|
|0,tcp,http,SF,239...|
|0,tcp,http,SF,181...|
|0,tcp,http,SF,236...|
|0,tcp,http,SF,233...|
|0,tcp,http,SF,238...|
|0,tcp,http,SF,235...|
|0,tcp,http,SF,234...|
|0,tcp,http,SF,239...|
+--------------------+
only showing top 20 rows



In [30]:
# Split data (if needed)
# evitar usar funciones nativas de python
# con dataframes usar funciones de spark que est'an optimizadas
from pyspark.sql.functions import split

split_col = split(df['value'], ',')
df2 = df\
    .withColumn('Protocol', split_col.getItem(1))\
    .withColumn('Service', split_col.getItem(2))\
    .withColumn('flag', split_col.getItem(3))\
    .withColumn('src_bytes', split_col.getItem(4))\
    .withColumn('dst_bytes', split_col.getItem(5))\
    .withColumn('urgent', split_col.getItem(8))\
    .withColumn('num_failed_logins', split_col.getItem(10))\
    .withColumn('root_shell', split_col.getItem(13))\
    .withColumn('guest_login', split_col.getItem(21))\
    .withColumn('label', split_col.getItem(41))\
    .drop('value')
df2.show()

+--------+-------+----+---------+---------+------+-----------------+----------+-----------+-------+
|Protocol|Service|flag|src_bytes|dst_bytes|urgent|num_failed_logins|root_shell|guest_login|  label|
+--------+-------+----+---------+---------+------+-----------------+----------+-----------+-------+
|     tcp|   http|  SF|      215|    45076|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      162|     4528|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      236|     1228|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      233|     2032|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      239|      486|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      238|     1282|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      235|     1337|     0|                0|         0|          0|normal.|


In [44]:
df2.printSchema()

root
 |-- Protocol: string (nullable = true)
 |-- Service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: string (nullable = true)
 |-- dst_bytes: string (nullable = true)
 |-- urgent: string (nullable = true)
 |-- num_failed_logins: string (nullable = true)
 |-- root_shell: string (nullable = true)
 |-- guest_login: string (nullable = true)
 |-- label: string (nullable = true)



In [32]:
# Increase the number of partitions (if needed) and Build a Temp table
print(df2.rdd.getNumPartitions())
print(spark.sparkContext.defaultParallelism)
df2 = df2.repartition(10)
print(df2.rdd.getNumPartitions())

1
2
10


In [41]:
df2.createOrReplaceTempView('df_KDDCup')

## Question 1: Count the number of connections for each label

In [47]:
query = """
  select label, count(1) conections
  from df_KDDCup
  group by label
  order by count(1) desc
"""
spark.sql(query).show()

+----------------+----------+
|           label|conections|
+----------------+----------+
|          smurf.|   2807886|
|        neptune.|   1072017|
|         normal.|    972781|
|          satan.|     15892|
|        ipsweep.|     12481|
|      portsweep.|     10413|
|           nmap.|      2316|
|           back.|      2203|
|    warezclient.|      1020|
|       teardrop.|       979|
|            pod.|       264|
|   guess_passwd.|        53|
|buffer_overflow.|        30|
|           land.|        21|
|    warezmaster.|        20|
|           imap.|        12|
|        rootkit.|        10|
|     loadmodule.|         9|
|      ftp_write.|         8|
|       multihop.|         7|
+----------------+----------+
only showing top 20 rows



## Question 2:  Get the list of `Protocols`that are `normal` and `vulnerable to attacks`, where there is NOT `guest login` to the destination addresses


In [56]:
query = """
  select
    Protocol
    , case
        when label = 'normal.' then 'normal'
        else 'attack'
    end state
    , count(1) n
  from df_KDDCup
  where guest_login != '1'
  group by
    Protocol
    , case
        when label = 'normal.' then 'normal'
        else 'attack'
    end
  order by Protocol desc
"""
spark.sql(query).show()

+--------+------+-------+
|Protocol| state|      n|
+--------+------+-------+
|     udp|normal| 191348|
|     udp|attack|   2940|
|     tcp|normal| 764894|
|     tcp|attack|1101613|
|    icmp|normal|  12763|
|    icmp|attack|2820782|
+--------+------+-------+



## Question 3: Apply Some Descriptive Statistics on Numerical Data

In [60]:
# PySpark provides built-in standard Aggregate functions defines in DataFrame API
from pyspark.sql.functions import mean, stddev, min, max, last, skewness

df2.select(
    mean(df2.src_bytes).alias('AVG(src_bytes)')
    , stddev(df2.src_bytes).alias('STD(src_bytes)')
    , min(df2.src_bytes).alias('MIN(src_bytes)')
    , max(df2.src_bytes).alias('MAX(src_bytes)')
    , last(df2.src_bytes).alias('LAST(src_bytes)')
    , skewness(df2.src_bytes).alias('SKEW(src_bytes)')
).show()

+------------------+-----------------+--------------+--------------+---------------+------------------+
|    AVG(src_bytes)|   STD(src_bytes)|MIN(src_bytes)|MAX(src_bytes)|LAST(src_bytes)|   SKEW(src_bytes)|
+------------------+-----------------+--------------+--------------+---------------+------------------+
|1834.6211752293746|941431.0744911298|             0|           999|           2341|1188.9519100465739|
+------------------+-----------------+--------------+--------------+---------------+------------------+



In [62]:
groups = df2.groupBy('Protocol')
groups.agg({'src_bytes': 'mean', 'dst_bytes': 'stddev'}).show()

+--------+-----------------+------------------+
|Protocol|   avg(src_bytes)| stddev(dst_bytes)|
+--------+-----------------+------------------+
|     tcp|3388.569965326596|1043771.3100418178|
|     udp|97.22772893848308| 55.43318653434132|
|    icmp|927.8916893855577|               0.0|
+--------+-----------------+------------------+



## Question 4: A Descriptive Stats based on `Protocols` and `Labels`


In [63]:
groups = df2.groupBy(['Protocol', 'label'])
groups.agg({'src_bytes': 'mean', 'dst_bytes': 'stddev'}).show()

+--------+----------------+--------------------+------------------+
|Protocol|           label|      avg(src_bytes)| stddev(dst_bytes)|
+--------+----------------+--------------------+------------------+
|    icmp|          satan.|   21.37837837837838|               0.0|
|     tcp|           back.|  54156.355878347706| 616.2317945744568|
|    icmp|         normal.|   90.67617331348428|               0.0|
|    icmp|        ipsweep.|  10.915116379683308|               0.0|
|     udp|       teardrop.|                28.0|1.2649097429582898|
|     tcp|        neptune.|0.009994244494257088|0.8499274110783581|
|     tcp|buffer_overflow.|  1400.4333333333334|12440.664773465569|
|     tcp|          satan.|  0.9351099172969534| 153.7260479321028|
|     tcp|       multihop.|  435.14285714285717| 382586.0049996229|
|     tcp|   guess_passwd.|  125.33962264150944|257.50228203647305|
|    icmp|           nmap.|                 8.0|               0.0|
|     tcp|    warezclient.|    300219.562745098|

In [65]:
query = """
  select
    Protocol
    , case label
        when 'normal.' then 'normal'
        else 'attack'
    end state
    , count(1) freq
    , avg(src_bytes) mean_src_bytes
    , sum(urgent) sum_urgent
  from df_KDDCup
  group by
    Protocol
    , case label
        when 'normal.' then 'normal'
        else 'attack'
    end
"""
spark.sql(query).show()

+--------+------+-------+------------------+----------+
|Protocol| state|   freq|    mean_src_bytes|sum_urgent|
+--------+------+-------+------------------+----------+
|     tcp|normal| 768670| 1844.290639676324|      35.0|
|     udp|normal| 191348| 98.31598448899388|       0.0|
|    icmp|normal|  12763| 90.67617331348428|       0.0|
|     udp|attack|   2940|26.399319727891157|       0.0|
|     tcp|attack|1101928| 4465.810210830472|       4.0|
|    icmp|attack|2820782| 931.6797813514124|       0.0|
+--------+------+-------+------------------+----------+



## Question 5: Get the frquency of `services` for the original `UDP and ICMP` based `attacks`
(hint 1: original attacks: `[dos, u2r, r2l, probe]`)

(hint 2: returns `services` and `protocols` center justified)

In [72]:
from pyspark.sql.types import StringType

def Attack_Category(item):
  if item.replace('.', '') in ['back', 'land', 'neptune', 'pod', 'smurf', 'teardrop']:
    return 'dos'
  if item.replace('.', '') in ['buffer_overflow', 'loadmodule', 'perl', 'rootkit']:
    return 'u2r'
  if item.replace('.', '') in ['ftp_write', 'guess_passwd', 'imap', 'multihop', 'phf', 'spy', 'warezclient', 'warezmaster']:
    return 'r2l'
  if item.replace('.', '') in ['ipsweep', 'nmap', 'portsweep', 'satan']:
    return 'probe'

def Center_Justify(item):
  return item.center(10)

spark.udf.register("Attack_Category", Attack_Category, StringType())
spark.udf.register("Center_Justify", Center_Justify, StringType())

query = """
  select
    Center_Justify(Service) service
    , Center_Justify(Protocol) protocol
    , Attack_Category(label) cat_label
    , count(1) freq
  from df_KDDCup
  where Protocol in ('udp', 'icmp') and label != 'normal.'
  group by
    Service
    , Protocol
    , Attack_Category(label)
  order by count(1) desc
"""
spark.sql(query).show()

+----------+----------+---------+-------+
|   service|  protocol|cat_label|   freq|
+----------+----------+---------+-------+
|  ecr_i   |   icmp   |      dos|2808145|
|  eco_i   |   icmp   |    probe|  12570|
| private  |   udp    |    probe|   1688|
| private  |   udp    |      dos|    979|
|  other   |   udp    |    probe|    261|
|  ecr_i   |   icmp   |    probe|     59|
| domain_u |   udp    |    probe|      9|
|  tim_i   |   icmp   |      dos|      5|
|  other   |   udp    |      u2r|      3|
|  urp_i   |   icmp   |    probe|      3|
+----------+----------+---------+-------+

