## Simulating MapReduce jobs + Spark TFIDF

### Reading the file

Make a folder, place the file 5000-8.txt and the notebook in it

In [None]:
import sys
with open('5000-8.txt', encoding = "ISO-8859-1") as f:
    lines = f.readlines()

### WordCount

##### Mapper

In [None]:
import string
import re

#Compute a list 
WC_mapper_out = []
# input comes from STDIN (standard input)
for line in lines:
    # remove leading and trailing whitespace
    line = line.strip()
    line = re.sub('['+string.punctuation+']', '', line)
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        WC_mapper_out.append('%s\t%s\n' % (word, 1))
print(WC_mapper_out[1:100])

#### Reducer

In [None]:
## replicate the sorting of hadoop
WC_mapper_out.sort()

#Reducer code
current_word = None
current_count = 0
word = None

WC_reducer_out =[]
# input comes from STDIN
for line in mapper_out:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            reducer_out.append('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    WC_reducer_out.append('%s\t%s' % (current_word, current_count))

In [None]:
for lines in reducer_out[31000:32000]:
    print(lines)

### Random sampling

#### Mapper

In [None]:
from random import randint
import string
import re

#Compute a list 
RS_mapper_out = []
# input comes from STDIN (standard input)
for line in lines:
    # remove leading and trailing whitespace
    line = line.strip()
    line = re.sub('['+string.punctuation+']', '', line)
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
    #add a random number between 0 and 10000 as a key
        RS_mapper_out.append('%s\t%s\n' % (word, randint(0,10000)))

In [None]:
[print(i) for i in RS_mapper_out[1:1000]]

print(RS_mapper_out[1:1000])

#### Reducers 

In [None]:
import time
import random

## replicate the sorting of hadoop
RS_mapper_out.sort()

RS_reducer_out =[]

# input comes from STDIN
key_val = random.sample(range(0, 10000), 10)
out_key = random.randint(0,10000)

start_time = time.time()

for line in RS_mapper_out:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    couple = line.split('\t')
    word = couple[0]
    key = int(couple[1])

    # print(key_val,key)
    if key in key_val:
        RS_reducer_out.append('%s\t%s' % (word, out_key))
        
print(time.time()-start_time)

In [None]:
RS_reducer_out

In [None]:
import time
import random

## replicate the sorting of hadoop
RS_mapper_out.sort()

RS_reducer_out =[]

# input comes from STDIN
key_val = set(random.sample(range(0, 10000), 10))
out_key = random.randint(0,10000)
start_time = time.time()

for line in RS_mapper_out:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    couple = line.split('\t')
    word = couple[0]
    key = int(couple[1])

    # print(key_val,key)
    try: 
        if key in key_val:
            RS_reducer_out.append('%s\t%s' % (word, out_key))
    except ValueError: 
        continue

print(time.time()-start_time)

In [None]:
RS_reducer_out

Second option with set is much more efficient (confirming https://stackoverflow.com/questions/7571635/fastest-way-to-check-if-a-value-exist-in-a-list)

### PYSPARK TFIDF

In [None]:
import pyspark
sc = pyspark.SparkContext()

In [None]:
text_raw = sc.textFile("5000-8.txt")

In [None]:
text_raw.take(5)

#Cleaning text and removing punctuation:

text_clean = text_raw.map(lambda x: ''.join([ c for c in x if (c.isalnum() or c==' ')]))
text_clean = text_clean.filter(lambda x: x != '').map(lambda x: x.lower())


In [None]:
words = text_clean.flatMap(lambda x: x.split())
doc_nb = 5
sample_size = 0.001

documents = []

In [None]:
documents = [(words.sample(False,sample_size)) for i in range(doc_nb)]
documents = [sc.parallelize(documents[i].map(lambda x: (i,x)).collect()) for i in range(doc_nb)]
documents = sc.union(documents)

In [None]:
doclist = documents.groupByKey().map(lambda x: list(x[1]))

#### Built in hashing TFIDF on rdd

In [None]:
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.ml.feature import CountVectorizer

hashingTF = HashingTF()
tf = hashingTF.transform(doclist)

tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

In [None]:
print("tfidf:")
for each in tfidf.collect():
    print(each)

#### TF IDF built in on Dataframes

In [None]:

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

test = documents.groupByKey().mapValues(list)
test = test.toDF(["id","words"])

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurizedData = hashingTF.transform(test)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)



#### Count Vectorizer

In [None]:

vectorizer = CountVectorizer(inputCol="text", outputCol="features")
model = vectorizer.fit(test)
result = model.transform(test)

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

idf = IDF(inputCol="features", outputCol="outfeatures")
idfModel = idf.fit(result)
rescaledData = idfModel.transform(result)
rescaledData.show()


#### test TFIDF

In [None]:

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()


#### Manual TF-IDF

In [None]:
import math

doc_word_count = documents.map(lambda x: ((x[0],x[1]),1)).reduceByKey(lambda x,y: x+y)
occ_count = doc_word_count.map(lambda x: (x[0][1],1)).reduceByKey(lambda x,y: x+y)

IDF = occ_count.map(lambda x : (x[0],math.log(doc_nb/x[1])))
IDF.collect()

In [None]:
#doc_word_count.sortBy(lambda x: x[1],False).collect()
joined = doc_word_count.map(lambda x: (x[0][1],(x[0][0],x[1]))).join(IDF)
TFIDF_scores = joined.map(lambda x: (x[0],(x[1][0][0],x[1][0][1]*x[1][1])))

In [None]:
TFIDF_scores.collect()

**using dataFrames**

In [None]:
wc_df = doc_word_count.map(lambda x:(x[0][0],x[0][1],x[1])).toDF(["doc_id","word","occurence"])
idf_df = IDF.toDF(["word","IDF"])

In [None]:
out = wc_df.join(idf_df,on="word")
out = out.withColumn("TFIDF",out.IDF*out.occurence)

In [None]:
from pyspark.sql.functions import first

out_pivot = out.groupBy("doc_id").pivot("word").agg(first("TFIDF"))
print((out_pivot.count(), len(out_pivot.columns)))
out_pivot.select("then").show()

In [None]:
out.where(out.word=="then").show()