# MDA 2021
## Pyspark Sample Code
-----------------------------------------------------------------

## Setup
--------------------------------------------------

Let's setup Spark on Colab environment.

In [24]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.


Now we authenticate a Google Drive client to processing data

In [25]:
from google.colab import drive
# This will prompt for authorization.
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Check and extract data
--------------------------------------------------

In [None]:
!ls '/content/drive/My Drive/Sample_Data.zip'

'/content/drive/My Drive/Sample_Data.zip'


In [None]:
!unzip "/content/drive/My Drive/Sample_Data.zip" -d "/content/drive/My Drive/Sampe_Data"

unzip:  cannot find or open /content/drive/My Drive/Sample_Data.zip, /content/drive/My Drive/Sample_Data.zip.zip or /content/drive/My Drive/Sample_Data.zip.ZIP.



the cells above, extract data which is in '/content/drive/My Drive/Sample_Data.zip' 

## Initializing Spark and read data
--------------------------------------------------

In [None]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,TimestampType
from pyspark.sql.functions import col,current_timestamp,to_date,hour,dayofweek

spark = SparkSession \
    .builder \
    .appName("Spark_Processor") \
    .master("local[*]") \
    .getOrCreate()

sc=spark.sparkContext

schema = StructType([ \
        StructField("DEVICE_CODE", IntegerType(), True), 
        StructField("SYSTEM_ID",IntegerType(),True), \
        StructField("ORIGINE_CAR_KEY",StringType(),True), \
        StructField("FINAL_CAR_KEY",StringType(),True), \
        StructField("CHECK_STATUS_KEY", IntegerType(), True), \
        StructField("COMPANY_ID", StringType(), True), \
        StructField("PASS_DAY_TIME", TimestampType(), True)
    ])

In [None]:
df=spark.read.csv('/content/drive/My Drive/Sampe_Data/Sample_Traffic.csv',header=True,schema=schema)
df.show(1)

+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|     200501|       81|       10477885|     10477885|               5|       161|2021-06-01 03:54:39|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
only showing top 1 row



In [None]:
df.head()

Row(DEVICE_CODE=200501, SYSTEM_ID=81, ORIGINE_CAR_KEY='10477885', FINAL_CAR_KEY='10477885', CHECK_STATUS_KEY=5, COMPANY_ID='161', PASS_DAY_TIME=datetime.datetime(2021, 6, 1, 3, 54, 39))

In [None]:
#part a

import pandas as pd
#first of all, we have to load the data
data  = sc.textFile('/content/drive/My Drive/Sampe_Data/Sample_Traffic.csv')
header = data.first()
data = data.filter(lambda line: line != header)
data = data.sample(False,0.1,101)
#this function is capable of splitting columns of data
#['DEVICE_CODE,SYSTEM_ID,ORIGINE_CAR_KEY,FINAL_CAR_KEY,CHECK_STATUS_KEY,COMPANY_ID,PASS_DAY_TIME']
def parseLine(line):  
  fields = line.split(',')
  DEVICE_CODE = int(fields[0])
  ORIGINE_CAR_KEY = int(fields[2])
  PASS_DAY_TIME = pd.to_datetime(fields[6]).day
  return (ORIGINE_CAR_KEY,PASS_DAY_TIME,DEVICE_CODE)

traffic_rdd = data.map(parseLine)
traffic_rdd.first()
#print (200501,1,10477885) tuple

traffic_rdd = traffic_rdd.map(lambda x:(tuple([x[0],x[1]]),x[2]))
traffic_rdd_gb = sorted(traffic_rdd.groupByKey().mapValues(list).collect())

In [None]:
traffic_rdd_gb

