# Project 2: Market-basket analysis for the course of Algorithms for Massive Datasets

### Installation of packages and API configuration

In [1]:
!pip install kaggle



In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=d6baf05ea27219e474f9a85862e7937e5a4aee4c543babf066cc32f1c108cada
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
#Upload the kaggle.json file with the API key
from google.colab import files
files.upload()

In [4]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

In [5]:
import pyspark
import kaggle

import time
import itertools
from itertools import combinations

In [6]:
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

### Download and access to the Yelp data

In [7]:
!kaggle datasets download -d yelp-dataset/yelp-dataset -f yelp_academic_dataset_review.json

Downloading yelp_academic_dataset_review.json.zip to /content
100% 2.06G/2.07G [00:14<00:00, 114MB/s]
100% 2.07G/2.07G [00:14<00:00, 152MB/s]


In [8]:
!unzip '/content/yelp_academic_dataset_review.json.zip'

Archive:  /content/yelp_academic_dataset_review.json.zip
  inflating: yelp_academic_dataset_review.json  


In [9]:
#Loading the JSON file in a Spark DataFrame
data = spark.read.json("/content/yelp_academic_dataset_review.json")
data.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

Counting how many rows are in the dataset, about 7 millions of rows

In [10]:
data = data.select("text")
data_count = data.count()

In [11]:
data_count

6990280

### Pre-processing of words in text reviews

Import the libraries for pre-processing

In [12]:
import nltk
#Stopwords to remove from the text
from nltk.corpus import stopwords
nltk.download('stopwords')
stop_words = set(stopwords.words("english"))
#To tokenize words
from nltk.tokenize import word_tokenize
nltk.download('punkt')
#To lematize words
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')
lemmatizer = WordNetLemmatizer()

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


Pre-processing

In [13]:
def data_preprocessing(text):

  #Make text lower and tokenize it if it is an alphabetic word
  text = [word.lower() for word in word_tokenize(text) if word.isalpha()]
  #Remove stopwords
  text = [word for word in text if word.lower() not in stop_words]
  #Lematize words
  text = [lemmatizer.lemmatize(word) for word in text]
  # Remove duplicates from the list
  text = list(set(text))

  return text

In [14]:
#Number of rows for the sample
count_baskets = 1000

In [15]:
#Sample of the dataset, taking a very small fraction of the dataset to test the algorithm
data_rdd = data.rdd.sample(withReplacement=False, fraction=count_baskets/data_count, seed=151)\
           .map(lambda row: data_preprocessing(row.text)).cache()

In [None]:
data_rdd.take(1)

[['entertaining',
  'fun',
  'go',
  'relaxed',
  'place',
  'good',
  'show',
  'always',
  'atmosphere',
  'price',
  'food',
  'fair',
  'time']]

## Apriori Algorithm

Computing Frequent items

In [None]:
#Function to compute the frequent single items
def freq_items(data, support):

  start_time = time.time()

  #Counting how many times a word appears in the list of reviews
  words = data.flatMap(lambda words: words) \
                       .map(lambda word: (word, 1)) \
                       .reduceByKey(lambda a, b: a + b)

  # Filter words with the support threshold
  freq_items = words.filter(lambda word_count: word_count[1] >= support)

  end_time = time.time()
  execution_time = end_time - start_time
  print("Time of execution to compute frequent items: ", execution_time)

  return freq_items

Testing the freq_items function with a sample of the dataset

In [None]:
threshold = 0.05

In [None]:
support_threshold = int(threshold*count_baskets)
support_threshold

50

In [None]:
freq_items_yelp = freq_items(data_rdd, support_threshold)

Time of execution to compute frequent items:  0.01924896240234375


In [None]:
freq_items_yelp.collect()

