# Let's give Spark a shot!

Your challenge is to calculate TFIDF on the text file in the repo, using Spark.  In that
text file, each line is a document.  You should have 4 total documents.  Don't be
too concerned about getting the formula right; just start by calculating the number
of times each word appears for each document and dividing by the
total amount of times the word appears in the whole corpus.  If you can get to that point,
then try to get the calculation correct according to one of the true formulas for TFIDF.

For this simple TFIDF measure, you should get the following for the word "of":

```
document 0: document count: 0.  Corpus count: 24.  Simple TFIDF: 0/24 = 0
document 1: document count: 6.  Corpus count: 24.  Simple TFIDF: 6/24 = 0.25
document 2: document count: 14. Corpus count: 24.  Simple TFIDF: 14/24 = 0.58333...
document 3: document count: 4.  Corpus count: 24.  Simple TFIDF: 4/24 = 0.166....
```

>Hint: `zipWithIndex()` called on an RDD will zip the elements of an RDD together with an index.

In [3]:
import pandas as pd
import numpy as np

In [2]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [31]:
doc_df = spark.read.text('dull.txt')

In [32]:
doc_df = doc_df.withColumnRenamed('value','docs')

In [33]:
doc_df.show()

+--------------------+
|                docs|
+--------------------+
|First line This i...|
| This is second line|
|          third line|
|and subsequent li...|
|          and more..|
+--------------------+



In [34]:
doc_df.printSchema()

root
 |-- docs: string (nullable = true)



In [None]:
# Already got this part wrong

# Calculate tf
for word in docs_concat:
    for doc_line in docs:
        tf = word / len(doc_line)
        if word in doc_line
        idf = len(docs_column) / 

# Class Solution

w(i,j) = tf(i,j) x log( N   /  df (i)   )
  
tf(i,j) = count of word in document j  
N = total # of documents  
df(i) = # of docs containing word i  

In [1]:
import findspark
findspark.init()
import os
import pyspark
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
from pprint import pprint

In [5]:
!ls

dull.txt
pair_tfidf_in_spark.md
pair_tfidf_my_solution.ipynb
readme.md


In [6]:
text = sc.textFile("dull.txt")

In [8]:
text  
# text is an RDD
# file not read in yet. lazy evaluation.


# First thing it needs to do an sc-text file. It has only done that.
# If you REALLY want to see the file, then you need to do a .collect()

dull.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [10]:
text.collect() # does an action to execute the map of this

# Try not to anyhow use .collect(). Because collect will collect EVERYTHING.
# A .collect() takes every single data from every node and pulls it into the machine that you did the .collect() on.
# You don't want to do collect on a whole dataset. You only want to do it on a small sample!

['First line This is a test file',
 'This is second line',
 'third line',
 'and subsequent line..',
 'and more..']

In [11]:
text.map(lambda x: x.replace(',',' ').replace('.', " ").replace('-', ' ').lower().split()).collect()

[['first', 'line', 'this', 'is', 'a', 'test', 'file'],
 ['this', 'is', 'second', 'line'],
 ['third', 'line'],
 ['and', 'subsequent', 'line'],
 ['and', 'more']]

In [13]:
text.map(lambda x: x.replace(',',' ').replace('.', " ").replace('-', ' ').lower().split())\
.zipWithIndex().collect()    # this is basically going to create a tuple with the list and the index

[(['first', 'line', 'this', 'is', 'a', 'test', 'file'], 0),
 (['this', 'is', 'second', 'line'], 1),
 (['third', 'line'], 2),
 (['and', 'subsequent', 'line'], 3),
 (['and', 'more'], 4)]

In [16]:
text.map(lambda x: x.replace(',',' ').replace('.', " ").replace('-', ' ').lower().split())\
.zipWithIndex()\
.map(lambda x: (x[1], x[0])).collect()    # swpa the position to put the index infront and the list behind

[(0, ['first', 'line', 'this', 'is', 'a', 'test', 'file']),
 (1, ['this', 'is', 'second', 'line']),
 (2, ['third', 'line']),
 (3, ['and', 'subsequent', 'line']),
 (4, ['and', 'more'])]

In [19]:
# An extended exmaple of the code above
text.map(lambda x: x.replace(',',' ').replace('.', " ").replace('-', ' ').lower().split())\
.zipWithIndex()\
.map(lambda x: (x[1], x[0], 'hello')).collect()    # swpa the position to put the index infront and the list behind

[(0, ['first', 'line', 'this', 'is', 'a', 'test', 'file'], 'hello'),
 (1, ['this', 'is', 'second', 'line'], 'hello'),
 (2, ['third', 'line'], 'hello'),
 (3, ['and', 'subsequent', 'line'], 'hello'),
 (4, ['and', 'more'], 'hello')]

In [23]:
text.map(lambda x: x.replace(',',' ').replace('.', " ").replace('-', ' ').lower().split())\
.zipWithIndex()\
.map(lambda x: (x[1], x[0]))\
.flatMapValues(lambda x: x).collect()

[(0, 'first'),
 (0, 'line'),
 (0, 'this'),
 (0, 'is'),
 (0, 'a'),
 (0, 'test'),
 (0, 'file'),
 (1, 'this'),
 (1, 'is'),
 (1, 'second'),
 (1, 'line'),
 (2, 'third'),
 (2, 'line'),
 (3, 'and'),
 (3, 'subsequent'),
 (3, 'line'),
 (4, 'and'),
 (4, 'more')]

In [7]:
doc_word = text.map(lambda x: x.replace(',',' ').replace('.', " ").replace('-', ' ').lower().split())\
               .zipWithIndex\
               .map(lambda x: (x[1], x[0])) \
               .flatMapValues(lambda x: x)

AttributeError: 'function' object has no attribute 'map'

In [None]:
doc_word_df = sqlContext.createDataFrame(doc_wrod ['doc_id', 'word'])

In [None]:
doc_word_df.registerTempTable("doc_word")   # 

In [None]:
# counting ocurence of words (global) (doc_word)
word_count_df = sqlContext.sql("""
    SELECT word, count(*) as tot_word_count 
    FROM doc_word
    GROUP BY word
""")
word_count_.df.show()  # Will get back the global occurence of the word

In [None]:
type(word_count_df)

In [None]:
# counting occruence of words (document level) (doc_Word
doc_word_count_df = sqlContext.sql("""
    SELECT doc_id, word, count(*) as doc_word_count
    FROM doc_word
    
""")

In [None]:
# combien docw ord count and total word ocunt
word_df = sqlContext.sql("""
    SELECT a.doc_id, a.word, a.doc_word_count, b.tot_word_count
    FROM doc_word_count a
    INNER JOIN word_count b 
    ON a.word = b.word
""")