# Data Engineer Test (Part 3)

In [1]:
#importing all required libraries ahead of time :)
#easy to spot any libraries not currently not installed in ur machine
%run kaggle.py
import json
from pyspark.sql import SparkSession,Row
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,StopWordsRemover, CountVectorizer,VectorAssembler
from pyspark.ml import Pipeline

## Question 4
String similarity (code in python):<br>
a) Download test.csv from https://www.kaggle.com/rishisankineni/text-similarity/data<br>
b) Load the data to a Spark/Pandas data frame<br>
c) Calculate similarity between description_x and description_y and store resultant<br>
scores in a new column<br>
d) Parallelise the matching process (bonus)<br>

In [2]:
#Kaggle credentials
# $ cp cred.json.example cred.json
# $ vim cred.json 
cred = json.load(open('cred.json'))

#login first and create browser instance
kaggle = Kaggle()
kaggle.login(cred['UserName'],cred['Password'])
local_file = "q4_test.csv"
dataset_url = "https://www.kaggle.com/rishisankineni/text-similarity/downloads/test.csv"
kaggle.download_dataset(dataset_url,local_file)

In [3]:
spark = SparkSession.builder.appName('test').getOrCreate()

In [4]:
q4_df = spark.read.csv("q4_test.csv",inferSchema=True,header=True)

In [5]:
q4_df.show()

+-------+--------------------+--------------------+-------------+
|test_id|       description_x|       description_y|same_security|
+-------+--------------------+--------------------+-------------+
|      0|        semtech corp| semtech corporation|         null|
|      1|vanguard mid cap ...|vanguard midcap i...|         null|
|      2|spdr gold trust g...|spdr gold trust s...|         null|
|      3|vanguard total bo...|vanguard total bo...|         null|
|      4|oakmark internati...|oakmark internati...|         null|
|      5|pfizer inc div: 1...|      pfizer inc com|         null|
|      6|spartan global ex...|sptn glb xus idx adv|         null|
|      7|vanguard total bo...|vanguard total bo...|         null|
|      8|banco latinoameri...|banco latinoameri...|         null|
|      9|baidu inc fadr 1 ...|baidu inc spons a...|         null|
|     10|  whole foods market|whole foods marke...|         null|
|     11|walgreens boots a...|walgreens boots alli|         null|
|     12|d

#### Pipeline

In [6]:
tokenizer = Tokenizer(inputCol="description_x", outputCol="x_token")
stopremove = StopWordsRemover(inputCol='x_token',outputCol='x_stop_tokens')
#count_vec = CountVectorizer(inputCol='x_stop_tokens',outputCol='x_vec')
hash_tf = HashingTF(inputCol='x_stop_tokens',outputCol='x_vec',numFeatures=20)
idf = IDF(inputCol="x_vec", outputCol="x_tf_idf")
#clean_up = VectorAssembler(inputCols=['test_id','x_tf_idf'],outputCol='features')

data_prep_pipe = Pipeline(stages=[tokenizer,stopremove,hash_tf,idf])
pipe = data_prep_pipe.fit(q4_df)
_ = pipe.transform(q4_df)

In [7]:
tokenizer = Tokenizer(inputCol="description_y", outputCol="y_token")
stopremove = StopWordsRemover(inputCol='y_token',outputCol='y_stop_tokens')
hash_tf = HashingTF(inputCol='y_stop_tokens',outputCol='y_vec',numFeatures=20)
idf = IDF(inputCol="y_vec", outputCol="y_tf_idf")
#clean_up = VectorAssembler(inputCols=['test_id','y_tf_idf'],outputCol='features')

data_prep_pipe = Pipeline(stages=[tokenizer,stopremove,hash_tf,idf])
pipe = data_prep_pipe.fit(_)
_ = pipe.transform(_)

In [8]:
df = _.select(['test_id','x_tf_idf','y_tf_idf'])
df.head(1)

