In [None]:
import sys
import csv
import numpy as np

In [None]:
import findspark
import os
findspark.init()
import pyspark
from pyspark import SparkContext

In [None]:
sc = SparkContext()

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
from pprint import pprint

In [None]:
!ls

In [None]:
text = sc.textFile("dull.txt")

In [None]:
text.zipWithIndex?

In [None]:
text

In [None]:
doc_word = text.map(lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower().split()) \
          .zipWithIndex() \
          .map(lambda x: (x[1], x[0])) \
          .flatMapValues(lambda x: x)

In [None]:
text.flatMapValues?

In [None]:
# doc_word.take(300)

In [None]:
doc_word_df = sqlContext.createDataFrame(doc_word, ['doc_id','word'])

In [None]:
type(doc_word_df)

In [None]:
doc_word_df.registerTempTable("doc_word")

In [None]:
word_count_df = sqlContext.sql("""
    SELECT word, count(*) as tot_word_count FROM doc_word GROUP BY word
""")
word_count_df.show()

In [None]:
word_count_df.registerTempTable("word_count")

In [None]:
doc_word_count_df = sqlContext.sql("""
    SELECT doc_id, word, count(*) as doc_word_count FROM doc_word GROUP BY doc_id, word
""")
doc_word_count_df.show()

In [None]:
doc_word_count_df.registerTempTable("doc_word_count")

In [None]:
doc_word_query = sqlContext.sql("""
    SELECT * FROM doc_word_count WHERE word='of'
""")
doc_word_query.show()

In [None]:
word_query = sqlContext.sql("""
    SELECT * FROM word_count WHERE word='of'
""")
word_query.show()

In [None]:
word_df = sqlContext.sql("""
    SELECT a.doc_id, a.word, a.doc_word_count, b.tot_word_count
    FROM doc_word_count a 
    INNER JOIN word_count b
    ON a.word = b.word
""")
word_df.show()

In [None]:
word_df.registerTempTable("word")

In [None]:
word_query = sqlContext.sql("""
    SELECT * FROM word WHERE word='of'
""")
word_query.show()

In [None]:
tfidf_df = sqlContext.sql("""
    SELECT doc_id, word, doc_word_count, tot_word_count, doc_word_count/tot_word_count as simple_tfidf
    FROM word  
""")
tfidf_df.show()

In [None]:
tfidf_df.registerTempTable("tfidf")

In [None]:
word_query = sqlContext.sql("""
    SELECT * FROM tfidf WHERE word='of'
""")
word_query.show(4)

Ok let's try a more complicated (and more correct) formulation for TFIDF.

For the word "of" appearing in document 1:
```
TF = The number of times "of" appears in the document 1 divided by the number of words in document 1
IDF = log(3/4), since the word "of" appears in 3 documents, but there are 4 documents in the corpus.
TF-IDF = TF x IDF
```

doc_word_count table has the word counts, grouped by document.  We can add up all the words for each document by grouping this by document.

In [None]:
doc_word_count_df.printSchema()

In [None]:
tf_denom_df = sqlContext.sql("""
    SELECT doc_id, sum(doc_word_count) as tf_denom FROM doc_word_count GROUP BY doc_id
""")
tf_denom_df.show()

In [None]:
tf_denom_df.registerTempTable("tf_denom")

Verifying the length of document 3:

In [None]:
d3 = "More nonsense followed by yet more nonsense.  Really I'm just writing this to have a fourth example.  Today we'll learn some more spark.  No time to be original.  Really struggling to find language.  I just need one more example.  It shouldn't be that hard.  Did you know if you just start writing and writing and writing and writing, the words just start to fall out.  All these sentences end in a period.  They are all simple sentences.  I think.  These are just examples that might be useful for our spark exercise today. Of mice and men.  Of course I need more of the word of."

In [None]:
len(d3.split())

Join the tfidf table to get the term frequency together with the denominator, and take the ratio.

In [None]:
tfidf_df.printSchema()

In [None]:
tfidf_2_df = sqlContext.sql("""
    SELECT a.*, b.tf_denom, a.doc_word_count/tf_denom as tf
    FROM tfidf a 
    INNER JOIN tf_denom b
    ON a.doc_id = b.doc_id
""")
# tfidf_2_df.show(1000)

In [None]:
tfidf_2_df.registerTempTable('tfidf_2')

For inverse document frequency, we need to get the total number of documents the word appears in, and divide by the total number of documents.  Then take the log.

In [None]:
doc_appearances_df = sqlContext.sql("""
    SELECT word, count(*) as doc_appearances FROM tfidf GROUP BY word
""")
doc_appearances_df.show()

In [None]:
doc_appearances_df.registerTempTable('doc_appearances')

In [None]:
word_query = sqlContext.sql("""
    SELECT * FROM doc_appearances WHERE word='of'
""")
word_query.show(4)

We need to join back to tfidf_2 table by word

In [None]:
tfidf_3_df = sqlContext.sql("""
    SELECT a.*, b.doc_appearances
    FROM tfidf_2 a 
    INNER JOIN doc_appearances b
    ON a.word = b.word
""")
tfidf_3_df.show(3)

In [None]:
tfidf_3_df.registerTempTable('tfidf_3')

In [None]:
word_query = sqlContext.sql("""
    SELECT * FROM tfidf_3 WHERE word='of'
""")
word_query.show(4)

Finally, we add up the number of documents in our corpus.  This is just the length of the doc_id index.

In [None]:
total_corpus_df = sqlContext.sql("""
    SELECT count(distinct doc_id) as corpus_count FROM tfidf_3
""")
total_corpus_df.show()

In [None]:
total_corpus_df.registerTempTable('total_corpus')

In [None]:
tfidf_3_df.printSchema()

In [None]:
tfidf_4_df = sqlContext.sql("""
    SELECT a.*, b.corpus_count
    FROM tfidf_3 a 
    INNER JOIN total_corpus b
""")
# tfidf_4_df.show(100)

In [None]:
tfidf_4_df.registerTempTable('tfidf_4')

In [None]:
tfidf_4_df.printSchema()

In [None]:
tfidf_5_df = sqlContext.sql("""
    SELECT *, tf*(log(1 + corpus_count/doc_appearances)) as tfidf FROM tfidf_4 
""")
# tfidf_5_df.show()

In [None]:
tfidf_5_df.registerTempTable('tfidf_5')

In [None]:
word_query = sqlContext.sql("""
    SELECT doc_id, word, tfidf FROM tfidf_5 WHERE word='of'
""")
word_query.show(4)