# **KDDCup Data Analytics with PySpark RDD: A structured case study**

### YouTube channel: Code with Kristi

### Tutor: Dr Sachin Saxena (PhD, MTech, BTech)

##### data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html


In [None]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

In [1]:
from pyspark import SparkContext, SparkConf

# Initializing Spark
conf = SparkConf().setAppName("KDDCup_PySpark").setMaster("local[*]")
sc = SparkContext(conf=conf)
print(sc)
print("Ready to go!")

<SparkContext master=local[*] appName=KDDCup_PySpark>
Ready to go!


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [64]:
# Read and Load Data to Spark
# Data source: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
rdd = sc.textFile("/content/drive/MyDrive/Big Data Pyspark Project/kddcup.data.gz")

In [None]:
# Repartition and Cache Data:


# check for another custom dataset
# Optional code starts here

In [16]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

# Initialize SparkContext
# sc = SparkContext("RDD Example")

# Your list of data
data = [('Ram', 30),
        ('Shyam', 25),
        ('Sita', 28),
        ('Lakshman', 32),
        ('Radha', 27),
        ('Krishna', 35)]

# Convert the list into an RDD
rdd = sc.parallelize(data)

# To check the contents of the RDD
print(rdd.collect())


[('Ram', 30), ('Shyam', 25), ('Sita', 28), ('Lakshman', 32), ('Radha', 27), ('Krishna', 35)]


In [38]:
# How many partitions do we have?
# By default, the number of partitions is determined by the number of cores available in your local setup or cluster.
# If you are running it locally, it's often based on the number of CPU cores.
rdd.getNumPartitions()
# Why only one partition, bcs spark see zip 17 MB in one partition, we need more partition

1

In [39]:
# Create RDD with a specific number of partitions (e.g., 4 partitions)
rdd = sc.parallelize(data, 4)

# Check the number of partitions again
num_partitions = rdd.getNumPartitions()

In [40]:
num_partitions

4

# Optional code till here

In [43]:
rdd.glom().map(len).collect()

[4898431]

In [44]:
rdd= rdd.repartition(10) # shuffle all data

In [45]:
rdd.glom().map(len).collect()

[489850,
 489850,
 489841,
 489840,
 489840,
 489840,
 489840,
 489840,
 489840,
 489850]

In [46]:
print(sc.defaultParallelism)
print(rdd.getNumPartitions())

rdd.persist()
# 2 cores and 10 partitions, 5 partitions in each core

2
10


MapPartitionsRDD[66] at coalesce at NativeMethodAccessorImpl.java:0

In [47]:
rdd.glom().map(len).collect()

[489850,
 489850,
 489841,
 489840,
 489840,
 489840,
 489840,
 489840,
 489840,
 489850]

In [69]:
rdd= rdd.repartition(15) # shuffle all data

In [70]:
rdd.glom().map(len).collect()

[326560,
 326560,
 326560,
 326560,
 326560,
 326560,
 326560,
 326560,
 326560,
 326570,
 326570,
 326570,
 326561,
 326560,
 326560]

In [71]:
rdd=rdd.coalesce(5)
# using coalesce partitions are not equal

coalesce() reduces the number of partitions and is more efficient when reducing partitions because it minimizes data movement. It is best used when you are shrinking the number of partitions.

### In your case, trying to increase partitions using coalesce(15) may not be the best option, since coalesce is meant for reducing partitions. To increase partitions, you should use repartition().

In [72]:
rdd.glom().map(len).collect()

[979680, 979700, 979681, 979680, 979690]

In [29]:
# Your list of data

from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()

data = [('Ram', 30),
        ('Shyam', 25),
        ('Sita', 28),
        ('Lakshman', 32),
        ('Radha', 27),
        ('Krishna', 35)]

df= spark.createDataFrame(data=data,schema=['name','age'])

df.show()


+--------+---+
|    name|age|
+--------+---+
|     Ram| 30|
|   Shyam| 25|
|    Sita| 28|
|Lakshman| 32|
|   Radha| 27|
| Krishna| 35|
+--------+---+



In [31]:
df.rdd.getNumPartitions()


2

In [33]:
df1=df.repartition('age')
df1.rdd.getNumPartitions()

1

In [34]:
df1.rdd.glom().map(len).collect()

[6]

In [35]:
df2=df1.coalesce(2)


In [36]:
df1.rdd.glom().map(len).collect()

[6]

