# **KDDCup Data Analytics with PySpark DF: A structured case study**



In [1]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=372f17f619a8fd7e921bf5fc5bec5062c1f13c6ad98069cc0b5e76174192d3e8
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [23]:
# import SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark

# Read and Load Data to Spark
# Data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

df = spark.read.text("kddcup.data.gz")
df.printSchema()


In [25]:
df = spark.read.text("kddcup.data.gz")
df.printSchema()

root
 |-- value: string (nullable = true)



In [26]:
# Split data (if needed)
split_col = split(df['value'], ',')
df = df.withColumn('Protocol', split_col.getItem(1)) \
       .withColumn('Service', split_col.getItem(2)) \
       .withColumn('flag', split_col.getItem(3)) \
       .withColumn('src_bytes', split_col.getItem(4)) \
       .withColumn('dst_bytes', split_col.getItem(5)) \
       .withColumn('urgent', split_col.getItem(8)) \
       .withColumn('num_failed_logins', split_col.getItem(10)) \
       .withColumn('root_shell', split_col.getItem(13)) \
       .withColumn('guest_login', split_col.getItem(21)) \
       .withColumn('label', split_col.getItem(41)) \
       .drop('value')

df.show()

+--------+-------+----+---------+---------+------+-----------------+----------+-----------+-------+
|Protocol|Service|flag|src_bytes|dst_bytes|urgent|num_failed_logins|root_shell|guest_login|  label|
+--------+-------+----+---------+---------+------+-----------------+----------+-----------+-------+
|     tcp|   http|  SF|      215|    45076|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      162|     4528|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      236|     1228|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      233|     2032|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      239|      486|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      238|     1282|     0|                0|         0|          0|normal.|
|     tcp|   http|  SF|      235|     1337|     0|                0|         0|          0|normal.|


In [29]:
# Increase the number of partitions (if needed) and Build a Temp table

df = df.repartition(10)

print(df.rdd.getNumPartitions())

df.createOrReplaceTempView("df_KDDCup")

10


## Question 1: Count the number of connections for each label

In [31]:
df.groupBy('label').count().orderBy('count', ascending=False).show()

+----------------+-------+
|           label|  count|
+----------------+-------+
|          smurf.|2807886|
|        neptune.|1072017|
|         normal.| 972781|
|          satan.|  15892|
|        ipsweep.|  12481|
|      portsweep.|  10413|
|           nmap.|   2316|
|           back.|   2203|
|    warezclient.|   1020|
|       teardrop.|    979|
|            pod.|    264|
|   guess_passwd.|     53|
|buffer_overflow.|     30|
|           land.|     21|
|    warezmaster.|     20|
|           imap.|     12|
|        rootkit.|     10|
|     loadmodule.|      9|
|      ftp_write.|      8|
|       multihop.|      7|
+----------------+-------+
only showing top 20 rows



## Question 2:  Get the list of `Protocols`that are `normal` and `vulnerable to attacks`, where there is NOT `guest login` to the destination addresses


In [40]:
sql_query = """ SELECT Protocol,
                CASE label
                  WHEN 'normal.' THEN 'no attack'
                  ELSE 'attack'
                END AS State,
                COUNT(*) as freq
              FROM df_KDDCup
              WHERE guest_login != 1
              GROUP BY Protocol, State
              ORDER BY Protocol DESC
            """

spark.sql(sql_query).show()

+--------+---------+-------+
|Protocol|    State|   freq|
+--------+---------+-------+
|     udp|   attack|   2940|
|     udp|no attack| 191348|
|     tcp|no attack| 764894|
|     tcp|   attack|1101613|
|    icmp|   attack|2820782|
|    icmp|no attack|  12763|
+--------+---------+-------+




## Question 3: Apply Some Descriptive Statistics on Numerical Data

In [41]:
# PySpark provides built-in standard Aggregate functions defines in DataFrame API
from pyspark.sql.functions import *

summary = df.select(mean(df.src_bytes).alias('AVG(src_bytes)'),
                    stddev(df.src_bytes).alias('STD(src_bytes)'),
                    min(df.src_bytes).alias('MIN(src_bytes)'),
                    max(df.src_bytes).alias('MAX(src_bytes)'),
                    last(df.src_bytes).alias('LAST(src_bytes)'),
                    skewness(df.src_bytes).alias('SKEW(src_bytes)')

                    )

summary.show()

