# DATASET PROJECT : Counting Frequent Actors Pair

This jupyter notebook is the implementation part of a project taking place in the course of Algorithm for massive datasets conducted by D. Malchiodi In the Università degli Studi di Milano Statale. This course aims to devellop method to deal with massive datasets. In this project, we will see an application of this course in the problem of counting frequent itemsets. You can find a the github repository https://github.com/theot-student/Dataset-Project.git

## 1) Loading and Pre processing the data

In [1]:
#don't forget to replace the xxxxx by your own kaggle id in order to load the datas
import os
os.environ['KAGGLE_USERNAME'] = "xxxxx"
os.environ['KAGGLE_KEY'] = "xxxxx"
!kaggle datasets download -d gsimonx37/letterboxd -f actors.csv


Dataset URL: https://www.kaggle.com/datasets/gsimonx37/letterboxd
License(s): GPL-3.0
Downloading actors.csv.zip to /content
 99% 48.0M/48.6M [00:00<00:00, 110MB/s] 
100% 48.6M/48.6M [00:00<00:00, 102MB/s]


In [2]:
!unzip actors.csv.zip

Archive:  actors.csv.zip
  inflating: actors.csv              


We make the import needed

In [3]:
import pandas as pd
import numpy as np
from collections import defaultdict
import math

Data loading with panda

In [4]:
actors = pd.read_csv('actors.csv')

In [5]:
actors.id.size

5523327

Data processing into batches

In [6]:
#this is the size of the data we want to process. Change it to change the duration of processing
data_size = actors.id.size // 5

In [7]:
actorsBatches = actors[:data_size].groupby(by='id')['name'].apply(np.array).to_numpy()

In [8]:
nb_batches = len(actorsBatches)

We need to recover the frequency of each actors and the name of every single actors

In [9]:
actorsN = actors[:data_size].name.to_numpy()

In [10]:
actors_name, frequency = np.unique(actorsN, return_counts=True)

In [11]:
actors_name = actors_name[np.argsort(frequency)]

In [12]:
frequency = np.sort(frequency)

In [13]:
integer2actor = actors_name

we need to create a dict to representing actor into integer

In [14]:
actor2integer = defaultdict()
i = 0
for actor in actors_name :
    actor2integer[actor] = i
    i += 1

and to replace the name by their integer representation in the batches

In [15]:
for i in range(len(actorsBatches)):
    for j in range(len(actorsBatches[i])):
        actorsBatches[i][j] = actor2integer[actorsBatches[i][j]]

and finally compute boolean frequency of actors and fix the treshold s

