In [1]:
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.ensemble import RandomForestRegressor, BaggingRegressor
from nltk.stem.snowball import SnowballStemmer

In [4]:
sc

<pyspark.context.SparkContext at 0x10bbbab38>

In [5]:
spark = SparkSession(sc)

In [7]:
spark

<pyspark.sql.session.SparkSession at 0x110edf860>

### Load Data
```
In this step, we load the four dataset, and join the attributes, description, and traning data
```

In [137]:
test_data = pd.read_csv("test.csv",encoding ='latin1')
train_data = pd.read_csv('train.csv',encoding ='latin1')
attributes = pd.read_csv("attributes.csv",encoding ='latin1')
product_description = pd.read_csv("product_descriptions.csv",encoding ='latin1') 

In [138]:
def merge(attr):
    return " ".join(list(attr["value"]))

In [139]:
df_without1 = train_data[train_data["product_title"].str.contains("without")]
df_without2 = test_data[test_data["product_title"].str.contains("without")]
df_without3 = train_data[train_data["search_term"].str.contains("without")]
df_without4 = test_data[test_data["search_term"].str.contains("without")]


In [140]:
print(df_without1.size/train_data.size)
print(df_without2.size/test_data.size)
print(df_without3.size/train_data.size)
print(df_without4.size/test_data.size)

0.00102609799236
0.0011338208563
0.000337532234328
0.000269957346739


In [141]:
attributes.dropna(how="all", inplace=True)
attributes["product_uid"] = attributes["product_uid"].astype(int)
attributes["value"] = attributes["value"].astype(str)
product_attributes = attributes.groupby("product_uid").apply(merge)
product_attributes = product_attributes.reset_index(name="product_attributes")

In [142]:
train_data = pd.merge(train_data, product_attributes, how="left", on="product_uid")

In [143]:
train_data = pd.merge(train_data, product_description, how="left", on="product_uid")

In [88]:
train_data.head(2)

Unnamed: 0,id,product_uid,product_title,search_term,relevance,product_attributes,product_description
0,2,100001,Simpson Strong-Tie 12-Gauge Angle,angle bracket,3.0,Versatile connector for various 90Â° connectio...,"Not only do angles make joints stronger, they ..."
1,3,100001,Simpson Strong-Tie 12-Gauge Angle,l bracket,2.5,Versatile connector for various 90Â° connectio...,"Not only do angles make joints stronger, they ..."


In [144]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType,StringType,IntegerType

sql_sc = SQLContext(sc)

trainSchema = StructType([
    StructField("id", IntegerType()),
    StructField("uid", IntegerType()),
    StructField("title", StringType()),
    StructField("term", StringType()),
    StructField("score", DoubleType()),
    StructField("attributes", StringType()),
    StructField("description", StringType())
])

rawTrain = sql_sc.createDataFrame(train_data, trainSchema)
rawTrain.show(3)

+---+------+--------------------+-------------+-----+--------------------+--------------------+
| id|   uid|               title|         term|score|          attributes|         description|
+---+------+--------------------+-------------+-----+--------------------+--------------------+
|  2|100001|Simpson Strong-Ti...|angle bracket|  3.0|Versatile connect...|Not only do angle...|
|  3|100001|Simpson Strong-Ti...|    l bracket|  2.5|Versatile connect...|Not only do angle...|
|  9|100002|BEHR Premium Text...|    deck over|  3.0|Brush,Roller,Spra...|BEHR Premium Text...|
+---+------+--------------------+-------------+-----+--------------------+--------------------+
only showing top 3 rows



### Correct the spell mistakes in search terms
```
In this step we replace the words that is spelled wrong in search terms, with correct terms. The spell check dictionary is shared by a kaggle team: www.kaggle.com/steubk/fixing-typos
We learned to use broadcast variable in Spark.

```

In [145]:
from spellCheckDict import spell_check_dict  

In [146]:
bc_search_term = sc.broadcast(spell_check_dict)

In [147]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType,StringType,IntegerType
def spellCheck(val):
    if val in bc_search_term.value:
        return bc_search_term.value.get(val)
    else:
        return val
scUDF =udf(spellCheck, StringType())

rawTrain = rawTrain.withColumn("term", scUDF("term")) #
rawTrain.show(5)

