<a href="https://colab.research.google.com/github/ranieri-unimi/ukraine-malchiodi-2022/blob/main/ukraine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### run once

In [None]:
!pip install pyspark
!pip install findspark

In [None]:
import os
os.environ["KAGGLE_USERNAME"] = 'ranieriunimi'
os.environ["KAGGLE_KEY"] = str(hex(232307088475198570779809482024044346960))[2:]

In [None]:
ref = 'bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows'
!mkdir datasets
!kaggle datasets download $ref --unzip -p ./datasets

In [None]:
import nltk
nltk.download('all');

## ukraine

In [None]:
SAMPLE_SIZE = 768

In [None]:
import numpy as np
import pandas as pd
import csv
import re
import string
import random

In [None]:
import pyspark
import findspark

## data cleaning

In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer

In [None]:
# https://www.pluralsight.com/guides/building-a-twitter-sentiment-analysis-in-python

def preprocess_tweet_text(tweet):
    index, tweet = tweet

    tweet.lower()

    # cleanings 🧹

    # urls
    tweet = re.sub(r"http\S+|www\S+|https\S+", '', tweet, flags=re.MULTILINE)

    # @ and #
    tweet = re.sub(r'\@\w+|\#','', tweet)

    # punctuations
    # tweet = tweet.translate(str.maketrans('', '', string.punctuation))
    tweet = tweet.translate(str.maketrans(string.punctuation+'…’”“', ' '*(len(string.punctuation)+4)))  # puntctuation to spaces


    tweet_tokens = word_tokenize(tweet)

    # emojitter
    wrds = [e for word in tweet_tokens for e in re.findall(r"(\w+|[^\w ]+)", word)]
    # TODO split also emoji-goups

    # stopwords
    filtered_words = [w for w in wrds if not w in set(stopwords.words('english'))]
    
    # stemmatize
    ps = PorterStemmer()
    stemmed_words = [ps.stem(w) for w in filtered_words]

    # lemmatize
    lemmatizer = WordNetLemmatizer()
    lemma_words = [str(lemmatizer.lemmatize(w, pos='a')) for w in stemmed_words]
  
    return (index, lemma_words)

In [None]:
# load dataset 
filename = r"./datasets/UkraineCombinedTweetsDeduped20220227-131611.csv.gzip"
pd.set_option("display.max_columns", None)
df = pd.read_csv(filename, compression='gzip', index_col=0, encoding='utf-8', quoting=csv.QUOTE_ALL)

  exec(code_obj, self.user_global_ns, self.user_ns)


In [None]:
#lang_hist = {l:df[df.language == l].size for l in df.language.unique()}

In [None]:
datalist = df[df.language == 'en'].text.tolist()

In [None]:
if SAMPLE_SIZE:
    datalist = random.sample(datalist, SAMPLE_SIZE)

## hadoop instance

In [None]:
# import findspark
# findspark.init("spark-3.1.1-bin-hadoop3.2") # SPARK_HOME
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
rdd = sc.parallelize(enumerate(datalist))

In [None]:
dataPipe = rdd.map(preprocess_tweet_text)

In [None]:
# indexing = dataset.flatMap(lambda _, v : [(e,e) for e in v]).reduceByKey(lambda k, v : k)

### foo's

In [None]:
def is_sub(sub, lst) : return all(e in lst for e in sub)
def add(a, b) : return a+b
def splat(t): return tuple(sorted(list(j for i in t for j in (i if isinstance(i, tuple) else (i,)))))
def doubled(t): return len(set(t)) == len(t)

In [None]:
# nen_rdd = sc.parallelize(datalist)

In [None]:
# elemListPipe = nen_rdd.flatMap(lambda word_list : word_list).distinct()
# elem_index = sc.parallelize(enumerate(elemListPipe.collect()))

## Apriori steps

### 1ne

In [None]:
PCENT = .01

In [None]:
candidateOne = dataPipe.flatMap(lambda x: x[-1]).distinct().collect()

In [None]:
countOnePipe = dataPipe.flatMap(lambda x: x[-1]).map(lambda x: (x,1)).reduceByKey(add)
countOnePipe.take(5)

[('satellit', 3), ('friday', 7), ('sever', 8), ('larg', 2), ('ground', 4)]

In [None]:
THRESHOLD = countOnePipe.map(lambda x : x[-1]).reduce(lambda a,b : a+b) * PCENT

In [None]:
# filter non frequent
frequentOnePipe = countOnePipe.filter(lambda x: x[-1] > THRESHOLD).map(lambda x: (1, x[0]))
frequentOnePipe.take(5)

[(1, 'ukrainian'), (1, 'putin'), (1, 'ukrain'), (1, 'russian'), (1, 'the')]

### 2wo

In [None]:
# generate candidate pairs
candidateTwoPipe = frequentOnePipe.join(frequentOnePipe).map(lambda x : x[-1]).map(splat).distinct().filter(doubled)
candidateTwoPipe.take(5)

