<span style="color:red">You should be in a PySpark docker image to do the following!</span>  
If you're not in PySpark, please run the following (presuming you have docker installed)

In [None]:
!docker run -it --rm -p 8888:8888 jupyter/pyspark-notebook

If you need help, check out [this link](https://medium.com/@suci/running-pyspark-on-jupyter-notebook-with-docker-602b18ac4494)

# Sherlock: Classifying Malicious Cell Phone Sessions 

This subset of the well-known Sherlock dataset contains data extracted from 37 user's cell phones across 3 months at the beginning of 2016. There are two tables involved in this analysis:  
T4.tsv: ~26Gb of data on battery level, memory usage, packet inflows and outflows and the like. Each row represents a scan, and scans are conducted every 5 seconds. 
Moriartyprobe.tsv: ~90Mb of data from an app called Moriarty which starts "sessions", which are a variety of realistic of attacks on the user's cellphone that stop and start intermittently. The sessions are either benign or malicious.  

Our model looks at the activity on the cell phone while these sessions are occuring. We develop a binary classification learning model that could be implemented to track T4 cell phone usage stats in real time to identify whether an attack is occurring.  

In order to use this dataset in a Spark ML pipeline, it must be imported, transformed, cleaned, subsetted, then both tables must be combined. The code below uses awk, PySpark, and a SparkSQL api to do all of that.  

## First: Grab the T4 data from my personal dropbox. The links below will automatically download

I'm not paying 11.99 a month for nothing!

<span style="color:red">WARNING: One of the two files, T4, is very large, 25.66Gbs to be exact. Please check your disc space before running the next code block.</span>  

[T4.tsv](https://www.dropbox.com/s/jf6engr5ayea6ev/T4.tsv?dl=1) (26Gb)  
Alternatively, you can download a sample of T4.  
Just make sure to change the filename to "T4sample.tsv" rather than "T4.tsv".

[T4randomsample.tsv](https://www.dropbox.com/s/md0qnju02gtxlgc/T4randomsample.tsv?dl=1) (26MB)

## Second: Grab the Moriarty Probe data from my personal dropbox. 

[Moriartyprobe.tsv](https://www.dropbox.com/s/epbcz9wg3nl0twi/Moriartyprobe.tsv?dl=1) (65.2MB)

## Third: Drop both T4.tsv or T4randomsample.tsv and Moriartyprobe.tsv  in the folder labeled "tsvs"

This step is very important, or else the filepaths below will not work (and you will feel like pulling your hair out trying to fix them). Or you'll be really upset with me. Neither is good

In [2]:
# Verify that your working directory is "SherLock"
!pwd

/home/jovyan/SherLock


In [11]:
# I use an awk script to do the transformations. It can be viewed at /Sherlock/import.awk
!cat import.awk
# change permissions to make file executable
!chmod +x import.awk

#!/usr/bin/awk -f
BEGIN { FS="\t"; OFS="," } { 
  if (FILENAME ~ /T4/){ 
    rebuilt=0 
    for(i=1; i<=NF; ++i) { 
      if ($i ~ /Hz/){ 
        $i = ($i + 0) 
        if ($i > 100) {$i*=.001} 
      } 
      if ($i ~ /,/ && $i !~ /^".*"$/) { 
        gsub("\"", "\"\"", $i) 
        $i = "\"" $i "\"" 
        rebuilt=1 
      } 
    } 
    if (!rebuilt) { $1=$1 } 
    print $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26 
  } 
  else if (FILENAME ~ /Mor/) { 
    rebuilt=0 
    for(i=1; i<=NF; ++i) 
      if ($i ~ /,/ && $i !~ /^".*"$/) { 
        gsub("\"", "\"\"", $i) 
        $i = "\"" $i "\"" 
        rebuilt=1 
      } 
    if (!rebuilt) { $1=$1 } 
    print $1,$2,$3,$4,$5,$6,$7,$8 
  } 
} 

<span style="color:red">If you used the dataset T4randomsample.tsv instead of T4.tsv, change the line "tsvs/T4.tsv" to "tsvs/T4randomsample.tsv"</span>

## Fourth: Transform the datasets using awk streaming language

Fun fact, this language was developed in 1977 by AT&T!  
I'm using a script called import.awk to make all transformations  
This next code calls that script, and print's its results to a new file in the correct part of the filesystem.  
The changes made are as follows:  
1. Change the delimiter from tabs (\t) to commas (,)
2. If the filename contains T4, change the Hz column to be numeric, and if the number is larger than 100, it means that the value was labeled in MHz, and it is to be reduced by a factor of 100.  The rest are null.
3. Else if the filename contains Mor, don't do the GHz transformation. 
3. If either file incidentally has any misplaced delimiter values (such as stray commas, quotation marks, or back slashes), fix them!
4. Print out a subset of columns (1-26) for T4.tsv, and a subset of columns (1-8) for Moriartyprobe.tsv. 
5. There should now be two trasformed, useable tsv files in the correct folders


In [7]:
!./import.awk \
tsvs/T4randomsample.tsv \
    > csvs/T4subset.csv 
!./import.awk \
tsvs/Moriartyprobe.tsv \
    > csvs/Moriartyprobe.csv

## Fifth: Load PySpark for further transformations
Use !pip install to install anything you are missing

In [8]:
# Import pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession
from functools import reduce
import pandas as pd
import numpy as np
import sklearn

In [9]:
# Start a session 
spark = SparkSession.builder.master('local[2]').config("spark.executor.memory", "1g").config("spark.driver.memory", "1g").appName('spark_sh_data').getOrCreate()

In [10]:
# Import data: t4 and Moriarty
t4 = spark.read.options(header=True, nullValue='NULL', inferSchema=True).csv('csvs/T4subset.csv')
mor = spark.read.options(header=True, nullValue='NULL', inferSchema=True).csv('csvs/Moriartyprobe.csv')

## Sixth: Add column names to csv files

In [12]:
# create colnames T4
t4_colnames = ['userid', 'uuid', 'Version', 'CpuGHz', 'CPU_0', 'CPU_1', 'CPU_2', 'CPU_3', 'Total_CPU', 'TotalMemory_freeSize', 'TotalMemory_max_size',
'TotalMemory_total_size', 'TotalMemory_used_size', 'Traffic_MobileRxBytes', 'Traffic_MobileRxPackets', 'Traffic_MobileTxBytes',
'Traffic_MobileTxPackets','Traffic_TotalRxBytes', 'Traffic_TotalRxPackets', 'Traffic_TotalTxBytes', 'Traffic_TotalTxPackets',
'Traffic_TotalWifiRxBytes', 'Traffic_TotalWifiRxPackets', 'Traffic_TotalWifiTxBytes', 'Traffic_TotalWifiTxPackets',
'Traffic_timestamp', 'Battery_charge_type', 'Battery_current_avg']

# create colnames Moriarty 
mor_colnames = ['userid', 'uuid', 'actionType', 'action', 'behavior', 'sessionType', 'sessionID', 'version']
# Add column names
t4_oldColumns = t4.schema.names
t4_newColumns = t4_colnames

mor_oldColumns = mor.schema.names
mor_newColumns = mor_colnames


t4 = reduce(lambda t4, idx: t4.withColumnRenamed(t4_oldColumns[idx], t4_newColumns[idx]), range(len(t4_oldColumns)), t4)
mor = reduce(lambda mor, idx: mor.withColumnRenamed(mor_oldColumns[idx], mor_newColumns[idx]), range(len(mor_oldColumns)), mor)
t4.printSchema()
mor.printSchema()

root
 |-- userid: string (nullable = true)
 |-- uuid: long (nullable = true)
 |-- Version: string (nullable = true)
 |-- CpuGHz: string (nullable = true)
 |-- CPU_0: string (nullable = true)
 |-- CPU_1: string (nullable = true)
 |-- CPU_2: string (nullable = true)
 |-- CPU_3: string (nullable = true)
 |-- Total_CPU: string (nullable = true)
 |-- TotalMemory_freeSize: string (nullable = true)
 |-- TotalMemory_max_size: string (nullable = true)
 |-- TotalMemory_total_size: string (nullable = true)
 |-- TotalMemory_used_size: string (nullable = true)
 |-- Traffic_MobileRxBytes: string (nullable = true)
 |-- Traffic_MobileRxPackets: string (nullable = true)
 |-- Traffic_MobileTxBytes: string (nullable = true)
 |-- Traffic_MobileTxPackets: string (nullable = true)
 |-- Traffic_TotalRxBytes: string (nullable = true)
 |-- Traffic_TotalRxPackets: string (nullable = true)
 |-- Traffic_TotalTxBytes: string (nullable = true)
 |-- Traffic_TotalTxPackets: string (nullable = true)
 |-- Traffic_Total

In [13]:
# verify successful import
import pandas as pd
pd.set_option('display.max_columns', None)
mor.toPandas().head()

Unnamed: 0,userid,uuid,actionType,action,behavior,sessionType,sessionID,version
0,0a50e09262,1451638991449,App entered onPause(),App Mode change,benign,benign,1.0,21
1,0a50e09262,1451637887475,Application entered onCreate(),Application started,benign,benign,1.0,21
2,0a50e09262,1451637887633,User started to play a game (name);solo,Game stared,benign,benign,1.0,21
3,0a50e09262,1451637921510,App entered onPause(),App Mode change,benign,benign,1.0,21
4,0a50e09262,1451638167470,App entered onResume,App Mode change,benign,benign,1.0,21


## Seventh: Join the t4 and Moriarty datasets. They do not share a common key. 
This is a critical move, and a challenging one. I will need to join the tables based on the uuid values, only joining values which are within the same range of time period. UUID is measured in milliseconds, so I will distribute labels of malicious or benign across the time ranges.
Utilizing a CTE, a window function, and a subquery, I successfully joined the two tables.  
If you desire a simple, foobar like stack overflow explanation of the general logic behind what I did, check out [this SO answer](https://dba.stackexchange.com/questions/105736/combine-two-event-tables-into-a-single-timeline). I built on this answer by using a CTE, so that I could join the two tables only on values found in the original t4 dataset. I didn't want duplicate values for any T4 data. 

In [14]:
# create temp table 
t4.createOrReplaceTempView('t4')
mor.createOrReplaceTempView('mor')
t4_mor_sql = spark.sql("""
with CTE as (SELECT uuid
    , min(sessionType) OVER (PARTITION BY m_grp) as sessionType
FROM (
    SELECT uuid, m.sessionType 
        , count(m.sessionType) OVER (ORDER BY uuid) as m_grp
    FROM mor m
    FULL OUTER JOIN t4 t using(uuid)
    ) sub)
SELECT * 
FROM CTE
JOIN t4 using(uuid);
""")

## Eighth: Save the new dataframe as t4_mor.csv
This requires repartitioning the Spark dataframe to only contain 1 partition, effectively recombining the entire t4_mor.csv dataframe into a single set. It's really computationally heavy, and will take up quite a bit more memory. In a perfect pipeline, this step would not be necessary, but we're going to have to jump over to Spencer and Chidi's ML models from here. 

In [None]:
t4_mor_sql.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("/home/jovyan/SherLock/pathToT4_mor")

#### The file will save to a folder called pathToT4_mor. Go in there after 10 minutes and find the file called SUCCESS, and there should be a csv available. Rename that file "t4_mor.csv" and move it to "csvs". 

In [19]:
t4_mor = spark.read.options(header=True, nullValue='NULL', inferSchema=True).csv('csvs/part-00000-bd9f58f5-38ad-4365-b427-27df4a3d51ea-c000.csv')

## Ninth and finally: Verify that the dataframe has both a sessionType column populated with values, and a bunch of cell data like CPU and Memory stats

In [20]:
t4_mor.toPandas().head()

Unnamed: 0,uuid,sessionType,userid,Version,CpuGHz,CPU_0,CPU_1,CPU_2,CPU_3,Total_CPU,TotalMemory_freeSize,TotalMemory_max_size,TotalMemory_total_size,TotalMemory_used_size,Traffic_MobileRxBytes,Traffic_MobileRxPackets,Traffic_MobileTxBytes,Traffic_MobileTxPackets,Traffic_TotalRxBytes,Traffic_TotalRxPackets,Traffic_TotalTxBytes,Traffic_TotalTxPackets,Traffic_TotalWifiRxBytes,Traffic_TotalWifiRxPackets,Traffic_TotalWifiTxBytes,Traffic_TotalWifiTxPackets,Traffic_timestamp
0,1451606641135,,1a1a12314b,2.0,2.4,5.882353,0.0,0.0,0.0,1.4705882,4975460,134217728,33985728,29010268,0,0,0,0,0,0,0,0,0,0,0,0,2016-01-01 02:04:01.024
1,1451610231840,malicious,5b76bedcac,2.0,1.7,6.666667,0.0,0.0,0.0,1.6666667,5967672,134217728,28579734,22612062,0,0,0,0,0,0,0,0,0,0,0,0,2016-01-01 03:03:51.735
2,1451612040990,malicious,d35184bc22,2.0,0.883,6.25,0.0,0.0,11.111112,4.3402777,2916574,134217728,30964342,28047768,0,0,0,0,0,0,0,0,0,0,0,0,2016-01-01 03:34:00.829
3,1451647411655,benign,6e200090ee,2.0,2.4,41.17647,50.0,83.33333,25.0,49.87745,1437200,134217728,36993820,35556620,-4364968,-7100,-1946755,-9184,3209,69,68549,72,4368177,7169,2015304,9256,2016-01-01 13:23:31.614
4,1451711185754,malicious,ec3b3592f1,2.0,2.4,6.25,16.666668,0.0,0.0,5.729167,7645592,134217728,29805114,22159522,0,0,0,0,0,0,0,0,0,0,0,0,2016-01-02 07:06:25.709