+---+------+--------------------+------------------+-----+--------------------+--------------------+
| id|   uid|               title|              term|score|          attributes|         description|
+---+------+--------------------+------------------+-----+--------------------+--------------------+
|  2|100001|Simpson Strong-Ti...|     angle bracket|  3.0|Versatile connect...|Not only do angle...|
|  3|100001|Simpson Strong-Ti...|         l bracket|  2.5|Versatile connect...|Not only do angle...|
|  9|100002|BEHR Premium Text...|         deck over|  3.0|Brush,Roller,Spra...|BEHR Premium Text...|
| 16|100005|Delta Vero 1-Hand...|  rain shower head| 2.33|Combo Tub and Sho...|Update your bathr...|
| 17|100005|Delta Vero 1-Hand...|shower only faucet| 2.67|Combo Tub and Sho...|Update your bathr...|
+---+------+--------------------+------------------+-----+--------------------+--------------------+
only showing top 5 rows



### with, without
```
There is a situation, where in product_title it says the product is "without ABC" while in the search term the user is searching for "ABC". When using our machine learning algorithm, the algorithm will think the product matches users need because the search words appear in the title. This is apparently wrong.

To deal with this, we could have seperate out the words appears after the "without" and give it negative weight. Yet for computation purpose we decide to remove the "without ABC" from the title and search words.

Example data：

"id","product_uid","product_title","search_term","relevance"

22888,104429,"Ryobi One+ 18-Volt Lithium-ion Shaft Cordless Electric String Trimmer and Edger without Battery and Charger","18v battery charger",2
2050,100368,"Niza Pro 2-piece 1.28 GPF Single Flush Round Toilet without Toilet Seat in White","buikids toilet seat",2


```

In [152]:
def find_without(sentence):
    sentenceL = sentence.split()
    #print(sentenceL)
    for i in range(len(sentenceL)):
        #print(sentenceL[i])
        if sentenceL[i] == "without":
            sentenceL = sentenceL[:i]
            return " ".join(sentenceL)
    return sentence
withoutUDF = udf(find_without,StringType())

In [153]:
rawTrain1 = rawTrain.withColumn("term", withoutUDF(rawTrain["term"])) 
#rawTrain = rawTrain.withColumn("title", withoutUDF("title")) 

In [155]:
rawTrain1.show(1)

+---+------+--------------------+-------------+-----+--------------------+--------------------+
| id|   uid|               title|         term|score|          attributes|         description|
+---+------+--------------------+-------------+-----+--------------------+--------------------+
|  2|100001|Simpson Strong-Ti...|angle bracket|  3.0|Versatile connect...|Not only do angle...|
+---+------+--------------------+-------------+-----+--------------------+--------------------+
only showing top 1 row



### Feature Engineering
```
We tokenize the sentences, remove stop words, stemmed the words, and generated 1-2-3-4-5 grams and compare ngram match. We also believe matching longer continuous words refers a higher correlation between product and search term. So we also use the length of words match and the length of the search term as our features.
We picked 5 as maximum n because we believe matching 5 continuous words should already mean that the two sentences are very similar, thus will not introduce more than 5 grams, which will demand more computational power.  
```

In [156]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType,StringType,IntegerType,ArrayType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import NGram
from pyspark.ml.feature import CountVectorizer

sb = SnowballStemmer("english") 

def stem_word(text):
    return [sb.stem(word) for word in text]
stemUDF = udf(stem_word,ArrayType(StringType())) #不写type就默认string了，虽然是"[]"

#process title
tokenizer = Tokenizer(inputCol="title", outputCol="title_1")
temp = tokenizer.transform(rawTrain1)

remover1 = StopWordsRemover(inputCol="title_1", outputCol="title_2")
temp = remover1.transform(temp)

temp = temp.withColumn("title_words", stemUDF("title_2"))

#hashingTF = HashingTF(inputCol="title_words", outputCol="rawFeatures", numFeatures=16)
#temp = hashingTF.transform(temp)
#idf = IDF(inputCol = "rawFeatures", outputCol="title_idf")
#idfModel = idf.fit(temp)
#temp = idfModel.transform(temp)

ngram = NGram(n=2, inputCol="title_words", outputCol="title_2grams")
temp = ngram.transform(temp)