[('sandwich', 50),
 ('everything', 91),
 ('would', 216),
 ('two', 75),
 ('restaurant', 129),
 ('something', 60),
 ('bar', 70),
 ('wanted', 58),
 ('side', 66),
 ('tried', 63),
 ('around', 71),
 ('since', 67),
 ('get', 210),
 ('also', 172),
 ('even', 132),
 ('got', 142),
 ('pizza', 53),
 ('different', 58),
 ('last', 65),
 ('table', 72),
 ('sauce', 69),
 ('bit', 64),
 ('server', 70),
 ('people', 98),
 ('definitely', 114),
 ('sure', 84),
 ('could', 130),
 ('wait', 88),
 ('find', 58),
 ('favorite', 72),
 ('salad', 57),
 ('ever', 79),
 ('ca', 67),
 ('customer', 73),
 ('many', 60),
 ('thing', 99),
 ('need', 59),
 ('said', 71),
 ('meal', 63),
 ('look', 58),
 ('year', 72),
 ('done', 56),
 ('much', 99),
 ('asked', 57),
 ('eat', 60),
 ('location', 64),
 ('go', 215),
 ('long', 70),
 ('delicious', 131),
 ('first', 112),
 ('price', 110),
 ('make', 115),
 ('told', 56),
 ('nice', 141),
 ('place', 320),
 ('back', 195),
 ('one', 204),
 ('want', 76),
 ('taste', 53),
 ('come', 125),
 ('ordered', 114),
 ('

In [None]:
len(freq_items_yelp.collect())

147

Frequent pairs itemsets k = 2

In [None]:
def freq_pairs_itemsets(data, frequent_items, support):

  start_time = time.time()

  #Taking the items
  items = frequent_items.map(lambda word: word[0])
  print(items.collect())
  items_broadcast = sc.broadcast(items.collect())

  #Counting how many times a couple of words appears
  count_pairs = data.flatMap(lambda words: [word for word in itertools.combinations(words, 2) if set(word).issubset(set(items_broadcast.value)) ]) \
                       .map(lambda word:(word, 1)) \
                       .reduceByKey(lambda a, b: a + b)

  # Filter pairs with the support threshold to consider just the frequent pairs
  freq_pairs = count_pairs.filter(lambda pair_count: pair_count[1] >= support)

  end_time = time.time()
  execution_time = end_time - start_time
  print("Time of execution to compute frequent pairs itemsets: ", execution_time)

  return freq_pairs

Testing the freq_pairs_itemsets function with a sample of the dataset

In [None]:
freq_pairs_itemsets_yelp = freq_pairs_itemsets(data_rdd, freq_items_yelp, support_threshold)

['sandwich', 'everything', 'would', 'two', 'restaurant', 'something', 'bar', 'wanted', 'side', 'tried', 'around', 'since', 'get', 'also', 'even', 'got', 'pizza', 'different', 'last', 'table', 'sauce', 'bit', 'server', 'people', 'definitely', 'sure', 'could', 'wait', 'find', 'favorite', 'salad', 'ever', 'ca', 'customer', 'many', 'thing', 'need', 'said', 'meal', 'look', 'year', 'done', 'much', 'asked', 'eat', 'location', 'go', 'long', 'delicious', 'first', 'price', 'make', 'told', 'nice', 'place', 'back', 'one', 'want', 'taste', 'come', 'ordered', 'beer', 'see', 'let', 'really', 'staff', 'work', 'like', 'feel', 'great', 'still', 'always', 'area', 'day', 'happy', 'well', 'better', 'review', 'away', 'think', 'right', 'new', 'way', 'visit', 'dish', 'friend', 'small', 'good', 'came', 'excellent', 'food', 'cheese', 'experience', 'lot', 'friendly', 'try', 'take', 'home', 'pretty', 'worth', 'another', 'chicken', 'made', 'going', 'best', 'give', 'hot', 'coming', 'flavor', 'clean', 'hour', 'went'

In [None]:
freq_pairs_itemsets_yelp.collect()

[(('food', 'time'), 85),
 (('also', 'service'), 56),
 (('service', 'food'), 52),
 (('like', 'service'), 65),
 (('also', 'great'), 53),
 (('would', 'good'), 72),
 (('food', 'would'), 78),
 (('food', 'staff'), 50),
 (('great', 'food'), 50),
 (('place', 'staff'), 64),
 (('one', 'good'), 81),
 (('place', 'like'), 66),
 (('place', 'service'), 60),
 (('get', 'go'), 62),
 (('service', 'time'), 54),
 (('back', 'good'), 79),
 (('food', 'nice'), 54),
 (('one', 'service'), 68),
 (('friendly', 'food'), 50),
 (('really', 'good'), 69),
 (('good', 'service'), 67),
 (('like', 'good'), 56),
 (('place', 'nice'), 53),
 (('recommend', 'would'), 52),
 (('place', 'good'), 116),
 (('would', 'time'), 58),
 (('back', 'would'), 63),
 (('go', 'good'), 79),
 (('place', 'great'), 88),
 (('little', 'food'), 57),
 (('place', 'also'), 50),
 (('great', 'time'), 98),
 (('one', 'go'), 58),
 (('place', 'time'), 76),
 (('back', 'time'), 76),
 (('food', 'get'), 51),
 (('food', 'good'), 122),
 (('place', 'love'), 53),
 (('f

In [None]:
len(freq_items_yelp.collect())

147

Apriori algorithm implementation
*   I define a function for the apriori algorithm which:
  *   In the first step compute frequent items
  *   Then I generate the k-combinations that are used next to compute the frequent itemsets and to speed up the process I use a broadcast function
  *   At the end I compute the frequent k-itemsets and the necessary time for the execution of the algorithm

In [None]:
def apriori(data, support_threshold):

  #Counting how many times a word appears in the list of reviews
  freq_items = data.flatMap(lambda words: words) \
                       .map(lambda word: (word, 1)) \
                       .reduceByKey(lambda a, b: a + b) \
                       .filter(lambda word_count: word_count[1] >= support_threshold)

  start_time = time.time()
  freq_itemsets = freq_items

  freq_items = freq_items.map(lambda word: word[0])

  k = 2
  while True:

    print(f"Computing {k}-broadcast items")

    items_broadcast = sc.broadcast(freq_items.collect())

    print(f"Computing the {k}-itemsets")

    #Counting how many times a k-combination of words appears
    new_itemsets = data.flatMap(lambda words: [word for word in itertools.combinations(words, k) if set(word).issubset(set(items_broadcast.value)) ]) \
                       .map(lambda word:(word, 1)) \
                       .reduceByKey(lambda a, b: a + b) \
                       .filter(lambda itemsets_count: itemsets_count[1] >= support_threshold)

    # Combine the new itemsets with the existing ones
    if not new_itemsets.isEmpty():
      freq_itemsets = freq_itemsets.union(new_itemsets)

    # Update freq_items for the next iteration
    freq_items = new_itemsets.map(lambda word: word[0])

    #Check if freq_items is empty and if yes, stop the computation
    if freq_items.isEmpty():
      break

    k += 1

  end_time = time.time()
  execution_time = end_time - start_time
  print("Time of execution of the algorithm: ", execution_time)

  return freq_itemsets

Testing the algorithm with a small sample of the Yelp dataset, small for having a computation in short time because for the lack of computational resources and testing different kind of threshold

Threhold 0.03

In [None]:
threshold = 0.03

In [None]:
support_threshold = int(threshold*count_baskets)
support_threshold

30

In [None]:
freq_itemsets_yelp = apriori(data_rdd, support_threshold)

Computing 2-broadcast items
Computing the 2-itemsets
Computing 3-broadcast items
Computing the 3-itemsets
Time of execution of the algorithm:  604.4841439723969


In [None]:
type(freq_itemsets_yelp)

pyspark.rdd.RDD

In [None]:
freq_itemsets_yelp.collect()

[('sandwich', 50),
 ('everything', 91),
 ('would', 216),
 ('two', 75),
 ('restaurant', 129),
 ('overall', 43),
 ('manager', 30),
 ('something', 60),
 ('family', 31),
 ('bar', 70),
 ('free', 37),
 ('getting', 42),
 ('quality', 45),
 (('food', 'time'), 85),
 (('could', 'get'), 48),
 (('love', 'best'), 30),
 (('food', 'went'), 32),
 (('get', 'place'), 33),
 (('one', 'well'), 42),
 (('make', 'go'), 42),
 (('back', 'made'), 40),
 (('back', 'much'), 31),
 (('place', 'day'), 32),
 (('get', 'ordered'), 32),
 ('today', 34),
 ('else', 39),
 ('wanted', 58),
 ('return', 36),
 ('side', 66),
 ('almost', 37),
 (('staff', 'great'), 49),
 (('another', 'would'), 32),
 (('like', 'great'), 43),
 (('also', 'service'), 56),
 (('food', 'try'), 30),
 (('well', 'service'), 45),
 (('place', 'best'), 37),
 (('give', 'time'), 36),
 (('service', 'food'), 52),
 (('restaurant', 'food'), 30),
 (('time', 'sure'), 38),
 ('tried', 63),
 ('around', 71),
 ('tasty', 40),
 ('room', 46),
 (('place', 'little'), 31),
 (('like'

In [None]:
len(freq_itemsets_yelp.collect())

716

Threshold 0.05

In [None]:
threshold = 0.05

In [None]:
support_threshold = int(threshold*count_baskets)
support_threshold

50

In [None]:
freq_itemsets_yelp = apriori(data_rdd, support_threshold)

Computing 2-broadcast items
Computing the 2-itemsets
Computing 3-broadcast items
Computing the 3-itemsets
Time of execution of the algorithm:  163.11374759674072


In [None]:
freq_itemsets_yelp.collect()

[('sandwich', 50),
 ('everything', 91),
 ('would', 216),
 ('two', 75),
 ('restaurant', 129),
 ('something', 60),
 ('bar', 70),
 (('food', 'time'), 85),
 ('wanted', 58),
 ('side', 66),
 (('also', 'service'), 56),
 (('service', 'food'), 52),
 ('tried', 63),
 ('around', 71),
 (('like', 'service'), 65),
 (('also', 'great'), 53),
 (('would', 'good'), 72),
 ('since', 67),
 ('get', 210),
 ('also', 172),
 (('food', 'would'), 78),
 (('food', 'staff'), 50),
 ('even', 132),
 ('got', 142),
 ('pizza', 53),
 ('different', 58),
 ('last', 65),
 (('great', 'food'), 50),
 ('table', 72),
 ('sauce', 69),
 ('bit', 64),
 ('server', 70),
 ('people', 98),
 (('place', 'staff'), 64),
 (('one', 'good'), 81),
 (('place', 'like'), 66),
 (('place', 'service'), 60),
 (('get', 'go'), 62),
 (('service', 'time'), 54),
 (('back', 'good'), 79),
 (('food', 'nice'), 54),
 (('one', 'service'), 68),
 ('definitely', 114),
 ('sure', 84),
 ('could', 130),
 ('wait', 88),
 ('find', 58),
 (('friendly', 'food'), 50),
 ('favorite', 

In [None]:
len(freq_itemsets_yelp.collect())

236

Threshold 0.07

In [None]:
threshold = 0.07

In [None]:
support_threshold = int(threshold*count_baskets)
support_threshold

70

In [None]:
freq_itemsets_yelp = apriori(data_rdd, support_threshold)

Computing 2-broadcast items
Computing the 2-itemsets
Computing 3-broadcast items
Computing the 3-itemsets
Time of execution of the algorithm:  100.03914141654968


In [None]:
freq_itemsets_yelp.collect()

[('everything', 91),
 ('would', 216),
 ('two', 75),
 ('restaurant', 129),
 ('bar', 70),
 (('food', 'time'), 85),
 ('around', 71),
 (('would', 'good'), 72),
 ('get', 210),
 ('also', 172),
 (('food', 'would'), 78),
 ('even', 132),
 ('got', 142),
 ('table', 72),
 ('server', 70),
 ('people', 98),
 (('one', 'good'), 81),
 (('back', 'good'), 79),
 ('definitely', 114),
 ('sure', 84),
 ('could', 130),
 ('wait', 88),
 ('favorite', 72),
 ('ever', 79),
 ('customer', 73),
 ('thing', 99),
 ('said', 71),
 ('year', 72),
 ('much', 99),
 ('go', 215),
 ('long', 70),
 ('delicious', 131),
 ('first', 112),
 (('place', 'good'), 116),
 ('price', 110),
 ('make', 115),
 ('nice', 141),
 (('go', 'good'), 79),
 (('place', 'great'), 88),
 ('place', 320),
 ('back', 195),
 ('one', 204),
 ('want', 76),
 ('come', 125),
 ('ordered', 114),
 ('really', 141),
 ('staff', 150),
 ('like', 227),
 (('great', 'time'), 98),
 ('great', 318),
 ('always', 121),
 (('place', 'time'), 76),
 (('back', 'time'), 76),
 ('day', 104),
 ('we

In [None]:
len(freq_itemsets_yelp.collect())

113

Results of the experiments with a sample of the dataset

In [None]:
results = [
    Row(threshold=0.03, computationTime= '604s', itemsets=716),
    Row(threshold=0.05, computationTime= '163s', itemsets=236),
    Row(threshold=0.07, computationTime= '100s', itemsets=113),
]

data_results = spark.createDataFrame(results)

data_results.show()

+---------+---------------+--------+
|threshold|computationTime|itemsets|
+---------+---------------+--------+
|     0.03|           604s|     716|
|     0.05|           163s|     236|
|     0.07|           100s|     113|
+---------+---------------+--------+



##FPGrowth Algorithm

The FPGrowth model is a machine learning model made available by pySpark to compute frequent itemsets in large datasets

In [29]:
from pyspark.mllib.fpm import FPGrowth

start_time = time.time()

model = FPGrowth.train(data_rdd, minSupport=0.1)

end_time = time.time()
print("Time of execution of the algorithm: ", end_time - start_time)

Time of execution of the algorithm:  1.3905205726623535


In [30]:
itemsets = model.freqItemsets().collect()

In [31]:
for itemset in itemsets:
  print(itemset.items, itemset.freq)

['menu'] 104
['food'] 336
['day'] 104
['place'] 320
['place', 'food'] 135
['great'] 318
['great', 'place'] 117
['great', 'food'] 132
['amazing'] 100
['much'] 99
['good'] 301
['good', 'great'] 98
['good', 'place'] 116
['good', 'food'] 142
['experience'] 99
['time'] 288
['time', 'great'] 100
['time', 'place'] 106
['time', 'good'] 105
['time', 'food'] 121
['service'] 257
['service', 'great'] 114
['service', 'food'] 122
['thing'] 99
['people'] 98
['like'] 227
['like', 'place'] 101
['would'] 216
['go'] 215
['get'] 210
['one'] 204
['back'] 195
['also'] 172
['staff'] 150
['best'] 149
['got'] 142
['really'] 141
['nice'] 141
['friendly'] 134
['even'] 132
['delicious'] 131
['could'] 130
['restaurant'] 129
['love'] 126
['come'] 125
['made'] 122
['always'] 121
['went'] 120
['well'] 116
['try'] 116
['make'] 115
['definitely'] 114
['u'] 114
['ordered'] 114
['order'] 112
['first'] 112
['price'] 110
['little'] 109
['never'] 107
['recommend'] 105


In [32]:
len(itemsets)

60