In [18]:
from pyspark.sql import HiveContext
import pyspark.sql 
import pyspark.sql.functions as func
import pyspark.sql.types as types
import re
import pandas as pd

In [28]:
df = spark.read.format("csv").option("header", "true").load("s3://queries.csv")

In [29]:
df = df.cache()

In [30]:
df.count()

4638

### Let's figure out number of collaborative interactions

In [594]:
df.groupby('user_id').count().select('count').describe().show()

+-------+------------------+
|summary|             count|
+-------+------------------+
|  count|              4422|
|   mean|1.0488466757123474|
| stddev|0.8972865070785064|
|    min|                 1|
|    max|                37|
+-------+------------------+



on average most of our users posted only one search query, on the other hand there are "outliers" with max=37.

In [671]:
colb_ids = df.groupby('user_id').count().where(func.col('count') > 1)

In [672]:
colb_ids.count()

83

In [612]:
outliers = df.groupby('user_id').count().where(func.col('count') > 3).orderBy('count')

In [620]:
outliers.show(25, False)

+----------------------------------------+-----+
|user_id                                 |count|
+----------------------------------------+-----+
|f368479f91e7488a388de792cccade153f61760d|4    |
|b9071fe201c0b81ee8993a0a2c2c217776017aa5|4    |
|889311ad9b4be5d00ef1a5ac93a6580f1a85d11e|4    |
|aad89e151e8c0b4f6cbc5775c2574ded6941dbc1|7    |
|8dd0b980793105e4b1d003de5b4b7c3ae2b56672|7    |
|dc6a1e7e8f267d99f2ad3f0cb3423faf87dd5c74|26   |
|ac00424bc343c923a34dc5d8edc44e534127d963|28   |
|88ec4549b00311ba4e46f3a1bdde085e86a1cfe1|28   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|37   |
+----------------------------------------+-----+



In [622]:
outliers_details = df.join(outliers.select(['user_id', 'count']), on=['user_id'], how='inner').orderBy('user_id')

In [629]:
outliers_details.where(func.col('user_id') == 'dab67f7f4aba7247c8d32c09086565eac33103eb').show(37, False)

+----------------------------------------+---------------------------+-----+
|user_id                                 |searched_term              |count|
+----------------------------------------+---------------------------+-----+
|dab67f7f4aba7247c8d32c09086565eac33103eb|murmansk                   |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|palau                      |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|caruaru                    |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|lorch                      |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|st albans                  |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|waikato                    |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|paldiski                   |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|eger                       |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|palanga                    |37   |
|dab67f7f4aba7247c8d32c09086565eac33103eb|milos                      |37   |

#### well... the serach interaction above doesn't look like coherent one?

In [663]:
outliers_ids = [r.user_id for r in outliers.where(func.col('count') > 7).select('user_id').collect()]

In [673]:
colb_ids = colb_ids.where(func.col('user_id').isin(outliers_ids) == False)

In [674]:
colb_ids.count()

79

In [675]:
colb_df = df.join(colb_ids, on='user_id', how='inner').cache()

In [676]:
clb_count = colb_df.count()

In [677]:
print('we have {} records where user was logged at least twice'.format(clb_count))

we have 180 records where user was logged at least twice


In [678]:
colb_df = colb_df.withColumn("term_length", func.length(func.col('searched_term'))).orderBy('term_length', ascending=False)

In [679]:
colb_df.show(10, False)

+----------------------------------------+----------------------------------------------------+-----+-----------+
|user_id                                 |searched_term                                       |count|term_length|
+----------------------------------------+----------------------------------------------------+-----+-----------+
|97a0ef8dc71810e4f5702bd50e5c7c86b7a4f94a|from kraków: auschwitz-birkenau full-day guided tour|2    |52         |
|8dd0b980793105e4b1d003de5b4b7c3ae2b56672|american museum of natural history: skip the line   |7    |49         |
|8dd0b980793105e4b1d003de5b4b7c3ae2b56672|caribe aquatic park + roundtrip from barcelona      |7    |46         |
|d57970604dea75a6788373b7a4eea5f0e09f9fc3|sydney aquarium and sydney wildlife                 |2    |35         |
|bc8e3f60266538b6665f567dcf0e79d7c2ef2d50|uffizzi museum with a skip the line                 |2    |35         |
|8dd0b980793105e4b1d003de5b4b7c3ae2b56672|altes museum: skip the line                   

In [680]:
colb_df.select('term_length').describe().show()