Understanding Partitions in Spark
Partitions in Spark represent how the data is distributed across the cluster or machine. Each partition can be processed in parallel on separate nodes (in a cluster) or cores (in a local setup). Proper partitioning helps optimize performance, balancing between parallelism and resource usage.

Key Points About Partitions:
Parallel Processing:

Spark processes data in parallel using partitions. More partitions can improve parallelism, but too many partitions can lead to overhead, while too few can lead to underutilization of resources.
Default Partitioning:

Spark automatically determines the number of partitions based on the environment:
Local Mode: It often defaults to the number of available cores.
Cluster Mode: It depends on the configuration, including the number of cores, workers, or resources in the cluster.
Custom Partitioning:

When creating an RDD using parallelize(), you can specify the number of partitions:
python
Copy code
rdd = sc.parallelize(data, num_partitions)
Increasing the number of partitions can help in situations where you have a large dataset and want to utilize more computational resources.
Shuffling and Repartitioning:

You can change the number of partitions in an RDD after it has been created:
Repartitioning (to increase the number of partitions): rdd.repartition(num_partitions)
Coalescing (to decrease the number of partitions, without a full shuffle): rdd.coalesce(num_partitions)
Repartitioning involves shuffling the data across the cluster, which can be resource-intensive, while coalescing is more efficient if you are reducing the number of partitions.
Choosing the Number of Partitions:

A general rule of thumb is to have at least 2-3 partitions per CPU core. For example, if you have a 4-core machine, having 8-12 partitions ensures efficient parallelism without too much overhead.
For larger clusters or distributed environments, the number of partitions should be chosen to maximize CPU utilization and minimize shuffling.

# Windows functions

In [79]:
# Your list of data

from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()

data = [('Ram', 30),
        ('Shyam', 25),
        ('Sita', 30),
        ('Lakshman', 25),
        ('Radha', 27),
        ('Krishna', 35),
        ('Lakshman', 25),
        ('Radha', 27),
        ('Krishna', 35)]

df= spark.createDataFrame(data=data,schema=['name','age'])

df.show()


+--------+---+
|    name|age|
+--------+---+
|     Ram| 30|
|   Shyam| 25|
|    Sita| 30|
|Lakshman| 25|
|   Radha| 27|
| Krishna| 35|
|Lakshman| 25|
|   Radha| 27|
| Krishna| 35|
+--------+---+



In [76]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, rank, col

# Define a window specification - partition by nothing (global window) and order by 'age'
windowSpec = Window.orderBy(col("age"))

# Add a new column 'row_number' that assigns a unique row number to each row based on the 'age' column
df_with_row_number = df.withColumn("row_number", row_number().over(windowSpec))

# Show the result
df_with_row_number.show()


+--------+---+----------+
|    name|age|row_number|
+--------+---+----------+
|   Shyam| 25|         1|
|Lakshman| 25|         2|
|   Radha| 27|         3|
|     Ram| 30|         4|
|    Sita| 30|         5|
| Krishna| 35|         6|
+--------+---+----------+



In [77]:
# Using rank to assign ranks to rows based on age
df_with_rank = df.withColumn("rank", rank().over(windowSpec))
df_with_rank.show()


+--------+---+----+
|    name|age|rank|
+--------+---+----+
|   Shyam| 25|   1|
|Lakshman| 25|   1|
|   Radha| 27|   3|
|     Ram| 30|   4|
|    Sita| 30|   4|
| Krishna| 35|   6|
+--------+---+----+



In [80]:
from pyspark.sql.functions import sum

# Cumulative sum of 'age' column over the window
df_with_cumsum = df.withColumn("cumulative_sum", sum(col("age")).over(windowSpec))
df_with_cumsum.show()


+--------+---+--------------+
|    name|age|cumulative_sum|
+--------+---+--------------+
|   Shyam| 25|            75|
|Lakshman| 25|            75|
|Lakshman| 25|            75|
|   Radha| 27|           129|
|   Radha| 27|           129|
|     Ram| 30|           189|
|    Sita| 30|           189|
| Krishna| 35|           259|
| Krishna| 35|           259|
+--------+---+--------------+



In [81]:
# Define a window specification - partition by 'department' and order by 'age'
# Assuming you have a 'department' column
windowSpecPartitioned = Window.partitionBy("name").orderBy(col("age"))

# Add row number for each 'department'
df_with_partitioned_row_number = df.withColumn("row_number", row_number().over(windowSpecPartitioned))
df_with_partitioned_row_number.show()


