ZHAW, CAS Information Engineering, Module Big Data, December 2019  
  
Final Project **Christophe Otter & Severin Troesch**

## Movie Finder: Performance gain with Spark
  
##### *V7 - 06.01.2019*

### 1. Preparation Python

What we want in this section is the following:  
  
- Install and load the necessary modules
- Load the dataset (written plots of 35k movies)
- Convert the data into a python list for the "baseline" assessment of the performance

First, lets download and import the necessary modules:

In [4]:
## installing the external modules via library utilities (are available on notebook session level)
dbutils.library.installPyPI('nltk')
import nltk
nltk.download('stopwords')
nltk.download('punkt') #used for nltk.word_tokenize function

In [5]:
## import modules
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
import os
import re
# import nltk #language processing, imported above
from nltk.corpus import stopwords
stopw = set(stopwords.words('english')) # set of stopwords
import math
import json
import numpy

Then, lets get the data (upload movies-database as .csv manually to /FileStore/tables/):  
Path: /FileStore/tables/movies_df.csv

Now, we load the csv with pandas - and convert the relevant column into a python-list.

In [8]:
## Then, load csv, but with regular pandas-commands - for speed comparison later
movies_pd = pd.read_csv("/dbfs/FileStore/tables/movies_df.csv")

In [9]:
## have a look
movies_pd.head() #looks ok - a regular pandas df that is

Unnamed: 0,Release Year,Title,Origin/Ethnicity,Director,Cast,Genre,Wiki Page,Plot,All
0,1901,Kansas Saloon Smashers,American,Unknown,,unknown,https://en.wikipedia.org/wiki/Kansas_Saloon_Sm...,"A bartender is working at a saloon, serving dr...","A bartender is working at a saloon, serving dr..."
1,1901,Love by the Light of the Moon,American,Unknown,,unknown,https://en.wikipedia.org/wiki/Love_by_the_Ligh...,"The moon, painted with a smiling face hangs ov...","The moon, painted with a smiling face hangs ov..."
2,1901,The Martyred Presidents,American,Unknown,,unknown,https://en.wikipedia.org/wiki/The_Martyred_Pre...,"The film, just over a minute long, is composed...","The film, just over a minute long, is composed..."
3,1901,"Terrible Teddy, the Grizzly King",American,Unknown,,unknown,"https://en.wikipedia.org/wiki/Terrible_Teddy,_...",Lasting just 61 seconds and consisting of two ...,Lasting just 61 seconds and consisting of two ...
4,1902,Jack and the Beanstalk,American,"George S. Fleming, Edwin S. Porter",,unknown,https://en.wikipedia.org/wiki/Jack_and_the_Bea...,The earliest known adaptation of the classic f...,The earliest known adaptation of the classic f...


In [10]:
# and make list of "All" variable to iterate over later (for the baseline comparison)
movies_ls = movies_pd.All.tolist()
type(movies_ls)

In [11]:
## check content
movies_ls[10] #good

### 2. Python helper Functions

What we want in this section is the following:

- Define the helper functions for the movie finder - them same functions that were used in the python-implementation.

Now, define the different helper functions necessary for the analysis of the dataset and for the movie-selection.  
  
Fow now, just take the same functions as in the spyder-pathon-version of the movie-recommender:

In [14]:
## normalizer function

def nor(txt):
    
    ''' doc:
       input (txt) = text string that needs to be normalised
       output = a list of normalised tokens - NOT UNIQUE!
    '''
    
    ## tokenize
    #txt_2 = nltk.word_tokenize(txt) # this causes problems in .map functionality later...
    txt_2 = txt.split() #tokenize 
    
    ## remove stopwords
    txt_2 = [word for word in txt_2 if word not in stopw]
    
    ## remove special characters and capitals
    txt_2 = [re.sub(r'\W+', '', word).lower() for word in txt_2]
    
    ## remove empty indexes
    txt_2 = [word for word in txt_2 if len(word)>1]
    
    # return result: a list of normalised tokens - NOT UNIQUE! - and joined with named entities
    return txt_2

In [15]:
## check
nor("bla is a word") #works

