# Project 2: Market-basket analysis - IMDB dataset

Project for the course of Algorithms for Massive Dataset <br> Nicolas Facchinetti 961648 <br> Antonio Belotti 960822

# Set up the Spark enviorment

We start by dowloading and installing all the needed tool to deal with Spark. In particular we are interested in obtainig a Java enviorment since Spark in written in Scala and so it need a JVM to run. Then we can download Apache Spark 3.1.2 with Hadoop 3.2 by the Apache CDN and uncompress it. Finally we can get and install PySpark, an interface for Apache Spark in Python

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

The next step is to correctly set the path in our remote enviorment to use the obtained tools.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

Finally we can import PySpark in the project

In [None]:
import findspark
findspark.init("spark-3.1.2-bin-hadoop3.2")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Download the dataset from Kaggle

First install the Python module of Kaggle to download the dataset from its datacenter

In [None]:
!pip install kaggle

Then load kaggle.json, a file containing your API credentials to be able to use the services offered by Kaggle

In [None]:
from google.colab import files

uploaded = files.upload()
  
# Move kaggle.json into the folder where the API expects to find it.
!mkdir -p ~/.kaggle/ && mv kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

Now we can download the dataset

In [None]:
!kaggle datasets download 'ashirwadsangwan/imdb-dataset'

We now must unzip the compressed archive to use it. Once done we can also remove it.

In [None]:
!unzip imdb-dataset.zip && rm imdb-dataset.zip

# Preapare the data for Spark

We can directly load the downloaded and extracted .tsv file in a Spark DataFrame by using the command read.csv(). We directly pass to the method the columns in which we are interested.

In [None]:
df_principals = spark.read.csv("/content/title.principals.tsv/title.principals.tsv", sep=r'\t', header=True).select('tconst','nconst','category')

In [None]:
df_principals.show(10)

In [None]:
df_basics = spark.read.csv("/content/title.basics.tsv/title.basics.tsv", sep=r'\t', header=True).select('tconst','titleType')

In [None]:
df_basics.show(10)

By inspecting the content of the column 'category' of df_principlas we can see that there are many jobs other than actors and actress (which are the two we are interested in)

In [None]:
df_principals.select("category").distinct().show()

Similarly we can do the same thing with df_basics and the column 'titleType' to see how many categories a title can have.

In [None]:
df_basics.select("titleType").distinct().show()

Once the data is loaded in a Spark DataFrame we can use the PySpark SQL module for processing the data. We start by exctracting only actors and actress from df_principals

In [None]:
pre = df_principals.count()
df_principals.createOrReplaceTempView("PRINCIPALS") # create a temporary table on DataFrame
df_principals = spark.sql("SELECT * from PRINCIPALS WHERE category ='actor' OR category='actress'")
print("We reduced the number of row from {} to {}".format(pre, df_principals.count()))

 And then we do the same thing with movies in df_basics

In [None]:
pre = df_basics.count()
df_basics.createOrReplaceTempView("BASICS") # create a temporary table on DataFrame
df_basics = spark.sql("SELECT * from BASICS WHERE titleType ='movie'")
print("We reduced the number of row from {} to {}".format(pre, df_basics.count()))

We can now see that we have two DataFrame, one containing only the movies and the other only the people which play as actor/actress in a title. To do the desired maket-basket analysis we have to pivot our tconst as rows, so each row stands for one titleId, and then including a list of nconst identifiers of the actors that played in it.

In [None]:
df_basics.show(10)

In [None]:
df_principals.show(10)

So we start by joining the two dataframe to extract from df_principals only the records with tconst related to a movie. We can also discard the category column since is no longer usefull.

In [None]:
basket_data = df_principals.join(df_basics, "tconst").select(df_principals.tconst, df_principals.nconst).sort("tconst")

In [None]:
basket_data.show(10)

Then we can remove hypothetical duplicated row and then aggregate the data using tconst identifier.

In [None]:
from pyspark.sql import functions as F
basket_data = basket_data.dropDuplicates()
basket_data = basket_data.groupBy("tconst").agg(F.collect_list("nconst").alias("nconsts")).sort('tconst')

In [None]:
print("There are {} titleId buckets".format(basket_data.count()))
basket_data.show(10, False)

As we can see above we now have the data in the correct format to do our analysis: in each row we have the identifier of a movie and in the second column the list of the idenfiers of the actors that played in it.
Since we done all the needed pre-processing computation on the data we can transform our DataFrame in a RDD to apply map-reduce functions.

