<a href="https://colab.research.google.com/github/LorenzoPolli/market-basket-analysis/blob/main/A_Priori_%26_SON_Market_basket_analysis_(eng_newspapers).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Market-basket analysis  🧺

The task is to implement a system finding frequent itemsets (aka market-basket analysis), analyzing the «Old Newspapers» dataset published on Kaggle and released under the public domain license (CC0).

> Project authors: Mathias Cardarello Fierro & Lorenzo Polli

Algorithms for Massive Data


*Università degli Studi di Milano*


15-Dec-2022


## **A-Priori & SON Algorithms**

### **1. Setup and data import**

In [102]:
%%capture
# Download the dataset containing old newspapers
import os

os.environ["KAGGLE_USERNAME"] = "mathiascardarello"
os.environ["KAGGLE_KEY"] = "89f16dcdf267d017756e3a2e5cece19a"
!pip install kaggle --upgrade
!kaggle datasets download alvations/old-newspapers --unzip

In [103]:
# Install Java 8 openjdk

%%capture
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"

!pip install py4j

In [3]:
!java -version

openjdk version "11.0.17" 2022-10-18
OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu218.04)
OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu218.04, mixed mode, sharing)


#### **1.1 Setting up PySpark and Spark NLP**


In [104]:
# Install pyspark and spark-nlp

%%capture
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

In [105]:
%%time
import sparknlp

spark = sparknlp.start()

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

Spark NLP version 4.2.4
Apache Spark version: 3.2.1
CPU times: user 5.18 ms, sys: 89 µs, total: 5.27 ms
Wall time: 36.4 ms


In [None]:
# Check Spark environment

spark

#### **1.2. Import the dataset**

In [106]:
# Import the dataset and display only articles in English

%%time
df = spark.read.csv('old-newspaper.tsv', sep='\t', header=True) 
df = df.filter("Language == 'English'")
df.show(5)

+--------+------------+----------+--------------------+
|Language|      Source|      Date|                Text|
+--------+------------+----------+--------------------+
| English| latimes.com|2012/04/29|He wasn't home al...|
| English|stltoday.com|2011/07/10|The St. Louis pla...|
| English|   freep.com|2012/05/07|WSU's plans quick...|
| English|      nj.com|2011/02/05|The Alaimo Group ...|
| English|  sacbee.com|2011/10/02|And when it's oft...|
+--------+------------+----------+--------------------+
only showing top 5 rows

CPU times: user 303 ms, sys: 33.1 ms, total: 336 ms
Wall time: 49.2 s


In [107]:
import pyspark.sql.functions as F
from pyspark.sql.functions import desc

In [None]:
# Check whether there are NULL values or not

%%time
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+--------+------+----+----+
|Language|Source|Date|Text|
+--------+------+----+----+
|       0|     0|   0|   0|
+--------+------+----+----+

CPU times: user 498 ms, sys: 64.3 ms, total: 562 ms
Wall time: 1min 27s


### **2. Pre-processing**

We need to prepare our dataset to implement the market-basket analysis, considering values of the "Text" attribute as baskets and the words as items. Therefore, a dataframe is created in which each row represents a basket with their comma-separated items.

Furthermore, we apply text mining techniques to filter out stop words to get a powerful analysis of the text.

In [108]:
# Import libraries

from pyspark.ml import Pipeline
import pyspark.sql.types as T
from typing import List
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.common import *
from sparknlp.pretrained import PretrainedPipeline

In [109]:
# Initialize the annotators

document = DocumentAssembler()\
    .setInputCol("Text")\
    .setOutputCol("document")\
    .setCleanupMode("shrink") # remove new lines and tabs, plus merging multiple spaces and blank lines to a single space

sentence = SentenceDetector()\
    .setInputCols(['document'])\
    .setOutputCol('sentence')

token = Tokenizer()\
    .setInputCols(['sentence'])\
    .setOutputCol('token')

normalizer = Normalizer()\
    .setInputCols(["token"])\
    .setOutputCol("normalized")\
    .setLowercase(True)\
    .setCleanupPatterns(["""[^\w\d\s]"""]) # remove punctuations (keep alphanumeric characterss)

stop_words = StopWordsCleaner.pretrained('stopwords_en', 'en')\
    .setInputCols(["normalized"])\
    .setOutputCol("cleanTokens")\
    .setCaseSensitive(False)