In [16]:
## function for TF out of list of not-unique normalised tokens (result from nor() function)
    
def tf(lis):
    
    ''' doc:
       input: 
           lis = list of not-unique normalised tokens (result from nor() function)
       output = a dict  with token and term freqencies
    '''
        
    res = {} #dict of tokens in lis (key) and # occurances in lis (value)
    cindex = set() #set of tokens in txt
    
    nt = len(lis) # number of tokens in txt
    
    # for every word in list
    for t in lis:
        if t not in cindex:
            cindex.add(t)
            res[t] = 1/nt # /nt gives the normalisation for the term-frequency
        else:
            res[t] += 1/nt
    
    ## return result - a dict of token and term freqencies
    return res


In [17]:
## idf function

# function that calculates idf for tokens in a collection of documents
# returns a dict (key = token, velue = idf)

def idf(coll):
    
    """doc:
    in: coll = list of of documents (strings of multiple tokens)
    out: dictionary of tokens to idf of token in collection
    """ 
    ts1 = round(time.time(),0) #timestamp 1
    
    N = len(coll)  
    print("Analysing {} documents...".format(N))
    print()
    
    dic = {}
    
    # get a list of sets of normalised tokens
    lss = [set(nor(doc)) for doc in coll] #returns a list of sets
    
    ts2 = round(time.time(),0) #timestamp 2
    print("step 1 (normalising & TF of all movie-plot strings) done ... (took {} s)".format(ts2-ts1))
    
    # set of all words in coll
    settot = set()
    for s in lss:
        settot = settot.union(s)
        
    ts3 = round(time.time(),0) #timestamp 3
    print("step 2 (merge all sets of normalised strings to one set) done ... (took {} s)".format(ts3-ts2))
        
    # for all tokens in collection 
    for tok in list(settot):
        nrdoc = 0
        
        # for all sets of tokens (i.e. documents)
        for tokset in lss:
            if tok in tokset:
                nrdoc += 1 #nrdoc: in how many docs is the token?
        
        dic[tok] = N/nrdoc #idf = N / number of docs that contain tok
    
    ts4 = round(time.time(),0) #timestamp 4 
    print("step 3 (for all tokens in all movies: determine idf) done ... (took {} s)".format(ts4-ts3))
    print()
    print("finished. (overall analysis time: {} s)".format(ts4 - ts1))
    return dic

In [18]:
## tfidf function: combine tf with idf - function takes string and gives dict of tokens:tfidf

def tfidf(txt, idfs):
    """ Compute TF-IDF
    Args:
        txt: string
        idfs (dictionary): token to IDF value
    Returns:
        dictionary: a dictionary of records to TF-IDF values
    """
    tfs = tf(nor(txt)) #a dict of token and term freqencies
    
    resdict = {}
    
    for t in tfs:
        if t in idfs:
            resdict[t] = tfs[t] * idfs[t] #calculate TF-IDF
        else:
            resdict[t] = 0 #zero if word is NOT in idfs reference
            
    return resdict

Now the ranking function (only cossim-function is applied, as the comparison between the ranking functions is not the point of this project):

In [20]:
## implement cosine-similarity function(s)

# dot product and normalizing funs
def dotprod(a, b):
    return sum([a[t] * b[t] for t in a if t in b])

def norm(a):
    return math.sqrt(dotprod(a, a))

# and finally, the actual fun to calculate cossim from two dicts
def cossim(dict1, dict2):
    """ Compute cosine similarity between two strings
    Args:
        dict1: first dict of tokens and tf-idf values
        dict2: second dict of tokens and tf-idf values

    Returns:
        cossim: cosine similarity value
    """

    return dotprod(dict1, dict2) / norm(dict1) / norm(dict2)

In [21]:
## build ranking-function to compare query and corpus  - using cosine similarity
    