+-------+------------------+
|summary|       term_length|
+-------+------------------+
|  count|               180|
|   mean|11.066666666666666|
| stddev| 7.339036211517593|
|    min|                 2|
|    max|                52|
+-------+------------------+



### FPGrowth alg

In [70]:
rdd = colb_df.rdd.map(lambda r: (r.user_id, [r.searched_term]))\
             .reduceByKey(lambda x,y: list(set(x+y)))
       
items = rdd.toDF(['user_id', 'items'])

In [242]:
items.show(2, False)

+----------------------------------------+------------------------------+
|user_id                                 |items                         |
+----------------------------------------+------------------------------+
|d498c32f64ab864f986b6918442daed1171050d4|[cambridgeshire, lincolnshire]|
|dcba62ca77599889e0faf9bdd7fe056d136e970c|[jammu and kashmir, gilgit]   |
+----------------------------------------+------------------------------+
only showing top 2 rows



#### FPGrowth could not accept large number of products

In [250]:
small = [(r.user_id, r.items) for r in items.take(6)[2:6]]

In [252]:
small = sqlContext.createDataFrame(small, ['id', 'items'])

In [253]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.3, minConfidence=0.2)
model = fpGrowth.fit(small)

In [254]:
model.freqItemsets.show(10)

+-----+----+
|items|freq|
+-----+----+
+-----+----+



## Collaborative filtering

In [681]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

#### encode string to integers

In [687]:
from pyspark.ml.feature import IndexToString, StringIndexer

id_indexere = StringIndexer(inputCol='user_id', outputCol="user_id_idx")
id_model = id_indexere.fit(colb_df)
indexed_ids = id_model.transform(colb_df)

term_indexer = StringIndexer(inputCol='searched_term', outputCol="searched_term_idx")
term_mdoel = term_indexer.fit(colb_df)
indexed_terms = term_mdoel.transform(indexed_ids)


In [713]:
indexed_terms.orderBy("term_length").show(10, False)

+----------------------------------------+-------------+-----+-----------+-----------+-----------------+
|user_id                                 |searched_term|count|term_length|user_id_idx|searched_term_idx|
+----------------------------------------+-------------+-----+-----------+-----------+-----------------+
|01c491119eb128a0a5089de299c5c3108b656160|kà           |2    |2          |57.0       |75.0             |
|b278ec37a585afb95ff7bc1fdca2fc61275d0de6|ski          |2    |3          |29.0       |99.0             |
|e925116f9d22f6914fe789b3f065d76540f8a7e2|kep          |2    |3          |21.0       |25.0             |
|0eeec901593030c2be416cd01dba1bf5ed23abdd|nan          |2    |3          |49.0       |94.0             |
|aad89e151e8c0b4f6cbc5775c2574ded6941dbc1|perm         |7    |4          |0.0        |135.0            |
|ffcdabd3aa19efcc3998260e40f79131c31533f6|napa         |2    |4          |71.0       |53.0             |
|9b9ce2929317c1722d3ed209e1a4a671020fa0fc|harz         

In [690]:
train_data = indexed_terms.rdd.map(lambda r: Rating(int(r.user_id_idx), int(r.searched_term_idx), 1.0))

In [691]:
rank = 10
numIterations = 20
model = ALS.train(train_data, rank, numIterations)

### Recomendations

In [714]:
prob1 = model.recommendProducts(22, 4)

In [715]:
prob1

[Rating(user=22, product=8, rating=0.9899546405474968),
 Rating(user=22, product=104, rating=0.9899546405474968),
 Rating(user=22, product=105, rating=0.777370871443289),
 Rating(user=22, product=170, rating=0.777370871443289)]

In [717]:
indexed_terms.where((func.col('searched_term_idx').isin([i.product for i in prob1]) )).show(truncate=False)

+----------------------------------------+------------------+-----+-----------+-----------+-----------------+
|user_id                                 |searched_term     |count|term_length|user_id_idx|searched_term_idx|
+----------------------------------------+------------------+-----+-----------+-----------+-----------------+
|11c13059781f4d1c058df2f1f43de255f4eb5244|gaudi house museum|3    |18         |6.0        |170.0            |
|11c13059781f4d1c058df2f1f43de255f4eb5244|santa cruz        |3    |10         |6.0        |105.0            |
|487998adfd10e8ac474497e57e14eca7749a8bef|new yourk         |2    |9          |22.0       |104.0            |
|487998adfd10e8ac474497e57e14eca7749a8bef|hakone            |2    |6          |22.0       |8.0              |
+----------------------------------------+------------------+-----+-----------+-----------+-----------------+



