# Traffic Analysis using PySpark
This is a personal project to analyze traffic data.

## Install PySpark

In [None]:
!pip install pyspark

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from functools import reduce
from pyspark.sql.functions import isnan, when, count, col
import matplotlib.pyplot as plt

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Create Spark Session

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "15g").appName('TrafficAnalysisUsingPySpark').getOrCreate()
print(spark)

In [None]:
# Load csv to Spark DataFrame
TRAFFIC_DATA = "/kaggle/input/ip-network-traffic-flows-labeled-with-87-apps/Dataset-Unicauca-Version2-87Atts.csv"

traffic_df = spark.read.options(header='True',inferSchema='True') \
                    .csv(path=TRAFFIC_DATA)

# Display the schema of DataFrame
traffic_df.printSchema()

In [None]:
# Show the first three rows (too many columns!)
# The columns have trailing whitespaces! 
# traffic_df.show(3)

## Data Transformation & Exploration

In [None]:
# Show dataframe columns
# https://www.linkedin.com/pulse/how-deal-white-spaces-column-names-your-raw-dataset-using-anirban-som/
current_columns = traffic_df.columns

new_columns = list(map(lambda item : item.replace(" ","_").replace(".","_").upper().strip(),current_columns)) 

final_df = reduce(lambda data, idx: data.withColumnRenamed(current_columns[idx], new_columns[idx]), range(len(current_columns)), traffic_df)

final_df.printSchema()

In [None]:
# Count rows of df
final_df.count()

In [None]:
# Display newly modified columns
final_df.columns

In [None]:
# Create a subtable by selecting some columns

sub_df = final_df.select('FLOW_ID',
 'SOURCE_IP',
 'SOURCE_PORT',
 'DESTINATION_IP',
 'DESTINATION_PORT',
 'PROTOCOL',
 'TIMESTAMP',
 'FLOW_DURATION',
 'TOTAL_FWD_PACKETS',
 'TOTAL_BACKWARD_PACKETS',
 'TOTAL_LENGTH_OF_FWD_PACKETS',
 'TOTAL_LENGTH_OF_BWD_PACKETS',
 'FLOW_BYTES_S',
 'FLOW_PACKETS_S',
 'AVERAGE_PACKET_SIZE',
 'LABEL',
 'PROTOCOLNAME')
sub_df.show(10)

In [None]:
# Check out what protocol name we have
sub_df.groupBy("PROTOCOLNAME").count().show()

In [None]:
# Calculate count, mean, stddev, min and max for FLOW_DURATION
sub_df.select('FLOW_DURATION').describe().show()

In [None]:
# Check for any NaN values
sub_df.select([count(when(isnan(c), c)).alias(c) for c in sub_df.columns]).show()

In [None]:
# Show distinct values
sub_df.select('PROTOCOLNAME').distinct().collect()

In [None]:
socmed = ['TWITTER','INSTAGRAM','FACEBOOK']

records = sub_df.filter(sub_df.PROTOCOLNAME.isin(socmed))

records.show(5)

## Pandas Conversion & Exploration

In [None]:
records_df = records.toPandas()

In [None]:
records_df

In [None]:
records_df['TIMESTAMP'] = pd.to_datetime(records_df['TIMESTAMP'],format= '%d/%m/%Y%H:%M:%S' )

In [None]:
records_df.head()

## Bar Count Plot

In [None]:
# Create a bar plot in pandas
records_df["PROTOCOLNAME"].value_counts().plot.bar()

In [None]:
# Use seaborn to plot the same graph above
import seaborn as sns
plt.figure(figsize=(10,4))
sns.countplot(x = 'PROTOCOLNAME', data = records_df)

## Network Graph Plot
The graph generated will be too complex if you include multiple PROTOCOLNAME.

### NetworkX

In [None]:
import networkx as nx

G = nx.Graph()

G = nx.from_pandas_edgelist(records_df[records_df['PROTOCOLNAME']=='INSTAGRAM'], 'SOURCE_IP', 'DESTINATION_IP')
# G = nx.from_pandas_edgelist(records_df[records_df, 'SOURCE_IP', 'DESTINATION_IP')
  
plt.figure(figsize=(30, 30))
nx.draw_networkx(G, with_labels=True,node_size=60,font_size=12)

In [None]:
# nx.draw_shell(G, with_labels=True,node_size=60,font_size=12)
plt.figure(figsize=(30, 30))
nx.draw_spring(G, with_labels=True,node_size=60,font_size=12)

In [None]:
# Print out network edges
[e for e in G.edges]

In [None]:
plt.figure(figsize=(30, 30))
nx.draw_kamada_kawai(G, with_labels=True,node_size=60,font_size=12)

## Time Series Plot

In [None]:
import plotly.express as px

fig = px.line(records_df[records_df['PROTOCOLNAME']=='INSTAGRAM'], x='TIMESTAMP', y="AVERAGE_PACKET_SIZE",
             title="Average Packet Size vs Time")
fig.show()

## Support Vector Machine (SVM) Classifier

In [None]:
from sklearn.metrics import explained_variance_score
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

In [None]:
# Label encode protocol name
encoder = LabelEncoder().fit(records_df['PROTOCOLNAME'])
records_df['PROTOCOLNAME'] = encoder.fit_transform(records_df['PROTOCOLNAME'])
records_df['PROTOCOLNAME']

In [None]:
X = records_df.drop(columns = ['FLOW_ID',
 'SOURCE_IP',
 'SOURCE_PORT',
 'DESTINATION_IP',
 'DESTINATION_PORT',
 'PROTOCOL',
 'TIMESTAMP',
 'LABEL',
 'PROTOCOLNAME'])
Y = records_df['PROTOCOLNAME']

In [None]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=3)

imputer = SimpleImputer()
X_train = imputer.fit_transform(X_train)
X_test = imputer.transform(X_test)

In [None]:
model = SVC()
model.fit(X_train,Y_train)

In [None]:
pred = model.predict(X_test)
explained_variance_score(pred,Y_test)

In [None]:
accuracy_score(pred,Y_test)