def get_ranking_cossim(query, ref_dict, threshold, idf_dict):
    
    """ calculate and return a ranking of movies relative to searchstring (query)
    input:
        query = searchstring
        ref_dict = reference dictionary of movies (title:dict(token:tfidf))
        threshold = threshold reference score (default = 1) above which results are returned
        idfs = reference dict with term idfs
    output:
        a sorted listof tuples (title, relevance score) with movies in decreasing similarity scores-order
    """
    
    ## first, apply tf() and nor() function to query
    q = tfidf(query, idf_dict) #dict of query tokens:tfidf
    
    ## second, iterate over ref_dict and compare query to values (calculate relevance score)
        ## the result is a dict with movie titles as key and the relevance score (rs) as value
    res = {}
    
    for film in ref_dict: #all films in ref_dict
        
        
        # calculate relevance score (cosine similarity)
        rs = cossim(q,ref_dict[film])
        
        # add relevance score for film to res-dict
        res[film] = rs 
        
    ## define a sorted list with decreasing relevance scores
    result1 = sorted(res.items(), key=lambda kv: kv[1], reverse = True)
    # and choose items over threshold
    result2 = [mov[0] for mov in result1 if mov[1] > threshold]
             
    ## finally, return result
    return result2

###3. Apply and time code from python-implementation

What we want in this section is the following:

- Apply the above-defined helper functions - build the same movie recommender as in the python-implementation.
- Time the crucial steps - as baseline for later performance improvement (the two crucial parts are: idfs and res_dict)

In [23]:
## first, prepare the timing of the python-implementation
py_pre = time.time()

In [24]:
## define number of rows in df that should be used for development
n_test = 1000

In [25]:
## idf-dict: make dict of idfs for movies (for lookup)

# apply to n_test cases
#idfs = idf(movies_ls[:n_test])

# apply to whole list of movies
idfs = idf(movies_ls)

In [26]:
## check content
idfs['steamboat'] #worked

In [27]:
## apply functions to  dataset - a dict results (title:list of tuples(title,TF))

res_dict = {} #dictionary with titles (keys) and result dict (values). result dict is token:tfidf
ts_resdict1 = time.time() #timestamp 1

# loop over movies  
#for i, row in movies_pd.head(n_test).iterrows(): #only for n_test cases
for i, row in movies_pd.iterrows(): #for all cases

    txt = row["All"]
    n_mov = i+1
    
    res_dict[row["Title"]] = tfidf(txt, idfs) 

ts_resdict2 = time.time() #timestamp 2
secs = round(ts_resdict2 - ts_resdict1,1) #analysis time

print("Analysed {} movies in {} seconds.".format(n_mov, secs))

In [28]:
## check content of result
res_dict['The Girl Who Stayed at Home'] # looks ok

Recommender should now work:

In [30]:
## check it it works
get_ranking_cossim("France action drama new test movie tarantino", 
                   ref_dict = res_dict, 
                   threshold = 0.03, 
                   idf_dict = idfs)

# works.

In [31]:
## and manifest the timing for the python implementation
py_post = time.time()
print("The python-implementation took {} minutes overall.".format(round((py_post - py_pre)/60,2)))

###4. Create the spark data structures

What we want in this section is the following:

- Create a spark-dataframe of movie-df for later parallelization
- Create an RDD of the "All" variable list for later parallelization

So lets create the spark dataframe:

In [34]:
## load data from csv file - create DataFrame (notabene: this is an RDD)
'''
movies = sqlContext.read.format("csv")\
.option("header","true")\
.option("delimiter", ",")\
.option("inferSchema", "true")\
.load("/FileStore/tables/movies_df.csv")
'''

# this does not work somehow - the resulting dataframe has weiredly shifted columns

In [35]:
## better: create spark dataframe from pandas df
movies_spk = sqlContext.createDataFrame(movies_pd.astype(str))

In [36]:
## cache the df
movies_spk.cache()

In [37]:
## check form
movies_spk.show(36) # looks ok

In [38]:
## check type
movies_spk #its a spark dataframe - good

In [39]:
## have a look at schema
movies_spk.printSchema() #looks good

In [40]:
## To test: Perform first SQL-query on dataframe (result is still an RDD)
movies_spk.registerTempTable("mov")
res1 = sqlContext.sql("select * from mov\
                      where Title == 'Inception'")
res1.show()

# (takes about 4 s)

In [41]:
## further test: run some spark transformations and actions:
movies_spk.select("Title", "Release Year").filter(movies_spk["Release Year"] > 2016).show(10) #works