[((7631926, 1), [212501, 900221, 900258]),
 ((7631931, 1), [900207]),
 ((7631986, 1), [100700839]),
 ((7631989, 1), [900244]),
 ((7631997, 1), [900277]),
 ((7632014, 1), [900271]),
 ((7632016, 1), [631346, 900155]),
 ((7632020, 1), [212802, 900273]),
 ((7632036, 1), [900244]),
 ((7632039, 1), [208602, 631795, 900246, 100701130]),
 ((7632049, 1), [900268, 100700853]),
 ((7632060, 1), [900207]),
 ((7632079, 1), [631349, 631349]),
 ((7632084, 1), [900269]),
 ((7632116, 1), [631350]),
 ((7632135, 1), [631795]),
 ((7632139, 1), [631795]),
 ((7632152, 1), [900259]),
 ((7632169, 1), [631776, 22009972]),
 ((7632170, 1), [205802, 114]),
 ((7632177, 1), [631357, 100700839, 631748]),
 ((7632183, 1), [631356, 900236]),
 ((7632187, 1), [900124]),
 ((7632195, 1), [200301]),
 ((7632197, 1), [631352]),
 ((7632213, 1), [900225]),
 ((7632218, 1), [202101]),
 ((7632244, 1), [900268]),
 ((7632251, 1), [631795]),
 ((7632263, 1), [900225]),
 ((7632270, 1), [631349]),
 ((7632275, 1), [900236, 900178]),
 ((76

In [None]:
#part 2
path_baskets = []
baskets = []
#erecting the baskets of paths
for i in range(len(traffic_rdd_gb)):
  path_baskets.append(traffic_rdd_gb[i][1])
  #merging all the items
  baskets.extend(path_baskets[i])
lblitems = sc.parallelize(path_baskets)

In [None]:
lblitems.take(2)

[[212501, 900221, 900258], [900207]]

In [None]:
#parse dataset to rdd
path_rdd = sc.parallelize(baskets)

#unique items in our dataset
uniquePaths = path_rdd.distinct()

#add 1 to each of paths
supportRdd = path_rdd.map(lambda path: (path , 1))

# Sum of values by key
supportRdd = supportRdd.reduceByKey(lambda x,y:x+y)

# First support values
supports = supportRdd.map(lambda item: item[1]) # Return only support values

In [None]:
# Define minimum support value 
minSupport = supports.min()

#filter first supportRDD with minimum support
supportRdd = supportRdd.filter(lambda item: item[1] >= minSupport)

#Creating base RDD, this rdd will be updated in every iteration
baseRdd = supportRdd.map(lambda item: ([item[0]] , item[1])) 

supportRdd = supportRdd.map(lambda item: item[0]) #We will need this rdd in the next sections
                                                  #in order to create combinations of items
supportRddCart = supportRdd

In [None]:
def removeReplica(record):

    if(isinstance(record[0], tuple)):
        x1 = record[0]
        x2 = record[1]
    else:
        x1 = [record[0]]
        x2 = record[1]

    if(any(x == x2 for x in x1) == False):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
        return result 
    else:
        return x1

In [None]:
combined = supportRdd.cartesian(uniquePaths)

900601

In [None]:
combined = combined.map(lambda item: removeReplica(item))

900601

In [None]:
#in this cell,we will create the combinations of roads

combination_length = 2

while(supportRdd.isEmpty() == False):
  #to generate various combinations 
  combined = supportRdd.cartesian(uniquePaths)
  #remove the duplicated ones
  combined = combined.map(lambda item: removeReplica(item))

  combined = combined.filter(lambda item: len(item) == combination_length)
  combined = combined.distinct()

  combined_2 = combined.cartesian(lblitems)
  combined_2 = combined_2.filter(lambda item: all(x in item[1] for x in item[0]))

  combined_2 = combined_2.map(lambda item: item[0])
  combined_2 = combined_2.map(lambda item: (item , 1))
  combined_2 = combined_2.reduceByKey(lambda x,y: x+y)
  combined_2 = combined_2.filter(lambda item: item[1] >= minSupport)

  baseRdd = baseRdd.union(combined_2)

  combined_2 = combined_2.map(lambda item: item[0])
  supportRdd = combined_2
  combination_length += 1

In [None]:
class Filter():

    def __init__(self):
        
        self.stages = 1


    def filterForConf(self, item , total):
        
        if(len(item[0][0]) > len(item[1][0])  ):
            if(self.checkItemSets(item[0][0] , item[1][0]) == False):
                pass
            else:
                return (item)       
        else:
            pass  
        self.stages = self.stages + 1

    # Check Items sets includes at least one comman item // Example command: # any(l == k for k in z for l in x )
    def checkItemSets(self, item_1 , item_2):

        if(len(item_1) > len(item_2)):
            return all(any(k == l for k in item_1 ) for l in item_2)
        else:
            return all(any(k == l for k in item_2 ) for l in item_1)


    def calculateConfidence(self, item):

        # Parent item list
        parent = set(item[0][0])
        
        # Child item list
        if(isinstance(item[1][0] , str)):
            child  = set([item[1][0]])
        else:
            child  = set(item[1][0])
        # Parent and Child support values
        parentSupport = item[0][1]
        childSupport = item[1][1]
        # Finds the item set confidence is going to be found

        support = (parentSupport / childSupport)*100

        return list([ list(child) ,  list(parent.difference(child)) , support ])

    
calcuItems = baseRdd.cartesian(baseRdd)

# Create Filter Object
ff = Filter()

total = calcuItems.count()
baseRddConfidence = calcuItems.filter(lambda item: ff.filterForConf(item , total))
baseRddConfidence = baseRddConfidence.map(lambda item: ff.calculateConfidence(item))

import pandas as pd

# Create an array with collected baseRddConfidence results
result = baseRddConfidence.collect()

# Create Data Frame
confidenceTable = pd.DataFrame(data = result , columns=["Before", "After" , "Confidence"])

# Show data frame
print(confidenceTable)

          Before       After  Confidence
0       [900244]    [900142]    0.300300
1       [900142]    [900244]    0.423729
2          [114]    [631776]    0.645161
3       [900276]    [900158]    0.909091
4       [900158]    [900276]    1.041667
..           ...         ...         ...
947        [143]    [212802]    3.333333
948   [22010043]  [22010078]    8.333333
949  [100701119]  [22010040]    1.369863
950  [100701119]  [22010080]    1.369863
951   [22010057]  [22010060]    2.941176

[952 rows x 3 columns]


In [35]:
#part 3-SON algorithm

#new support threshold
new_minSupport = supports.min()/3

#splitting the original rdd into 3 sub rdd
#path_rdd --> path_rdd_1, path_rdd_2, path_rdd_3

path_rdd_1 = lblitems.sample(False, 0.33, 1)
path_rdd_2 = lblitems.sample(False, 0.33, 2)
path_rdd_3 = lblitems.sample(False, 0.33, 3)


In [48]:
#run the a-priori algorith on each dataset
import pandas as pd
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori

dataset_1 = path_rdd_1.collect()
te = TransactionEncoder()
te_ary = te.fit(dataset_1).transform(dataset_1)
df = pd.DataFrame(te_ary, columns=te.columns_)
results_1 = apriori(df, min_support=new_minSupport)

Unnamed: 0,support,itemsets


In [None]:
import pandas as pd
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori

dataset_2 = path_rdd_2.collect()
te = TransactionEncoder()
te_ary = te.fit(dataset_2).transform(dataset_2)
df = pd.DataFrame(te_ary, columns=te.columns_)
results_2 =apriori(df, min_support=new_minSupport)

In [None]:
import pandas as pd
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori

dataset_3 = path_rdd_3.collect()
te = TransactionEncoder()
te_ary = te.fit(dataset_3).transform(dataset_3)
df = pd.DataFrame(te_ary, columns=te.columns_)
results_3 =apriori(df, min_support=new_minSupport)

In [None]:
final_rdd=sc.union([results_1, results_2, results_3])
final_rdd = final_rdd.filter(lambda item: item[1] >= minSupport)