Variables to change:

1. jar_file_loc: this is the file of the .jar postgres file. Change path to where it is located
2. train_data_path & test_data_path: change to path of train/test csv files

In [1]:
jar_file_loc = 'postgresql-42.5.0.jar'

# set up Spark
import pyspark
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("GenericAppName") \
    .config('spark.jars', jar_file_loc) \
    .getOrCreate()


#Access SparkContext from your SparkSession
print("APP Name :"+ spark.sparkContext.appName);
print("Master :"+ spark.sparkContext.master);

sqlContext = SQLContext(spark.sparkContext)


APP Name :GenericAppName
Master :local[*]




I tried using the augmented CSVs, but I couldn't succcessfully read it from Postgres after I wrote to it. The read command to more than 10 minutes and still wasn't able to produce a result. So, I decided to use the reduced dataset instead. I will run this on the cloud for the final submissions, so with the higher computation power, I should be able to analyze the augmented files.

In [2]:
# read in data
df_train = spark.read.csv('data_folder/train70_reduced.csv', header = True, inferSchema = True)
df_test = spark.read.csv('data_folder/test30_reduced.csv', header = True, inferSchema = True)

In [3]:
# add column to differentiate b/w train and test sets
from pyspark.sql.functions import col, lit

df_train_cat = df_train.withColumn("data_category", lit("train"))
df_test_cat = df_test.withColumn("data_category", lit("test"))

print('Train', df_train_cat.count())
print('Test', df_test_cat.count())


# combine dfs
df_combined = df_train_cat.union(df_test_cat)
print('Combined', df_combined.count())

Train 231646
Test 84351
Combined 315997


In [4]:
# write into postgresql db

db_properties={}
#update your db username
db_properties['username']="postgres"
#update your db password
db_properties['password']="bigdata"
#make sure you got the right port number here
db_properties['url']= "jdbc:postgresql://host.docker.internal/postgres"
#make sure you had the Postgres JAR file in the right location
db_properties['driver']="org.postgresql.Driver"
db_properties['table']= "mqtt"

# create df with train data 
df_combined.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", "bigdata")\
.option("Driver", db_properties['driver'])\
.save()

In [5]:
# read db to ensure data has been written in correctly
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", "bigdata")\
    .option("Driver", db_properties['driver'])\
    .load()

print('Item Count from PostgreSQL Read:', df_read.count())

Item Count from PostgreSQL Read: 315997


# Task II

In [6]:
# average length of a MQTT message in train
df_train = df_read.filter(df_read['data_category'] == 'train')

print('Average length of a MQtt message in the training dataset is:',\
      df_train.agg({'`mqtt.len`':'mean'}).collect()[0][0])

Average length of a MQtt message in the training dataset is: 31.435725201384873


In [7]:
# average of length of TCP message grouped by target
df_read.groupby('`target`').agg({'`tcp.len`': 'mean'}).orderBy('`avg(tcp.len)`', ascending = False).show()

+----------+------------------+
|    target|      avg(tcp.len)|
+----------+------------------+
|     flood|13357.469178082192|
|       dos| 311.8243034553761|
| malformed| 20.68321195860483|
|legitimate|7.7758887953888305|
|   slowite| 3.937407365180709|
|bruteforce| 3.916684747233673|
+----------+------------------+



In [8]:
# X most frequent TCP flags

def most_frequent(X):
    tcp_flags_count_df = df_read.groupby('`tcp.flags`').count().orderBy('count', ascending = False)
    flags_desc_list = tcp_flags_count_df.rdd.map(lambda x: x[0]).collect()
    return flags_desc_list[0:X]

In [10]:
# most popular target on Twitter

# get targets
targets = df_read.select('`target`').distinct().rdd.map(lambda x: x[0]).collect()
targets

['slowite', 'bruteforce', 'flood', 'malformed', 'dos', 'legitimate']

In [12]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [29]:
!pip install zookeeper



UsageError: Line magic function `%%` not found.


In [None]:
# set up KafkaConsumer to read tweets from Producer
from kafka import KafkaConsumer
import json
import time
import numpy as np


topic_name = 'project-twitter-streaming'

kafka_consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=['127.0.0.1:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=True,
    auto_commit_interval_ms =  5000,
    fetch_max_bytes = 128,
    max_poll_records = 100,
    value_deserializer=lambda x: x.decode('utf-8'),
    api_version=(0, 10, 1))

# create dictionary to keep track of count of search terms
counter_dict = dict(zip(targets, [0] * len(targets)))

for message in kafka_consumer:
    print(message.value)
    
    # split message
    split = message.value.lower().split()
    
    # count frequencies
    freq_df = spark.createDataFrame(split, "string").groupBy('value').count()\
    .orderBy('count', ascending = False)
    
    # extract count of search terms
    freqs = freq_dff.filter(freq_dff.value.isin(targets)).rdd.map(lambda x: [x[0],x[1]]).collect()
    
    # update count dict
    for freq in freqs:
        counter_dict[freq[0]] += freq[1]
    
    print(counter_dict)

In [None]:
# after stream is over, plot frequencies
import seaborn as sns

sns.barplot(list(counter_dict.keys()), list(counter_dict.values()))

In [None]:
# split dict in keys and vals
keys = list(counter_dict.keys())
vals = list(counter_dict.values())

# find key with highest val
print('Most popular attack on Twitter is:', keys[vals.index(max(vals))])