### Environment setup

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorIndexer,VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator,MulticlassClassificationEvaluator
    
spark = SparkSession.builder \
            .master("local") \
            .appName("kaggle") \
            .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/final.prediction") \
            .getOrCreate()

In [2]:
import pymongo
from pymongo import MongoClient

client = MongoClient("127.0.0.1",27017,maxPoolSize=10000)
outputCollection = client['final']['prediction']

### Read training data from MongoDB

In [3]:
prd_train_raw = client['final']['train']
prd_train = []
for document in prd_train_raw.find({}):
    prd_train.append((int(document['id']),int(document['product_uid']), document['product_title'], \
                      str(document['search_term']), float(document['relevance'])))

# define schema for training data
training_schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("product_uid", IntegerType(), False),
        StructField("product_title", StringType(), False),
        StructField("search_term", StringType(), False),
        StructField("relevance", DoubleType(), False),
    ])

training_rdd = sc.parallelize(prd_train)
training_df = spark.createDataFrame(training_rdd, training_schema)

## check label distribution

In [4]:
training_df.describe(['relevance']).show()

+-------+------------------+
|summary|         relevance|
+-------+------------------+
|  count|             74067|
|   mean|2.3816337910269922|
| stddev|0.5339839484172036|
|    min|               1.0|
|    max|               3.0|
+-------+------------------+



In [5]:
training_df.cube('relevance').count().dropna().orderBy('count', ascending = False).show()

+---------+-----+
|relevance|count|
+---------+-----+
|      3.0|19125|
|     2.33|16060|
|     2.67|15202|
|      2.0|11730|
|     1.67| 6780|
|     1.33| 3006|
|      1.0| 2105|
|      2.5|   19|
|     2.75|   11|
|     2.25|   11|
|     1.75|    9|
|      1.5|    5|
|     1.25|    4|
+---------+-----+



In [6]:
# check the label imbalance
training_df.cube(round('relevance')).count().dropna().orderBy('count', ascending = False).show()

+-------------------+-----+
|round(relevance, 0)|count|
+-------------------+-----+
|                2.0|34595|
|                3.0|34357|
|                1.0| 5115|
+-------------------+-----+



### Read test data from MongoDB

In [7]:
prd_test_raw = client['final']['test']
prd_test = []
for document in prd_test_raw.find({}):
    prd_test.append((int(document['id']), int(document['product_uid']), \
                     document['product_title'], str(document['search_term'])))

# define schema for test
test_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("product_uid", IntegerType(), False),
    StructField("product_title", StringType(), False),
    StructField("search_term", StringType(), False),
    ])

test_rdd = sc.parallelize(prd_test)
test_df = spark.createDataFrame(test_rdd, test_schema)

### Read product description from MongoDB

In [8]:
prd_description_raw = client['final']['product_description']
prd_description = []

for document in prd_description_raw.find({}):
    prd_description.append((int(document['product_uid']), str(document['product_description'])))  
    
# define schema for product_description",
prd_schema = StructType([
    StructField("product_uid",IntegerType(), False),
    StructField("product_description", StringType(), True),
])

prd_rdd = sc.parallelize(prd_description)
prd_df = spark.createDataFrame(prd_rdd, prd_schema)

### UDF tokenize method

In [9]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer

english_stopwords = stopwords.words('english')
english_punctuations = [',','.',':',';','?','!','(',')','&','#','%']
lmtz = WordNetLemmatizer()

def tokenizer(text):
    tokenizedWords = [word for word in word_tokenize(text.strip())]
    
    # remove stop words
    filteredWords = [word for word in tokenizedWords if word not in english_stopwords]

    # remove punctuations
    filtered = [word for word in filteredWords if word not in english_punctuations]
    
    # lemmatize word
    lemmatized = [lmtz.lemmatize(word) for word in filtered]
    
    return ','.join(lemmatized)

# define tokenizer
udfTokenize = udf(tokenizer, StringType())

### Preprocess training data

In [10]:
# Join training data with product description
training_full_df = training_df.join(prd_df, 'product_uid', "left_outer")
training_full_df.cache()

DataFrame[product_uid: int, id: int, product_title: string, search_term: string, relevance: double, product_description: string]

In [11]:
# convert product_title & product_description to lower case
training_preprocessedDF = training_full_df.select('id','product_uid', lower(training_full_df.product_title), \
                                                 lower(training_full_df.product_description), \
                                                 lower(training_full_df.search_term), 'relevance') \
                                   .withColumn("tokenized_title", udfTokenize("lower(product_title)")) \
                                   .withColumn("tokenized_description", udfTokenize("lower(product_description)")) \
                                   .withColumn("tokenized_search", udfTokenize("lower(search_term)")) \
                                   .select('id','product_uid','tokenized_title','tokenized_description','tokenized_search','relevance')

