## **Spark Installation And Downloading Data**

**Update Packages Info**

In [None]:
!apt update

**Installing Pyspark and JDK**

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

**Initializing SparkContext**


In [None]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.feature import MinHashLSH
import scipy.sparse
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT
import numpy as np

# create conf
conf = SparkConf().setAppName("Project")
conf.set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf = conf)
spark = SparkSession.builder.getOrCreate()

**Installing Spark UI**

Here we first download ngrok

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Link to my account on ngrok

In [None]:
get_ipython().system_raw('./ngrok authtoken 1ns7zFuhFPYTbt70oIUf6gofM9T_7mLYN4vqb5uJqpaW1eiix')

Set spark ui port address

In [None]:
get_ipython().system_raw('./ngrok http 4050 &')


Get Spark Ui Link

In [None]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

**Download Data From Another Google Drive With Link**

In [None]:
!pip install -U -q PyDrive 
  
from pydrive.auth import GoogleAuth 
from pydrive.drive import GoogleDrive 
from google.colab import auth 
from oauth2client.client import GoogleCredentials 
  
  
# Authenticate and create the PyDrive client. 
auth.authenticate_user() 
gauth = GoogleAuth() 
gauth.credentials = GoogleCredentials.get_application_default() 
drive = GoogleDrive(gauth)

In [None]:

from os import path
url = 'data-path'
id = url.split("/")[-2] 
  
downloaded = drive.CreateFile({'id':id})  
downloaded.GetContentFile('violationtraffic.rar')  


**Mount Google Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

**Unzip Data File**

In [None]:
!unrar x drive/MyDrive/violationtraffic_csv.rar 

## **EDA**



In [None]:
# File location and type
file_location = "violationtraffic.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)


df.limit(20).show()

**Number of Records Per CrimeCode**

In [None]:
NumRecords_code_df = df.select(col("CrimeCode")).rdd.map(lambda id: (id[0], 1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: -x[1]).toDF(["CrimeCode", "NumRecords"])
NumRecords_code_df.toPandas().to_csv("records_per_crimecode.csv")
NumRecords_code_df.show(5)

**Imagescore and CarSpeed Details for crimcecode 2002 ( Unauthorized  Speed Crime )**

In [None]:
df.select(col("ImageScore"), col("CrimeCode"), col("CarSpeed")).filter(df.CrimeCode == 2002).describe().show()

**Number Of Records Each Camera Logged**

In [None]:
NumRecords_df = df.select(col("DeviceId")).rdd.map(lambda id: (id[0], 1)).reduceByKey(lambda x,y: x+y).toDF(["CameraId", "NumRecords"])
NumRecords_df.toPandas().plot.scatter(x='CameraId', y='NumRecords', c='DarkBlue', title = "Number Of Records Each Camera Logged")

**Number Of Logged Records Per Day**

In [None]:
NumRecords_day_df = df.select(dayofyear(col("PassDatetime"))).rdd.map(lambda id: (id[0], 1)).reduceByKey(lambda x,y: x+y).toDF(["DayOfYear", "NumRecords"])
NumRecords_day_df.toPandas().plot.scatter(x='DayOfYear', y='NumRecords', c='DarkBlue', title = "Number Of Records Each Camera Logged")

**Number Of Logged Records Per Week**

In [None]:
NumRecords_week_df = df.select(weekofyear(col("PassDatetime"))).rdd.map(lambda id: (id[0], 1)).reduceByKey(lambda x,y: x+y).toDF(["WeakOfYear", "NumRecords"])
NumRecords_week_df.toPandas().plot.scatter(x='WeakOfYear', y='NumRecords', c='DarkBlue', title = "Number Of Records Each Camera Logged")

**Cameras That do not Log Speed**

In [None]:
camera_without_speed = df.select(col("DeviceId"), col("CarSpeed")).rdd.reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] == 0).map(lambda x: (x[0], )).toDF(["DeviceId"])
camera_without_speed.toPandas().to_csv("cameras_without_speed.csv")
camera_without_speed.show(5)