ngram = NGram(n=3, inputCol="title_words", outputCol="title_3grams")
temp = ngram.transform(temp)

ngram = NGram(n=4, inputCol="title_words", outputCol="title_4grams")
temp = ngram.transform(temp)

ngram = NGram(n=5, inputCol="title_words", outputCol="title_5grams")
temp = ngram.transform(temp)

temp=temp.drop("rawFeatures").drop("title").drop("title_1").drop("title_2")



In [157]:
#process search term

tokenizer = Tokenizer(inputCol="term", outputCol="term_1")
temp = tokenizer.transform(temp)

remover1 = StopWordsRemover(inputCol="term_1", outputCol="term_2")
temp = remover1.transform(temp)

temp = temp.withColumn("term_words", stemUDF("term_2"))

#hashingTF = HashingTF(inputCol="term_words", outputCol="rawFeatures", numFeatures=16)
#temp = hashingTF.transform(temp)
#idf = IDF(inputCol = "rawFeatures", outputCol="term_idf")
#idfModel = idf.fit(temp)
#temp = idfModel.transform(temp)

ngram = NGram(n=2, inputCol="term_words", outputCol="term_2grams")
temp = ngram.transform(temp)

ngram = NGram(n=3, inputCol="term_words", outputCol="term_3grams")
temp = ngram.transform(temp)

ngram = NGram(n=4, inputCol="term_words", outputCol="term_4grams")
temp = ngram.transform(temp)

ngram = NGram(n=5, inputCol="term_words", outputCol="term_5grams")
temp = ngram.transform(temp)

temp=temp.drop("rawFeatures").drop("term").drop("term_1").drop("term_2")

In [158]:
#process description
tokenizer = Tokenizer(inputCol="description", outputCol="description_1")
temp = tokenizer.transform(temp)

remover1 = StopWordsRemover(inputCol="description_1", outputCol="description_2")
temp = remover1.transform(temp)

temp = temp.withColumn("description_words", stemUDF("description_2"))

#hashingTF = HashingTF(inputCol="description_words", outputCol="rawFeatures", numFeatures=16)
#temp = hashingTF.transform(temp)
#idf = IDF(inputCol = "rawFeatures", outputCol="description_idf")
#idfModel = idf.fit(temp)
#temp = idfModel.transform(temp)

ngram = NGram(n=2, inputCol="description_words", outputCol="description_2grams")
temp = ngram.transform(temp)

ngram = NGram(n=3, inputCol="description_words", outputCol="description_3grams")
temp = ngram.transform(temp)

ngram = NGram(n=4, inputCol="description_words", outputCol="description_4grams")
temp = ngram.transform(temp)

ngram = NGram(n=5, inputCol="description_words", outputCol="description_5grams")
temp = ngram.transform(temp)

temp=temp.drop("rawFeatures").drop("description").drop("description_1").drop("description_2")

In [159]:
#process attributes

tokenizer = Tokenizer(inputCol="attributes", outputCol="attributes_1")
temp = tokenizer.transform(temp)

remover1 = StopWordsRemover(inputCol="attributes_1", outputCol="attributes_2")
temp = remover1.transform(temp)

temp = temp.withColumn("attributes_words", stemUDF("attributes_2"))

#hashingTF = HashingTF(inputCol="attributes_words", outputCol="rawFeatures", numFeatures=16)
#temp = hashingTF.transform(temp)
#idf = IDF(inputCol = "rawFeatures", outputCol="attributes_idf")
#idfModel = idf.fit(temp)
#temp = idfModel.transform(temp)

ngram = NGram(n=2, inputCol="attributes_words", outputCol="attributes_2grams")
temp = ngram.transform(temp)

ngram = NGram(n=3, inputCol="attributes_words", outputCol="attributes_3grams")
temp = ngram.transform(temp)

ngram = NGram(n=4, inputCol="attributes_words", outputCol="attributes_4grams")
temp = ngram.transform(temp)

ngram = NGram(n=5, inputCol="attributes_words", outputCol="attributes_5grams")
temp = ngram.transform(temp)

temp=temp.drop("rawFeatures").drop("attributes").drop("attributes_1").drop("attributes_2")

In [160]:
from pyspark.sql.functions import length  #string
from pyspark.sql.functions import size  #array