Try to save the dataframe as parquet file... DOESNT WORK SO FAR

In [43]:
## write a parquet file for later use - DOESNT WORK...

# Remove the file if it exists
#dbutils.fs.rm("/tmp/movies_pq", True)

# then write
#movies_spk.write.parquet("/tmp/movies_pq")
#movies_spk.write.parquet('C:\\Users\\sever\\Google Drive\\Dokumente PC\\ZHAW\\CAS Information Engineering\\Modul Big Data\\project\\performance_analysis_movie_recommender\\movies_pq')

Now, save the python list with the "All" variable as RDD:

In [45]:
## python list to RDD
par_deg = "automatically chosen" #degree of parallelization - or is it stupid to set that manually?
movies_ls_rdd = sc.parallelize(movies_ls) #degree of parallelization chosen automatically

In [46]:
## check the dada type
movies_ls_rdd # an RDD - good

In [47]:
## run some basic operations on RDD
print(movies_ls_rdd.count())
print(movies_ls_rdd.takeSample(False, 1))
print(movies_ls_rdd.take(1))
#works

In [48]:
## now a .map operation - which will be important later

# goal: only take first 10 characters of text and swho the first three movies
print(movies_ls_rdd.map(lambda text: text[0:10]).take(3)) #works

# goal: apply nor() function, take frst token of first three movies
print(movies_ls_rdd.map(lambda text: nor(text)).map(lambda ls: ls[0]).take(3)) #works

###5. Create the spark helper functions

What we want in this section is the following:

- Parallalize the analysis in the two crucial steps (i.e. formulate the python helper functions as parallalizable spark-functions)

So now, lets re-formulate the the python helper functions for the recommender in "spark language". So that i can be parallelized on multiple clusters:

In [51]:
## normalizer function
## THIS STAYS THE SAME AS IN THE PYTHON IMPLEMENTATION. NO CHANGE NEEDED.


def nor(txt):
    
    ''' doc:
       input (txt) = text string that needs to be normalised
       output = a list of normalised tokens - NOT UNIQUE!
    '''
    
    ## tokenize
    #txt_2 = nltk.word_tokenize(txt) # this causes problems in .map functionality later...
    txt_2 = txt.split() #tokenize 
    
    ## remove stopwords
    txt_2 = [word for word in txt_2 if word not in stopw]
    
    ## remove special characters and capitals
    txt_2 = [re.sub(r'\W+', '', word).lower() for word in txt_2]
    
    ## remove empty indexes
    txt_2 = [word for word in txt_2 if len(word)>1]
    
    # return result: a list of normalised tokens - NOT UNIQUE! - and joined with named entities
    return txt_2

In [52]:
## function for TF out of list of not-unique normalised tokens (result from nor() function)
## THIS STAYS THE SAME AS IN THE PYTHON IMPLEMENTATION. NO CHANGE NEEDED.
    
def tf(lis):
    
    ''' doc:
       input: 
           lis = list of not-unique normalised tokens (result from nor() function)
       output = a dict  with token and term freqencies
    '''
        
    res = {} #dict of tokens in lis (key) and # occurances in lis (value)
    cindex = set() #set of tokens in txt
    
    nt = len(lis) # number of tokens in txt
    
    # for every word in list
    for t in lis:
        if t not in cindex:
            cindex.add(t)
            res[t] = 1/nt # /nt gives the normalisation for the term-frequency
        else:
            res[t] += 1/nt
    
    ## return result - a dict of token and term freqencies
    return res


In [53]:
## idf function - SPARK VERSION

# function that calculates idf for tokens in a collection of documents
# returns a dict (key = token, velue = idf)

def idf_spk(coll):
    
    """doc:
    in: coll = list (RDD) of of documents (dociments = strings of multiple tokens)
    out: pair of tokens and idf of token in collection
    """ 
    
    # count dumber of documents in collection
    N = float(coll.count())
    
    # get a list of (non-unique) normalised tokens
    lss = coll.flatMap(lambda doc: list(set(nor(doc)))) 
    
    # build tuples: (token, 1) for all tokens in lss
    token_count = lss.map(lambda token: (token, 1))
    
    # reduce: build tuple with (token, sum of occurances)
    token_sum = token_count.reduceByKey(lambda a, b : a + b)
    
    # calculate idf and return pair with (token, idf)
    return (token_sum.map(lambda tok: (tok[0], float(N/tok[1]))))

