# Initialize Spark and its config

I set number of partitions each rdd be to 8, because of number of cores for parallelism of SON algorithm. so we have 8 chunks

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import *

# create conf
conf = SparkConf().setAppName("Frequent Itemset")
conf.set("spark.default.parallelism", 8)
conf.set("spark.sql.shuffle.partitions", 8)

# create the context
sc = pyspark.SparkContext(conf = conf)
spark = SparkSession.builder.getOrCreate()

# Loading Train Data

Using spark function to load data as dataframe

In [6]:
# File location and type
file_location = "train.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df = df.drop("_c0")
display(df.toPandas())

Unnamed: 0,id,text,emotions
0,27383,i feel awful about it too because it s my job ...,sadness
1,110083,im alone i feel awful,sadness
2,140764,ive probably mentioned this before but i reall...,joy
3,100071,i was feeling a little low few days back,sadness
4,2837,i beleive that i am much more sensitive to oth...,love
...,...,...,...
299995,103288,i was plagiarized by arbitrage magazine but i ...,sadness
299996,50219,i feel resigned to what happens to me,sadness
299997,65485,i guess because i feel emotionally i have been...,sadness
299998,101735,i only want people in my life who make me feel...,joy


# Part A) Preprocessing

# Installing Package for Text Processing

In [3]:
pip install nltk

Collecting nltk
  Downloading nltk-3.5.zip (1.4 MB)
