##### Assignment 3:  TFIDF Using Data Frames

Now you will do TF-IDF calculation one more time:  first we did it using MapReduce, in class we did it using Spark RDDs, and now you will do it using Spark Data Frames.

In Assignment 1 you wrote an interactive query scorer that read the indexed documents and computed TF-IDF for a query line.
For this assignment you will implement the same indexing phase (producing a data frame with the computed TF-IDF values), but you will do query processing in batch:
you will take a file with one query per line, and produce a data frame with the queries, best matching document, and score of the best matching document.

In [2]:
sc

In [3]:
%fs rm -r FileStore/tables/documents

In [4]:
%fs ls FileStore/tables/documents

path,name,size
dbfs:/FileStore/tables/documents/bible_kjv-ef3b8.txt,bible_kjv-ef3b8.txt,4332554
dbfs:/FileStore/tables/documents/carroll_alice-ac78c.txt,carroll_alice-ac78c.txt,144395
dbfs:/FileStore/tables/documents/melville_moby_dick-7006e.txt,melville_moby_dick-7006e.txt,1265914
dbfs:/FileStore/tables/documents/shakespeare_macbeth-08fa9.txt,shakespeare_macbeth-08fa9.txt,100351
dbfs:/FileStore/tables/documents/whitman_leaves-013de.txt,whitman_leaves-013de.txt,711215


In [5]:
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import udf, col, lit
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import regexp_extract
import pyspark.sql.functions as func
from pyspark.sql.functions import length
from pyspark.sql.functions import min, max
from pyspark.sql.types import StringType
from pyspark.sql.functions import desc
from pyspark.sql.types import *

In [6]:
def splitRow(row):
  return row.strip().split()
splitRowUDF = udf(lambda r: splitRow(r), ArrayType(StringType()))

In [7]:
def termify(word):
  return ''.join([c for c in word.lower() if 97 <= ord(c) <= 122])
termifyUDF = udf(lambda w: termify(w), StringType())

In [8]:
# This is you indexing function; it must have this signature.
# The first argument is the name of the directory where the documents reside
# The output is the name of a file that will be the stored TF-IDF data frame -- you 
# will store the file in Parquet format.

# You must use DataFrames only, in the following sense:
#   You can use wholeTextFiles to read the documents, which gives you a list of pairs:  the first element of each pair is
#     the pathname of the file, the second element of the pair is the whole file text.
#   You must immediately convert this list to a data frame with columns 'pathname' and 'text' and use only 
#     data frame operations for the rest of the function.
#   Within the function you should first build a data frame with the columns 'Term', 'DocID', and 'TFIDF';  the first two
#     are strings, the third is a float.  
#   Scale the TF-IDF values as follows:   divide the value by the maximum value over all (term, docID) rows, and multiply by 100.
#   The last step in the function is to write the data frame in parquet format to the specified output file name

def index(indir, outfile): 
  dbutils.fs.rm(outfile,True)
  textFiles = sc.wholeTextFiles(indir)
  sq = SQLContext(sc)
  df = sq.createDataFrame(textFiles).withColumnRenamed("_1", "pathname").withColumnRenamed("_2", "text")
  regex_str = "[\/]([^\/]+)$"
  df = df.withColumn("docid", regexp_extract("pathname",regex_str,1)).drop("pathname") #get file name only
  df = df.withColumn("words", splitRowUDF(col("text"))).drop("text")
  df = df.select(df.docid, explode(df.words).alias("word"))
  df = df.withColumn("term", termifyUDF(col("word"))).drop('word')
  df = df.filter(length(df.term) > 0) #Make sure word is atleast 1
  
  #create separate dataframe to help compute tfidf
  termDocCount = df.groupBy("term", "docid").count().withColumnRenamed("count", "termInDocCount")
  docLength = df.groupBy("docid").count().withColumnRenamed("count", "totalTermsInDoc")
  termDocFreq = df.distinct().groupBy('term').count().withColumnRenamed("count", "termInDocFreq")
  
  #Getting tfidf
  #(docid, term, termInDocCount, totalTermsInDoc)
  firstJoin = termDocCount.join(docLength, on =["docid"])
  #(term, docid, termInDocCount, totalTermsInDoc)
  tfidf = firstJoin.join(termDocFreq, on =["term"])
  tfidf = tfidf.withColumn("temp", ((tfidf.termInDocCount/tfidf.totalTermsInDoc)/tfidf.termInDocFreq))
  tfidf = tfidf.withColumn("tfidf", (tfidf.temp/(tfidf.select(max("temp")).collect()[0][0])*100))                         
  tfidf.select("term", "docid", "tfidf").write.save(outfile, format='parquet')
  
  print(tfidf.take(30))
  
  
indexFileName = '/FileStore/tfidf.parquet'
index('/FileStore/tables/documents', indexFileName)
  
   # Your code here
   # Yes, this is a trap -- you had better delete these comment lines :-)

In [9]:
from pyspark.sql.functions import min, max