temp = temp.withColumn("search_term_length",size("term_words"))

In [162]:
#This is the step verifying the relevance score when the search term length is zero after preprocessing.
#We can see that these scores are very low, so the result is verified. 
temp.where(temp["search_term_length"]==0).select("id","score").show()

+------+-----+
|    id|score|
+------+-----+
| 11752|  1.0|
| 16601| 1.67|
| 27425|  1.0|
| 27953| 1.67|
| 28230|  2.0|
| 55100| 1.67|
| 55372|  1.0|
| 55762| 1.67|
| 55898| 1.33|
| 68566|  1.0|
| 69261|  1.0|
| 70056| 2.33|
| 76500| 1.67|
| 81246| 1.67|
| 95573| 2.33|
|105334| 1.67|
|108561| 1.67|
|114426| 1.67|
|119352|  2.0|
|155970|  1.0|
+------+-----+



```
Example rows with "zero" search term length after preprocessing the text:

"id","product_uid","product_title","search_term","relevance"

16601,102930,"Hampton Bay Adonia 52 in. Oil Rubbed Bronze Ceiling Fan","or",1.67
27425,105706,"Hampton Bay Waterton II 52 in. Oil Rubbed Bronze Ceiling Fan","or",1
```

In [163]:
temp.columns

['id',
 'uid',
 'score',
 'title_words',
 'title_2grams',
 'title_3grams',
 'title_4grams',
 'title_5grams',
 'term_words',
 'term_2grams',
 'term_3grams',
 'term_4grams',
 'term_5grams',
 'description_words',
 'description_2grams',
 'description_3grams',
 'description_4grams',
 'description_5grams',
 'attributes_words',
 'attributes_2grams',
 'attributes_3grams',
 'attributes_4grams',
 'attributes_5grams',
 'search_term_length']

In [164]:
temp.show(1)

+---+------+-----+--------------------+--------------------+--------------------+--------------------+------------+---------------+--------------+-----------+-----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
| id|   uid|score|         title_words|        title_2grams|        title_3grams|        title_4grams|title_5grams|     term_words|   term_2grams|term_3grams|term_4grams|term_5grams|   description_words|  description_2grams|  description_3grams|  description_4grams|  description_5grams|    attributes_words|   attributes_2grams|   attributes_3grams|   attributes_4grams|   attributes_5grams|search_term_length|
+---+------+-----+--------------------+--------------------+--------------------+--------------------+------------+---------------+--------------+-----------+-----------+------

In [166]:
from pyspark.sql.types import DoubleType,StringType,IntegerType

def compareTerms(a,b):
    match = 0
    for word in a:
        match += b.count(a)
    return match    
compareudf = udf(compareTerms, IntegerType())

In [176]:



temp=temp.withColumn("term_title_1gram",compareudf("term_words","title_words"))
temp=temp.withColumn("term_title_2gram",compareudf("term_2grams","title_2grams"))
temp=temp.withColumn("term_title_3gram",compareudf("term_3grams","title_3grams"))
temp=temp.withColumn("term_title_4gram",compareudf("term_4grams","title_4grams"))
temp=temp.withColumn("term_title_5gram",compareudf("term_5grams","title_5grams"))

temp=temp.withColumn("term_description_1gram",compareudf("term_words","description_words"))
temp=temp.withColumn("term_description_2gram",compareudf("term_2grams","description_2grams"))
temp=temp.withColumn("term_description_3gram",compareudf("term_3grams","description_3grams"))
temp=temp.withColumn("term_description_4gram",compareudf("term_4grams","description_4grams"))
temp=temp.withColumn("term_description_5gram",compareudf("term_5grams","description_5grams"))

temp=temp.withColumn("term_attributes_1gram",compareudf("term_words","attributes_words"))
temp=temp.withColumn("term_attributes_1gram",compareudf("term_2grams","attributes_2grams"))
temp=temp.withColumn("term_attributes_3gram",compareudf("term_3grams","attributes_3grams"))
temp=temp.withColumn("term_attributes_4gram",compareudf("term_4grams","attributes_4grams"))
temp=temp.withColumn("term_attributes_5gram",compareudf("term_5grams","attributes_5grams"))

In [169]:
temp.describe("term_description_5gram")

DataFrame[summary: string, term_description_5gram: string]

