# Mahtab Nejati 98209434

# Problem 4

# NOTICE:
## Rename the dataset file to "q4.csv" before execution.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
MAX_MEMORY = "8g"
spark = SparkSession.builder.appName('App Name').master("local[*]").config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY).getOrCreate()
sqlc = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

In [2]:
import nltk
from nltk import pos_tag
from nltk import word_tokenize
from nltk.corpus import stopwords,wordnet
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer

nltk.download("omw")
nltk.download("punkt")
nltk.download("stopwords")
nltk.download("averaged_perceptron_tagger")

stop_words = set(stopwords.words("english"))
lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()


## Workaround for the issue I've talked about with Mr. Rahimian
while(True):
    try:
        sc.broadcast(wordnet)
        break
    except:
        pass

[nltk_data] Downloading package omw to /home/mahtab/nltk_data...
[nltk_data]   Package omw is already up-to-date!
[nltk_data] Downloading package punkt to /home/mahtab/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/mahtab/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/mahtab/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [3]:
import json
import pickle
import numpy as np
import pandas as pd
from collections import OrderedDict
from operator import itemgetter
from itertools import combinations
from functools import reduce

In [4]:
import sklearn.metrics

# Reading in the data

In [5]:
df = sqlc.read.option('header','true').csv('q4.csv')
df = df.drop(*['id'])
df = df.toDF('id','text','emotion')
emotions = df.select('emotion').dropDuplicates().collect()
emotions = sorted([emotions[i]['emotion'] for i in range(len(emotions))])
dataset_size = df.count()

# df.head(10)
dataset_size

300000

# Part A: Preprocessing the text
### Since stemming totally distorted the words and caused numerous spelling errors and POS distortion, I'v chosen not to apply it and only use lemmatization with appropriate word POS.
### You can un-comment the stemming to see the results with stemming

In [6]:
def getWordnetPOS(word):
    tag = nltk.pos_tag([word])[0][1][0].upper()
    tag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}
    return tag_dict.get(tag, wordnet.NOUN)

def uniqueList(seq):
    seen = set()
    seen_add = seen.add
    return [x for x in seq if not (x in seen or seen_add(x))]

def preprocessText(row):
    text = row['text']
    
    # LowerCase
    sent = text.lower()
    
    # Remove all digits
    sent = ''.join([i for i in sent if not i.isdigit()])
    
    # Get list of words
    words = nltk.word_tokenize(sent)
    
    result = []
    
    for word in words:
        
        
        # Remove all words with length less than 2
        if not len(word) > 1:
            continue
        
        # Remove all punctuations
        if not word.isalnum():
            continue
        
        # Lemmatizing the word
        new_word = lemmatizer.lemmatize(word,getWordnetPOS(word))
        
        # Stemming the word
#         new_word = stemmer.stem(new_word)
        
        # Remove all stop words
        if new_word in stop_words:
            continue
        
        # Remove all words with length less than 2 agian
        if not len(word) > 1:
            continue
        
        result.append(new_word)
    
    result = uniqueList(result)
        
    return row['id'],result,row['emotion'],len(result)



rdd = df.rdd.map(lambda row: (preprocessText(row)))

# rdd.take(10)

# Part B: Saving distinct words to a json file

# NOTICE:
### Runing the cell below will take quite some time to run.
### I have pickled the results into the file 'HW2_P04_MahtabNejti_98209434_Pickled_Preproc' for further use. 
### Skip the following cell and execute the next one to load data in or uncomment the code in the cell and execute.

In [7]:
# baskets = rdd.collect()
# with open('HW2_P04_MahtabNejti_98209434_Pickled_Preproc','wb') as f:
#     pickle.dump(baskets,f)
# rdd = sc.parallelize(baskets).map(lambda row: row[1:])

In [8]:
with open('HW2_P04_MahtabNejti_98209434_Pickled_Preproc','rb') as f:
    baskets = pickle.load(f)
rdd = sc.parallelize(baskets).map(lambda row: row[1:])
# rdd.first()