**Number of Logged Records Per Company**

In [None]:
company_record = df.select(col("CompanyId")).rdd.map(lambda id: (id[0], 1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: -x[1]).toDF(["CompanyId", "NumRecords"])
company_record.toPandas().to_csv("company_records.csv")
company_record.show(5)

**Camera Devices That Was Less Active OverAll ( Less Than 100000 records )**

In [None]:
less_active_df = df.select(col("DeviceId")).rdd.map(lambda id: (id[0], 1)).reduceByKey(lambda x,y: x+y).filter(lambda x : x[1] < 100000).sortBy(lambda x: -x[1]).toDF(["CameraId", "NumRecords"])
less_active_df.toPandas().to_csv("less_active_devices.csv")
less_active_df.show(5)

**Camera Devices That is Broken Or does'nt have Image Score**

In [None]:
score_df = df.select(col("DeviceId"), col("ImageScore")).rdd.reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] == 0).map(lambda x: (x[0], )).toDF(["DeviceId"])
score_df.toPandas().to_csv("withoutScore_Broken_Devices.csv")
score_df.show(5)

**With High Probability Broken Devices**

In [None]:
broken_df = score_df.join(less_active_df, score_df.DeviceId == less_active_df.CameraId).select(col("DeviceId"))
broken_df.toPandas().to_csv("HighProb_Broken_Devices.csv")
broken_df.show(5)

**Count of Distincts**

In [None]:
camera_nums = df.select(col("DeviceId")).distinct().count()

## **Frequent Itemsets**

**Making Baskets**

In [None]:
basket_holder = df.select(col("PassDatetime").alias('time'), col("MasterPlateNumber").alias('car'), col("DeviceId").alias('camera')) \
.groupby("car", dayofyear("time")).agg(collect_set( "camera").alias("items"))

basket_holder.show(3)

**Baskets for Specific Car**

In [None]:
basket_holder.filter(basket_holder.car == '11199425').show()

**FP-Growth Algorithm**

In [None]:
fp = FPGrowth(minSupport=0.001, minConfidence=0.5)
fpm = fp.fit(basket_holder)

**Build Dataframe Of Frequent Itemsets**

In [None]:
freq_df = fpm.freqItemsets.withColumn("items", col("items").cast("string"))

freq_df.toPandas().to_csv("freq_item_01.csv")

freq_df.show(3)

**Build Dataframe Of Association Rules**

In [None]:
assoc_df = fpm.associationRules.withColumn("antecedent", col("antecedent").cast("string"))
assoc_df = assoc_df.withColumn("consequent", col("consequent").cast("string"))

# cache df for search
assoc_df.cache()

assoc_df.toPandas().to_csv("assoc_rules_01.csv")

assoc_df.show(3)

**Suggest New Paths**

In [None]:
from itertools import combinations
import numpy as np

# finding all subset of lists with len = [1,n]
def subset(setInp):
    subsets = []
    for i in np.arange(1, len(setInp)+1):
        subsets.extend(list(combinations(setInp, i)))

    subsets = [str(list(x)) for x in subsets]
    return subsets

# new driver that we want to recommend him/her new path
new_driver = [1671, 1950] 
new_driver_df = spark.createDataFrame(subset(new_driver), StringType()).select(col("value").alias("antecedent"))

new_driver_df.join(assoc_df, "antecedent").show()


**LSH**

**Data Preparation**

Dataframe containing (time, car, camera)

In [None]:
time_car_cam_df = df.select(col("PassDatetime").alias("time"), col("MasterPlateNumber").alias("car"), col("DeviceId").alias("camera")).withColumn("time", dayofyear(col("time")))

time_car_cam_df.show(3)


Creating a Hash Dataframe so we can Hash each cameraId to [0, len(cameraId)]