person = spark.createDataFrame([
(0, "Bill Chambers", 0, 100),
(1, "Matei Zaharia", 1, 500), (2, "Michael Armbrust", 1, 250)])\
.toDF("id", "name", "graduate_program", "spark_status")

#person.show()
person.select(max("spark_status")).collect()[0][0]

In [10]:

# This is a helper function that you must implement.  It does the TF-IDF calculation 
# for a single query line.   Its first input is the query line (a string). The second input
# is the data frame containing the indexed TF-IDF information -- i.e. it was created in 
# the index phase, and will be read from parquet as the first line of scoreQueryFile below.

# This helper function must return a data frame with columns DocID and Score and will 
# have no rows with score 0 and will be in descending order of score.  I.e.  first 
# row has the docID and score with highest TF-IDF value.  Remember, the TFIDF of a set of 
# terms for a document is the sum of the TFIDF values for each term in the set.
#
# This function must use only data frames.  It must first convert the queryLine into a data frame with 
# a single column Word and one row for each word in the query.  It then does the TFIDF calculation using 
# only data frame operations on the query dataframe and the TFIDF dataframe.

def scoreQueryLine(queryLine, tfidfTable):
  myList = queryLine.split(" ")
  sq = SQLContext(sc)
  df = sq.createDataFrame(myList, StringType())
  df = df.withColumn("term", termifyUDF(col("value"))).drop('value')
  df = df.filter(length(df.term) > 0) #Make sure word is atleast 1
  #score = spark.read.parquet(tfidfTable)
  final = df.join(tfidfTable, on = ["term"])
  docid_score = final.groupBy("docid").sum().withColumnRenamed("sum(tfidf)", "score").sort(desc("score"))
  #docid_score.show()
  return docid_score

#scoreQueryLine("leaves OF grass", 'FileStore/tfidf.parquet')
   # Your code here


In [11]:

# This is your query processing function.   
# It takes as input the name of a file containing the queries, one line per query.  The second argument
#  is the name of the Parquet file containing the indexed TF-IDF values.  

# This function returns a DataFrame with three columns:   'Query', 'DocID', and 'Score' 
#   There will be one row for each query in the input file, *unless* there are no documents with non-zero TFIDF for the query.
#   The docID and score are for the highest-scoring document.  You can break ties automatically.

# This function does not need to use Data Frame operations only, though it does need to return a data frame.
# In can implement a loop over lines in the query file and call scoreQueryLine sequentially on each line.

def scoreQueryFile(filename, tfidfFileName='FileStore/tfidf.parquet'):
  textFile = sc.textFile(filename)
  tfidf = spark.read.parquet(tfidfFileName)
  #tfidf.show()
  sq = SQLContext(sc)
  field = [StructField("query", StringType(), True),StructField("docid", StringType(), True),StructField("score", FloatType(), True)]
  schema = StructType(field)
  result = sq.createDataFrame(sc.emptyRDD(), schema)
  #result.show()
  for line in textFile.collect():
    df = scoreQueryLine(line, tfidf)
    df = df.withColumn("query", lit(line))
    df = df.limit(1)
    #df.show()
    result = result.union(df)
  result.show()
  return result
    
  

#scoreQueryFile('/FileStore/tables/queries.txt')


Do not include the cells below in your handed-in notebook.

Here are the files in my documents directory:

<pre>
bible_kjv-ef3b8.txt
carroll_alice-ac78c.txt
melville_moby_dick-7006e.txt
shakespeare_macbeth-08fa9.txt
whitman_leaves-013de.txt
</pre>

Notice they have the strange 5 char ID that gets appended to the file name when it is uploaded to the Databricks filesystem.

Here is the content of my queries.txt file
<pre>
Dead baby whales!
leaves OF grass
leaves grass
kill,  king.
rabbit HOLE
mAD hatter
god loves
</pre>

This is how I will run your code, and here is some sample output

In [16]:
indexFileName = '/FileStore/tfidf.parquet'
index('/FileStore/tables/documents', indexFileName)
scoreQueryFile('/FileStore/tables/queries.txt', indexFileName).show()

<pre>
+-----------------+-------------------+------------------+
|            Query|              DocID|             Score|
+-----------------+-------------------+------------------+
|Dead baby whales!| melville_moby_dick|6.1125700619618115|
|  leaves OF grass|          bible_kjv| 53.26585244325753|
|     leaves grass|     whitman_leaves|1.3585601754910759|
|     kill,  king.|shakespeare_macbeth|3.7715007463115304|
|      rabbit HOLE|      carroll_alice|  9.86284835004943|
|       mAD hatter|      carroll_alice| 6.949867558290645|
|        god loves|          bible_kjv| 8.513498219172556|
+-----------------+-------------------+------------------+
</pre>

Notice the weird behavior of the second and third lines.  

The problem is that the word 'of' has very high term frequency in the bible, and that dwarfs the effect from the other words.  

Dividing by document frequency is supposed to correct for this -- i.e. it gives less weight to words like 'of' that appear in most/all documents.  But with only five documents in the index set, it doesn't "penalize" these common words enough.  

The common solution is to define 'stop words' that are left out of indexing altogether.