# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Mini-Project: Complex Analytics using Pyspark

## Problem Statement

Perform complex analytics on a network intrusion dataset using Pyspark

## Learning Objectives

At the end of the mini-project, you will be able to :

* analyze the data using Pyspark
* implement RDD based operations on the data
* derive insights from the complex data

### Dataset

The dataset chosen for this mini-project is a [10% subset](https://www.kdd.org/kdd-cup/view/kdd-cup-1999/Data) of the **[KDD Cup 1999 dataset](http://kdd.ics.uci.edu/databases/kddcup99/task.html)** (Computer network intrusion detection). This is the dataset used for the Third International Knowledge Discovery and Data Mining Tools Competition. The competition task was to build a network intrusion detector, a predictive model capable of distinguishing between ``bad`` connections, called intrusions or attacks, and ``good`` normal connections. This database contains a standard set of data to be audited, which includes a wide variety of intrusions simulated in a military network environment.

## Information

Since 1999, KDD’99 has been the most widely used data set for the evaluation of anomaly detection methods. This data set is prepared by S. J. Stolfo and is built based on the data captured in DARPA’98 IDS evaluation program. DARPA’98 is about 4 gigabytes of compressed raw (binary) tcpdump data of 7 weeks of network traffic, which can be processed into about 5 million connection
records, each with about 100 bytes. KDD dataset consists of approximately 4,900,000 single connection vectors each of which contains 41 features and is labeled as either normal or an attack, with exactly one specific attack type. The simulated attacks fall into one of the following four categories:

* Denial of Service Attack (DoS): making some computing or memory resources too busy so that they deny legitimate users access to these resources.
* User to Root Attack (U2R): unauthorized access from a remote machine according to exploit machine's vulnerabilities.
* Remote to Local Attack (R2L): unauthorized access to local super user (root) privileges using system's susceptibility.
* Probing Attack: host and port scans as precursors to other attacks. An attacker scans a network to gather information or find known vulnerabilities.

KDD’99 features can be classified into three groups:

1) Basic features: this category encapsulates all the attributes that can be extracted from a TCP/IP connection. Most of these features leading to an implicit delay in detection.

2) Traffic features: this category includes features that are computed with respect to a window interval and is divided into two groups:

  * "same host" features

  * "same service" features

3) Content features: unlike most of the DoS and Probing attacks, the R2L and U2R attacks don’t have any intrusion frequent sequential patterns. This is because the DoS and Probing attacks involve many connections to some host(s) in a very short period of time, however the R2L and U2R attacks are embedded in the data portions of the packets, and normally involve only a single connection. To detect these kinds of attacks, we need some features to be able to look for suspicious behavior in the data portion, e.g., the number of failed login attempts. These features are called content features.

## Grading = 10 Points

In [None]:
#@title Install packages and Download Dataset
!pip -qq install pyspark
# Download the data
!wget -qq http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
# Download feature names
!wget -qq https://kdd.ics.uci.edu/databases/kddcup99/kddcup.names
print("Successfully Installed packages and downloaded datasets!")

### Creating Spark Session and Load the data (1 point)

#### Import required packages

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.mllib.stat import Statistics
import seaborn as sns
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
from operator import add

#### Create a Spark session

Create a Spark session is a combined entry point of a Spark application, which came into implementation from Spark 2.0 (Instead of having various contexts, everything is encapsulated in a Spark session)

In [None]:
# Start spark session
spark = SparkSession.builder.appName('Intrusion_Detector').getOrCreate()
spark

#### Creating an RDD from a File

The most common way of creating an RDD is to load it from a file. Notice that Spark's textFile can handle compressed files directly.

In [None]:
#Accessing sparkContext from sparkSession instance.
sc = spark.sparkContext

Load the dataset and show the top 10 records

Hint: sparkContext.textFile()

In [None]:
data_file = "/content/kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [None]:
raw_data.count()

In [None]:
raw_data.take(5)

### RDD Basic Operations (4 points)

#### Convert the data to CSV format (list of elements).