training_preprocessedDF.cache()

DataFrame[id: int, product_uid: int, tokenized_title: string, tokenized_description: string, tokenized_search: string, relevance: double]

### split training & validation set

In [12]:
training_prepDF, validation_prepDF = training_preprocessedDF.randomSplit([0.8,0.2], seed = 11)

### Preprocess test data

In [13]:
test_full_df = test_df.join(prd_df, 'product_uid', "left_outer")
test_full_df.cache()

DataFrame[product_uid: int, id: int, product_title: string, search_term: string, product_description: string]

In [14]:
# convert product_title & product_description to lower case
test_prepDF = test_full_df.select('id', 'product_uid', lower(test_full_df.product_title), \
                                         lower(test_full_df.search_term), lower(test_full_df.product_description)) \
                                 .withColumn("tokenized_title", udfTokenize("lower(product_title)")) \
                                 .withColumn("tokenized_description", udfTokenize("lower(product_description)")) \
                                 .withColumn("tokenized_search", udfTokenize("lower(search_term)")) \
                                 .select('id', 'product_uid', 'tokenized_title','tokenized_description','tokenized_search')
test_prepDF.cache()

DataFrame[id: int, product_uid: int, tokenized_title: string, tokenized_description: string, tokenized_search: string]

### Similarity and distance features

In [15]:
# define ratio of matches between search term and target field
def simCalc(src_text, target_text):
    src_words = src_text.split(",")
    target_words = target_text.split(",")

    # Jaccard similarity
    intersection = set(src_words).intersection(set(target_words))
    return float(len(intersection))/len(src_words)

jaccardSim = udf(simCalc, DoubleType())

In [16]:
# define occurrence of matches between search term and target field 
def simOccur(src_text, target_text):
    src_words = src_text.split(",")
    target_words = target_text.split(",")
    intersection = set(src_words).intersection(set(target_words))
    return len(intersection)

occurSim = udf(simOccur, IntegerType())

In [17]:
def applySim(df):
    return df.withColumn('title_search_hit_ratio',jaccardSim('tokenized_search','tokenized_title')) \
             .withColumn('title_search_hit_occur', occurSim('tokenized_search','tokenized_title')) \
             .withColumn('desc_search_hit_ratio', jaccardSim('tokenized_search','tokenized_description')) \
             .withColumn('desc_search_hit_occur', occurSim('tokenized_search','tokenized_description'))

In [26]:
# calculate the search hit in title field
searchHitTrainingDF = applySim(training_prepDF) \
                    .select('id','product_uid','title_search_hit_occur','title_search_hit_ratio', \
                            'desc_search_hit_occur','desc_search_hit_ratio', \
                            'tokenized_search','tokenized_title','tokenized_description','relevance')

searchHitValidationDF = applySim(validation_prepDF) \
                    .select('id','product_uid','title_search_hit_occur','title_search_hit_ratio', \
                            'desc_search_hit_occur','desc_search_hit_ratio', \
                            'tokenized_search','tokenized_title','tokenized_description','relevance')

searchHitTestDF = applySim(test_prepDF) \
                        .select('id','product_uid','title_search_hit_occur','title_search_hit_ratio', \
                            'desc_search_hit_occur','desc_search_hit_ratio', \
                            'tokenized_search','tokenized_title','tokenized_description')

searchHitTrainingDF.cache()
searchHitValidationDF.cache()
searchHitTestDF.cache()

DataFrame[id: int, product_uid: int, title_search_hit_occur: int, title_search_hit_ratio: double, desc_search_hit_occur: int, desc_search_hit_ratio: double, tokenized_search: string, tokenized_title: string, tokenized_description: string]

### Topic model with TF-IDF

In [28]:
# add row_id for retrieving LDA similar document
searchHitTrainingDF_with_row_id = searchHitTrainingDF.withColumn("row_id", monotonically_increasing_id())
searchHitTrainingDF_with_row_id.cache()

searchHitValidationDF_with_row_id = searchHitValidationDF.withColumn("row_id", monotonically_increasing_id())
searchHitValidationDF_with_row_id.cache()

searchHitTestDF_with_row_id = searchHitTestDF.withColumn("row_id", monotonically_increasing_id())
searchHitTestDF_with_row_id.cache()