In [172]:
temp.head()

Row(id=2, uid=100001, score=3.0, title_words=['simpson', 'strong-ti', '12-gaug', 'angl'], title_2grams=['simpson strong-ti', 'strong-ti 12-gaug', '12-gaug angl'], title_3grams=['simpson strong-ti 12-gaug', 'strong-ti 12-gaug angl'], title_4grams=['simpson strong-ti 12-gaug angl'], title_5grams=[], term_words=['angl', 'bracket'], term_2grams=['angl bracket'], term_3grams=[], term_4grams=[], term_5grams=[], description_words=['angl', 'make', 'joint', 'stronger,', 'also', 'provid', 'consistent,', 'straight', 'corners.', 'simpson', 'strong-ti', 'offer', 'wide', 'varieti', 'angl', 'various', 'size', 'thick', 'handl', 'light-duti', 'job', 'project', 'structur', 'connect', 'needed.', 'bent', '(skewed)', 'match', 'project.', 'outdoor', 'project', 'moistur', 'present,', 'use', 'zmax', 'zinc-coat', 'connectors,', 'provid', 'extra', 'resist', 'corros', '(look', '"z"', 'end', 'model', 'number).versatil', 'connector', 'various', '90', 'connect', 'home', 'repair', 'projectsstrong', 'angl', 'nail', '

In [72]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer,StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType,StringType,IntegerType
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler

features = ["term_title_match", "term_attr_match","term_descr_match","term_title_dice","term_attr_dice","term_descr_dice","term_title_idfCS", "term_attr_idfCS", "term_descr_idfCS"]

assember_features = VectorAssembler(inputCols=features, outputCol="features")
data = assember_features.transform(result)


In [173]:
temp.select("term_description_5gram").show(3,truncate=False)

+----------------------+
|term_description_5gram|
+----------------------+
|0                     |
|0                     |
|0                     |
+----------------------+
only showing top 3 rows



In [174]:
temp.select("term_description_5gram")

DataFrame[term_description_5gram: int]

In [74]:
data.registerTempTable("data")
trainData = sql_sc.sql("SELECT * from data where score is not NULL")
testData = sql_sc.sql("SELECT * from data where score is NULL")
trainData.select("id","uid","features").show(3,truncate=False)
testData.select("id","uid","features").show(3,truncate=False)

+---+------+---------------------------------------------------------------------------------------------------+
|id |uid   |features                                                                                           |
+---+------+---------------------------------------------------------------------------------------------------+
|2  |100001|[2.0,1.0,3.0,0.3333333333333333,0.0,0.0,0.7109604509799033,0.5010182036373578,0.3850999386018893]  |
|3  |100001|[1.0,12.0,16.0,0.0,0.0,0.0,0.4442547693388415,0.4024369606020151,0.2140468853236035]               |
|9  |100002|[2.0,3.0,5.0,0.0,0.0,0.017857142857142856,0.5860035067962868,0.4011036658482023,0.4539311687927065]|
+---+------+---------------------------------------------------------------------------------------------------+
only showing top 3 rows

+---+---+--------+
|id |uid|features|
+---+---+--------+
+---+---+--------+



In [None]:
(trainD, validD) = trainData.randomSplit([0.8, 0.2])
#featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=5).fit(trainD)
rf = RandomForestRegressor(featuresCol="features",labelCol='score', numTrees=11,maxDepth=5)
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(trainD)
predictions = model.transform(validD)

In [36]:
evaluator = RegressionEvaluator(labelCol="score",predictionCol="prediction")
rmse = evaluator.evaluate(predictions,{evaluator.metricName:"rmse"})
mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse"})
r2 = evaluator.evaluate(predictions,{evaluator.metricName:"r2"})
mae = evaluator.evaluate(predictions,{evaluator.metricName:"mae"})
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("Mean Squared Error (MSE) on test data = %g" % mse)
print("R^2 metric (R2) on test data = %g" % r2)
print("Mean Absolute Error (MAE) on test data = %g" % mae)

Root Mean Squared Error (RMSE) on test data = 0.506655
Mean Squared Error (MSE) on test data = 0.2567
R^2 metric (R2) on test data = 0.0950811
Mean Absolute Error (MAE) on test data = 0.416563