In [54]:
## test function
idf_spk(sc.parallelize(movies_ls[:3],2)).collect() # seems ok
#sc.parallelize(movies_ls[:10],2).flatMap(lambda doc: list(set(nor(doc)))).map(lambda token: (token, 1)).reduceByKey(lambda a, b : a + b).collect()

In [55]:
# compare results of python and spark function!

res_py = idf(movies_ls[:3]) # this is the python result
print(res_py)

res_spk = idf_spk(sc.parallelize(movies_ls[:3],2))
print(res_spk.collect())

# seems ok!

In [56]:
## tfidf function: combine tf with idf - function takes string and gives dict of tokens:tfidf
## THIS STAYS THE SAME AS IN THE PYTHON IMPLEMENTATION. NO CHANGE NEEDED.

def tfidf(txt, idfs):
    """ Compute TF-IDF
    Args:
        txt: string
        idfs (dictionary): token to IDF value
    Returns:
        dictionary: a dictionary of records to TF-IDF values
    """
    tfs = tf(nor(txt)) #a dict of token and term freqencies
    
    resdict = {}
    
    for t in tfs:
        if t in idfs:
            resdict[t] = tfs[t] * idfs[t] #calculate TF-IDF
        else:
            resdict[t] = 0 #zero if word is NOT in idfs reference
            
    return resdict

Now the ranking function (only cossim-function is applied, as the comparison between the ranking functions is not the point of this project):

In [58]:
## implement cosine-similarity function(s)
## THESE STAY THE SAME AS IN THE PYTHON IMPLEMENTATION. NO CHANGE NEEDED.

# dot product and normalizing funs
def dotprod(a, b):
    return sum([a[t] * b[t] for t in a if t in b])

def norm(a):
    return math.sqrt(dotprod(a, a))

# and finally, the actual fun to calculate cossim from two dicts
def cossim(dict1, dict2):
    """ Compute cosine similarity between two strings
    Args:
        dict1: first dict of tokens and tf-idf values
        dict2: second dict of tokens and tf-idf values

    Returns:
        cossim: cosine similarity value
    """

    return dotprod(dict1, dict2) / norm(dict1) / norm(dict2)

In [59]:
## build ranking-function to compare query and corpus  - using cosine similarity
    
def get_ranking_cossim_spk(query, ref_pairs, threshold, idf_dict):
    
    """ calculate and return a ranking of movies relative to searchstring (query)
    input:
        query = searchstring
        ref_pairs = reference rdd pairs of movies (title:dict(token:tfidf))
        threshold = threshold reference score above which results are returned
        idf_dict = reference dict with term idfs as pairs of (token, idf)
    output:
        a sorted listof tuples (title, relevance score) with movies in decreasing similarity scores-order
    """
    
    ## first, apply tfidf() function to query
    q = tfidf(query, idf_dict) #yields dict of query tokens:tfidf 
    
    ## second, go over ref_pairs and compare query to values (calculate relevance score)
        ## the result is a pair with movie titles as key and the relevance score (rs) as value
      
    result = (ref_pairs 
              .map(lambda film: (film[0], cossim(q,film[1]))))
              
    ## and return unordered result
    return result #.takeOrdered(10, lambda s: -1*s[1]) #take the first 10 #---------------------------------- this is the problem... the sorting takes forever...


###6. Apply the spark helper functions

What we want in this section is the following:

- Parallalize the analysis in the two crucial steps (i.e. apply the helper functions)
- Time the (parallalized) crucial steps
- Plot speed of parallalized version compared to baseline (maybe in next section...)

In [61]:
## prepare the timing of the spark-implementation
spk_pre = time.time()

First, the idfs-reference dict

In [63]:
##First, apply the idf_spk function and build the spark idf-reference
idfs_spk = idf_spk(movies_ls_rdd)#.cache() #lazy evaluation - only executed when action is called