In [9]:
max_length = rdd.map(lambda row: row[-1]).max()
# max_length

In [10]:
# words = rdd.flatMap(lambda row: row[0])

# distinct_words_df = words.map(lambda row: (row,)).toDF(['word']).dropDuplicates()
# distinct_words_df = distinct_words_df.withColumn("id", 1+monotonically_increasing_id())
# windowSpec = Window.orderBy("id")
# distinct_words_df = distinct_words_df.withColumn("id", row_number().over(windowSpec))
# json_rdd = distinct_words_df.rdd.map(lambda row: {row[0]:row[1]})
# word_dict = json_rdd.reduce(lambda a,b: {**a,**b})
# with open('HW2_P04_MahtabNejti_98209434_ByWords.json', 'w') as f:
#     json.dump(word_dict, f)

### I also choose to store the distinct words as a reverse dictionary for furthur use

In [11]:
# id_dict = {v:k for k,v in word_dict.items()}
# with open('HW2_P04_MahtabNejti_98209434_ByIDs.json', 'w') as f:
#     json.dump(id_dict, f)

# Part C: How big of a data can I load into my system's memory
### Answers in pdf file.

In [12]:
# len(word_dict)

# Part D: SON Algorithm

## Preparing for count

In [13]:
def toCount(row,emotion_list):
    counts = np.zeros(len(emotion_list)+1,dtype=int)
    counts[-1] = 1
    counts[emotions.index(row[1])] = 1
    newRow = ([tuple([x]) for x in row[0]],counts)
    return newRow

countable = rdd.map(lambda row: toCount(row,emotions))

## Getting itemsets

In [14]:
def getCandidateItemsets(part,size,prevFreq):
    new_baskets = []
    for basket in part:
        candidates = []
        basket_list = [x[0] for x in basket[0]]
        itemset_list = [tuple(sorted(x)) for x in combinations(basket_list,size)]
        for itemset in itemset_list:
            if size == 2:
                subsets = [(x,) for x in itemset]
            else:
                subsets = [tuple(sorted(x)) for x in combinations(itemset,size-1)]
            if all(x in prevFreq for x in subsets):
                candidates.append(itemset)
        new_baskets.append((candidates,basket[1].copy()))
    return new_baskets

## Counting itemsets

In [15]:
def getFrequentItemsets(partition,size,s,prevFreq=[]):
    part = list(partition)
    support = len(part)*s
    counts = {}
    if size > 1:
        part = getCandidateItemsets(part,size,prevFreq)
    for basket in part:
        for itemset in basket[0]:
            try:
                counts[itemset] += basket[1].copy()
            except KeyError:
                counts[itemset] = basket[1].copy()
    freqItemset = {key:value for key,value in counts.items() if value[-1] >= support}
    return (freqItemset,)

## Making sure results are actually frequent

In [16]:
def getSupportedItemset(row,size,supported):
    allSupported = []
    basket = [x[0] for x in row[0]]
    if size == 1:
        itemset_list = [x for x in row[0]]
    else:
        itemset_list = [tuple(sorted(x)) for x in combinations(basket,size)]
    for itemset in itemset_list:
        if itemset in supported:
            allSupported.append((itemset,row[1].copy()))
    return allSupported
    

def countSupportedItemsets(rdd,size,supported):
    data = rdd.flatMap(
        lambda row: getSupportedItemset(row,size,supported)).reduceByKey(
        lambda a,b : np.array(a.copy()+b.copy(),dtype=int))
    return dict(data.collect())
    return data.collect()

## SON

In [17]:
def son(rdd,s,dataset_size,max_size):
    support = s*dataset_size
    freqItemsets = {}
    for i in range(1,max_size+1):
        try:
            prevFreq = list(freqItemsets['size_'+str(i-1)].keys())
        except KeyError:
            prevFreq = []
        allSupportedFrequents = rdd.mapPartitions(lambda part: getFrequentItemsets(part,i,s*0.9,prevFreq)).collect()
        supportedFrequents = reduce(
            lambda a,b : dict.fromkeys(set(a.keys()).union(set(b.keys())),0),allSupportedFrequents)
        supportedFrequents = countSupportedItemsets(rdd,i,list(supportedFrequents.keys()))
        freqItemsets['size_'+str(i)] = { k:v for k,v in supportedFrequents.items() if v[-1] >= support}
    return freqItemsets

