# MDLE - Exercise 2.1
### Frequent itemsets and association rules - Similar items
##### Authors: Pedro Duarte 97673, Pedro Monteiro 97484

Import necessary modules

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import random, itertools

Declare constants

In [None]:
# Spark Constants
APP_NAME = 'assignment1ex2'
MASTER = 'local[*]'

# Similarity Constants
MIN_HIGH_SIMILARITY = (.9, .85)
MAX_LOW_SIMILARITY = (.05, .6)

# Exercise Input Constants
BANDS_ROWS_MAX_VALUE = 50

Configuration and Initialization of Spark

- Parameters:
    - `APP_NAME` (string): the name of the Spark application
    - `MASTER` (string): the URL of the Spark master node
<br></br>
- Returns:
    - `sc` (SparkContext): the Spark context for the given application and master
    - `spark` (SparkSession): the Spark session for the given application and master

In [None]:
conf = SparkConf().setAppName(APP_NAME).setMaster(MASTER)
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession.builder.appName(APP_NAME).master(MASTER).config("spark.driver.memory", "15g").getOrCreate()

function `similar_probability`
- calculate the probability that two things are similar, given their similarity score `s` and a reference value `r`

function `not_similar_in_bands_probability` 
- calculate the probability that two things are not similar, given their similarity score `s`, a reference value `r`, and a number of comparison bands `b`

function `similar_in_bands_probability`
- calculate the probability that two things are similar, given their similarity score `s`, a reference value `r`, and a number of comparison bands `b`

In [None]:
# Lambda functions to abstract probability calcs
similar_probability = lambda s, r: s**r 
not_similar_in_bands_probability = lambda s, r, b: (1 - similar_probability(s, r))**b
similar_in_bands_probability = lambda s, r, b: 1 - not_similar_in_bands_probability(s, r, b)

Generate a list of valid pairs of values based on their similarity probabilities and threshold values <br><br>
`parallelize` create pairs of values for range `BANDS_ROWS_MAX_VALUE` <br> <br>
`filter` check if the probability of similarity between the minimum high similarity threshold (`MIN_HIGH_SIMILARITY[1]`) and the two values in the tuple (`v[0]` and `v[1]`) is greater than or equal to the minimum high similarity threshold value (`MIN_HIGH_SIMILARITY[0]`) <br> <br>
`filter` check if the probability of similarity between the maximum low similarity threshold (`MAX_LOW_SIMILARITY[1]`) and the two values in the tuple (`v[0]` and `v[1]`) is less than the maximum low similarity threshold value (`MAX_LOW_SIMILARITY[0]`) <br> <br>
`reduceByKey` reduce the RDD to only unique tuples with the minimum value <br> <br>
`sortBy` sort RDD by the first element of each tuple <br> <br>
`cache` RDD is cached in memory for faster access in the future <br> <br>
`collect` retrieve all the elements in the RDD and store them in a list <br> <br>

In [None]:
bands_rows_valid_pairs = sc.parallelize([(r, b) for r in range(BANDS_ROWS_MAX_VALUE) for b in range(BANDS_ROWS_MAX_VALUE)]) \
    .filter(lambda v: similar_in_bands_probability(MIN_HIGH_SIMILARITY[1], v[0], v[1]) >= MIN_HIGH_SIMILARITY[0]) \
    .filter(lambda v: similar_in_bands_probability(MAX_LOW_SIMILARITY[1], v[0], v[1]) < MAX_LOW_SIMILARITY[0]) \
    .reduceByKey(min) \
    .sortBy(lambda v: v[0]) \
    .cache()

bands_rows_valid_pairs.collect()

Create rows and bands from previous step. <br>
Tested multiple combinations of rows and bands and the best was choosed.


N_FUNCTIONS = number of rows * number of bands

In [None]:
r, b = (13, 18)
N_FUNCTIONS = r*b

N_FUNCTIONS

# Exercise 2.2

In [None]:
# Data Constants
TWEET_ID_COLUMN = 'tweet_id'
URL_COLUMN = 'url'
TEXT_COLUMN = 'text'

# Algorithm Constants
MAX_SHINGLE_SIZE = 5

# Input Constants
INPUT_FILE = 'covid_news_small.json.bz2'

Reading and Parsing Data from CSV File

- Parameters:
    - `INPUT_FILE` (string): the path to the input CSV file
<br></br>
- Returns:
    - `ds` (DataFrame): the parsed data as a Spark DataFrame

In [None]:
ds = spark.read.json(INPUT_FILE)
ds.schema

Generate shingles from the tweets text and store them in a RDD, with duplicates removed and shingles sorted for each tweet. <br><br>
`rdd` function is called to get the RDD representation of the dataframe <br> <br>
`map` map each row of the RDD to a tuple of two values - tweet id and tweet text <br> <br>
`filter` remove any tweets that have an empty text <br> <br>
`mapValues` split the tweet text into words, create shingles from the words, and store the shingles in a list. If a word is smaller than the maximum shingle size, the word is added to the list directly. Otherwise, shingles are created from the word and added to the list. This is done for each tweet <br> <br>
`mapValues` sort the shingles in each tweet's list <br> <br> 
`mapValues` convert each tweet's list of shingles into a set of shingles to remove duplicates <br><br>
`cache` cached for faster access <br><br>
`count` return the number of elements in the RDD <br><br>