In [16]:
s = frequency[-len(frequency) // 100]
bool_frequency = np.zeros(np.size(frequency), dtype=bool)

In [17]:
bool_frequency[-len(frequency) // 100:] = True

## 2) PCY algorithm

First, we need to create an hash function.

In [18]:
nb_bucket = len(actorsBatches) // 100

In [19]:
def hash_pair(element1, element2):
    bucket = (hash(element1) + hash(element2)) % nb_bucket
    return bucket

We now can start by implementing the PCY algorithm

In [20]:
def PCY(batches, single_frequency, nb_bucket, treshold):
    bitmap = np.zeros(nb_bucket)
    hash_counters = np.zeros(nb_bucket)

    #first pass along batches
    for batch in batches:
        for i in range(len(batch)):
            for j in range(i+1,len(batch)):
                hash_counters[hash_pair(batch[i],batch[j])] += 1

    #we create our bitmap of frequent buckets with our treshold (we can change the treshold considering the number of data that are processed)
    bucket_treshold = s                                                        #used for small size of data
    #bucket_treshold = np.sort(hash_counters)[int(-len(hash_counters)//2)]       #used for large size of data
    bitmap = [x >= bucket_treshold for x in hash_counters]

    #second pass along batches
    counters = defaultdict()
    for batch in batches:
        for i in range(len(batch)):
            for j in range(i+1,len(batch)):
                if (bitmap[hash_pair(batch[i],batch[j])] and single_frequency[batch[i]] and single_frequency[batch[j]]):
                    k = min(batch[i],batch[j])
                    l = max(batch[i],batch[j])
                    if (k,l) in counters:
                        counters[(k,l)] += 1
                    else:
                        counters[(k,l)] = 1

    #then we filter our candidate pairs with the treshold
    for actors_pair, counter in counters.copy().items():
        if counter < treshold:
            counters.pop(actors_pair)

    return hash_counters, bitmap, counters

In [21]:
hash_counters_PCY, bitmap_PCY, counters_PCY = PCY(actorsBatches, bool_frequency, nb_bucket, s)

We finish by remake our pair in a readable way by translating our integer into the real names of the actors

In [22]:
final_counters_PCY = dict()
for actors_pair, counter in counters_PCY.items():
    final_counters_PCY[integer2actor[actors_pair[0]] + ' and ' + integer2actor[actors_pair[1]]] = counter
sorted_counters_PCY = sorted(final_counters_PCY.items(), key=lambda x:x[1], reverse=True)

In [23]:
sorted_counters_PCY[:10]

[('Harold Miller and Bess Flowers', 67),
 ('Jeff Bennett and Frank Welker', 57),
 ('Grey DeLisle and Frank Welker', 53),
 ('Ikue Otani and Megumi Hayashibara', 53),
 ('Rob Paulsen and Jeff Bennett', 51),
 ('Harold Miller and Sam Harris', 49),
 ('Ikue Otani and Kappei Yamaguchi', 48),
 ('Sam Harris and Bess Flowers', 46),
 ('Bert Stevens and Bess Flowers', 43),
 ('Stan Laurel and Oliver Hardy', 42)]

## 3) SON algorithm

We will see 2 implementations of SON algorithm. The first one with classical numpy vector and considering we can store our data in the main memory. The second one will use a map reduce approach using spark.

### a) without map reduce

The idea is to first implement our son algorithm on our data without using map reduce (as in the 2) part). First we need to fix p = the number of chunks we will use. And we fix n the number of basket in each chunk.

In [24]:
n = 20000
p = n / nb_batches

In [25]:
p

0.37997530160539567

In [55]:
1/p

2.63175

In [27]:
proba = 1

In [28]:
def simple_Random_Algo(chunk, single_frequency, proba, chunk_treshold):
  counters = defaultdict()

  #we sample the batch if needed
  random_batches = chunk
  if proba != 1:
    random_batches = np.random.choice(chunk, size=int(len(chunk) * proba), replace=False)

  #perform a double loop to recover the frequency of pairs
  for batch in random_batches:
     for i in range(len(batch)):
          for j in range(i+1,len(batch)):
              if  (single_frequency[batch[i]] and single_frequency[batch[j]]):
                  k = min(batch[i],batch[j])
                  l = max(batch[i],batch[j])
                  if (k,l) in counters:
                      counters[(k,l)] += 1
                  else:
                      counters[(k,l)] = 1

  #then we filter our candidate pairs with the treshold
  sample_treshold = chunk_treshold * proba

  for actors_pair, counter in counters.copy().items():
      if counter < sample_treshold:
          counters.pop(actors_pair)

  return set(counters.keys())

First version of SON algorithm

In [29]:
def SON(batches, single_frequency, treshold, n, p, proba):
    candidates = set()
    counters = dict()
    #we will apply the simple_random_algorithm on all chunk of the complete data
    chunk_treshold = p * s

    for i in range(int(1//p)):
        chunk = batches[i*n : (i+1)*n]
        candidates = candidates.union(simple_Random_Algo(chunk, single_frequency, proba, chunk_treshold))
    chunk = batches[int((1//p)) * n :]
    candidates = candidates.union(simple_Random_Algo(chunk, single_frequency, proba, chunk_treshold))

    print(len(candidates))

    for pair in candidates:
      counters[pair] = 0
    for batch in batches:
      for i in range(len(batch)):
        for j in range(i+1,len(batch)):
          k = min(batch[i],batch[j])
          l = max(batch[i],batch[j])
          if ((k,l) in candidates):
            counters[(k,l)] += 1

    #then we filter our candidate pairs with the treshold
    for actors_pair, counter in counters.copy().items():
        if counter < chunk_treshold:
            counters.pop(actors_pair)

    return counters

Second version with the more clever way to deal with the second scan

In [30]:
def SON_efficient(batches, single_frequency, treshold, n, p, proba):
    candidates = set()
    counters = dict()
    #we will apply the rsimple_random_algorithm on all chunk of the complete data
    chunk_treshold = p * s

    for i in range(int(1//p)):
        chunk = batches[i*n : (i+1)*n]
        candidates = candidates.union(simple_Random_Algo(chunk, single_frequency, proba, chunk_treshold))
    chunk = batches[int((1//p)) * n :]
    candidates = candidates.union(simple_Random_Algo(chunk, single_frequency, proba, chunk_treshold))

    #check if the number of candidate is not to huge
    print(len(candidates))

    single_candidates = set()
    for pair in candidates:
      single_candidates.add(pair[0])
      single_candidates.add(pair[1])

    for pair in candidates:
      counters[pair] = 0

    #we remove the actors that are not in our candidate pairs
    for batch in batches:
      batch_set = set(batch)
      for actor in batch.copy():
        if not(actor in single_candidates):
          batch_set.remove(actor)

      if len(batch_set) > 1:
        new_batch = list(batch_set)

        for i in range(len(new_batch)):
          for j in range(i+1,len(new_batch)):
            k = min(new_batch[i],new_batch[j])
            l = max(new_batch[i],new_batch[j])
            if ((k,l) in candidates):
              counters[(k,l)] += 1

    #then we filter our candidate pairs with the treshold
    for actors_pair, counter in counters.copy().items():
        if counter < chunk_treshold:
            counters.pop(actors_pair)

    return counters

In [31]:
counters_SON = SON(actorsBatches, bool_frequency, s, n, p, proba)

861


In [32]:
counters_SON_efficient = SON_efficient(actorsBatches, bool_frequency, s, n, p, proba)

861


The number printed during the SON algorithm is the number of candidate after the first pass over data. This is print in order to chekc if the number is not to huge to have a reasonable time of processing. This number can be changed by changing the number of chunk (or the number n of batches in each chunk here) or the treshold s.

Output for non efficient SON algorithm

In [33]:
final_counters_SON = dict()
for actors_pair, counter in counters_SON.items():
    final_counters_SON[integer2actor[actors_pair[0]] + ' and ' + integer2actor[actors_pair[1]]] = counter
sorted_counters_SON = sorted(final_counters_SON.items(), key=lambda x:x[1], reverse=True)

In [34]:
sorted_counters_SON[:20]

[('Harold Miller and Bess Flowers', 67),
 ('Jeff Bennett and Frank Welker', 57),
 ('Ikue Otani and Megumi Hayashibara', 53),
 ('Grey DeLisle and Frank Welker', 53),
 ('Rob Paulsen and Jeff Bennett', 51),
 ('Harold Miller and Sam Harris', 49),
 ('Ikue Otani and Kappei Yamaguchi', 48),
 ('Sam Harris and Bess Flowers', 46),
 ('Bert Stevens and Bess Flowers', 43),
 ('Stan Laurel and Oliver Hardy', 42),
 ('Megumi Hayashibara and Koichi Yamadera', 41),
 ('Jim Cummings and Frank Welker', 39),
 ('Edna Purviance and Charlie Chaplin', 39),
 ('Jim Cummings and Jeff Bennett', 37),
 ('Dee Bradley Baker and Grey DeLisle', 37),
 ('Rob Paulsen and Frank Welker', 37),
 ('Bert Stevens and Harold Miller', 36),
 ('Franklyn Farnum and Bess Flowers', 35),
 ('Kenichi Ogata and Kappei Yamaguchi', 35),
 ('Sherry Lynn and Mickie McGowan', 35)]

Output of efficient SON algorithm

In [35]:
final_counters_SON_efficient = dict()
for actors_pair, counter in counters_SON_efficient.items():
    final_counters_SON_efficient[integer2actor[actors_pair[0]] + ' and ' + integer2actor[actors_pair[1]]] = counter
sorted_counters_SON_efficient = sorted(final_counters_SON_efficient.items(), key=lambda x:x[1], reverse=True)

In [36]:
sorted_counters_SON_efficient[:20]

[('Harold Miller and Bess Flowers', 67),
 ('Jeff Bennett and Frank Welker', 57),
 ('Ikue Otani and Megumi Hayashibara', 53),
 ('Grey DeLisle and Frank Welker', 53),
 ('Rob Paulsen and Jeff Bennett', 51),
 ('Harold Miller and Sam Harris', 49),
 ('Ikue Otani and Kappei Yamaguchi', 48),
 ('Sam Harris and Bess Flowers', 46),
 ('Bert Stevens and Bess Flowers', 43),
 ('Stan Laurel and Oliver Hardy', 42),
 ('Megumi Hayashibara and Koichi Yamadera', 41),
 ('Jim Cummings and Frank Welker', 39),
 ('Edna Purviance and Charlie Chaplin', 39),
 ('Jim Cummings and Jeff Bennett', 37),
 ('Dee Bradley Baker and Grey DeLisle', 37),
 ('Rob Paulsen and Frank Welker', 37),
 ('Bert Stevens and Harold Miller', 36),
 ('Franklyn Farnum and Bess Flowers', 35),
 ('Kenichi Ogata and Kappei Yamaguchi', 35),
 ('Sherry Lynn and Mickie McGowan', 35)]

### b) using spark

First we need to import spark. For this part I'm using the importation of spark which is present in the spark presentation jupyter notebook of the lecture of "algorithm for massive dataset".

In [37]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [38]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [39]:
import findspark
findspark.init("spark-3.1.1-bin-hadoop3.2")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [40]:
import pyspark
type(spark)
import pandas as pd
import numpy as np
from collections import defaultdict
import math

In [41]:
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType

We now come to the loading of our data

In [42]:
df = spark.read.option("delimiter", ";").option("delimiter", ",").option("header", True).csv('actors.csv')

As before we introduce a maximum value of data to process

In [43]:
sp_data_size = df.count() // 5

In [44]:
type(sp_data_size)

int

In [45]:
used_df = df.limit(sp_data_size)

In [56]:
nb_chunk = 3

In [57]:
def is_Frequent_Single(index, bool_spark_frequency):
    return bool_spark_frequency[index]

def first_map(chunk, s, bool_spark_frequency):
    counters = defaultdict()
    length = 0

    #we first pass along our sample/chunk to recover our candidate pair regarding single frequency
    for batch in chunk:
      np_batch = batch[0]
      length += 1
      for i in range(len(np_batch)):
        for j in range(i+1,len(np_batch)):
          if (is_Frequent_Single(np_batch[i], bool_spark_frequency)and is_Frequent_Single(np_batch[j], bool_spark_frequency)):
            k = min(np_batch[i],np_batch[j])
            l = max(np_batch[i],np_batch[j])
            if (k,l) in counters:
              counters[(k,l)] += 1
            else:
              counters[(k,l)] = 1


    #then we filter our candidate pairs with the treshold
    sample_treshold = s
    for actors_pair, counter in counters.copy().items():
        if counter >= sample_treshold:
            yield (1, [actors_pair])

def first_reduce(pair1, pair2):
    if (pair1 == pair2):
      return pair1
    else:
      return pair1 + pair2

def second_map(chunk, candidates, single_candidates):
    counters = defaultdict()

    for pair in candidates:
      counters[pair] = 0

    for batch in chunk:
      batch_set = set(batch.batch)
      for actor in batch.batch:
        if not(actor in single_candidates):
          batch_set.remove(actor)

      if len(batch_set) > 1:
        new_batch = list(batch_set)

        for i in range(len(new_batch)):
          for j in range(i+1,len(new_batch)):
            k = min(new_batch[i],new_batch[j])
            l = max(new_batch[i],new_batch[j])
            if ((k,l) in candidates):
              counters[(k,l)] += 1

    for actors_pair, counter in counters.copy().items():
      yield (actors_pair, counter)

def second_reduce(counter1, counter2):
    return counter1 + counter2

def check_treshold(counter, s):
    if counter >= s:
      return counter

class MapReduceSON():

  def __init__(self, data_size, df, nb_chunk):
    self.data_size = data_size
    self.df = df
    self.nb_chunk = nb_chunk

  def data_process(self):
    #first we find our single frequency for each actors
    frequency_df = self.df.groupby("name").count()
    frequency_sorted_df = frequency_df.sort("count")

    #then we index our actors names to provide a map from actor names to integer
    frequency_sorted_df = frequency_sorted_df.withColumn("index", monotonically_increasing_id())

    #we need to create a mapping from index to name (i'm not sure if it's reliable for DFS)
    actor_map = np.array(frequency_sorted_df.select("name").collect())
    self.actor_map = actor_map

    #and we use index in our dataframe
    df_indexed = self.df.join(frequency_sorted_df.select("name","index"), on="name", how="left")
    self.df_indexed = df_indexed

    #we create our batches
    batches_df = self.df_indexed.groupby("id").agg(collect_list("index").alias("batch")).repartition(nb_chunk, "batch")
    self.batches_df = batches_df

    #we create our treshold keeping only 1% of most frequent single actors
    s = frequency_sorted_df.select("count").take(frequency_sorted_df.count() - (frequency_sorted_df.count() // 100))[-1]
    s = s["count"]
    self.s = s

    #then we create an array list to recover frequent single actors in boolean
    aboveTresholdUDF = udf(lambda x: x >= s,BooleanType())
    frequency_sorted_df = frequency_sorted_df.withColumn("bool", aboveTresholdUDF(col("count")))
    bool_spark_frequency = np.array(frequency_sorted_df.select("index","bool").rdd.map(lambda x : [x[0], x[1]]).collect())
    bool_spark_frequency = bool_spark_frequency[np.argsort(bool_spark_frequency[:,0]),1]
    self.bool_spark_frequency = bool_spark_frequency

    #we also need our number of batches
    nb_batches = batches_df.count()
    self.nb_batches = nb_batches


  def map_reduce(self):
    #we need to transform our class variables into local variables otherwise the map reduce doesn't work in spark (probably because of thread and because we get outside the class)
    s = self.s
    nb_batches = self.nb_batches
    nb_chunk = self.nb_chunk
    bool_spark_frequency = self.bool_spark_frequency
    s_chunk = s / nb_chunk

    #we create our rdd
    rdd = self.batches_df.select("batch").rdd

    #the we do our first map reduce to get our candidates
    candidates_MR = rdd.mapPartitions(lambda x : first_map(x, s_chunk, bool_spark_frequency)).reduceByKey(first_reduce).collect()
    candidates = candidates_MR[0][1]
    print(len(candidates))

    #for the efficient version we create a set of single candidate
    single_candidates = set()
    for pair in candidates:
      single_candidates.add(pair[0])
      single_candidates.add(pair[1])

    #and finally we proceed to our second map reduce to get the final counters
    final_counters_MRSON = rdd.mapPartitions(lambda x : second_map(x, candidates, single_candidates)).reduceByKey(second_reduce).reduceByKey(lambda x : check_treshold(x, s)).collect()

    #we rename our counters to make them readable
    counters_named = []
    for counter in final_counters_MRSON:
      if counter[1] >= s:
        counters_named += [[str(self.actor_map[counter[0][0], 0]) + " and " + str(self.actor_map[counter[0][1], 0]) + " appear together in " + str(counter[1]) + " movies.", str(counter[1])]]

    return counters_named

In [58]:
SON = MapReduceSON(sp_data_size, used_df, nb_chunk)
SON.data_process()

In [59]:
final_counters_MRSON = SON.map_reduce()

702


Output of spark SON algorithm

In [60]:
np.flip(np.array(final_counters_MRSON)[np.argsort(np.array(final_counters_MRSON)[:, 1], 0),0])[:20]

array(['Harold Miller and Bess Flowers appear together in 67 movies.',
       'Jeff Bennett and Frank Welker appear together in 57 movies.',
       'Ikue Otani and Megumi Hayashibara appear together in 53 movies.',
       'Grey DeLisle and Frank Welker appear together in 53 movies.',
       'Rob Paulsen and Jeff Bennett appear together in 51 movies.',
       'Harold Miller and Sam Harris appear together in 49 movies.',
       'Ikue Otani and Kappei Yamaguchi appear together in 48 movies.',
       'Sam Harris and Bess Flowers appear together in 46 movies.',
       'Bert Stevens and Bess Flowers appear together in 43 movies.',
       'Stan Laurel and Oliver Hardy appear together in 42 movies.',
       'Megumi Hayashibara and Koichi Yamadera appear together in 41 movies.',
       'Jim Cummings and Frank Welker appear together in 39 movies.',
       'Edna Purviance and Charlie Chaplin appear together in 39 movies.',
       'Jim Cummings and Jeff Bennett appear together in 37 movies.',
    