In [64]:
## check type
idfs_spk #RDD, good

In [65]:
## and show the results 
# idfs_spk.takeSample(False,3) #this is an action

#takes only 2 min (on databricks community version)?! even without multiple clusters??

In [66]:
# check result - seems ok
# print(idfs_spk.count()) # number of tokens with idf-result

In [67]:
## collect idfs dictionary as python dict to driver, and BROADCAST the variable
## idf_spk() function is evaluated here - therefore it takes a while

idfs_spk_dict = idfs_spk.collectAsMap() #we use the collectAsMap() action to return the IDFs to the driver as a Python dictionary.
idfs_spk_dict_bc = sc.broadcast(idfs_spk_dict) #and we save it as broadcsted variable

In [68]:
## check types
print(type(idfs_spk_dict))
print(type(idfs_spk_dict_bc))

Now, make the "movie reference dict"

In [70]:
## apply tfidf to  dataset (lazy eval)
ts_resdict_spk1 = time.time() #timestamp 1

# go over movies (lazy evaluation)
movies_res_spk = (movies_spk #the dataframe of the whole dataset
                  .select("Title","All") #select the two important columns
                  .rdd #build an rdd (map functions cannot be applied to dataframes, it seems...)
                  .map(lambda row: (row[0],tfidf(row[1], idfs_spk_dict_bc.value))) #map to make pair of (title, tfidf-dict of text) ------------- SROCKINGER: here the broadcasted, correct?(not if the movies_res variable is broadcsted a well?!)
                  .cache()) 

ts_resdict_spk2 = time.time() #timestamp 2
secs_spk = round(ts_resdict_spk2 - ts_resdict_spk1,1) #analysis time

print("Analysed all movies in {} seconds. No surprise with lazy evaluation ;-)".format(secs_spk))

In [71]:
## check result - takes a while
print(movies_res_spk) #an RDD, good
#print(movies_res_spk.take(1)) #looks good
#print(movies_res_spk.count()) #seems correct

In [72]:
## collect and broadcast the movies_res_spk RDD (doesn't work if i try to broadcast the RDD directly (Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver,).)

#movies_res_spk_coll = movies_res_spk.collect() #collect to driver - takes a while - use this later to time the effect of the broadcast var

In [73]:
#  movies_res_spk_coll_bc = sc.broadcast(movies_res_spk_coll) # and broadcast
#  movies_res_spk_coll_bc #show type

# this works. however, i think its not necessary as the get_ranking_cossim_spk function takes an rdd as input

*Note:* The whole thing is way faster than with the python code - even without parallelization

##### Now test the whole recommender:

In [76]:
#quickly re-test tfidf() function
tfidf("a test bond", idfs_spk_dict_bc.value) # good

In [77]:
## then apply ranking-function (lazy evaluation again)
query_test = "France action drama new test movie tarantino"

result_q1 = get_ranking_cossim_spk(query = query_test, ref_pairs = movies_res_spk, threshold = 0.1, idf_dict = idfs_spk_dict_bc.value)
# result_q1 = get_ranking_cossim_spk(query = query_test, ref_pairs = movies_res_spk_coll_bc.value, threshold = 0.1, idf_dict = idfs_spk_dict_bc.value) #------- does not work with the broadcasted movies_res_spk_coll_bc

In [78]:
## type?
result_q1 #rdd

In [79]:
## and show results (get_ranking_cossim_spk function is evaluated here (and the definition of movies_res_spk is also only evaluated here) - therefore this step takes long with cold cache (i.e. not-yet-cached movies_res_spk))

#result_q1.take(100) #takes only 3s

# do the ordering on the result of the get_ranking_cossim_spk function:
result_q1.takeOrdered(10, lambda s: -1*s[1]) # this (sorting) used to take long! why??

# works!

## note:
# cold cache (first time): ca. 1.3 min
# warm cache (starting second time): under 2 seconds

# I think the point here might be the cashing of the movies_res_spk dict. Once this happened (and it happens in the first iteration of the code) the sorting is way faster...
# when the cashing does not happen (in the definition of the movies_res_spk variable) the code takes 1.2 min in any iteration!