DataFrame[id: int, product_uid: int, title_search_hit_occur: int, title_search_hit_ratio: double, desc_search_hit_occur: int, desc_search_hit_ratio: double, tokenized_search: string, tokenized_title: string, tokenized_description: string, row_id: bigint]

In [20]:
from gensim import corpora, models, similarities

def ldaSim(docs):
    # bag of words
    dictionary = corpora.Dictionary(docs)
    
    # map document to (word, frequency)
    corpus = [dictionary.doc2bow(text) for text in docs]
    
    # conver (word,frequency) to tfidf model
    tfidf = models.TfidfModel(corpus)
    
    # (word,tfidf value)
    corpus_tfidf = tfidf[corpus]

    lda = models.LdaModel(corpus_tfidf, id2word=dictionary, num_topics=5)
    
    # create index 
    index = similarities.MatrixSimilarity(lda[corpus])

    return (index, dictionary, lda)

In [21]:
# get description field - list operation, memory consuming
desc_docs = searchHitTrainingDF_with_row_id.select('tokenized_description').rdd.flatMap(lambda x:x).collect()

desc_index, desc_dictionary, desc_lda = ldaSim([doc.split(',') for doc in desc_docs])

In [22]:
# define sim
def ldaDescSimilarity(q, pid):
    # vectorize query terms
    query_bow = desc_dictionary.doc2bow(q.lower().split())
    
    # map query to 2-dimensional space
    query_lda = desc_lda[query_bow]
    
    # get cosine similarity of query with each document
    sims = desc_index[query_lda]
    
    sorted_sims = sorted(enumerate(sims))
    
    return float(sorted_sims[pid][1]) if pid < len(sorted_sims) else 0.0

searchDescLDAsim = udf(ldaDescSimilarity, FloatType())

In [23]:
# get title field - list operation memory consuming
title_docs = searchHitTrainingDF_with_row_id.select('tokenized_title').rdd.flatMap(lambda x:x).collect()

title_index, title_dictionary, title_lda = ldaSim([doc.split(',') for doc in title_docs])

In [24]:
# define sim
def ldaTitleSimilarity(q, pid):
    sims = title_index[title_lda[title_dictionary.doc2bow(q.split())]]
    
    sorted_sims = sorted(enumerate(sims))
    
    return float(sorted_sims[pid][1]) if pid < len(sorted_sims) else 0.0
    
searchTitleLDAsim = udf(ldaTitleSimilarity, FloatType())

In [33]:
trainingReadyDF = searchHitTrainingDF_with_row_id \
                .withColumn('searchTitleLDASim', searchTitleLDAsim('tokenized_search','row_id')) \
                .withColumn('searchDescLDASim', searchDescLDAsim('tokenized_search','row_id')) \
                .select('id','product_uid','title_search_hit_occur','title_search_hit_ratio', \
                        'desc_search_hit_occur','desc_search_hit_ratio', \
                        'searchTitleLDASim','searchDescLDASim','relevance')
trainingReadyDF.cache()

DataFrame[id: int, product_uid: int, title_search_hit_occur: int, title_search_hit_ratio: double, desc_search_hit_occur: int, desc_search_hit_ratio: double, searchTitleLDASim: float, searchDescLDASim: float, relevance: double]

In [30]:
validationReadyDF = searchHitValidationDF_with_row_id \
                .withColumn('searchTitleLDASim', searchTitleLDAsim('tokenized_search','row_id')) \
                .withColumn('searchDescLDASim', searchDescLDAsim('tokenized_search','row_id')) \
                .select('id','product_uid','title_search_hit_occur','title_search_hit_ratio', \
                        'desc_search_hit_occur','desc_search_hit_ratio', \
                        'searchTitleLDASim','searchDescLDASim','relevance')
validationReadyDF.cache()

DataFrame[id: int, product_uid: int, title_search_hit_occur: int, title_search_hit_ratio: double, desc_search_hit_occur: int, desc_search_hit_ratio: double, searchTitleLDASim: float, searchDescLDASim: float, relevance: double]

In [31]:
testReadyDF = searchHitTestDF_with_row_id \
                .withColumn('searchTitleLDASim', searchTitleLDAsim('tokenized_search','row_id')) \
                .withColumn('searchDescLDASim', searchDescLDAsim('tokenized_search','row_id')) \
                .select('id','product_uid','title_search_hit_occur','title_search_hit_ratio', \
                        'desc_search_hit_occur','desc_search_hit_ratio', \
                        'searchTitleLDASim','searchDescLDASim')
testReadyDF.cache()