### Cosine similarity

In [694]:
features = model.productFeatures()

In [695]:
features.take(5)

[(0,
  array('d', [0.002898055361583829, 0.5327853560447693, -0.18894001841545105, -0.13316021859645844, -0.4272618293762207, 0.024094535037875175, -0.3080541789531708, 0.0180001612752676, 0.6004667282104492, -0.09628759324550629])),
 (32,
  array('d', [0.22591079771518707, -0.014748246408998966, 0.30071818828582764, 0.21335169672966003, 0.2107064425945282, -0.6190462112426758, 0.005165599752217531, 0.26987746357917786, 0.5202518105506897, 0.16475209593772888])),
 (64,
  array('d', [0.5905473232269287, -0.04534277319908142, 0.10780686140060425, 0.38446810841560364, -0.039201073348522186, -0.3314572870731354, 0.2194383144378662, 0.23788738250732422, -0.31411656737327576, 0.4005388915538788])),
 (96,
  array('d', [-0.4721476137638092, -0.1280212253332138, -0.5046680569648743, 0.17404226958751678, 0.02250622771680355, 0.36595582962036133, -0.03131099045276642, -0.10302021354436874, -0.2929040789604187, -0.4789309799671173])),
 (128,
  array('d', [-0.4909956157207489, -0.052700504660606384

In [696]:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix,CoordinateMatrix, MatrixEntry

pred = IndexedRowMatrix(features.map(lambda x: IndexedRow(x[0],x[1]))).toBlockMatrix().transpose().toIndexedRowMatrix()
pred_sims = pred.columnSimilarities()

In [697]:
simdf = pred_sims.entries.map(lambda e: (e.i, e.j, e.value)).toDF(['i', 'j', 'similarity']).cache()

In [698]:
simdf.count()

15931

### Let's explore similarity

similarity between **i and j** search terms. 

In [718]:
simdf.show(5)

+---+---+--------------------+
|  i|  j|          similarity|
+---+---+--------------------+
|163|170|  0.4728120117977583|
|128|138|  0.2724593035496108|
| 98|119|  0.4299654549562856|
| 68|168|-0.15087527294611158|
| 35| 65|0.026965612347701437|
+---+---+--------------------+
only showing top 5 rows



In [722]:
simdf.groupby('i').count().show(5)

+---+-----+
|  i|count|
+---+-----+
| 26|  152|
| 29|  149|
| 65|  113|
| 54|  124|
| 19|  159|
+---+-----+
only showing top 5 rows



In [703]:
import sys
from pyspark.sql.window import Window

window = Window.partitionBy('i').orderBy(func.col('similarity').desc())

recom = simdf.withColumn("rank", func.rank().over(window))\
             .where((func.col('rank') < 4) & ((func.col('similarity') > 0.97))).cache()

#### first 3 in alph order

In [704]:
window = Window.partitionBy(['i', 'similarity']).orderBy(func.col('similarity').desc())

recom = recom.withColumn("rown", func.row_number().over(window))\
             .where((func.col('rown') < 4)).cache()

In [706]:
recom.count()

133

In [707]:
rec1 = recom.join(indexed_terms.select(['searched_term', 'searched_term_idx'])\
           , on=(recom.i == indexed_terms.searched_term_idx)).cache()
     

In [708]:
j_term = indexed_terms.select([func.col('searched_term').alias('j_term')\
                               , func.col('searched_term_idx').alias('j_term_idx')])

In [709]:
recom_terms = rec1.join(j_term, on=(rec1.j == j_term.j_term_idx))

In [710]:
recds = recom_terms.select(['searched_term', 'j_term']).groupby('searched_term').agg(func.collect_list('j_term'))

In [711]:
recds.show(25, False)

+----------------------------------------------+-----------------------------------------------------+
|searched_term                                 |collect_list(j_term)                                 |
+----------------------------------------------+-----------------------------------------------------+
|cheyenne                                      |[laramie]                                            |
|dinant                                        |[luxembourg dinant]                                  |
|lampang                                       |[nan]                                                |
|torbay                                        |[bunbury]                                            |
|donetsk                                       |[biertan, bradford, perm]                            |
|pasadena                                      |[winnipeg]                                           |
|uffizzi museum with a skip the line           |[brisbane airport brisban