lemmatizer = LemmatizerModel.pretrained()\
    .setInputCols(["cleanTokens"])\
    .setOutputCol("lemma")

prediction_pipeline = Pipeline(stages = [document, sentence, token, normalizer, stop_words, lemmatizer])

stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [110]:
%%time
clean_df = prediction_pipeline.fit(df).transform(df)

CPU times: user 132 ms, sys: 23.3 ms, total: 155 ms
Wall time: 1.05 s


In [111]:
# Add a column where each row corresponds to a different basket of items 

%%time
clean_df = clean_df.withColumn("Basket", clean_df.lemma.result) 

CPU times: user 5.15 ms, sys: 914 µs, total: 6.06 ms
Wall time: 76 ms


Note that in the last row, item "love" is repeated twice, and this is not good for further algorithms application.

In [112]:
# Remove duplicates inside baskets
remove_duplicates = F.udf(lambda x: list(set(x)))
clean_df = clean_df.withColumn("BasketNoDup", remove_duplicates(F.col("Basket")))

In [113]:
# Remove strings "[" and "]" from baskets -> otherwise, some items are not easily identified 
clean_df = clean_df.withColumn("BasketNoBrackets", 
                F.regexp_replace(F.regexp_replace(F.regexp_replace("BasketNoDup", "\\]\\[", ""), "\\[", ""), "\\]", ""))

In [114]:
# Add an index column to optimize computation time
from pyspark.sql.functions import monotonically_increasing_id
clean_df = clean_df.withColumn("Index", monotonically_increasing_id()) 

In [15]:
# Show DataFrame after transformations
clean_df.select("Index","Basket","BasketNoDup","BasketNoBrackets").show()