DataFrame[id: int, product_uid: int, title_search_hit_occur: int, title_search_hit_ratio: double, desc_search_hit_occur: int, desc_search_hit_ratio: double, searchTitleLDASim: float, searchDescLDASim: float]

### Schema of training & test data

In [34]:
trainingReadyDF.printSchema()
testReadyDF.printSchema()

root
 |-- id: integer (nullable = false)
 |-- product_uid: integer (nullable = false)
 |-- title_search_hit_occur: integer (nullable = true)
 |-- title_search_hit_ratio: double (nullable = true)
 |-- desc_search_hit_occur: integer (nullable = true)
 |-- desc_search_hit_ratio: double (nullable = true)
 |-- searchTitleLDASim: float (nullable = true)
 |-- searchDescLDASim: float (nullable = true)
 |-- relevance: double (nullable = false)

root
 |-- id: integer (nullable = false)
 |-- product_uid: integer (nullable = false)
 |-- title_search_hit_occur: integer (nullable = true)
 |-- title_search_hit_ratio: double (nullable = true)
 |-- desc_search_hit_occur: integer (nullable = true)
 |-- desc_search_hit_ratio: double (nullable = true)
 |-- searchTitleLDASim: float (nullable = true)
 |-- searchDescLDASim: float (nullable = true)



### train regression model

In [37]:
featureCols = ['title_search_hit_occur','title_search_hit_ratio','desc_search_hit_occur', \
               'desc_search_hit_ratio','searchTitleLDASim','searchDescLDASim']

assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

training_DF = trainingReadyDF.select(col('title_search_hit_occur'), col('title_search_hit_ratio'), \
                                     col('desc_search_hit_occur'), col('desc_search_hit_ratio'), \
                                     col('searchTitleLDASim'), col('searchDescLDASim'), \
                                     col('relevance').alias('label'))

In [38]:
regressor = RandomForestRegressor(featuresCol = 'features', numTrees=20, maxDepth=2)

regPipeline = Pipeline(stages = [assembler, regressor])

regModel = regPipeline.fit(training_DF)

### regression model evaluation

In [42]:
regValidations = regModel.transform(validationReadyDF)

regValidations.printSchema()

regEvaluator = RegressionEvaluator(labelCol = "relevance", predictionCol = "prediction", metricName = "rmse")

print regEvaluator.evaluate(regValidations)

root
 |-- id: integer (nullable = false)
 |-- product_uid: integer (nullable = false)
 |-- title_search_hit_occur: integer (nullable = true)
 |-- title_search_hit_ratio: double (nullable = true)
 |-- desc_search_hit_occur: integer (nullable = true)
 |-- desc_search_hit_ratio: double (nullable = true)
 |-- searchTitleLDASim: float (nullable = true)
 |-- searchDescLDASim: float (nullable = true)
 |-- relevance: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)

0.501531720921


### search optimal paramter for regression model

In [44]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder() \
                .addGrid(regressor.maxDepth, [2,3,4,5]) \
                .addGrid(regressor.numTrees, [15,20,25]) \
                .build()

In [45]:
regEvaluator = RegressionEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "rmse")
cv = CrossValidator(estimator = regressor, estimatorParamMaps = paramGrid, evaluator = regEvaluator, numFolds = 5)

df = Pipeline(stages = [assembler]) \
        .fit(training_DF) \
        .transform(validationReadyDF.select(col('title_search_hit_occur'), col('title_search_hit_ratio'), \
                                     col('desc_search_hit_occur'), col('desc_search_hit_ratio'), \
                                     col('searchTitleLDASim'), col('searchDescLDASim'), \
                                     col('relevance').alias('label')))        
df.printSchema()
cvModel = cv.fit(df)

optimal_tree_depth = cvModel.bestModel.trees[0].depth
optimal_tree_num = cvModel.bestModel.getNumTrees
print "regression best tree depth: ", optimal_tree_depth
print "regression best tree num: ", optimal_tree_num

root
 |-- title_search_hit_occur: integer (nullable = true)
 |-- title_search_hit_ratio: double (nullable = true)
 |-- desc_search_hit_occur: integer (nullable = true)
 |-- desc_search_hit_ratio: double (nullable = true)
 |-- searchTitleLDASim: float (nullable = true)
 |-- searchDescLDASim: float (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)

regression best tree depth:  5
regression best tree num:  20


### predict on test set using optimal regressor

In [46]:
# train classifier using optimal parameters
optimalReg = RandomForestRegressor(labelCol = "label",featuresCol = "features", numTrees = optimal_tree_num, maxDepth=optimal_tree_depth)