+--------+---+----------+
|    name|age|row_number|
+--------+---+----------+
| Krishna| 35|         1|
| Krishna| 35|         2|
|Lakshman| 25|         1|
|Lakshman| 25|         2|
|   Radha| 27|         1|
|   Radha| 27|         2|
|     Ram| 30|         1|
|   Shyam| 25|         1|
|    Sita| 30|         1|
+--------+---+----------+



## Question 1: Get ten records randomly


In [None]:
rdd.takeSample(False, 10,1234) # False means I do need any replacement, 1234 is seed

['0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,510,510,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.',
 '910,udp,other,SF,146,105,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,2,0.01,0.67,0.96,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,csnet_ns,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,198,2,1.00,1.00,0.00,0.00,0.01,0.06,0.00,255,2,0.01,0.06,0.00,0.00,1.00,1.00,0.00,0.00,neptune.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,255,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,smurf.',
 '0,icmp,eco_i,SF,8,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,35,0.00,0.00,0.00,0.00,1.00,0.00,1.00,2,255,1.00,0.00,1.00,0.50,0.00,0.00,0.00,0.00,ipsweep.',
 '0,tcp,private,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,117,10,1.00,1.00,0.00,0.00,0.09,0.07,0.00,255,11,0.04,0.07,0.00,0.00,1.00,1.00,0.00,0.00,neptune.',
 '0,icmp,ecr_i,SF,520,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.0

## Question 2: Count elements

In [None]:
rdd.count()

4898431

## Question 3: Calculate the ratio of `normal` connections


In [None]:
# last column in data
normal_rdd = rdd.filter(lambda line: 'normal.' in line )
ratio = normal_rdd.count()/rdd.count()*100
print("Ratio of Normal Connection is {:.4f} %".format(ratio))

# means 80 % of connections are attack traffic

Ratio of Normal Connection is 19.8590 %


## Question 4: Get the list of labels


In [None]:
Split_rdd = rdd.map(lambda line: line.split(",") )
# Label_rdd = Split_rdd.map(lambda item: item[41]).distinct()
# why 41. look at https://kdd.ics.uci.edu/databases/kddcup99/kddcup.names count at 41
# or use this command
Label_rdd = Split_rdd.map(lambda item: item[-1]).distinct()
Label_rdd.collect()

['pod.',
 'neptune.',
 'warezclient.',
 'loadmodule.',
 'smurf.',
 'nmap.',
 'spy.',
 'back.',
 'teardrop.',
 'ipsweep.',
 'phf.',
 'ftp_write.',
 'multihop.',
 'guess_passwd.',
 'normal.',
 'imap.',
 'satan.',
 'land.',
 'portsweep.',
 'warezmaster.',
 'rootkit.',
 'perl.',
 'buffer_overflow.']

## Question 5: Count the number of connections for each label

In [None]:
rdd.filter(lambda line: 'neptune.' in line).count()
rdd.filter(lambda line: 'perl.' in line).count()

3

### or

In [None]:
# first i is 'pod.', 'neptune.', and so on .....
def Label_count_Function(items):
  Labels_count = []
  for i in items:
    Labels_count.append(rdd.filter(lambda line: i in line).count())


In [None]:
%%time
# Soultion no 1 with long time more than 10 mins
# Label_count_Function(Label_rdd.collect())

CPU times: user 3 µs, sys: 1e+03 ns, total: 4 µs
Wall time: 11 µs


In [None]:
# create key and value pairs
# Soultion no 2 with short time
%%time
Label_rdd_KV = Split_rdd.map(lambda x:( x[-1],1))
Label_RDD_reduce = Label_rdd_KV.reduceByKey(lambda a,b: a+b )


CPU times: user 8.11 ms, sys: 0 ns, total: 8.11 ms
Wall time: 31.9 ms


In [None]:
import pandas as pd
Keys = Label_RDD_reduce.keys().collect()
Values = Label_RDD_reduce.values().collect()

DF_Labels_KV = pd.DataFrame({'Lable':Keys, 'Count':Values})

DF_Labels_KV.sort_values(by="Count",ascending=False)

# we have more than 2 millions labels with Smurf and widly accepted traffic and really dengerous

Unnamed: 0,Lable,Count
4,smurf.,2807886
1,neptune.,1072017
14,normal.,972781
16,satan.,15892
9,ipsweep.,12481
18,portsweep.,10413
5,nmap.,2316
7,back.,2203
2,warezclient.,1020
8,teardrop.,979


## Question 6: Get the connection type with successful `root_shell` connections to servers, where the number of data bytes from source (`src_bytes`) is 500 times more than from server (`dst_bytes`)

In [None]:
Split_rdd.filter(lambda x: x[13]!='1')\
.map(lambda x: (x[1], x[4],x[5])) \
.filter(lambda x:int(x[2])>int(x[1])*500)\
.collect()
# .count()
# .take(10)


# this (x[1], (x[4],x[5])) is equilvalent to (tcp, (source,destination)) taken from
# https://kdd.ics.uci.edu/databases/kddcup99/kddcup.names
# x[1]= protocol_type: symbolic.
# x[4]= src_bytes: continuous.
# x[5]= dst_bytes: continuous.

# and
# take (10) Return the first 10 rows of the DataFrame



[('tcp', '227', '238768'),
 ('tcp', '228', '191273'),
 ('tcp', '0', '8081'),
 ('tcp', '0', '14736'),
 ('tcp', '161', '142716'),
 ('tcp', '241', '125015'),
 ('tcp', '251', '171136'),
 ('tcp', '241', '125015'),
 ('tcp', '0', '3796'),
 ('tcp', '240', '125015'),
 ('tcp', '0', '4'),
 ('tcp', '0', '3679'),
 ('tcp', '0', '2321'),
 ('tcp', '585', '2661605'),
 ('tcp', '670', '424558'),
 ('tcp', '293', '1306670'),
 ('tcp', '477', '1731900'),
 ('tcp', '776', '1249426'),
 ('tcp', '245', '756005'),
 ('tcp', '245', '756005'),
 ('tcp', '318', '267694'),
 ('tcp', '132', '212680'),
 ('tcp', '250', '273525'),
 ('tcp', '563', '508929'),
 ('tcp', '675', '1618836'),
 ('tcp', '903', '507630'),
 ('tcp', '443', '353774'),
 ('tcp', '989', '1113826'),
 ('tcp', '598', '1835587'),
 ('tcp', '769', '901353'),
 ('tcp', '551', '4091815'),
 ('tcp', '524', '410401'),
 ('tcp', '556', '363741'),
 ('tcp', '590', '1470899'),
 ('tcp', '755', '417500'),
 ('tcp', '154', '187774'),
 ('tcp', '179', '273525'),
 ('tcp', '0', '4')

## Question 7:  Get the list of `Protocols`that are `normal` and `vulnerable to attacks`, where there is NOT `guest login` to the destination addresses


In [None]:
normal_protocols_rdd = Split_rdd.filter(lambda line: "normal." in line[-1] and line[21] != '1')\
.map(lambda line: (line[1],1)).reduceByKey(lambda x,y: x+y)


attack_protocols_rdd = Split_rdd.filter(lambda line: "normal." not in line[-1] and line[21] != '1')\
.map(lambda line: (line[1],1)).reduceByKey(lambda x,y: x+y)

normal_KeyValue = pd.DataFrame({'Label':normal_protocols_rdd.keys().collect(),'State':'normal','Count': normal_protocols_rdd.values().collect()})

attack_KeyValue = pd.DataFrame({'Label':attack_protocols_rdd.keys().collect(),'State':'normal','Count': attack_protocols_rdd.values().collect()})

results = pd.concat([normal_KeyValue, attack_KeyValue])
results.sort_values(by='Label', ascending=False )

Unnamed: 0,Label,State,Count
1,udp,normal,191348
1,udp,normal,2940
2,tcp,normal,764894
2,tcp,normal,1101613
0,icmp,normal,12763
0,icmp,normal,2820782


## Question 8: Get a summary statistics for the sum of `tcp` connections to the same destination IP address (hint: `protocol_type` and `dst_host_count` features)

In [None]:
# Source: https://spark.apache.org/docs/latest/mllib-statistics.html

from pyspark.mllib.stat import Statistics
from math import sqrt

summary=Statistics.colStats(Split_rdd.filter(lambda line: line[1]=='tcp').map(lambda line: [int(line[31])]))
tcp_mean = round(float(summary.mean()),3)
tcp_std = round(float(sqrt(summary.variance())),3)
tcp_min = round(float(summary.min()),3)
tcp_max  = round(float(summary.max()),3)
print([tcp_mean, tcp_std, tcp_min, tcp_max])



[201.752, 90.726, 0.0, 255.0]


  tcp_mean = round(float(summary.mean()),3)
  tcp_std = round(float(sqrt(summary.variance())),3)
  tcp_min = round(float(summary.min()),3)
  tcp_max  = round(float(summary.max()),3)


## [challenge] Question 9: Filter the number of `icmp`-based attacks for each `service`