# MIE524 - Assignment 1


## Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [70]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u382-ga-1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 18 not upgraded.


Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [71]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

Put all your imports, and path constants in the next cells.

## Q1 - Word Count in Spark

### Write your function in the next cells

In [72]:
# load the text
rdd = spark.sparkContext.textFile("/content/plotsummaries.txt")

In [73]:
import re

def lower_str(word):
    """
    INPUT:
        word: string
    OUTPUT:
        modified_wrod: string

    NOTE: output the given word in lower case letters.
    """

    # YOUR CODE HERE
    res = word.lower()
    return (res)

def strip_punc(word):
    """
    INPUT:
        word: string
    OUTPUT:
        modified_wrod: string

    NOTE: output the given word with characters stripped.
    """

    # YOUR CODE HERE
    punc = '!"#$%&\'()*+,.:;<=>?@[\]^_`{|}-~/'

    for p in punc:
      word = word.replace(p, '')
    return (word)

# You may have additional functions

def remove_num(word):
  if any(char.isnumeric() for char in word): # isnumeric vs. isdigit - isnumeric cover bigger scope so was selected
    return True
  return(False)

# def remove_nonEng(word):
#   if any(char.isalpha() for char in word):
#     return True
#   return(False)

def remove_nonEnglish(word):
  # initially tried to use .isAlpha() but wasn't fully doing the work
  english_pattern = re.compile(r'^[a-zA-Z]+$') # alphabet
  return(english_pattern.match(word))

Run your function in the next cells to output required content.

In [74]:
# filtering process
rdd = rdd.map(lower_str).map(strip_punc) # to lowercase & remove punctuations
rdd = rdd.flatMap(lambda line: line.split(" ")) # to word
rdd = rdd.filter(lambda word: not remove_num(word)) # remove number
rdd = rdd.filter(lambda word: remove_nonEnglish(word))

rdd = rdd.filter(lambda x:x!='') # remove ''

# reduceByKey(): count how many times each word occurs
rdd_num_init = rdd.map(lambda word:(word,1)) # initialization, there will be repetition
rdd_num = rdd_num_init.reduceByKey(lambda x,y:(x+y)).sortByKey() # condense

## OUTPUT
rdd_num.take(10)

[('a', 183688),
 ('aa', 21),
 ('aaa', 1),
 ('aachan', 2),
 ('aachen', 1),
 ('aachim', 8),
 ('aaden', 1),
 ('aag', 7),
 ('aaianna', 4),
 ('aalim', 4)]

In [75]:
# mask to only leave the first character
masked_rdd = rdd_num.map(lambda word: (word[0][0], word[1]))

# combine the values that share key
res_rdd = masked_rdd.reduceByKey(lambda x, y: (x + y)).sortByKey()

res = res_rdd.collect()

with open('Q1.txt', 'w') as saveFile:
  for r in res:
    saveFile.write(str(r).strip("()"))
    saveFile.write("\n")

## PART 2 - Oxford Covid-19 Government Response Tracker

### Load the dataset

In [76]:
## COMBINE LATER... but as we don't want to load everytime

# load the dataset
rdd = spark.read.option("header", True).csv("/content/OxCGRT_USA_latest.csv")
# print(rdd.columns)
# rdd.printSchema()

### Q2 - Computing Index Score with Spark

In [77]:
from decimal import InvalidOperation
# Treat as Global Variable

indicators = ["C1M_School closing",
    "C2M_Workplace closing",
    "C3M_Cancel public events",
    "C4M_Restrictions on gatherings",
    "C5M_Close public transport",
    "C6M_Stay at home requirements",
    "C7M_Restrictions on internal movement",
    "C8EV_International travel controls",
    "E1_Income support",
    "E2_Debt/contract relief",
    "H1_Public information campaigns",
    "H2_Testing policy",
    "H3_Contact tracing",
    "H6M_Facial Coverings",
    "H7_Vaccination policy",
    "H8M_Protection of elderly people"]