[K     |████████████████████████████████| 1.4 MB 812 kB/s eta 0:00:01
Collecting regex
  Downloading regex-2020.11.13-cp38-cp38-manylinux2014_x86_64.whl (738 kB)
[K     |████████████████████████████████| 738 kB 678 kB/s eta 0:00:01
Building wheels for collected packages: nltk
  Building wheel for nltk (setup.py) ... [?25ldone
[?25h  Created wheel for nltk: filename=nltk-3.5-py3-none-any.whl size=1434676 sha256=89ee2ae67f121e72d58a151a85654002c1fb20c07110f23cc9371f0fda4f748d
  Stored in directory: /home/jovyan/.cache/pip/wheels/ff/d5/7b/f1fb4e1e1603b2f01c2424dd60fbcc50c12ef918bafc44b155
Successfully built nltk
Installing collected packages: regex, nltk
Successfully installed nltk-3.5 regex-2020.11.13
Note: you may need to restart the kernel to use updated packages.


# Downloading libraries of word processing

In [4]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('omw')

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /home/jovyan/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/jovyan/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package omw to /home/jovyan/nltk_data...
[nltk_data]   Unzipping corpora/omw.zip.


True

Doing all the tasks defined in question sequentially. they are obvious with comments. After that we have a clean text and we add it to first document

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
import string
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
from nltk import pos_tag
from collections import defaultdict
from nltk.stem import PorterStemmer

def preprocess(text):
    
    #lower
    lower_text = text.lower()
    
    #delete number
    without_num_text = re.sub(r'\d+', '', lower_text)
    
    #delete punctutation
    without_punc_text = without_num_text.translate((None, string.punctuation))
    
    #delete stopwords
    word_tokens = word_tokenize(without_punc_text)
    without_stop_text = [word for word in word_tokens if word not in stop_words]
    
    #delete words with length less than 3
    without_length_text = [word for word in without_stop_text if len(word) > 2]
    
    #stemming
    #stemmer= PorterStemmer()
    #stemmed_text = [stemmer.stem(word) for word in without_length_text]
    
    #lemmatization
    lemmatizer=WordNetLemmatizer()
    lemmatize_text = []
    for token, tag in pos_tag(without_length_text):
        lemmatize_text.append(lemmatizer.lemmatize(token, tag_dict[tag[0]]))
        
    return lemmatize_text

# defining spark function for preprocess the data
preprocess_udf = udf(lambda x: preprocess(x), ArrayType(StringType()))

# list of english stopwords
stop_words = set(stopwords.words('english'))

# tags needed for lemmatization
tag_dict = defaultdict(lambda : wordnet.NOUN)
tag_dict['J'] = wordnet.ADJ
tag_dict['V'] = wordnet.VERB
tag_dict['R'] = wordnet.ADV

# cleaning the text
df = df.withColumn("clean_text", preprocess_udf(col("text")))

print("Adding clean text to df :")
display(df.toPandas())


Adding clean text to df :


Unnamed: 0,id,text,emotions,clean_text
0,27383,i feel awful about it too because it s my job ...,sadness,"[feel, awful, job, get, position, succeed, hap..."
1,110083,im alone i feel awful,sadness,"[alone, feel, awful]"
2,140764,ive probably mentioned this before but i reall...,joy,"[ive, probably, mention, really, feel, proud, ..."
3,100071,i was feeling a little low few days back,sadness,"[feel, little, low, day, back]"
4,2837,i beleive that i am much more sensitive to oth...,love,"[beleive, much, sensitive, people, feeling, te..."
...,...,...,...,...
299995,103288,i was plagiarized by arbitrage magazine but i ...,sadness,"[plagiarize, arbitrage, magazine, one, feel, t..."
299996,50219,i feel resigned to what happens to me,sadness,"[feel, resign, happens]"
299997,65485,i guess because i feel emotionally i have been...,sadness,"[guess, feel, emotionally, beaten, circumstance]"
299998,101735,i only want people in my life who make me feel...,joy,"[want, people, life, make, feel, value, need]"


# Part B) Saving distinct words

In [9]:
import json

# in map phase we iterate on each sentence and for each word we create (word, 1)
word_rdd = df.select(col("clean_text")).rdd.flatMap(lambda list_word: [(word, 1) for word in list_word[0]])

# in reduce phase we just count distinct words and then we another map just return the words
distinct_words = word_rdd.reduceByKey(lambda x,y: x+y).map(lambda word: word[0]).collect()

# as the question wanted, we attach unique id to each distinct word
distinct_dict = dict(zip(list(range(len(distinct_words))), distinct_words))


# save distinct words
with open('distinct_words.json', 'w') as fp:
    json.dump(distinct_dict, fp)
#clean_text



# Part J) SON algorithm

We have 8 chunks. Here we show number of sentences per partition(chunk)

In [10]:
# creating rdd of sentences and emotions for SON algorithm
sentence_rdd = df.select(col("clean_text"), col("emotions")).rdd.map(lambda row: (row['clean_text'], row['emotions']))

# number of sentence per partition
def countp(part):
    baskets = []
    for v in part:
        baskets.append(v)
    return [len(baskets)]

# defining SON support and length of all sentence
supp = 0.05
size_of_sentences = df.count()

# counting number of sentence per partition so we can select a accurate support per partition
num_per_partition = sentence_rdd.mapPartitions(countp).collect()

print("Number of sentences per partitions(chunks) : ")
print(num_per_partition)

Number of sentences per partitions(chunks) : 
[42372, 42381, 41974, 41858, 41860, 41785, 41773, 5997]


## Phase 1 of SON
Here we do first phase of SON algorithm to find candidates for second phase. Our Hypothesis here is, if each frequent item candidate is frequent on more than 2 chunks, is frequent. Other details are in comments.

In [12]:
import pandas as pd
from itertools import combinations

# creating subsets of length 2
def subset(setInp):
    subsets = []
    subsets.extend(list(combinations(setInp, 2)))
    return subsets
 
# find single frequent items in phase 1 of SON
def singleton(sentences, support):
    # holding counting of words
    bag_of_words = dict()
    
    # iterating on each sentence
    for sentence in sentences:
        # iterating on word of sentence
        for word in sentence[0]:
            
            # adding to count of word
            if(word in bag_of_words):
                bag_of_words[word] += 1
        
            else:
                bag_of_words[word] = 1
      
     
    L1 = []
    len_sentence = len(sentences)
    
    # filter base on support that was calculated base on number of sentences per partition
    for word in bag_of_words:
        if( bag_of_words[word] / len_sentence >= support):
              L1.append((word, 1))
  
    return L1

# find frequent items with length 2 in phase 1
def doubleton(sentences, support, candidates):
    # holding counting of doubles
    bag_of_double = dict()
    
    # iterating on each sentence
    for sentence in sentences:
        # iterating on candidates
        for candidate in candidates:
            
            # adding to count of word
            if(set(candidate).issubset(sentence[0])):
                
                if(candidate in bag_of_double):
                    bag_of_double[candidate] +=1
                else:
                    bag_of_double[candidate] = 1
        
    L2 = []
    len_sentence = len(sentences)
    # filter base on support that was calculated base on number of sentences per partition
    for double in bag_of_double:
        if( bag_of_double[double] / len_sentence >= support):
            L2.append((double, 1))
      
    return L2

# map function of phase 1 to find candidates
def freq_candidate(partition_iterator):
    
    # first retrieve sentences from partition
    sentences = []
    for sentence in partition_iterator:
        sentences.append(sentence)
    
    # computing support for each partitions base on fraction of their size to all sentences
    fraction_of_all = len(sentences) / size_of_sentences
    support_per_partition = fraction_of_all * supp 
    
    # computing single frequent itemsets
    single_freq = singleton(sentences, support_per_partition)
    
    # building candaites of doubleton
    freq_list = [word[0] for word in single_freq]
    candidate_double = subset(freq_list)
  
    # finding frequent doubleton
    double_freq = doubleton(sentences, support_per_partition, candidate_double)
  
    return single_freq + double_freq

# SON algorithm (map 1)
son_map1_rdd = sentence_rdd.mapPartitions(freq_candidate)

# SON algorithm (reduce 1)
son_reduce1_rdd = son_map1_rdd.reduceByKey(lambda x,y: x+y).filter(lambda itemset: itemset[1] > 2).map(lambda itemset: itemset[0])
phase1_candidate = son_reduce1_rdd.collect()

print("Show some candidates of Phase 1: ")
print(phase1_candidate[0:10])

Show some candidates of Phase 1: 
['feel', 'really', 'like', 'start', 'thought', 'good', 'two', 'book', 'yet', 'work']


## Second Phase of SON


In [13]:
import pandas as pd
from itertools import combinations
from collections import Counter
  
def count_cand(sentences):
    # dict holding count of each word
    bag_of_words = dict()
    
    # list of available emotions
    emotions = ['sadness', 'love','joy', 'fear', 'surprise', 'anger']
    
    # iterate on candidates of phase 1
    for word in phase1_candidate:
        
        # iterate on sentence
        for sentence in sentences:
            
            # check if it is single or doubleton
            if(type(word) is str):
                
                # check for occurence of word in sentence
                if(word in sentence[0]):
                    if(word in bag_of_words):
                        bag_of_words[word]['Count'] +=1
                        bag_of_words[word]['emotion'][sentence[1]] +=1

                    else:
                        detail = dict()
                        detail['Count'] = 1
                        detail['emotion'] = dict()
                        for i in emotions:
                            detail['emotion'][i] = 0 

                        detail['emotion'][sentence[1]] = 1
                        bag_of_words[word] = detail
            
            # for doubletons
            else:
                if( (word[0] in sentence[0]) and (word[1] in sentence[0]) ):
                    if(word in bag_of_words):
                        bag_of_words[word]['Count'] +=1
                        bag_of_words[word]['emotion'][sentence[1]] +=1

                    else:
                        detail = dict()
                        detail['Count'] = 1
                        detail['emotion'] = dict()
                        for i in emotions:
                            detail['emotion'][i] = 0 

                        detail['emotion'][sentence[1]] = 1
                        bag_of_words[word] = detail

    out_tuple = []
    # prepare data for reduce
    for word in bag_of_words:
        out_tuple.append((word, (bag_of_words[word]['emotion'], bag_of_words[word]['Count'])))
  
    return out_tuple



# check candidates of phase 1 with one more iterate on all data
def freq_itemset(partition_iterator):
    
    # retireve sentences from partition
    sentences = []
    for sentence in partition_iterator:
        sentences.append(sentence)
    
    # counting candidates on all sentence and on each emotion
    count_cand_list = count_cand(sentences)
  
    return count_cand_list

# SON algorithm (map 2)
son_map2_rdd = sentence_rdd.mapPartitions(freq_itemset)

# SON algorithm (reduce 2)
son_reduce2_rdd = son_map2_rdd.reduceByKey(lambda x,y:(dict(Counter(x[0]) + Counter(y[0])), x[1]+y[1])).filter(lambda itemset: itemset[1][1] / size_of_sentences >= supp )
final_freq_list = son_reduce2_rdd.collect()

print("Showing two frequent items :")
print(final_freq_list[0:2])

Showing two frequent items :
[('feel', ({'sadness': 77117, 'love': 22478, 'joy': 92517, 'fear': 29820, 'surprise': 9466, 'anger': 35445}, 266843)), ('really', ({'sadness': 5012, 'love': 1491, 'joy': 5738, 'fear': 1870, 'surprise': 661, 'anger': 2475}, 17247))]


# Showing Freq items as Dataframe

In [33]:
from pyspark.sql.types import *
from builtins import max

# creating schema of dataframe
mySchema = StructType([ StructField("frequent item", StringType(), True)\

                       ,StructField("sadness", IntegerType(), True)\

                       ,StructField("love", IntegerType(), True)\

                       ,StructField("joy", IntegerType(), True)\

                       ,StructField("fear", IntegerType(), True)\

                       ,StructField("surprise", IntegerType(), True)\

                       ,StructField("anger", IntegerType(), True)\
                       ,StructField("count", IntegerType(), True)\
                       ,StructField("max_emotion", StringType(), True)

                       ])

df_list = []
for item in final_freq_list:
    
    dict_inp = dict()
    dict_inp['frequent item'] = str(item[0])
    dict_inp.update(item[1][0])
    dict_inp['count'] = item[1][1]
    dict_inp['max_emotion'] = max(item[1][0], key=item[1][0].get) 
    df_list.append(dict_inp)
   
frequent_items_df = spark.createDataFrame(df_list, schema = mySchema).toPandas()
print("Showing Frequent items :")
display(frequent_items_df)

Showing Frequent items :


Unnamed: 0,frequent item,sadness,love,joy,fear,surprise,anger,count,max_emotion
0,feel,77117,22478,92517,29820,9466,35445,266843,joy
1,really,5012,1491,5738,1870,661,2475,17247,joy
2,like,14859,5621,16950,4254,1609,6985,50278,joy
3,"('feel', 'make')",5412,1534,7330,1983,536,2019,18814,joy
4,"('feel', 'know')",5038,1459,5195,2130,639,2245,16706,joy
5,"('feel', 'really')",4539,1358,5307,1691,599,2195,15689,joy
6,"('feel', 'like')",14070,5227,16290,4013,1538,6588,47726,joy
7,get,6011,1611,7492,2491,765,3752,22122,joy
8,"('feel', 'get')",5061,1335,6378,2072,617,3134,18597,joy
9,think,4286,1324,4690,1946,625,2204,15075,joy


# Part D) Guessing Emotions of sentences

For each sentence we iterate on frequent itemsets. every itemsets that exist in sentence vote about its new emotion. Maximum number of votes will be consider as new emotion. But if there is no frequent itemset in sentence, we don't make any guess and just writing '-' at emotion column of that sentence.

In [53]:
def attach_emotion(sentence):
    # holding value of each emotion candidate
    emotion_holder = dict.fromkeys(['sadness','love','joy', 'fear', 'surprise', 'anger'],0)
    
    # iterate on frequent items
    for item, emotion in zip(frequent_items_df['frequent item'], frequent_items_df['max_emotion']):
        
        # condition for single freq
        if(type(item) is str):
            if(item in sentence):
                emotion_holder[emotion] +=1
                
        else:
            if(item[0] in sentence and item[1] in sentence):
                emotion_holder[emotion] +=1
                
    # emotion candidate with max value is the new label            
    emotion_candid = max(emotion_holder, key=emotion_holder.get)
    
    # if sentence hasn't any frequent itemsets we don't label it
    if(emotion_holder[emotion_candid] == 0 ):
        return '-'
    else:
        return emotion_candid
        
    
# new emotion label    
attach_emotion_udf = udf(lambda x: attach_emotion(x), StringType())
df = df.withColumn("new_emotion", attach_emotion_udf(col("clean_text")))

print("Dataframe with new emotion label :")
display(df.toPandas())

Dataframe with new emotion label :


Unnamed: 0,id,text,emotions,clean_text,new_emotion
0,27383,i feel awful about it too because it s my job ...,sadness,"[feel, awful, job, get, position, succeed, hap...",joy
1,110083,im alone i feel awful,sadness,"[alone, feel, awful]",joy
2,140764,ive probably mentioned this before but i reall...,joy,"[ive, probably, mention, really, feel, proud, ...",joy
3,100071,i was feeling a little low few days back,sadness,"[feel, little, low, day, back]",joy
4,2837,i beleive that i am much more sensitive to oth...,love,"[beleive, much, sensitive, people, feeling, te...",sadness
...,...,...,...,...,...
299995,103288,i was plagiarized by arbitrage magazine but i ...,sadness,"[plagiarize, arbitrage, magazine, one, feel, t...",joy
299996,50219,i feel resigned to what happens to me,sadness,"[feel, resign, happens]",joy
299997,65485,i guess because i feel emotionally i have been...,sadness,"[guess, feel, emotionally, beaten, circumstance]",joy
299998,101735,i only want people in my life who make me feel...,joy,"[want, people, life, make, feel, value, need]",joy


# Saving new emotions

In [55]:
# saving new_labels
new_labels = df.select(col("id"), col("new_emotion")).toPandas()
new_labels.to_csv("emotion.csv")

# Accuracy of our guess

First we select columns of given and new emotions, then just filter sentences that we could guess, then we compute number of true guesses and finally we compute the accuracy

In [62]:
import numpy as np

# remove sentences that we couldn't guess
guessed_rdd = df.select(col("emotions"), col("new_emotion")).rdd.filter(lambda row: row[1] != '-' )

# number of true guesses
true_guess = np.sum(guessed_rdd.map(lambda row: 1 if row[0] == row[1] else 0).collect())

# number of all guesses
guessed_number = guessed_rdd.count()

print("Accuracy of our Guess is = " + str(true_guess/guessed_number))

Accuracy of our Guess is = 0.34462738067959725