In [80]:
## and manifest the timing for the python implementation
#spk_post = time.time()
print("The spark-implementation (20 cores) took {} seconds overall.".format(round((spk_post - spk_pre),1)))

###7. Multiply the dataset - for sepeed scaling experiment

What we want in this section is the following:

- multiply the two important RDDs (movies_spk, movies_ls_rdd). That is, create a new RDD that is just x times the old RDDs repeated.
- run the critical steps (i.e. ...) on 1, 2 and 4 nodes and see how the speed of the multiplied RDDs scale.

*Note:* "par_deg" in Cmd 43 has to be changed for the sensible use of multiple clusters... Best option is to choose the degree automatically.

In [82]:
## cjeck degree of parallelization
par_deg

In [83]:
## multiply the spark-datasets x 2

# multiply how many times?
multipl_1 = 2

# apply for movies_ls_rdd
movies_ls_rdd_mult2 = movies_ls_rdd.flatMap(lambda x: [x]*multipl_1)

# and for movies_spk
movies_spk_mult2 = movies_spk
itera1 = 1
while itera1 < multipl_1:
  movies_spk_mult2 = movies_spk_mult2.union(movies_spk)
  itera1 += 1

In [84]:
## multiply the spark-datasets x 4

# multiply how many times?
multipl_2 = 4

# apply for movies_ls_rdd
movies_ls_rdd_mult4 = movies_ls_rdd.flatMap(lambda x: [x]*multipl_2)

# and for movies_spk
movies_spk_mult4 = movies_spk
itera2 = 1
while itera2 < multipl_2:
  movies_spk_mult4 = movies_spk_mult4.union(movies_spk)
  itera2 += 1

In [85]:
## multiply the spark-datasets x 10

# multiply how many times?
multipl_3 = 10

# apply for movies_ls_rdd
movies_ls_rdd_mult10 = movies_ls_rdd.flatMap(lambda x: [x]*multipl_3)

# and for movies_spk
movies_spk_mult10 = movies_spk
itera3 = 1
while itera3 < multipl_3:
  movies_spk_mult10 = movies_spk_mult10.union(movies_spk)
  itera3 += 1

In [86]:
## see if it worked (numbers should be the same)
print(movies_ls_rdd.count() * 4)
print(movies_ls_rdd_mult2.count() * 2)
print(movies_ls_rdd_mult4.count())
print()
print(movies_spk.count() * 4)
print(movies_spk_mult2.count() * 2)
print(movies_spk_mult4.count())
print()
print(movies_spk_mult10.count()/10*4)

# looks ok

Now apply the "critical function" (i.e. the calculation of the idfs ref dict) for the multiplied datasets:

In [88]:
##First, apply the idf_spk function and build the spark idf-reference
idfs_spk_mult1 = idf_spk(movies_ls_rdd) #lazy. this has been done above - but now new name for speed comparison
idfs_spk_mult2 = idf_spk(movies_ls_rdd_mult2) #a lazy evaluation
idfs_spk_mult4 = idf_spk(movies_ls_rdd_mult4) #again a lazy evaluation
idfs_spk_mult10 = idf_spk(movies_ls_rdd_mult10) #again a lazy evaluation

Now evaluate and compare running time - always make a list of 10 iterations of the *cold cache*-version of the respective analysis (action: count of total tokens - should be the same in all three cases as there are the same tokens)

In [90]:
## dataset x 1

# result list
times_dx1 = []

# timing, 10 times
for i in range(10):
  
  # re-define idfs - so that the count() action later is always "cold cache" and the whole calculation is actually executed
  idfs_spk_mult1 = idf_spk(movies_ls_rdd) #lazy.
  
  # then the timing
  t1_1 = time.time()
  idfs_spk_mult1.count()
  t2_1 = time.time()

  #append result to list
  times_dx1.append(t2_1 - t1_1)
  
  # show progress
  print("Iteration Nr. {} done ...".format(i+1))

print()
print("The mean for the data x1 is {} seconds.".format(round(numpy.mean(times_dx1),1)))