[('the', 'ukrain'),
 ('russia', 'russian'),
 ('russian', 'war'),
 ('russia', 'war'),
 ('ukrain', 'ukrainian')]

In [None]:
# count pair frequency
candidateTwo = candidateTwoPipe.collect() # pair in MEM

countTwoPipe = dataPipe.map(lambda x : [(pair, is_sub(pair, x[-1])) for pair in candidateTwo] ).flatMap(lambda x : x).reduceByKey(add)
countTwoPipe.take(5)

[(('the', 'ukrain'), 111),
 (('russia', 'russian'), 108),
 (('russian', 'war'), 35),
 (('russia', 'war'), 61),
 (('putin', 'ukrainian'), 22)]

In [None]:
#THRESHOLD = countTwoPipe.map(lambda x : x[-1]).reduce(lambda a,b : a+b) * PCENT

In [None]:
# filter non frequent
frequentTwoPipe = countTwoPipe.filter(lambda x: x[-1] > THRESHOLD).map(lambda x: (1, x[0]))
frequentTwoPipe.take(5)

[(1, ('russian', 'ukrain')), (1, ('russia', 'ukrain'))]

### 3hree

In [None]:
# generate candidate triples
candidateThreePipe = frequentTwoPipe.join(frequentOnePipe).map(lambda x : x[-1]).map(splat).distinct().filter(doubled)
candidateThreePipe.take(5)

[('russian', 'the', 'ukrain'),
 ('russia', 'the', 'ukrain'),
 ('russian', 'ukrain', 'ukrainian'),
 ('putin', 'russian', 'ukrain'),
 ('russia', 'ukrain', 'ukrainian')]

In [None]:
# count three frequency
candidateThree = candidateThreePipe.collect() # three in MEM

countThreePipe = dataPipe.map(lambda x : [(pair, is_sub(pair, x[-1])) for pair in candidateThree] ).flatMap(lambda x : x).reduceByKey(add)
countThreePipe.take(5)

[(('russian', 'the', 'ukrain'), 57),
 (('russia', 'the', 'ukrain'), 60),
 (('russia', 'russian', 'ukrain'), 99),
 (('russian', 'ukrain', 'war'), 31),
 (('russia', 'ukrain', 'war'), 56)]

In [None]:
# filter non frequent
frequentThreePipe = countThreePipe.filter(lambda x: x[-1] > THRESHOLD).map(lambda x: (1, x[0]))
frequentThreePipe.take(5)

[]

## let's generalize it

In [None]:
def aPriori(data, THRESHOLD = .1):
  THRESHOLD *= data.count()

  frequent_items_pipe = (data
                    .flatMap(lambda x: x[-1])
                    .map(lambda x: (x,1))
                    .reduceByKey(add)
                    .filter(lambda x: x[-1] > THRESHOLD)
                    )

  frequent_items = frequent_items_pipe.collect()
  frequent_items_pipe = frequent_items_pipe.map(lambda x: (1, x[0]))

  frequent_itemsets = frequent_items
  frequent_itemsets_pipe = frequent_items_pipe

  while len(frequent_itemsets):
    yield frequent_itemsets
    ### COUNTING PHASE
    candidate_itemsets_pipe = (frequent_itemsets_pipe
                          .join(frequent_items_pipe)
                          .map(lambda x : x[-1])
                          .map(splat)
                          .distinct()
                          .filter(doubled)
                          )
    
    candidate_itemsets = candidate_itemsets_pipe.collect()

    ### FILTER PHASE
    frequent_itemsets_pipe = (data
                         .map(lambda x : [(pair, is_sub(pair, x[-1])) for pair in candidate_itemsets] )
                         .flatMap(lambda x : x)
                         .reduceByKey(add)
                         .filter(lambda x: x[-1] > THRESHOLD)
                         )
    
    frequent_itemsets = frequent_itemsets_pipe.collect()
    frequent_itemsets_pipe = frequent_itemsets_pipe.map(lambda x: (1, x[0]))

In [None]:
fgen = aPriori(dataPipe, 0.04)

singletone_support = {(k,):v for k,v in next(fgen)}
itemsets_support = dict([e for l in fgen for e in l])

itemsets_support = {**singletone_support, **itemsets_support}

In [None]:
len(itemsets_support)

195

## measures 

In [None]:
confidence = dict()
lift = dict()

for xy in itemsets_support:
  if len(xy) > 1:
    for y in xy:
      x = list(xy)
      x.remove(y)
      x = tuple(x)

      confidence[x,y] = itemsets_support[xy] / itemsets_support[x]
      lift[x,y] = (itemsets_support[xy])/(itemsets_support[(y, )]*itemsets_support[x])

In [None]:
confidence[('russia', 'ukrain'), 'war']

0.19718309859154928

In [None]:
lift[('russia', 'ukrain'), 'war']

0.0014185834431046712