# Run the algorithm

In [18]:
results = son(countable,0.05,dataset_size,2)

In [19]:
data = []
for k,v in results['size_1'].items():
    entry = {}
    entry['size'] = 1
    entry['itemset'] = k
    entry['anger'] = v[0]
    entry['fear'] = v[1]
    entry['joy'] = v[2]
    entry['love'] = v[3]
    entry['sadness'] = v[4]
    entry['surprise'] = v[5]
    entry['total'] = v[6]
    data.append(entry)
for k,v in results['size_2'].items():
    entry = {}
    entry['size'] = 2
    entry['itemset'] = k
    entry['anger'] = v[0]
    entry['fear'] = v[1]
    entry['joy'] = v[2]
    entry['love'] = v[3]
    entry['sadness'] = v[4]
    entry['surprise'] = v[5]
    entry['total'] = v[6]
    data.append(entry)
freqItemsets = pd.DataFrame(data)
freqItemsets

Unnamed: 0,size,itemset,anger,fear,joy,love,sadness,surprise,total
0,1,"(get,)",3759,2494,7482,1608,6004,768,22115
1,1,"(feel,)",39099,33312,99177,24073,85257,10616,291534
2,1,"(im,)",5912,4784,12928,2963,11228,1232,39047
3,1,"(really,)",2475,1870,5738,1491,5012,661,17247
4,1,"(like,)",6998,4268,16984,5715,14888,1614,50467
5,1,"(go,)",2631,2636,6243,1381,5576,695,19162
6,1,"(time,)",2425,2014,5606,1372,5116,682,17215
7,1,"(make,)",2287,2177,7831,1639,5805,576,20315
8,1,"(know,)",2502,2317,5514,1552,5452,700,18037
9,2,"(feel, make)",2175,2144,7720,1614,5714,570,19937


# Part E: How to compute the probability P(feeling|w1 w2 ... wn)
## Answers in pdf file.

# Part F: Calculate the emotion of the sentences.

In [20]:
emotions_counted = dict(rdd.map(lambda row : (row[1],1)).reduceByKey(lambda a,b: a+b).collect())
words_counted = countable.flatMap(
    lambda row: [(a[0],row[1]) for a in row[0]]).reduceByKey(
    lambda a,b : a+b).map(
    lambda row: {'word':row[0],
                 emotions[0]:row[1][0],
                 emotions[1]:row[1][1],
                 emotions[2]:row[1][2],
                 emotions[3]:row[1][3],
                 emotions[4]:row[1][4],
                 emotions[5]:row[1][5],
                 'total':row[1][6]}).collect()
words_df = pd.DataFrame(words_counted)
emotions_counted

{'sadness': 87362,
 'love': 24775,
 'anger': 41315,
 'fear': 34381,
 'joy': 101419,
 'surprise': 10748}

In [21]:
words_df

Unnamed: 0,word,anger,fear,joy,love,sadness,surprise,total
0,feel,39099,33312,99177,24073,85257,10616,291534
1,job,243,226,719,124,543,55,1910
2,position,70,72,180,43,119,14,498
3,happen,323,386,684,151,762,141,2447
4,im,5912,4784,12928,2963,11228,1232,39047
...,...,...,...,...,...,...,...,...
52163,rikki,0,0,1,0,0,0,1
52164,conveyer,0,0,1,0,0,0,1
52165,bebo,0,0,0,0,1,0,1
52166,frugalistas,0,0,1,0,0,0,1


In [22]:
words_prob = words_df.copy()
s = words_df['total'].sum()
words_prob['total'] /= s
emotions_prob = emotions_counted.copy()
for em in emotions:
    s = words_df[em].sum()
    words_prob[em]/=s
    emotions_prob[em]/=300000