indicator_flags = ["C1M_Flag",
    "C2M_Flag",
    "C3M_Flag",
    "C4M_Flag",
    "C5M_Flag",
    "C6M_Flag",
    "C7M_Flag",
    "E1_Flag",
    "H1_Flag",
    "H6M_Flag",
    "H7_Flag",
    "H8M_Flag"]

indicator_lst = [column[:2] for column in indicators]
indicator_flags_filtered = [flag.replace('M', '') for flag in indicator_flags] # for convinence
FLAG = {flag[:2]: 1 if any(flag[:2] in indic_flags[:2] for indic_flags in indicator_flags) else 0 for flag in indicator_flags_filtered} # C8, H2, H3, H8, E2: flag values dont exist
print(indicator_flags_filtered)

max_lst = [3, 3, 2, 4, 2, 3, 2, 4, 2, 2, 2, 3, 2, 4, 5, 3]
MAX = {indicator: max_lst[indicator_lst.index(indicator)] for indicator in indicator_lst}

needs = ["RegionName", "Date"]

print(FLAG.keys())


['C1_Flag', 'C2_Flag', 'C3_Flag', 'C4_Flag', 'C5_Flag', 'C6_Flag', 'C7_Flag', 'E1_Flag', 'H1_Flag', 'H6_Flag', 'H7_Flag', 'H8_Flag']
dict_keys(['C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'E1', 'H1', 'H6', 'H7', 'H8'])


In [78]:
from pyspark.sql import functions as F
from functools import reduce
from collections import Counter

def clean_data(df):
    """
    INPUT:
        df: spark dataframe
    OUTPUT:
        cleaned data: spark dataframe

    NOTE: output the given word with characters stripped.
    """
    # YOUR CODE HERE

    # extract only the columns to use
    extract = needs + indicators + indicator_flags
    extract_df = df.select(*extract)

    # drop the rows that have RegionName empty (NA)
    clean_df = extract_df.na.drop(subset=['RegionName'])

    # cast the value to integer
    for col_name in clean_df.columns:
      if col_name not in needs:
        clean_df = clean_df.withColumn(col_name, col(col_name).cast("int"))

    # fill up the empty values
    clean_df = clean_df.na.fill(0) # fill with minimum value: 0 (indicator and flag)

    return clean_df

from pyspark.sql.types import IntegerType
def personal_mode(nums):
  counts = Counter(nums)
  max_count = max(counts.values())
  mode = [item for item, count in counts.items() if count == max_count] # list of all that have max_count

  if len(mode) > 1: # if multiple modes, take the largest value
      mode = max(mode)
  else:
      mode = mode[0]  # if only one, take that
  return mode

personal_mode = udf(personal_mode, IntegerType())



def impute_data(df):
  # min value to fill NA values -> take "mode" to aggregate by month -> if there is tie in "mode" then break it by taking the bigger one
    """
    INPUT:
        df: spark dataframe
    OUTPUT:
        imputed data: spark dataframe

    NOTE: output the dataframe with nan values replaced with the minimal value ofT the given indicator.
    """
    # YOUR CODE HERE
    clean_indic_df = df.select([col(old).alias(old[:2]) if old in indicators else old for old in df.columns]) # manipulate the header to only leave 'indicator'
    formated_df = clean_indic_df.withColumn("Date", date_format(to_date(col("Date"), "yyyyMMdd"), "yyyyMM")) # format manipulation; to date

    # with default mode ; couldnt find a resource of how it handles tie break
    df_imputed1 = formated_df.groupby("RegionName", "Date").agg(*[mode(indicator).alias(indicator) for indicator in indicator_lst])
    df_imputed2 = formated_df.groupby("RegionName", "Date").agg(*[mode(indicator_flag).alias(indicator_flag.replace('M','')) for indicator_flag in indicator_flags])

    # Attempted with TIE BREAKER mode - wasn't able to implement
    # df_imputed1 = formated_df.groupBy("RegionName", "Date", *indicator_lst).agg(*[personal_mode((col(indicator)).alias(indicator)) for indicator in indicator_lst])
    # df_imputed2 = formated_df.groupBy("RegionName", "Date", *indicator_flags).agg(*[personal_mode((col(indicator_flag)).alias(indicator_flag)) for indicator_flag in indicator_flags])

    df_imputed = df_imputed1.join(df_imputed2, ["RegionName", "Date"], "left")
    return (df_imputed)


def create_I(df):
  # compute I value for each indicator and save to lst
  lst = []
  for ind in indicator_lst:
    v = df[ind]
    n = MAX[ind]
    if v == 0: # if v = 0 -> I = 0
      I = 0

    elif ind not in FLAG.keys(): # if F = 0 -> f = 0
      F = 0
      f = 0
      I = 100 * (v - 0.5*(F - f)) / n

    else:
      F = FLAG[ind]
      f = df[ind+"_Flag"]
      I = 100 * (v - 0.5*(F - f)) / n

    lst.append(I)

  return(lst)


def compute_index_score(df):
    """
    INPUT:
        df: spark dataframe
    OUTPUT:
        list of index scores per region and period: list

    NOTE: output a list of computed scores per region and period based on the algorithm.
    """
    # YOUR CODE HERE

    # traverse over row and apply the function
    res = df.rdd.map(lambda x: create_I(x))

    # calculate the sum of res
    row_sums = res.map(lambda row: reduce(lambda x, y: x + y, row, 0.0)).collect()

    # calculate Government Index
    Gov_Index = [1/16*val for val in row_sums]

    # dataframe of government index
    gov_index_df = spark.createDataFrame([(gov_index,) for gov_index in Gov_Index], ["Gov_Index"])
    # dataframe of RegionName and Date
    selected_df = df.select('RegionName', 'Date').orderBy("RegionName", "Date")


    # add a unique identifier to join them
    gov_index_df = gov_index_df.withColumn("id", monotonically_increasing_id())
    selected_df = selected_df.withColumn("id", monotonically_increasing_id())

    # join by id and drop the extra column
    combined_df = selected_df.join(gov_index_df, "id", "inner").drop("id")

    # reorder the columns as expected output
    combined_df = combined_df.select("RegionName", "Date", "Gov_Index")

    return combined_df

Run your function in the next cells to output required content.

In [79]:
let = clean_data(rdd)
# let.printSchema()
# let.show(10)

In [80]:
let2 = impute_data(let)
let2.show()

+----------+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|RegionName|  Date| C1| C2| C3| C4| C5| C6| C7| C8| E1| E2| H1| H2| H3| H6| H7| H8|C1_Flag|C2_Flag|C3_Flag|C4_Flag|C5_Flag|C6_Flag|C7_Flag|E1_Flag|H1_Flag|H6_Flag|H7_Flag|H8_Flag|
+----------+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|   Alabama|202001|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|      0|      0|      0|      0|      0|      0|      0|      0|      0|      0|      0|      0|
|   Alabama|202002|  0|  0|  0|  0|  0|  0|  0|  3|  0|  0|  0|  1|  1|  0|  0|  0|      0|      0|      0|      0|      0|      0|      0|      0|      0|      0|      0|      0|
|   Alabama|202003|  0|  0|  2|  0|  0|  0|  1|  3|  0|  0|  2|  1|  1|  0|  0|  3|      1|      0| 

In [81]:
let3 = compute_index_score(let2)

let3 = let3.collect()
with open('Q2.txt', 'w') as saveFile:
    for row in let3:
        # Extract values from the row
        region_name = row["RegionName"]
        date = row["Date"]
        gov_index = row["Gov_Index"]

        # Format and write the values to the file
        saveFile.write(f'{region_name}, {date}, {gov_index}\n')

### Q3 - Association Rules

In [82]:
## COMBINE LATER... but as we don't want to load everytime

# load the dataset
rdd = spark.read.option("header", True).csv("/content/OxCGRT_USA_latest.csv")
# print(rdd.columns)
# rdd.printSchema()

In [83]:
from pyspark.sql.functions import year, month

def get_data(df): # MODIFIED THE FUNCTION FROM Q2
    # extract only the columns in use
    extract = needs + indicators
    extract_df = df.select(*extract)

    # drop the rows that have RegionName empty (NA)
    clean_df = extract_df.na.drop(subset=['RegionName'])

    # cast the value to integer
    for col_name in clean_df.columns:
      if col_name not in needs:
        clean_df = clean_df.withColumn(col_name, col(col_name).cast("int"))

    # cast datatype of date
    clean_df = clean_df.withColumn('Date', to_date(col('Date'), 'yyyyMMdd'))

    # fill up the None / Null values
    clean_df = clean_df.na.fill(0) # fill with minimum value: 0 (indicator and flag)

    # extract out the year and month, used to do partitioning to extract first day of each month
    clean_df = clean_df.withColumn('year', year(col('Date')))
    clean_df = clean_df.withColumn('month', month(col('Date')))

    # spec for partitioning and ordering
    spec = Window.partitionBy("RegionName", "year", "month").orderBy("Date")

    # add a row number to each row within the partition
    clean_df = clean_df.withColumn("row_num", row_number().over(spec))

    # filter and get the first row in each partition
    clean_df = clean_df.filter(clean_df.row_num == 1)

    # drop unnecesary columns
    clean_df = clean_df.drop("row_num", "year", "month")

    return clean_df

In [84]:
data = get_data(rdd)
data.show(400)
data.printSchema()

row_count = data.rdd.count()
print(row_count)

+-----------+----------+------------------+---------------------+------------------------+------------------------------+--------------------------+-----------------------------+-------------------------------------+----------------------------------+-----------------+-----------------------+-------------------------------+-----------------+------------------+--------------------+---------------------+--------------------------------+
| RegionName|      Date|C1M_School closing|C2M_Workplace closing|C3M_Cancel public events|C4M_Restrictions on gatherings|C5M_Close public transport|C6M_Stay at home requirements|C7M_Restrictions on internal movement|C8EV_International travel controls|E1_Income support|E2_Debt/contract relief|H1_Public information campaigns|H2_Testing policy|H3_Contact tracing|H6M_Facial Coverings|H7_Vaccination policy|H8M_Protection of elderly people|
+-----------+----------+------------------+---------------------+------------------------+------------------------------+-

In [85]:
from itertools import combinations

def apriori(items, min_sup, itemset_size):
    """
    INPUT:
        items: list
        min_sup: the min support
        itemset_size : if want double (2), if want triplet (3)
    OUTPUT:
        list of frequent itemsets: list

    NOTE: output a list of frequent itemsets.
    """
    item_counts = {} # dictionary to count individual items

    # frequency of each item
    for item in items:
        for itm in item:
            if itm in item_counts:
                item_counts[itm] += 1
            else:
                item_counts[itm] = 1

    # print("item_counts")
    # print(item_counts)
    # if itself has less count than min_support, we don't need it
    frequent_items = {item: count for item, count in item_counts.items() if count >= min_sup}


    frequent_itemsets = [] # list for frequent item"sets"

    # create Double
    if itemset_size == 2:
        candidates = []

        # group to make candidate sets
        for item_combination in combinations(frequent_items.keys(), itemset_size):
            candidate_itemset = tuple(sorted(item_combination))
            candidates.append(candidate_itemset)

        # frequency for candidate sets
        candidate_counts = {}
        for transaction in transactions:
            for itemset in candidates:
                if set(itemset).issubset(set(transaction)):
                    if itemset in candidate_counts:
                        candidate_counts[itemset] += 1
                    else:
                        candidate_counts[itemset] = 1

        # only keep the sets with their freq >= min_support
        frequent_itemsets.extend([itemset for itemset, count in candidate_counts.items() if count >= min_sup])


    # create Triplet
    elif itemset_size == 3:
        candidates = []

        # group to make candidate sets
        for item_combination in combinations(frequent_items.keys(), itemset_size):
            candidate_itemset = tuple(sorted(item_combination))
            candidates.append(candidate_itemset)

        # frequency for candidate sets
        candidate_counts = {}
        for transaction in transactions:
            for itemset in candidates:
                if set(itemset).issubset(set(transaction)):
                    if itemset in candidate_counts:
                        candidate_counts[itemset] += 1
                    else:
                        candidate_counts[itemset] = 1

        # only keep the sets with their freq >= min_support
        frequent_itemsets.extend([itemset for itemset, count in candidate_counts.items() if count >= min_sup])

    # with 'if' statements, double and triplet are seperated so the output only contains double / triplet depending on the input.

    return frequent_itemsets

In [86]:
# DOUBLE
min_support = 100
itemset_size = 2

columns = data.columns
transactions = data.rdd.map(lambda row: [column for column in columns if isinstance(row[column], int) and row[column] != 0]).collect()

# for tran in transactions:
#   print(tran)

frequent_doubles = apriori(transactions, min_support, itemset_size)

for double in frequent_doubles:
    print(double)


# =============== #


# # TRIPLET
# min_support = 80
# itemset_size = 3

# columns = data.columns
# transactions = data.rdd.map(lambda row: [column for column in columns if isinstance(row[column], int) and row[column] > 0]).collect()

# frequent_triplets = apriori(transactions, min_support, itemset_size)

# for triplet in frequent_triplets:
#     print(triplet)

('C8EV_International travel controls', 'H2_Testing policy')
('C8EV_International travel controls', 'H3_Contact tracing')
('H2_Testing policy', 'H3_Contact tracing')
('C8EV_International travel controls', 'H1_Public information campaigns')
('H1_Public information campaigns', 'H2_Testing policy')
('H1_Public information campaigns', 'H3_Contact tracing')
('C1M_School closing', 'C8EV_International travel controls')
('C2M_Workplace closing', 'C8EV_International travel controls')
('C3M_Cancel public events', 'C8EV_International travel controls')
('C4M_Restrictions on gatherings', 'C8EV_International travel controls')
('C5M_Close public transport', 'C8EV_International travel controls')
('C6M_Stay at home requirements', 'C8EV_International travel controls')
('C7M_Restrictions on internal movement', 'C8EV_International travel controls')
('C8EV_International travel controls', 'E1_Income support')
('C8EV_International travel controls', 'E2_Debt/contract relief')
('C8EV_International travel contro

Run your function in the next cells to output required content.

3C)

In [92]:
def calculate_freq(group, transactions, double_triplet):
  if double_triplet == 2: # computation for double
    X, Y = group
    count_XY = 0

    for transaction in transactions:
        if X in transaction and Y in transaction:
            count_XY += 1
    return count_XY

  elif double_triplet == 3: # computation for triplet
    X, Y, Z = group
    count_XYZ = 0

    for transaction in transactions:
        if X in transaction and Y in transaction and Z in transaction:
            count_XYZ += 1
    return count_XYZ

def calculate_confidence(A, B, transactions):
  # conf(A -> B) =  P(B|A)
  # P(B|A) = P(A∩B) / P(A)
  #        = (num of transc. containing both / all transc. ) * (all transc. / num of transc. containing A)
  #        = num of transc. containing both / num of transc. containing A
  count_A = 0
  count_A_B = 0

  for transaction in transactions:
    if set(A).issubset(set(transaction)):
        count_A += 1
        if B in transaction:
            count_A_B += 1

  #print("A:", A, "B:", B, count_A_B / count_A)
  return count_A_B / count_A if count_A > 0 else 0


def generate_association_rules(double_triple, frequent_group, transactions, min_support, return_count):
    association_rules = []

    if double_triple == 2: # for double
      for pair in frequent_group[:return_count]:
        X, Y = pair

        count_XY = calculate_freq(pair, transactions, double_triple)

        if count_XY >= min_support:
            confidence_XY = calculate_confidence([X], Y, transactions)
            confidence_YX = calculate_confidence([Y], X, transactions)

            association_rules.append((X, Y, confidence_XY))
            association_rules.append((Y, X, confidence_YX))


    elif double_triple == 3: # for triplet
      for triplet in frequent_group[:return_count]:
          X, Y, Z = triplet

          count_XYZ = calculate_freq(triplet, transactions, double_triple)

          if count_XYZ >= min_support:
              confidence_XY_Z = calculate_confidence([X, Y], Z, transactions)
              confidence_X_Z_Y = calculate_confidence([X, Z], Y, transactions)
              confidence_Y_Z_X = calculate_confidence([Y, Z], X, transactions)

              association_rules.append(((X, Y), Z, confidence_XY_Z))
              association_rules.append(((X, Z), Y, confidence_X_Z_Y))
              association_rules.append(((Y, Z), X, confidence_Y_Z_X))

    # sort the rules in order we want
    association_rules.sort(key= lambda rule: (-rule[2], rule[0], rule[1])) # rule[2] = confidence, add - to sort in descending order, then alphabetically (lexologically) break tie for the same conf level

    return association_rules[:return_count]

# Run the function - Double
print("============== DOUBLE ==============")
min_support = 100
itemset_size = 2

columns = data.columns
transactions = data.rdd.map(lambda row: [column for column in columns if isinstance(row[column], int) and row[column] > 0]).collect()
frequent_group = apriori(transactions, min_support, itemset_size)
res = generate_association_rules(itemset_size, frequent_group, transactions, min_support, return_count=10)

for rule in res:
    A, B, confidence = rule
    print(f"{A} => {B}, Confidence: {confidence:.4f}")


# Run the function - Triple
print("============== TRIPLE ==============")
min_support = 100
itemset_size = 3

columns = data.columns
transactions = data.rdd.map(lambda row: [column for column in columns if isinstance(row[column], int) and row[column] > 0]).collect()
frequent_group = apriori(transactions, min_support, itemset_size)
res = generate_association_rules(itemset_size, frequent_group, transactions, min_support, return_count=15)

for rule in res:
    A, B, confidence = rule
    print(f"{A} => {B}, Confidence: {confidence:.3f}")


C1M_School closing => C8EV_International travel controls, Confidence: 1.0000
C2M_Workplace closing => C8EV_International travel controls, Confidence: 1.0000
C3M_Cancel public events => C8EV_International travel controls, Confidence: 1.0000
C4M_Restrictions on gatherings => C8EV_International travel controls, Confidence: 1.0000
C8EV_International travel controls => H2_Testing policy, Confidence: 1.0000
C8EV_International travel controls => H3_Contact tracing, Confidence: 1.0000
H1_Public information campaigns => C8EV_International travel controls, Confidence: 1.0000
H1_Public information campaigns => H2_Testing policy, Confidence: 1.0000
H1_Public information campaigns => H3_Contact tracing, Confidence: 1.0000
H2_Testing policy => C8EV_International travel controls, Confidence: 1.0000
('C1M_School closing', 'C8EV_International travel controls') => H2_Testing policy, Confidence: 1.000
('C1M_School closing', 'C8EV_International travel controls') => H3_Contact tracing, Confidence: 1.000
('

In [94]:
# OVERALL - SAVE AS TXT FILE


# Save Double
min_support = 100
itemset_size = 2

columns = data.columns
transactions = data.rdd.map(lambda row: [column for column in columns if isinstance(row[column], int) and row[column] > 0]).collect()
frequent_group = apriori(transactions, min_support, itemset_size)
res = generate_association_rules(itemset_size, frequent_group, transactions, min_support, return_count=10)

with open('Q3_b.txt', 'w') as saveFile:
  for rule in res:
    A, B, confidence = rule
    output = f"{A},{B},{confidence:.3f}"
    saveFile.write(str(output))
    saveFile.write("\n")


# Save Triple

min_support = 100
itemset_size = 3

columns = data.columns
transactions = data.rdd.map(lambda row: [column for column in columns if isinstance(row[column], int) and row[column] > 0]).collect()
frequent_group = apriori(transactions, min_support, itemset_size)
res = generate_association_rules(itemset_size, frequent_group, transactions, min_support, return_count=15)

with open('Q3_c.txt', 'w') as saveFile:
  for rule in res:
    A, B, confidence = rule
    output = f"{A},{B},{confidence:.3f}"
    saveFile.write(str(output))
    saveFile.write("\n")


