In [1]:
from pyspark.sql import SparkSession

from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType
import numpy as np
import random
from pyspark.sql.functions import udf
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import pyspark
print(pyspark.__version__)
import re

2.4.4


In [2]:
spark = SparkSession.builder.master("local[*]").appName("Assign1").getOrCreate()    
spark.conf.set("spark.executor.memory", '30g')
spark.conf.set("spark.driver.memory",'30g')

### Part 1

In [3]:
AmazonDF = spark.read.csv("Amazon.csv",header=True,sep=",",inferSchema=True).limit(100)
GoogleDF = spark.read.csv("Google.csv",header=True,sep=",",inferSchema=True).limit(100)

### Part 2

#### (a) Implement a function that takes a string and returns non-empty tokens by splitting using regular expressions.


In [5]:
def tokenize(string,stopWordsList):
    
    words = re.split('\W+|\s|\.',string)
    
    words = [word for word in words if word not in stopWordsList and word != '']
    
    
    return words

tokenize('he hello know','')

['he', 'hello', 'know']

In [7]:
stopWordsList =  spark.read.csv("stopwords.txt",header=True,sep='\n',inferSchema=True).collect()
stopWordsList = [i['!!'] for i in stopWordsList]

In [8]:
GoogleDF = GoogleDF.filter(GoogleDF.description.isNotNull())
googleTokens = GoogleDF.select('id','description').rdd.map(lambda x: (x['id'],tokenize(x['description'],stopWordsList)))
googleTokensDf = spark.createDataFrame(googleTokens, ["id", "tokenised_description"])
googleTokensDf.show()

+--------------------+---------------------+
|                  id|tokenised_description|
+--------------------+---------------------+
|http://www.google...| [learning, quickb...|
|http://www.google...| [fun, reading, wr...|
|http://www.google...| [qb, pos, 6, 0, b...|
|http://www.google...| [save, spectacle,...|
|http://www.google...| [adobe, cs3, prod...|
|http://www.google...| [corel, video, st...|
|http://www.google...| [whether, working...|
|http://www.google...| [qb, pos, 6, 0, p...|
|http://www.google...| [quickbooks, cred...|
|http://www.google...| [sony, media, sof...|
|http://www.google...| [qb, pos, 6, 0, p...|
|http://www.google...| [decide, fate, ga...|
|http://www.google...| [based, tween, li...|
|http://www.google...| [cisco, systems, ...|
|http://www.google...| [wasp, bar, code,...|
|http://www.google...| [axis, communicat...|
|http://www.google...| [hp, eu063av, aba...|
|http://www.google...| [ibm, bb0gyna, us...|
|http://www.google...| [equisys, eqzfn07...|
|http://ww

In [9]:
AmazonDF = AmazonDF.filter(AmazonDF.description.isNotNull())
amazonTokens = AmazonDF.select('id','description').rdd.map(lambda x: (x['id'],tokenize(x['description'],stopWordsList)))
amazonTokensDf = spark.createDataFrame(amazonTokens, ["id", "tokenised_description"])
amazonTokensDf.show()