+-----------+--------------------+--------------------+--------------------+
|      Index|              Basket|         BasketNoDup|    BasketNoBrackets|
+-----------+--------------------+--------------------+--------------------+
|85899345920|[wasnt, home, app...|[apparently, home...|apparently, home,...|
|85899345921|[st, louis, plant...|[production, mass...|production, mass,...|
|85899345922|[wsus, plan, quic...|[applaud, center,...|applaud, center, ...|
|85899345923|[alaimo, group, m...|[employee, june, ...|employee, june, g...|
|85899345924|[difficult, predi...|[merit, absolutel...|merit, absolutely...|
|85899345925|[amount, scoff, y...|[year, nfl, round...|year, nfl, round,...|
|85899345926|[14915, charlevoi...|[charlevoix, 1491...|charlevoix, 14915...|
|85899345927|[long, line, fail...|[gop, lonegan, ci...|gop, lonegan, cit...|
|85899345928|[time, report, su...|[employee, improv...|employee, improve...|
|85899345929|[hit, hard, somep...|[rizzo, im, field...|rizzo, im, field,...|

In [115]:
# Create a new object that contains just text and baskets with no duplicated words

clean_df.createOrReplaceTempView("df_view")

baskets = spark.sql("SELECT Index, SPLIT(BasketNoBrackets,', ') AS Basket FROM df_view")

In [None]:
# Column "Basket" must be of the type "Array of strings"
baskets.printSchema()

root
 |-- Index: long (nullable = false)
 |-- Basket: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [116]:
# Obtain a sample to work with that is 0.5% the size of the original dataset
sample = baskets.select("Index","Basket").sample(False, 0.005, 10)

In [None]:
sample.show(5, truncate=False)

+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Index      |Basket                                                                                                                                                                              |
+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|85899345935|[school, son, student, mentore, child, andrades, opportunity, adult, play, role, intrigue, patrick, life, high, erin]                                                               |
|85899346237|[fish, jeopardize, threaten, adversely, habitat, zidells, plan, conclude, wild, modify]                                                                                             |
|85899346277|[shut, shoul

### **3. Exploratory Data Analysis**

In [None]:
# Total number of articles

%%time
total_rows = spark.sql("""SELECT COUNT(DISTINCT Text) AS total_rows 
                        FROM df_view""")
total_rows.show()

+----------+
|total_rows|
+----------+
|   1010092|
+----------+

CPU times: user 387 ms, sys: 52.1 ms, total: 439 ms
Wall time: 1min 7s


In [None]:
# Total number of articles by source

%%time
top_sources = spark.sql("""SELECT DISTINCT source AS source_name,
                                  COUNT(DISTINCT Text) AS total_rows 
                        FROM df_view
                        GROUP BY source_name
                        ORDER BY total_rows DESC
                        LIMIT 10""")
top_sources.show()

+----------------+----------+
|     source_name|total_rows|
+----------------+----------+
|   cleveland.com|    152716|
|          nj.com|    125100|
|    stltoday.com|    120632|
|  oregonlive.com|    103494|
|     latimes.com|     60637|
|   azcentral.com|     42693|
|      sfgate.com|     42121|
|baltimoresun.com|     39994|
|       freep.com|     36466|
| startribune.com|     31820|
+----------------+----------+

CPU times: user 420 ms, sys: 55.2 ms, total: 475 ms
Wall time: 1min 12s


In [None]:
# Total number of articles by year

%%time
by_year = spark.sql("""SELECT DISTINCT substr(Date, 1, 4) AS year,
                              COUNT(DISTINCT Text) AS total_rows 
                        FROM df_view
                        GROUP BY year
                        ORDER BY year""")
by_year.show()

+----+----------+
|year|total_rows|
+----+----------+
|2005|      3859|
|2006|      7076|
|2007|      8694|
|2008|     17909|
|2009|     42158|
|2010|    122991|
|2011|    263965|
|2012|    543444|
+----+----------+

CPU times: user 456 ms, sys: 55 ms, total: 511 ms
Wall time: 1min 13s


### **4. Market-Basket Analysis**

#### **4.1 A-priori Algorithm**

In [117]:
# A-priori

### FUNCTIONS ###
# Definition of sum function
def sum_items(x,y):
    return x+y

# Definition of filtering function: it checks if a set of items is a subset of a basket
def filtering(basket_list, candidates):
  for item in candidates:
    if set(item).issubset(set(basket_list)):
      return ((item, 1))

##### The Algorithm implementation 

In [118]:
# Convert column containing baskets into a Resilient Distributed Dataset (RDD)
rdd_baskets = sample.select("Basket").rdd

In [20]:
# Create a single array from an array of arrays
flat_list = rdd_baskets.flatMap(lambda xs: [x[0] for x in xs])

In [119]:
minSupport = 50

singletons = flat_list.map(lambda item: (item, 1)) # create a set of (key, value) pairs 
singletons_sum = singletons.reduceByKey(sum_items) # count frequency of each key
singletons_filtered = singletons_sum.filter(lambda item: item[1] >= minSupport) # filter out items that are below the threshold (not frequent)

In [22]:
frequent_items = singletons_filtered.map(lambda item: item[0]) # obtain a list of the frequent items

In [None]:
# Import combinations module
from itertools import combinations

In [None]:
pairs_list = list(combinations(frequent_items.toLocalIterator(),2)) # a list of all the possible frequent items combinations (pairs)

In [None]:
pairs_list

[('work', 'year'),
 ('work', 'start'),
 ('work', 'school'),
 ('year', 'start'),
 ('year', 'school'),
 ('start', 'school')]

Note: the most frequent items are "work", "year", "start", "school".

In [None]:
# Create the support table for the pairs of items by applying the filtering function previously created
supp_table_pairs = basket_list.map(lambda x : filtering(x, pairs_list)).filter(lambda x: x is not None) # apply filtering function to check if a pair appears in the baskets

In [93]:
supp_table_pairs_sum = supp_table_pairs.reduceByKey(sum_items) # sum of values by item as key

In [120]:
### ALGORITHM IMPLEMENTATION ###
def a_priori(rdd_baskets, minSupport):
  '''
  Apply the a-priori algorithm to singletons only to obtain a candidate itemset of frequent items
  '''

  # Create a single array from an array of arrays
  flat_list = rdd_baskets.flatMap(lambda xs: [x[0] for x in xs])

  singletons = flat_list.map(lambda item: (item, 1)).cache() # create a set of (key, value) pairs and cache the results
  singletons_sum = singletons.reduceByKey(sum_items) # count frequency of each key
  singletons_filtered = singletons_sum.filter(lambda item: item[1] >= minSupport) # filter out items that are below the threshold (not frequent)

  candidate_itemset = singletons_filtered.map(lambda item: (item[0], 1)) # obtain the candidate itemset

  # Consider pairs of items -> in this case, the output of the function should be modified

  # frequent_items = singletons_filtered.map(lambda item: (item[0])) # retain only frequent items
  # pairs_list = list(combinations(frequent_items.toLocalIterator(),2)) # a list of all the possible frequent items combinations (pairs)

  # Create the support table for the pairs of items by applying the filtering function previously created
  # supp_table_pairs = rdd_baskets.map(lambda x : filtering(x, pairs_list)).filter(lambda x: x is not None) # apply filtering function to check if a pair appears in the baskets

  return (candidate_itemset)

#### **4.2 SON Algorithm**

In [24]:
# Reduce number of partitions from default value (45) to 10.
rdd_baskets = rdd_baskets.coalesce(10)

In [25]:
minSupport = 50
numPartitions = rdd_baskets.getNumPartitions()
adjSupport = minSupport/numPartitions
adjSupport

5.0

##### The Algorithm implementation

In [None]:
%%time
candidates = spark.sparkContext.parallelize([]) # create an empty RDD to merge the results of a-priori applied on each chunk

for i in range(0, numPartitions-1):
  partition = spark.sparkContext.parallelize(rdd_baskets.glom().collect()[i]) # collect each partition by using glom method
  candidate_chunk = a_priori(partition, adjSupport) # call the a-priori function and apply it to the partition  
  candidates = candidates.union(candidate_chunk) # add local frequent itemset to candidates RDD

In [None]:
# Manually select the minimum global support to filter out infrequent items
minTotalSupport = 1

# Apply reduce function to candidates and filter out not frequent items (false negatives)
candidates_sum = candidates.reduceByKey(sum_items) # count frequency of each key
candidates_filtered = candidates_sum.filter(lambda item: item[1] >= minTotalSupport) # filter out items that are below the threshold (not frequent)

### **5. Results**

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

# Display SON algorithm results
result_columns = ["Item","Freq"]
results = candidates_filtered.toDF(result_columns)
results.createOrReplaceTempView("results")
sqlContext.sql("SELECT * FROM results ORDER BY Freq DESC").show(truncate = False)

+--------+----+
|Item    |Freq|
+--------+----+
|guard   |1   |
|family  |1   |
|score   |1   |
|10      |1   |
|seventh |1   |
|head    |1   |
|        |1   |
|scott   |1   |
|step    |1   |
|healthy |1   |
|athletic|1   |
|official|1   |
|put     |1   |
|gross   |1   |
|catholic|1   |
|gop     |1   |
|think   |1   |
|stock   |1   |
|federal |1   |
|return  |1   |
+--------+----+
only showing top 20 rows



In [None]:
# Convert RDD containing the most freq items to dataframe
freq_columns = ["Item", "Freq"]
freq_df = singletons_filtered.toDF(freq_columns)
freq_df.createOrReplaceTempView("freq_df")
sqlContext.sql("SELECT * FROM freq_df ORDER BY Freq DESC").show(5, truncate = False)

+------+----+
|Item  |Freq|
+------+----+
|work  |107 |
|year  |73  |
|school|65  |
|start |61  |
+------+----+



In [None]:
# Retrieve total support for a specific singleton
work_support = sqlContext.sql("SELECT Item, Freq FROM freq_df WHERE Item=='work'")
work_support.show()

+----+----+
|Item|Freq|
+----+----+
|work| 107|
+----+----+



In [None]:
# Note: here the error "unhashable type 'list'" occurs. It refers back to the "filtering" function

# Display frequent pairs and their support 
freq_pairs_columns = ["Pair", "Freq"]
freq_pairs_df = supp_table_pairs_sum.toDF(freq_pairs_columns)
freq_pairs_df.createOrReplaceTempView("freq_pairs_df")
sqlContext.sql("SELECT * FROM freq_pairs_df ORDER BY Freq DESC").show(5, truncate = False)

In [121]:
# Note: the same error occurs here. It is a less efficient function
def find_pairs_supp(rdd_baskets, pairs_list):
  iter_rdd = rdd_baskets.toLocalIterator()
  pairs_supp = []

  for pair in pairs_list:
    count_supp = 0
    for basket in iter_rdd:
      if(set(list(pair))).issubset(set(basket)):
        count_supp += 1
    pairs_supp.append(pair, count_supp)

  return pairs_supp

In [None]:
supp_freq_pairs = find_pairs_supp(rdd_baskets, pairs_list)

In [None]:
# Compute the confidence (example of calculated confidence)
confidence_I1_I2 = round(I1_I2_support / I1, 2)
confidence_I2_I1 = round(I1_I2_support / I2, 2)