<a href="https://colab.research.google.com/github/sofia-sfx/AMD_Project/blob/main/AMD_Project_Sofia_Introzzi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Apriori implementation for Market Basket Analysis

# Packages

In [5]:
!pip install -q pyspark

[K     |████████████████████████████████| 281.4 MB 43 kB/s 
[K     |████████████████████████████████| 199 kB 34.8 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [6]:
!pip install -q kaggle

In [7]:
import os 

In [8]:
os.environ["KAGGLE_USERNAME"] = "sofiaintrozzi"
os.environ["KAGGLE_KEY"] = "b4c95dfeaa91c11c57a97eb854a2cf0b"

# Dataset import

In [9]:
import pandas as pd 
import numpy as np
import csv

In [10]:
!kaggle datasets download -d bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows --unzip

Downloading ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip to /content
100% 13.2G/13.2G [01:09<00:00, 252MB/s]
100% 13.2G/13.2G [01:09<00:00, 203MB/s]


In [11]:
pd.set_option("display.max_columns", 10)
df = pd.read_csv('/content/0928_UkraineCombinedTweetsDeduped.csv.gzip', compression='gzip', index_col=0, encoding='utf-8', quoting=csv.QUOTE_ALL)

In [12]:
data = df[df['language']=='de']['text']

10976

# Spark Session and RDD

In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
spark

In [14]:
rdd = sc.parallelize(data)
rdd.getNumPartitions()

2

In [15]:
rdd.take(2)

['@KonstantinNotz @JanaSchimke Geht die Frage auch an den ukrainischen Präsidenten oder kommt da die berühmte Grüne Doppelmoral zum Tragen? #Ukraine #Italien #Meloni https://t.co/cqgL1JJOZc',
 '@Armin__Hermann @LViehler Danach beißen sie die #Nordstream Rohre auf… https://t.co/USNocQ2dr4']

# Pre-processing  

In [16]:
from itertools import combinations

import string
puncts = set(string.punctuation)
import nltk
nltk.download("stopwords")
from nltk.corpus import stopwords
stopwords = stopwords.words("german")
from nltk.tokenize import word_tokenize
nltk.download("punkt")
nltk.download("omw-1.4")
from nltk.stem import WordNetLemmatizer
nltk.download("wordnet") 
lemmatizer = WordNetLemmatizer()

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data] Downloading package wordnet to /root/nltk_data...


In [17]:
def pre_process(tweets):
  text = nltk.word_tokenize(tweets, language = 'german')
  text = [w for w in text if not w in stopwords] 
  text = [lemmatizer.lemmatize(word) for word in text] 
  text = " ".join([w for w in text if w not in puncts]) 
  text = text.lower()
  text = text.split()
  return text

In [18]:
preprocessed_rdd = rdd.map(pre_process)

# Algorithm Implementation

## First  

In [19]:
plain_rdd = (preprocessed_rdd.flatMap(lambda tw: tw)
                                  .map(lambda w: (w, 1))
                                  .reduceByKey(lambda w1, w2: 1)
                                  .map(lambda x: x[0])
                                  .collect())

In [20]:
dizionario = dict(zip(plain_rdd, range(len(plain_rdd))))

In [21]:
def convert(testo):
  listaint = [None] * len(testo)
  
  for index in range(len(testo)):
    word = testo[index]
    for k, v in dizionario.items(): 
      if k == word:
        listaint[index] = v
 
  return listaint 

In [22]:
rdd_int = preprocessed_rdd.flatMap(convert)

In [23]:
count_singletons = (rdd_int.map(lambda index: (index, 1))
                           .reduceByKey(lambda x1, x2: x1+x2))

## Between the passes

In [24]:
s = len(plain_rdd) * 0.01

In [25]:
frequent_word2int = count_singletons.filter(lambda x: x[1] >= s)

## Second 

In [72]:
def count_freq(rdd):
  return (rdd.flatMap(lambda w: w)
             .map(lambda w: (w, 1))
             .reduceByKey(lambda x1, x2: x1+x2)
             .filter(lambda x: x[1] >= s))

In [67]:
frequent_singletons = count_freq(preprocessed_rdd).map(lambda x: x[0]).collect() 

In [29]:
def scan(basket):
  basket = [w for w in basket if w in frequent_singletons]
  return basket

In [153]:
rdd_int_basket = (preprocessed_rdd.map(scan)
                                 .map(convert))

In [31]:
rdd_unique_in_basket = rdd_int_basket.map(lambda x: tuple(np.unique(x)))

## Itemsets - Pairs 

In [148]:
def generate_set2(basket, r=2):
  basket = [comb for comb in combinations(basket, r)]
  return basket

In [149]:
count_pairs = count_freq(rdd_unique_in_basket.map(generate_set2)).sortBy(lambda x: -x[1])

In [151]:
count_pairs.count()

32

## Itemsets - Triples 

In [145]:
def generate_set3(basket, r=3):
  basket = [comb for comb in combinations(basket, r)]
  return basket

In [146]:
count_triples = count_freq(rdd_unique_in_basket.map(generate_set3)).sortBy(lambda x: -x[1])

In [134]:
count_triples.count()

3

# Measure Confidence and Interest

In [98]:
single_supp = {(k):v for k,v in count_singletons.collect()}
double_supp = {(k):v for k,v in count_pairs.collect()}
triple_supp = {(k):v for k,v in count_triples.collect()}
triple_supp

{(20, 17953, 17994): 591, (86, 88, 17953): 440, (20, 86, 88): 437}

In [112]:
def make_confidence_interest(target):
  if target in triple_supp.keys():
    supp_IuJ = triple_supp[target]
    if target[:-1] in double_supp.keys():
      supp_I = double_supp[target[:-1]]
      confidence = supp_IuJ/supp_I
      if target[2] in single_supp.keys():
        supp_j = single_supp[target[2]]
      interest = confidence - supp_j/len(data)
      return confidence, interest

In [100]:
target = (20, 17953, 17994)
make_confidence_interest(target)

(0.3265193370165746, 0.1730025731681781)

In [101]:
target = (86, 88, 17953)
make_confidence_interest(target)

(0.45267489711934156, -0.006782099965206567)