To create a Dataframe using the RDD file, convert each row into a list by splitting with a comma (,)

Hint: `map()` and `split()`

In [None]:
csv_data = raw_data.map(lambda x: x.split(","))
head_rows = csv_data.take(5)
head_rows[0]

Count how many interactions are normal and attacked in the dataset.

Hint: apply `filter` on each row, except the rows with 'normal.', all the remaining values are attacked.

In [None]:
normal_data = csv_data.filter(lambda x: x[41] == 'normal.')
normal_data.count()

In [None]:
attack_data = csv_data.filter(lambda x: x[41] !='normal.')
attack_data.count()

#### Protocol and Service combinations using Cartesian product

We can compute the Cartesian product between two RDDs by using the Cartesian transformation. It returns all possible pairs of elements between two RDDs. In our case, we will use it to generate all the possible combinations between Service and Protocol in our network interactions.

First of all, isolate each collection of values in two separate RDDs. For that use `distinct` on the CSV-parsed dataset. From the dataset description, we know that protocol is the second column and service is the third.

In [None]:
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

In [None]:
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()

Now let's do the Cartesian product

Hint: [cartesian](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.cartesian.html#:~:text=Return%20the%20Cartesian%20product%20of,and%20b%20is%20in%20other%20.)

In [None]:
product = protocols.cartesian(services).collect()
print("There are {} combinations of protocol X service".format(len(product)))

#### Inspecting interaction duration

select the total duration of interactions for normal and attack intrusion types.
* Use the above filtered normal and attacked data and convert the duration column to integer type using `map()`
* get the sum of duration by applying `reduce` on both the data using add operator
* find the mean of duration by dividing the sum with count

Hint: [reduce()](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.RDD.reduce.html)

In [None]:
normal_duration_data = normal_data.map(lambda x: int(x[0]))
attack_duration_data = attack_data.map(lambda x: int(x[0]))

In [None]:
total_normal_duration = normal_duration_data.reduce(add)
total_attack_duration = attack_duration_data.reduce(add)

print("Total duration for 'normal' interactions is {}".format(total_normal_duration))
print("Total duration for 'attack' interactions is {}".format(total_attack_duration))

In [None]:
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()
print("Mean duration for 'normal' interactions is {}".format(total_normal_duration / float(normal_count)))
print("Mean duration for 'attack' interactions is {}".format(total_attack_duration / float(attack_count)))

#### Data aggregation with key/value pair RDDs

We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements.

* create a key/value pair of intrusion type and duration
* calculate the total duration of each intrusion type using `reduceByKey()`

In [None]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0])))
durations_by_key = key_value_duration.reduceByKey(add)

durations_by_key.collect()

### Create a DataFrame with the header as features (2 points)

* Read the features (*kddcup.names*) and preprocess.

    Hints:
    - Each feature description appears row-wise in *kddcup.names*
    - The first row consists of distinct values of intrusion_types
    - Add or move the *intrusion_types* column name to the last, to align with the data.
    - Each feature is represented as *feature_name*: *type*, remove *type* after colon (:)

In [None]:
names = sc.textFile("/content/kddcup.names")
firstRow = names.first()
data = names.filter(lambda row:row != firstRow)
data = data.map(lambda row: row.split(':')[0])

In [None]:
names_list = data.collect()
names.count(), data.count()

In [None]:
names_list.append('intrusion_type')
names_list

* Create a dataframe with the data and headers as preprocessed feature names

In [None]:
sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame(csv_data).toDF(*names_list)
df.show(5)

#### What is the count of each protocol type?

Hint: apply `groupby` on protocol_type and count the records

In [None]:
df.groupBy('protocol_type').count().orderBy('count', ascending=False).show()

#### Register the DataFrame as a temporary table
Using sql context, write a query to
   * extract the label and their frequencies
   * select the distinct protocol types with their count of transactions which are not normal
   * select count of transactions in each protocol type that lasts more than 1 second (duration > 1000), with no data transfer from destination (dst_bytes == 0)

 Hint: `SQLContext.sql(query)`

* extract the label and their frequencies