emotions_prob

{'sadness': 0.29120666666666667,
 'love': 0.08258333333333333,
 'anger': 0.13771666666666665,
 'fear': 0.11460333333333333,
 'joy': 0.3380633333333333,
 'surprise': 0.035826666666666666}

In [23]:
words_prob

Unnamed: 0,word,anger,fear,joy,love,sadness,surprise,total
0,feel,0.107122,0.109316,0.107256,0.101133,0.112475,0.107420,1.084063e-01
1,job,0.000666,0.000742,0.000778,0.000521,0.000716,0.000557,7.102294e-04
2,position,0.000192,0.000236,0.000195,0.000181,0.000157,0.000142,1.851802e-04
3,happen,0.000885,0.001267,0.000740,0.000634,0.001005,0.001427,9.099117e-04
4,im,0.016197,0.015699,0.013981,0.012448,0.014812,0.012466,1.451954e-02
...,...,...,...,...,...,...,...,...
52163,rikki,0.000000,0.000000,0.000001,0.000000,0.000000,0.000000,3.718478e-07
52164,conveyer,0.000000,0.000000,0.000001,0.000000,0.000000,0.000000,3.718478e-07
52165,bebo,0.000000,0.000000,0.000000,0.000000,0.000001,0.000000,3.718478e-07
52166,frugalistas,0.000000,0.000000,0.000001,0.000000,0.000000,0.000000,3.718478e-07


In [24]:
def getIDEm(row):
    results = []
    for e in emotions:
        results.append((row[0],e,row[1]))
    return results

In [25]:
def getIDEmWord(row):
    results = []
    for w in row[-1]:
        results.append(((row[0],row[1]),list(words_prob.loc[words_prob.word == w][row[1]])[0]))
    return results

In [26]:
def getEm(a,b):
    if a[1]>=b[1]:
        return a
    else:
        return b

In [27]:
def predictEmotion(rdd):
    id_em = rdd.flatMap(getIDEm).flatMap(getIDEmWord).reduceByKey(
        lambda a,b : a*b).map(
        lambda row: (row[0][0],(row[0][1],row[1]*emotions_prob[row[0][1]]))).reduceByKey(getEm).map(
        lambda row: {'id':row[0],'prediction':row[1][0]})
    return id_em

# Notice:
## Executing the following cell will take almost an entire day (Took me about 7-8 hours). I have pickled the results into file "HW2_P04_MahtabNejti_98209434_Pickled_Predict". Skip this cell and load the results from the next cell.
## If you want to run the algorithm, make sure you reduce the dataset size by limiting the baskets list.
## I could have dropped a set of words with too little standard deviation in their distribution among feelings or used other dimenseionality reduction techniques to reduce the execution time. But I didn't have enough time to do so.

In [28]:
# ready = sc.parallelize(baskets)
# predictions = predictEmotion(ready).collect()
# with open('HW2_P04_MahtabNejti_98209434_Pickled_Predict','wb') as f:
#     pickle.dump(predictions,f)
# predictions_df = sqlc.read.json(sc.parallelize(predictions))

In [None]:
with open('HW2_P04_MahtabNejti_98209434_Pickled_Predict','rb') as f:
    predictions = pickle.load(f)
predictions_df = sqlc.read.json(sc.parallelize(predictions))

In [None]:
data = df.alias('data')
pred = predictions_df.alias('pred')

final = data.join(pred,data.id == pred.id).select(data['*'],pred['prediction']).toPandas()
final.to_csv("HW2_P04_MahtabNejti_98209434.csv", header=True,index=False)
final

# Lost some rows! Why?
### As shown below, amotion of sentences with too short and incignificant words (i.e. non-informative sentences) was not predicted.

In [None]:
successful = data.join(pred,data.id == pred.id).select(data['*'])
lost = data.exceptAll(successful).toPandas()
lost

# Calculating the Precision, Recall, and F1-Socre

In [None]:
x = list(final['emotion'])
y = list(final['prediction'])
print(sklearn.metrics.classification_report(x, y, digits=2))