+------------------+-----------------+--------------+--------------+---------------+------------------+
|    AVG(src_bytes)|   STD(src_bytes)|MIN(src_bytes)|MAX(src_bytes)|LAST(src_bytes)|   SKEW(src_bytes)|
+------------------+-----------------+--------------+--------------+---------------+------------------+
|1834.6211752293746|941431.0744911298|             0|           999|           2341|1188.9519100465739|
+------------------+-----------------+--------------+--------------+---------------+------------------+



In [42]:
groups = df.groupBy("Protocol")
groups.agg({'src_bytes':'mean', 'dst_bytes':'stddev'}).show()

+--------+-----------------+------------------+
|Protocol|   avg(src_bytes)| stddev(dst_bytes)|
+--------+-----------------+------------------+
|     tcp|3388.569965326596|1043771.3100418178|
|     udp|97.22772893848308| 55.43318653434132|
|    icmp|927.8916893855577|               0.0|
+--------+-----------------+------------------+



## Question 4: A Descriptive Stats based on `Protocols` and `Labels`


In [43]:
sql_query = """
                           SELECT protocol,
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                            ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                            ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                            SUM(urgent) as sum_urgent,
                            SUM(num_failed_logins) as sum_failed_logins,
                            SUM(root_shell) as sum_root_shell,
                            SUM(guest_login) as sum_guest_login
                           FROM df_KDDCup
                           GROUP BY protocol, state
                           ORDER BY 3 DESC
                           """
spark.sql(sql_query).show()

+--------+---------+----------+--------------+--------------+----------+-----------------+--------------+---------------+
|protocol|    state|total_freq|mean_src_bytes|mean_dst_bytes|sum_urgent|sum_failed_logins|sum_root_shell|sum_guest_login|
+--------+---------+----------+--------------+--------------+----------+-----------------+--------------+---------------+
|    icmp|   attack|   2820782|        931.68|           0.0|       0.0|              0.0|           0.0|            0.0|
|     tcp|   attack|   1101928|       4465.81|       2005.96|       4.0|             61.0|          32.0|          315.0|
|     tcp|no attack|    768670|       1844.29|       4071.32|      35.0|             96.0|         302.0|         3776.0|
|     udp|no attack|    191348|         98.32|         89.41|       0.0|              0.0|           0.0|            0.0|
|    icmp|no attack|     12763|         90.68|           0.0|       0.0|              0.0|           0.0|            0.0|
|     udp|   attack|    

## Question 5: Get the frquency of `services` for the original `UDP and ICMP` based `attacks`
(hint 1: original attacks: `[dos, u2r, r2l, probe]`)

(hint 2: returns `services` and `protocols` center justified)

In [44]:
from pyspark.sql.types import StringType

def Attack_Category(item):
  if item.replace(".", "") in ['back', 'land', 'neptune', 'pod', 'smurf', 'teardrop']:
    return "DoS"
  elif item.replace(".", "") in ['buffer_overflow', 'loadmodule', 'perl', 'rootkit']:
    return "U2R"
  elif item.replace(".", "") in ['ftp_write', 'guess_passwd', 'multihop', 'phf', 'spy', 'warezclient', 'warezmaster']:
    return "R2L"
  else: return "Probe"

def Center_Justify(item):
  return item.center(10)


spark.udf.register("OrginalAttacks", Attack_Category, StringType())
spark.udf.register("TextJustify", Center_Justify, StringType())

sql_query = """
                SELECT
                  TextJustify(service) as service,
                  TextJustify(protocol) as protocol,
                  OrginalAttacks(label) as new_label,
                  COUNT(*) as freq
                FROM df_KDDCup
                WHERE (protocol = 'udp' OR protocol = 'icmp') AND label != 'normal.'
                GROUP BY service, new_label, protocol
                ORDER BY freq DESC
          """

spark.sql(sql_query).show()

+----------+----------+---------+-------+
|   service|  protocol|new_label|   freq|
+----------+----------+---------+-------+
|  ecr_i   |   icmp   |      DoS|2808145|
|  eco_i   |   icmp   |    Probe|  12570|
| private  |   udp    |    Probe|   1688|
| private  |   udp    |      DoS|    979|
|  other   |   udp    |    Probe|    261|
|  ecr_i   |   icmp   |    Probe|     59|
| domain_u |   udp    |    Probe|      9|
|  tim_i   |   icmp   |      DoS|      5|
|  other   |   udp    |      U2R|      3|
|  urp_i   |   icmp   |    Probe|      3|
+----------+----------+---------+-------+