optimalRegPipeline = Pipeline(stages = [assembler, optimalReg])

optimalRegPredictions = optimalRegPipeline.fit(training_DF) \
                            .transform(testReadyDF.select(col('id'), col('title_search_hit_occur'), \
                                     col('title_search_hit_ratio'), col('desc_search_hit_occur'), \
                                     col('desc_search_hit_ratio'), col('searchTitleLDASim'), col('searchDescLDASim')))

optimalRegPredict = optimalRegPredictions.withColumn('predict', round('prediction')).select('id', 'predict')

### save to MongoDB

In [47]:
def formatRow(row):
    ret = {}
    ret['title'] = 'optimal_regressor'
    ret['id'] = int(row[0])
    ret['prediction'] = int(row[1])
    return ret
    
l = optimalRegPredict.rdd.map(list).collect()
formattedResult = map(formatRow, l)

# save prediction result
result = outputCollection.insert_many(formattedResult)

print "Inserted %d prediction results to db" %len(result.inserted_ids)

Inserted 166693 prediction results to db


### train classification model

In [57]:
classifier = RandomForestClassifier(labelCol = "label",featuresCol = "features", numTrees = 20)

classifierPipeline = Pipeline(stages = [assembler, classifier])

classificationModel = classifierPipeline.fit(trainingReadyDF.select('title_search_hit_occur', \
                                     'title_search_hit_ratio', 'desc_search_hit_occur', \
                                     'desc_search_hit_ratio', 'searchTitleLDASim', 'searchDescLDASim', \
                                     round('relevance').alias('label')))




In [61]:
# convert label to [1.0,2.0,3.0] for both training and test set
clsPredictions = classificationModel.transform(validationReadyDF.withColumn('label', round('relevance')))

clsPredict = clsPredictions.withColumn('predict', round('prediction'))

clsEvaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "predict", metricName = "accuracy")

print "accuracy of classification model: ", clsEvaluator.evaluate(clsPredict)

 accuracy of classification model:  0.583846309951


### Search optimal parameter for classification model

In [65]:
clsEvaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
cv = CrossValidator(estimator = classifier, estimatorParamMaps = paramGrid, evaluator = clsEvaluator, numFolds = 5)

trDF = trainingReadyDF.select('title_search_hit_occur', \
                                     'title_search_hit_ratio', 'desc_search_hit_occur', \
                                     'desc_search_hit_ratio', 'searchTitleLDASim', 'searchDescLDASim', \
                                     round('relevance').alias('label')).cache()

vrDF = validationReadyDF.select('id', 'title_search_hit_occur', \
                                     'title_search_hit_ratio', 'desc_search_hit_occur', \
                                     'desc_search_hit_ratio', 'searchTitleLDASim', 'searchDescLDASim', \
                                           round('relevance').alias('label')).cache()

df = Pipeline(stages = [assembler]).fit(trDF).transform(vrDF)
    
cvModel = cv.fit(df)

optimal_tree_depth = cvModel.bestModel.trees[0].depth
optimal_tree_num = cvModel.bestModel.getNumTrees
print "classifier best tree depth: ", optimal_tree_depth
print "classifier best tree num: ", optimal_tree_num

classifier best tree depth:  5
classifier best tree num:  20


### prediction on test set using classification model

In [66]:
# train classifier using optimal parameters
optimalCls = RandomForestClassifier(labelCol = "label",featuresCol = "features", numTrees = optimal_tree_num, maxDepth=optimal_tree_depth)

optimalClsPipeline = Pipeline(stages = [assembler, optimalCls])

trDF = trainingReadyDF.select('title_search_hit_occur', \
                                     'title_search_hit_ratio', 'desc_search_hit_occur', \
                                     'desc_search_hit_ratio', 'searchTitleLDASim', 'searchDescLDASim', \
                                     round('relevance').alias('label')).cache()

optimalClsPredictions = optimalClsPipeline.fit(trDF).transform(testReadyDF)

optimalClsPredict = optimalClsPredictions.withColumn('predict', round('prediction')).select('id', 'predict')

### Save result to MongoDB

In [68]:
def formatRow(row):
    ret = {}
    ret['title'] = 'optimal_classification'
    ret['id'] = int(row[0])
    ret['predict'] = int(row[1])
    return ret
    
l = optimalClsPredict.rdd.map(list).collect()
formattedResult = map(formatRow, l)

# save prediction result
result = outputCollection.insert_many(formattedResult)

print "Inserted %d prediction results to db" %len(result.inserted_ids)

Inserted 166693 prediction results to db


![alt text](score.png "Title")