In [None]:
df.registerTempTable("connections")

In [None]:
labels = sqlContext.sql("""SELECT intrusion_type, count(*) as freq FROM connections GROUP BY intrusion_type ORDER BY 2 DESC;""")
labels.show()

* select the distinct protocol types with their count of transactions which are not normal

In [None]:
attack_protocol = sqlContext.sql("""
                           SELECT
                             protocol_type,
                             COUNT(*) as freq
                           FROM connections WHERE intrusion_type != 'normal.'
                           GROUP BY protocol_type
                           """)
attack_protocol.show()

* select count of transactions in each protocol type that lasts more than 1 second (duration > 1000), with no data transfer from destination (dst_bytes == 0)

In [None]:
# df.select("protocol_type", "duration", "dst_bytes").filter(
#     df.duration>1000).filter(df.dst_bytes==0).groupBy("protocol_type").count().show()

protocols_long_duration = sqlContext.sql("SELECT protocol_type, COUNT(*) FROM connections WHERE duration > 1000 AND dst_bytes==0 GROUP BY protocol_type")
protocols_long_duration.show()

### Find the highly correlated columns (2 points)

* identify the columns which are not integer type and remove those columns
* apply correlation function on the data (Hint: `Statistics.corr()`)
* collect the names of the columns on which correlation is applied
* create a dataframe with correlation matrix with index and columns as names
* get the highly correlated features by considering a correlation value greater than 0.8

    Hint: `np.triu()` , `pd.mask()`

In [None]:
def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)

In [None]:
correlation_matrix = Statistics.corr(vector_data)

In [None]:
# eliminating names of columns having string type
numeric_names = [name for i,name in enumerate(names_list) if i not in [1,2,3,41]]
numeric_names

In [None]:
# creating a dataframe with correlation matrix
corr_df = pd.DataFrame(correlation_matrix, index=numeric_names, columns=numeric_names)
corr_df

In [None]:
#  fill the null values with 0
corr_df.fillna(0,inplace=True)

In [None]:
# Set Up Mask To Hide Upper Triangle
mask = np.triu(np.ones_like(corr_df, dtype=bool))
tri_df = corr_df.mask(mask)

# Finding features with correlation value more than specified threshold value (bar=0.9)
highly_cor_col = [col for col in tri_df.columns if any(tri_df[col] > 0.8 )]
print("length of highly correlated columns",highly_cor_col)

### Analysis report (1 points)

* Find the ratio of attacked transactions vs normal transactions

    Hint: encode instrusion_type column by replacing normal with 1 and all other with 0

* Describe the statistics of attacked and normal transactions
    
    Hint: Min, Max, Mean
    
* Select any two features that influence the intrusion_type and visualize the scatter plot to see the separation of normal and attacked

In [None]:
label_encode = udf(lambda x: 0 if x == 'normal.' else 1)
df = df.withColumn('label',label_encode(df['intrusion_type']))

In [None]:
fea_label = highly_cor_col
fea_label.append('label')
fea_label

In [None]:
data1 = df.select(fea_label)
data1

In [None]:
# convert to Pandas and fill the null values
df2 = data1.toPandas()
df2.fillna(0,inplace=True)
df2.label = df2.label.astype('int')

In [None]:
df2_normal = df2[df2['label']==0]
df2_attacked = df2[df2['label']==1]
# Ratio of attacked vs normal
len(df2_attacked) / len(df2) , len(df2_normal)/len(df2)

In [None]:
# describe
df2.describe()

In [None]:
# compare the min, max, mean of attacked and normal
df_stats = pd.DataFrame([df2_attacked.max(),df2_attacked.min(), df2_normal.max(), df2_normal.min()])
df_stats

In [None]:
# Scatter plot
df2.plot(kind="scatter", x="srv_count", y="count", label="label",c= 'label',cmap='BrBG_r')
plt.legend()
plt.show()

---------------------------------------- (or)  --------------------------------------

In [None]:
plt.scatter(df2['srv_count'],df2['count'],c=df2['label'])
plt.show()