[Row(test_id=0, x_tf_idf=SparseVector(20, {7: 1.0223, 13: 1.5031}), y_tf_idf=SparseVector(20, {5: 1.7821, 13: 1.5031}))]

In [10]:
# The map operation should be able to distribute to other clusters (if got any)
# cosine similarity
#1 - x['x_tf_idf'].dot(x['y_tf_idf'])/(x['x_tf_idf'].norm(2)*x['y_tf_idf'].norm(2))
#x['x_tf_idf'].squared_distance(x['y_tf_idf'])
# choose the built in squared distance
new_df = df.select(['test_id','x_tf_idf','y_tf_idf']).rdd.map(
    lambda x : Row( distance=float(1 - x['x_tf_idf'].dot(x['y_tf_idf'])/(x['x_tf_idf'].norm(2)*x['y_tf_idf'].norm(2))), test_id=x['test_id'])
).toDF()

new_df.show()

+-------------------+-------+
|           distance|test_id|
+-------------------+-------+
|0.46688613087875275|      0|
| 0.5382639097773301|      1|
|  0.030247583431531|      2|
| 0.2503203464381226|      3|
| 0.6191473640349485|      4|
|0.27824635258317754|      5|
|0.47763156314799937|      6|
|0.31785068032330166|      7|
| 0.1576517596167727|      8|
| 0.5400476750834667|      9|
| 0.4467747348264962|     10|
|0.36047764793855985|     11|
|0.15010684463071922|     12|
|0.14710627737376425|     13|
|  0.374305168167954|     14|
| 0.5894585797698484|     15|
|0.17665286616467601|     16|
| 0.2369017380111672|     17|
| 0.4979437935526283|     18|
| 0.5078347878293663|     19|
+-------------------+-------+
only showing top 20 rows



In [11]:
# sorted desc
new_df.orderBy(new_df.distance.desc()).show()

+------------------+-------+
|          distance|test_id|
+------------------+-------+
|               1.0|    425|
|               1.0|     22|
|               1.0|    431|
|               1.0|    246|
|               1.0|    187|
|               1.0|    139|
|               1.0|    269|
|0.9416581659608296|    202|
|0.9361048010849069|    245|
|0.9141509546609883|    155|
|0.9135757522099837|    157|
| 0.912422554583911|    197|
|0.9104215165809344|    416|
|0.9104215165809344|    234|
|0.9102134450999559|     71|
|0.9031583971532898|    256|
|0.8902769483214518|     82|
|0.8894599278033617|     33|
|0.8729578504863073|    237|
|0.8722041236986973|    448|
+------------------+-------+
only showing top 20 rows



In [12]:
#Let't inspect manually
df1 = new_df.alias('df1')
df2 = q4_df.alias('df2')

df1.join(df2, df1.test_id == df2.test_id).select(["df1.*","df2.description_x","df2.description_y"]).orderBy(df1.distance.desc()).show()

+------------------+-------+--------------------+--------------------+
|          distance|test_id|       description_x|       description_y|
+------------------+-------+--------------------+--------------------+
|               1.0|    425|vanguard extended...|vang ext mkt idx ins|
|               1.0|     22|vanguard total bo...|vang tot bd mk is pl|
|               1.0|    431|     fid sel biotech|fidelity select b...|
|               1.0|    246|the growth fund o...|amer fds grwth fd...|
|               1.0|    187| vang tot bd mkt adm|vanguard total bo...|
|               1.0|    139|        coca cola co|   coca-cola company|
|               1.0|    269|vanguard total bo...|vang tot bd mkt inst|
|0.9416581659608296|    202|ishares core msci...|harding loevner e...|
|0.9361048010849069|    245|vanguard short te...|vanguard short-te...|
|0.9141509546609883|    155|vanguard total bo...|vang tot bd mk is pl|
|0.9135757522099837|    157|vanguard mid-cap ...|vang midcap idx inst|
| 0.91