+----------+---------------------+
|        id|tokenised_description|
+----------+---------------------+
|b0006zf55o| [oem, arcserve, b...|
|b000g80lqo| [peachtree, premi...|
|b0006se5bq| [singing, coach, ...|
|b000ehpzv8| [emc, retrospect,...|
|b00021xhzw| [upgrade, install...|
|b000gzwjgc| [marketing, infor...|
|b0000dbykm| [mia, s, math, ad...|
|b00029bqa2| [disney, s, 1st, ...|
|b0007prnjo| [many, times, hea...|
|b000aazr5i| [marketing, infor...|
|b000bhl1r8| [sql, server, com...|
|b00006hmwc| [reference, domin...|
|b00006hvvo| [today, enterpris...|
|b0000ycfcw| [topics, presents...|
|b00002sac9| [now, featuring, ...|
|b000bcz8ng| [world, book, enc...|
|b000fm18vi| [chord, display, ...|
|b00009apna| [complete, easy, ...|
|b0009rgzgm| [use, computer, r...|
|b000o24l3q| [note, upgrade, v...|
+----------+---------------------+
only showing top 20 rows



### Part 3

In [49]:
from collections import Counter



def getTermFrequency(inpList):
    freqDict = Counter(inpList)
    return dict(freqDict)


googleTermFreq=googleTokens.map(lambda x:(x[0],getTermFrequency(x[1])))
googleTermFreq = spark.createDataFrame(googleTermFreq, ["id", "value"])
googleTermFreq.show(3)


+--------------------+--------------------+
|                  id|               value|
+--------------------+--------------------+
|http://www.google...|[quickbooks -> 1,...|
|http://www.google...|[solving -> 1, re...|
|http://www.google...|[retailers -> 1, ...|
+--------------------+--------------------+
only showing top 3 rows



In [48]:
amazonTermFreq=amazonTokens.map(lambda x:(x[0],getTermFrequency(x[1])))
amazonTermFreq = spark.createDataFrame(amazonTermFreq, ["id", "value"])
amazonTermFreq.show(3)

+----------+--------------------+
|        id|               value|
+----------+--------------------+
|b0006zf55o|[1 -> 1, arcserve...|
|b000g80lqo|[year -> 1, advan...|
|b0006se5bq|[singing -> 1, nt...|
+----------+--------------------+
only showing top 3 rows



#### Using HashingTF and IDF functions 

In [43]:
hashingTF = HashingTF(inputCol="tokenised_description", outputCol="rawFeatures")
featurizedData = hashingTF.transform(amazonTokensDf)


In [25]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [27]:
rescaledData.select("id", "features").show()

+----------+--------------------+
|        id|            features|
+----------+--------------------+
|b0006zf55o|(262144,[39881,55...|
|b000g80lqo|(262144,[15,9886,...|
|b0006se5bq|(262144,[47491,59...|
|b000ehpzv8|(262144,[13087,63...|
|b00021xhzw|(262144,[15,353,4...|
|b000gzwjgc|(262144,[329,1879...|
|b0000dbykm|(262144,[19492,19...|
|b00029bqa2|(262144,[20495,21...|
|b0007prnjo|(262144,[1998,578...|
|b000aazr5i|(262144,[573,2437...|
|b000bhl1r8|(262144,[966,1799...|
|b00006hmwc|(262144,[15013,17...|
|b00006hvvo|(262144,[15,4525,...|
|b0000ycfcw|(262144,[30425,34...|
|b00002sac9|(262144,[413,5795...|
|b000bcz8ng|(262144,[1652,105...|
|b000fm18vi|(262144,[39221,80...|
|b00009apna|(262144,[15664,17...|
|b0009rgzgm|(262144,[15,13828...|
|b000o24l3q|(262144,[15,353,2...|
+----------+--------------------+
only showing top 20 rows



### Part 4
Combine the datasets to create a corpus. Each element of the corpus is a <key, value> pair where key is ID and value is associated tokens from two datasets combined

In [54]:
combinedText=googleTokensDf.union(amazonTokensDf)

combinedText.show(10)


+--------------------+---------------------+
|                  id|tokenised_description|
+--------------------+---------------------+
|http://www.google...| [learning, quickb...|
|http://www.google...| [fun, reading, wr...|
|http://www.google...| [qb, pos, 6, 0, b...|
|http://www.google...| [save, spectacle,...|
|http://www.google...| [adobe, cs3, prod...|
|http://www.google...| [corel, video, st...|
|http://www.google...| [whether, working...|
|http://www.google...| [qb, pos, 6, 0, p...|
|http://www.google...| [quickbooks, cred...|
|http://www.google...| [sony, media, sof...|
+--------------------+---------------------+
only showing top 10 rows



### Part 5
Write an IDF function that return a pair RDD where key is each unique token and value is corresponding IDF value. Plot a histogram of IDF values. 

In [None]:
def getIDF()