Serialize to file the RDD and download to skip the processing all the time.



In [None]:
basket_data.write.format('json').save("data")

In [None]:
!zip -r data.zip data

In [None]:
from google.colab import files
files.download('data.zip')

# APRIORI MAP-REDUCE

In [None]:
data = spark.read.format("json").option("header", "true").load("data").select('tconst', 'nconsts').rdd

In [None]:
data.take(5)

Accediamo al campo 1 sicchè 0 è il bucket, flat perché cosi unisce tutte le row in una

In [None]:
data.flatMap(lambda row: row[1]).take(10)

Mappiamo ogni record di autore trovato in se stesso e 1

In [None]:
data.flatMap(lambda row: (row[1]))
  .map(lambda elem: (elem,1)).take(10)

Aggiungiamo reduce che somma la parte dopo il contantore dell'attore

In [None]:
data.flatMap(lambda row: (row[1])).map(lambda elem: (elem,1)).reduceByKey(lambda a,b: a+b).take(10)

Aggiungiamo un threshold (almeno 200 apparizioni)

In [None]:
res = data.flatMap(lambda row: (row[1])) \
          .map(lambda elem: (elem,1)) \
          .reduceByKey(lambda a,b: a+b) \
          .filter(lambda x: x[1] >=200)
res.take(10)

Vediamo ora per la seconda parte di apriori

In [None]:
data.take(10)

Prendiamo il primo record per provare e estriamo i due elementi. Scriviamo una funzione che controlla se gli elementi di una copia sono nella riga

In [None]:
coppia = ['nm0063086', 'nm0183823']    #primi due attori del primo record

def row_contains_elements(row, elements):
  return all(x in row for x in elements)

data.map(lambda x:x[1]).filter(lambda x: row_contains_elements(x,coppia)).take(5)


Proviamo ora a cercare di far generare le copie possibili ad ogni singola riga. trick per evitare doppioni. flatmap direttamente almeno sono gia spacchettate

In [None]:
data.take(1)

In [None]:
def generate_candidate(x):
  candidates = []
  for a, elemA in enumerate(x):
    for b, elemB in enumerate(x):
      if a < b:
        candidates.append((elemA, elemB))
  return candidates

data.map(lambda x: x[1]).flatMap(lambda x: generate_candidate(x)).take(10)

Aggiungiamo poi un controllo che la copia generata sia in quelle di interesse

In [None]:
copia = [('nm0063086', 'nm0183823'), ('nm0846894', 'nm3002376')]

def generate_candidate(x):
  candidates = []
  for a, elemA in enumerate(x):
    for b, elemB in enumerate(x):
      if a < b:
        candidates.append((elemA, elemB))
  return candidates

data.map(lambda x: x[1]).flatMap(lambda x: generate_candidate(x)).filter(lambda x: x in copia).take(3)

Vero proprio passo di map. Le tuple per qualche motivo sono hashabili

In [None]:
copia = [('nm0063086', 'nm0183823'), ('nm0846894', 'nm3002376')]

def generate_candidate(x):
  candidates = []
  for a, elemA in enumerate(x):
    for b, elemB in enumerate(x):
      if a < b:
        candidates.append((elemA, elemB))
  return candidates

data.map(lambda x: x[1]).flatMap(lambda x: generate_candidate(x)) \
    .filter(lambda x: x in copia).map(lambda x: (x,1)).take(3)

Aggiungiamo reduce e il controllo del threshold

In [None]:
copia = [('nm0063086', 'nm0183823'), ('nm0846894', 'nm3002376')]

def generate_candidate(x):
  candidates = []
  for a, elemA in enumerate(x):
    for b, elemB in enumerate(x):
      if a < b:
        candidates.append((elemA, elemB))
  return candidates

data.map(lambda x: x[1]).flatMap(lambda x: generate_candidate(x)) \
    .filter(lambda x: x in copia) \
    .map(lambda x: (x,1)) \
    .reduceByKey(lambda a,b: a+b) \
    .filter(lambda x: x[1] >=1) \
    .take(3)
          

In [None]:
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