In [None]:
distinct_cameras = df.select(col("DeviceId").alias("camera")).distinct().rdd.zipWithIndex().map(lambda x: (x[0][0], x[1])).toDF(["camera", "id"])

distinct_cameras.show()

Join two Dfs and change cameraId to New Id

In [None]:
time_car_newId_df = time_car_cam_df.join(distinct_cameras, "camera").drop("camera")

time_car_newId_df.show(3)

Build undirected paths

In [None]:
build_path = time_car_newId_df.rdd.map(lambda x: ((x[0], x[1]), x[2])).groupByKey()
build_path.cache()

build_path.take(3)

Prepare it for Sparse Transformation

In [None]:
def add_value(x):

  unique_list = list(set(x[1]))
  value_list = []

  for item in unique_list:
    value_list.append((item, 1.0))

  return (x[0][1], value_list)

path_with_value = build_path.map(lambda x: add_value(x))

path_with_value.take(1)

Convert it to SparseVector

In [None]:
from pyspark.ml.linalg import Vectors

sparse_value = path_with_value.map(lambda x: (x[0], Vectors.sparse(camera_nums ,x[1]))).toDF(["car", "sparse_items"])

sparse_value.show(1)

**MinHash**

Computing Hash Funtions

In [None]:
mh =  MinHashLSH(inputCol="sparse_items", outputCol="minHash", numHashTables=5, seed=5123)

model = mh.fit(sparse_value)

minhash_basket = model.transform(sparse_value)

Show sample of Hashes

In [None]:
minhash_basket.show(3)

Extract One Vector So we find its 10 Nearest Neighbors

In [None]:
find_near_vects = sparse_value.rdd.map(lambda x: (x[0], x[1])).take(1)[0]
find_near_vects

10 Nearest Neighbors Base One Minhash

In [None]:
model.approxNearestNeighbors(sparse_value, find_near_vects[1], 10).collect()

## **PageRank**

**Building Paths ( each user in each day build one path )**

In [None]:
path_df = df.select(col("PassDatetime").alias('time'), col("MasterPlateNumber").alias('car'), col("DeviceId").alias('camera')) \
.groupby("car", dayofyear("time")) \
.agg(collect_set(struct("time", "camera")).alias("list_col"))

path_df.show(3)

**Sort CameraId in Each path base on TimeStamp so we get a directed path**

In [None]:
import operator

def path_find(row_detail):

  res = sorted(row_detail, key=operator.itemgetter(0))
  items = [item[1] for item in res]

  if(len(items) > 1):
    return items

path_udf = udf(lambda x: path_find(x), ArrayType(IntegerType()))

path_df = path_df.withColumn("paths", path_udf(col("list_col")))

path_df = path_df.select(col("car"), col("paths")).rdd.filter(lambda x: x[1] != None)

**Create Graph of paths ( each node with its adjacency List )**

In [None]:
def path_build(path_list):

  out_list = []

  for i in range(len(path_list) - 1):
    out_list.append((path_list[i], {path_list[i+1]}))
  
  return out_list

pred_list = path_df.flatMap(lambda x: path_build(x[1])).reduceByKey(lambda x,y : x|y)
pred_list = pred_list.cache()

pred_list.take(3)

**Exact PageRank Calculator**

In [None]:
def computeContribs(neighbors, rank):
    
    num_neighbors = len(neighbors)
    for neigh in neighbors:
        yield (neigh, rank / num_neighbors)

def pagerank(ranks, alpha, thresh = 0.01):

  range_convergence = int(np.log10(thresh) / np.log10(1 - alpha)) + 1

  # Calculates and updates Neighbor ranks continuously using PageRank algorithm.
  for i in range(range_convergence):
    # Calculates point contributions to the rank of other points.
    contribs = pred_list.join(ranks).flatMap(
        lambda point_neigh_rank: computeContribs(point_neigh_rank[1][0], point_neigh_rank[1][1]))

    # Re-calculates point ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(operator.add).mapValues(lambda rank: rank * (1 - alpha) + alpha)
    
  # calculate diffrence between new ranks and old ranks using l1 norm
  #diff_ranks = ranks.join(new_ranks).map(lambda point_ranks: (1, np.abs(point_ranks[1][0] - point_ranks[1][1]))).reduceByKey(operator.add).collect()[0][1] / camera_nums
  #print("round = " + str(i))
  #print("diff val = " + str(diff_ranks))

  #ranks = new_ranks

  return ranks