In [None]:
shingles = ds.rdd \
  .map(lambda v: (v[TWEET_ID_COLUMN], v[TEXT_COLUMN].casefold())) \
  .filter(lambda v: len(v[1])) \
  .mapValues(lambda v: [shingle for word in v.split() for shingle in ({word[i:i+MAX_SHINGLE_SIZE] for i in range(len(word) - MAX_SHINGLE_SIZE + 1)} if len(word) > MAX_SHINGLE_SIZE else [word])]) \
  .mapValues(sorted) \
  .mapValues(set) \
  .cache()

shingles.count()

Create MinHashing Function Using Universal Hash <br>
a,b - random numbers <br>
24862048 - prime number > total shingles <br>
len(shingles) - total shingles <br>

Return:
- returns a min hash for each function

In [None]:
def build_hashes(shingles, functions):
    return [min([((a*shingle + b)%24862048)%len(shingles) for shingle in shingles]) for a, b in functions]

Generate random values to run previous function

In [None]:
functions = [(random.randint(0, 100), random.randint(0, 100)) for _ in range(N_FUNCTIONS)]

Function `calc_band_hash` 
- take a row as an argument and return a list of tuples. Each tuple contains a hash of a shingle group (band) and its corresponding row ID.


In [None]:
def calc_band_hash(row):
    return [((hash(row[1][i*r:(i+1)*r]), i), row[0]) for i in range(b)]

Compute all pairs of similar tweets by hashing each band of shingles for each tweet and comparing them, returning a distinct list of pairs of tweets that share at least one band hash and band index <br><br>
`shingles` RDD is mapped to replace each tweet's set of shingles with a list of hash values for those shingles <br><br>
`mapValues` compute the hash values for each band of shingles using the provided `build_hashes` function and a set of hash functions (`functions`) <br><br>
`flatMap` apply the `calc_band_hash` function to each tweet's hash list and produce a list of tuples of the form `((band hash, band index), row ID)` <br><br>
`groupByKey` group the tuples by their band hash and band index <br><br>
`flatMap` produce pairs of tweets that share a band hash and band index <br><br>
`distinct` remove any duplicate pairs of tweets <br><br>
`collect` return a list of all distinct pairs of tweets that share at least one band hash and band index <br><br>

In [None]:
similar_pairs = shingles \
  .mapValues(lambda v: [hash(s) for s in v]) \
  .mapValues(lambda v: build_hashes(v, functions)) \
  .flatMap(lambda v: calc_band_hash((v[0], tuple(v[1])))) \
  .groupByKey() \
  .flatMap(lambda v: itertools.combinations(v[1], 2)) \
  .distinct()
  
similar_pairs.collect()

Create a dictionary of candidate pairs where each key represents a tweet and its value is a set of tweets that are similar to it based on the output of the previous code

In [None]:
candidate_pairs = similar_pairs.flatMap(lambda v: [v, v[::-1]]).groupByKey().mapValues(set).collectAsMap()
shingles = shingles.collectAsMap()

function `find_similar`
- find similar pairs from the candidate pairs

function `calc_similar`
- calculate Jaccard similarity between an article and its candidates pairs

In [None]:
def find_similar(article_id: str, sim_treshold: float):
    return [pair for pair in candidate_pairs[article_id] if len([None for s in shingles[article_id] if s in shingles[pair]])/len(shingles[article_id]) > sim_treshold]
        
def calc_similar(article_id: str):
    return [len([None for s in shingles[article_id] if s in shingles[pair]])/len(shingles[article_id]) for pair in candidate_pairs[article_id]]
        

Example of running the function

In [None]:
find_similar('1346893198283694085', .85)

# Exercise 2.3

Parallelized the first 100 shingle keys (dataset sample)

In [None]:
subset = sc.parallelize(list(shingles.keys())[:100])

Calculate false positive rate

In [None]:
false_positive_rate = subset \
  .filter(lambda v: v in candidate_pairs.keys()) \
  .map(lambda k: len([None for v in candidate_pairs[k] if v not in find_similar(k, .85)]) / len(candidate_pairs[k])) \
  .reduce(lambda v1, v2: v1 + v2) / len(shingles.keys()) * 100

false_positive_rate

Function to calculate the Jaccard Similarity

In [None]:
similarity = lambda v1, v2: len([v for v in v1 if v in v2])/len(v1)

Calculate false negative rate

In [None]:
false_negative_percentages = subset.cartesian(subset) \
  .filter(lambda v: v[0] > v[1]) \
  .filter(lambda v: similarity(shingles[v[0]], shingles[v[1]]) > .85) \
  .flatMap(lambda v: [v, v[::-1]]) \
  .groupByKey() \
  .map(lambda v: (v[0], len([None for p in v[1] if p not in candidate_pairs[v[0]]]) / len(v[1]) if v[0] in candidate_pairs else 1)) \
  .groupByKey() \
  .mapValues(lambda v: sum(v)/len(v)) \
  .map(lambda v: v[1]) \
  .collect()

false_negative_rate = sum(false_negative_percentages)/len(false_negative_percentages)
false_negative_rate