In [91]:
## dataset x 2

# result list
times_dx2 = []

# timing, 10 times
for i in range(10):
  
  # re-define idfs - so that the count() action later is always "cold cache" and the whole calculation is actually executed
  idfs_spk_mult2 = idf_spk(movies_ls_rdd_mult2)
  
  # then the timing
  t1_2 = time.time()
  idfs_spk_mult2.count()
  t2_2 = time.time()

  #append result to list
  times_dx2.append(t2_2 - t1_2)
  
  # show progress
  print("Iteration Nr. {} done ...".format(i+1))

print()
print("The mean for the data x2 is {} seconds.".format(round(numpy.mean(times_dx2),1)))

In [92]:
## dataset x 4

# result list
times_dx4 = []

# timing, 10 times
for i in range(10):
  
  # re-define idfs - so that the count() action later is always "cold cache" and the whole calculation is actually executed
  idfs_spk_mult4 = idf_spk(movies_ls_rdd_mult4)
  
  # then the timing
  t1_4 = time.time()
  idfs_spk_mult4.count()
  t2_4 = time.time()

  #append result to list
  times_dx4.append(t2_4 - t1_4)
  
  # show progress
  print("Iteration Nr. {} done ...".format(i+1))

print()
print("The mean for the data x4 is {} seconds.".format(round(numpy.mean(times_dx4),1)))

In [93]:
## dataset x 10

# result list
times_dx10 = []

# timing, 10 times
for i in range(10):
  
  # re-define idfs - so that the count() action later is always "cold cache" and the whole calculation is actually executed
  idfs_spk_mult10 = idf_spk(movies_ls_rdd_mult10)
  
  # then the timing
  t1_10 = time.time()
  idfs_spk_mult10.count()
  t2_10 = time.time()

  #append result to list
  times_dx10.append(t2_10 - t1_10)
  
  # show progress
  print("Iteration Nr. {} done ...".format(i+1))

print()
print("The mean for the data x10 is {} seconds.".format(round(numpy.mean(times_dx10),1)))

Keep this for the DAG-visualization

In [95]:
# dataset x 10
idfs_spk_mult10.count()

Now, time the effect of the broadcast-variable in the

In [97]:
## apply tfidf to 4x dataset (lazy eval) - same as above, just again to time the broadcast-effect

# go over movies (lazy evaluation) - WITH BROADCAST
movies_res_spk_bc = (movies_spk_mult10 #the dataframe of the whole dataset
                  .select("Title","All") #select the two important columns
                  .rdd #build an rdd (map functions cannot be applied to dataframes, it seems...)
                  .map(lambda row: (row[0],tfidf(row[1], idfs_spk_dict_bc.value))) #here be the broadcast
                  .cache()) 

# go over movies (lazy evaluation) - NO BROADCAST
movies_res_spk_nbc = (movies_spk_mult10 #the dataframe of the whole dataset
                  .select("Title","All") #select the two important columns
                  .rdd #build an rdd (map functions cannot be applied to dataframes, it seems...)
                  .map(lambda row: (row[0],tfidf(row[1], idfs_spk_dict))) #here be the broadcast
                  .cache()) 

In [98]:
## broadcasted
movies_res_spk_coll_bc = movies_res_spk_bc.count() #collect to driver

In [99]:
movies_res_spk_coll_bc

In [100]:
## non-broadcasted
movies_res_spk_coll_nbc = movies_res_spk_nbc.count() #collect to driver

In [101]:
movies_res_spk_coll_nbc

The broadcast does not seem to do much... For reasons see the following text (from big data exercise week 4):

"The solution in (3c) works well for small datasets, but it requires Spark to (automatically) send the idfsSmallWeights variable to all the workers. If we didn't cache() similarities, then it might have to be recreated if we run similar() multiple times. This would cause Spark to send idfsSmallWeights every time.
Instead, we can use a broadcast variable - we define the broadcast variable in the driver and then we can refer to it in each worker. Spark saves the broadcast variable at each worker, so it is only sent once."

Mayve, in our case, the idfs-dict is sent only once to each worker anyway... therefore its no wonder that the broadcast does not do much...?