# initialize rank of all points to 1
ranks = df.select(col("DeviceId")).distinct().rdd.map(lambda x: (x[0], 1.0))

ranks = pagerank(ranks, 0.15, 0.001)

ranks = ranks.sortBy(lambda a: -a[1])

spark.createDataFrame(ranks, ["cameraId", "PageRank"]).toPandas().to_csv("pagerank_cameras.csv")

## **Recommendation System**

In [None]:
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.feature import MinMaxScaler

**Creating (Car, camera, Score)**

In [None]:
car_camera_rating = df.select(col("MasterPlateNumber").alias('car'), col("DeviceId").alias('camera')).rdd.map(lambda x: ((x[0], x[1]), 1))\
.reduceByKey(lambda x,y : x+y).map(lambda x: (x[0][0], x[0][1], x[1]))

car_camera_rating.take(3)

**Show Details of Rating**

In [None]:
car_camera_rating_df = car_camera_rating.toDF(["car", "camera", "rating"])

car_camera_rating_df.select(col("rating")).describe().show()

**GroupBy each camera so we have List of (car, Ratings)**

In [None]:
normalize_rating_rdd = car_camera_rating_df.rdd.map(lambda x: (x[1], (x[0], x[2]))).groupByKey()

normalize_rating_rdd.cache()

**Normalize Ratings of Each camera between 0 and 1**

In [None]:
from operator import itemgetter
import builtins as py_builtin


def normalize_func(x):
  max_rating = py_builtin.max(x[1],key=itemgetter(1))[1]

  output = []

  for items in x[1]:
    output.append((items[0], x[0], items[1]/max_rating))

  return output

normalized_rdd = normalize_rating_rdd.flatMap(lambda x: normalize_func(x))

normalized_rdd.cache()

normalized_rdd.take(3)

**Creating Rating**

In [None]:
ratings = normalized_rdd.map(lambda x: Rating(x[0], x[1], x[2]))

ratings.take(3)

**Compute ALS Algorithm**

In [None]:
#Split data 80/20
train, test = ratings.randomSplit([0.8, 0.2])

# train the model
trained_model = ALS.train(train, rank = 15, iterations= 20, lambda_=0.01)

<b> Evaluate the model </b>

In [None]:
# extract just user and product
train_features = train.map(lambda x: (x.user, x.product))

# predict by train data
train_predict_rating = trained_model.predictAll(train_features).map(lambda x: ((x[0], x[1]), x[2]))

# real train rating
train_real_rating = train.map(lambda x: ((x.user, x.product), x.rating))

# join predict and real rating
predict_join_real = train_predict_rating.join(train_real_rating).map(lambda x: x[1])

# compute regression metrics on train
train_metrics = RegressionMetrics(predict_join_real)
# train evaluation
train_mse = train_metrics.meanSquaredError
print("train mse:", train_mse)

# extract just user and product
test_features = test.map(lambda x: (x.user, x.product))

# predict by test data
test_predict_rating = trained_model.predictAll(test_features).map(lambda x: ((x[0], x[1]), x[2]))

# real test rating
test_real_rating = test.map(lambda x: ((x.user, x.product), x.rating))

# join predict and real rating
predict_join_real = test_predict_rating.join(test_real_rating).map(lambda x: x[1])

# compute regression metrics on test
test_metrics = RegressionMetrics(predict_join_real)

# test evaluation
test_mae = test_metrics.meanSquaredError
print("test mse:",test_mae)