def apriorihmap(data, support_threshold):
    """ 
    data: Pyspark.rdd 
      [
        [tconst, [nconst,]],
      ]
    """
    nconst_rdd = data.map(lambda x: x[1])

    frequent_items_rdd = nconst_rdd.flatMap(lambda x: x) \
          .map(lambda elem: (elem,1)) \
          .reduceByKey(lambda a,b: a+b) \
          .filter(lambda x: x[1] >= support_threshold)\
          .map(lambda x: x[0])

    def generate_candidate(x):
      candidates = []
      for a, elemA in enumerate(x):
        for b, elemB in enumerate(x):
          if a < b:
            candidates.append((elemA, elemB))
      return candidates

    #print(f"found {frequent_items_rdd.count()} frequent singletons")
    frequent_singletons_bv = sc.broadcast({k:True for k in frequent_items_rdd.collect()})

    return data.map(lambda x: x[1]) \
          .filter(lambda x: [elem for elem in x if frequent_singletons_bv.value.get(elem, False)])\
          .flatMap(lambda x: generate_candidate(x)) \
          .map(lambda x: (x,1)) \
          .reduceByKey(lambda a,b: a+b) \
          .filter(lambda x: x[1] >=support_threshold)

In [None]:
rules = apriori(data, 60)
rules.count()

# SON

In [None]:
!unzip data.zip

In [None]:
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

data = spark.read.parquet("data.parquet").rdd #format("json").option("header", "true").load("data").select('tconst', 'nconsts').rdd.flatMap(lambda x:x[1])

In [None]:
# empirical sweet-spot for the number of partitions (assuming every executor has 4 cores ...)
num_partitions = sc._jsc.sc().getExecutorMemoryStatus().size() * 4 

In [None]:
# force actual partitioning. If you don't introduce a fake partition-key, the reduceByKey you do when counting elements will count the elements globally.

support_threshold = 150

import random
data.map(lambda x: ((random.randint(1, num_partitions), x), 1))\
    .reduceByKey(lambda x,y: x+y)\
    .filter(lambda x: x[1] >= support_threshold / num_partitions)\
    .map(lambda x: (x[0][1], x[1]))\
    .reduceByKey(lambda x,y: x+y)\
    .filter(lambda x: x[1] >= support_threshold)\
    .first()


    #TODO 

# Demo FP Growht

In [None]:
from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="nconsts", minSupport=0.0001, minConfidence=0.0001)
model = fpGrowth.fit(basket_data)

In [None]:
# Display frequent itemsets.
model.freqItemsets.show()
items = model.freqItemsets

In [None]:
# Display generated association rules.
model.associationRules.show()
rules = model.associationRules

In [None]:
# transform examines the input items against all the association rules and summarize the consequents as prediction
model.transform(basket_data).show()
transformed = model.transform(basket_data)

# Demo Antonio

In [None]:
import pandas as pd

Lets try to load some data in a Pandas Dataframe

In [None]:
actors_cols = {
    "original": [
        "nconst",  # actor unique id
        "knownForTitles"  # move he/she is in
    ],
    "renamed": ["actorId", "titles"]
}

actors_df = pd.read_csv(
    "name.basics.tsv.gz",
    compression="gzip",
    sep='\t',
    usecols=actors_cols["original"]
)

# clean and pre-process
actors_df.columns = actors_cols["renamed"]
actors_df.drop(actors_df[actors_df.titles == "\\N"].index, inplace=True)
actors_df.titles = actors_df.titles.apply(lambda x: x.split(","))

In [None]:
actors_df

In [None]:
def apriori(transactions, support_threshold):
    singleton_counter = []
    lookup_index_table = {}
    reverse_lookup_index_table = {}

    # count singletons
    for bucket in transactions:
        for elem in bucket:
            if elem not in lookup_index_table:
                # The newly discovered element is appended on the tail of the array counter
                lookup_index_table[elem] = len(singleton_counter)
                reverse_lookup_index_table[len(singleton_counter)] = elem
                singleton_counter.append(0)

            idx = lookup_index_table[elem]
            singleton_counter[idx] += 1

    frequent_items_table = [i for i,v in enumerate(singleton_counter) if v > support_threshold]

    # count pairs
    pair_counter = {}
    for bucket in transactions:
        frequent_items = [lookup_index_table[item] for item in bucket 
                          if lookup_index_table[item] in frequent_items_table]

        for x in frequent_items:
            for y in frequent_items:
                if x<y:
                    pair_counter[(x,y)] = pair_counter.get((x,y), 0) +1 

    return [list(map(lambda x: reverse_lookup_index_table[x], i)) for i,c in pair_counter.items() 
            if c > support_threshold] 

In [None]:
# test
rules = apriori(actors_df.titles, 300)

movies_df = pd.read_csv("title.basics.tsv.gz", compression='gzip', sep='\t')
from IPython.display import display

for x,y in rules:
    display(movies_df.loc[((movies_df.tconst == x) | (movies_df.tconst == y))])