In [91]:
import numpy as np
import copy
from pyspark.ml.fpm import FPGrowth
import pandas as pd
from numpy import random
import sys
from scipy.sparse import find
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit
from pyspark.sql import SQLContext


sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

### Read the dataset in spark

In [2]:
df = spark.read.json("reviews_Clothing_Shoes_and_Jewelry_5.json.gz")

In [3]:
df_subset = df.select(df['reviewerID'],df['asin'],df['overall'])

### Generate the feature index for every user in the dataset

In [4]:
indexer = StringIndexer(inputCol="reviewerID", outputCol="userIndex")
indexed_user = indexer.fit(df_subset).transform(df_subset)
indexed_user.show()

+--------------+----------+-------+---------+
|    reviewerID|      asin|overall|userIndex|
+--------------+----------+-------+---------+
|A1KLRMWW2FWPL4|0000031887|    5.0|  10516.0|
|A2G5TCU2WDFZ65|0000031887|    5.0|   4003.0|
|A1RLQXYNCMWRWN|0000031887|    5.0|   2710.0|
| A8U3FAMSJVHS5|0000031887|    5.0|   5929.0|
|A3GEOILWLK86XM|0000031887|    5.0|  34460.0|
| A27UF1MSF3DB2|0000031887|    4.0|   5588.0|
|A16GFPNVF4Y816|0000031887|    5.0|  10532.0|
|A2M2APVYIB2U6K|0000031887|    5.0|  35600.0|
|A1NJ71X3YPQNQ9|0000031887|    4.0|  28181.0|
| A3EERSWHAI6SO|0000031887|    5.0|  18411.0|
| AX1QE6IR7CHXM|0000031887|    5.0|   5910.0|
|A2A2WZYLU528RO|0000031887|    5.0|  27539.0|
|A391EXIT5TFP72|0000031887|    5.0|  22608.0|
|A34ATJR9KFIXL9|0000031887|    5.0|   1513.0|
| AJ6B83I4YJHYW|0000031887|    5.0|    259.0|
|A26A4KKLAVTMCC|0000031887|    3.0|  17353.0|
|A1MXJVYXE2QU6H|0000031887|    5.0|  35969.0|
|A2XJ13PIXVJFJH|0000031887|    1.0|  20428.0|
|A287XY94U7JDM8|0000031887|    5.0

### Generate the feature index for every item in the dataset

In [5]:
indexer = StringIndexer(inputCol="asin", outputCol="itemIndex")
indexed_df = indexer.fit(indexed_user).transform(indexed_user)
indexed_df.show()

+--------------+----------+-------+---------+---------+
|    reviewerID|      asin|overall|userIndex|itemIndex|
+--------------+----------+-------+---------+---------+
|A1KLRMWW2FWPL4|0000031887|    5.0|  10516.0|   2210.0|
|A2G5TCU2WDFZ65|0000031887|    5.0|   4003.0|   2210.0|
|A1RLQXYNCMWRWN|0000031887|    5.0|   2710.0|   2210.0|
| A8U3FAMSJVHS5|0000031887|    5.0|   5929.0|   2210.0|
|A3GEOILWLK86XM|0000031887|    5.0|  34460.0|   2210.0|
| A27UF1MSF3DB2|0000031887|    4.0|   5588.0|   2210.0|
|A16GFPNVF4Y816|0000031887|    5.0|  10532.0|   2210.0|
|A2M2APVYIB2U6K|0000031887|    5.0|  35600.0|   2210.0|
|A1NJ71X3YPQNQ9|0000031887|    4.0|  28181.0|   2210.0|
| A3EERSWHAI6SO|0000031887|    5.0|  18411.0|   2210.0|
| AX1QE6IR7CHXM|0000031887|    5.0|   5910.0|   2210.0|
|A2A2WZYLU528RO|0000031887|    5.0|  27539.0|   2210.0|
|A391EXIT5TFP72|0000031887|    5.0|  22608.0|   2210.0|
|A34ATJR9KFIXL9|0000031887|    5.0|   1513.0|   2210.0|
| AJ6B83I4YJHYW|0000031887|    5.0|    259.0|   

In [6]:
assembler = VectorAssembler(
    inputCols=["userIndex", "itemIndex", "overall"],
    outputCol="features")

### Generate the feature vector for every user-item pair of the dataset.

In [7]:
output = assembler.transform(indexed_df)

In [8]:
output = output.withColumn("id", monotonically_increasing_id())

In [9]:
review_features = output.select(output['id'],output['features'])

In [10]:
review_features.show()

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[10516.0,2210.0,5.0]|
|  1| [4003.0,2210.0,5.0]|
|  2| [2710.0,2210.0,5.0]|
|  3| [5929.0,2210.0,5.0]|
|  4|[34460.0,2210.0,5.0]|
|  5| [5588.0,2210.0,4.0]|
|  6|[10532.0,2210.0,5.0]|
|  7|[35600.0,2210.0,5.0]|
|  8|[28181.0,2210.0,4.0]|
|  9|[18411.0,2210.0,5.0]|
| 10| [5910.0,2210.0,5.0]|
| 11|[27539.0,2210.0,5.0]|
| 12|[22608.0,2210.0,5.0]|
| 13| [1513.0,2210.0,5.0]|
| 14|  [259.0,2210.0,5.0]|
| 15|[17353.0,2210.0,3.0]|
| 16|[35969.0,2210.0,5.0]|
| 17|[20428.0,2210.0,1.0]|
| 18|[17322.0,2210.0,5.0]|
| 19|[27510.0,2210.0,5.0]|
+---+--------------------+
only showing top 20 rows



### Number of feature vectors available

In [11]:
review_features.count()

278677

In [12]:
data_vectorized = []
list(map(lambda x: data_vectorized.append((x.id,
                                           Vectors.dense(x.features),)),
         (row for row in review_features.rdd.collect())))

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,

In [13]:
df_reviews = spark.createDataFrame(data_vectorized, ["id", "features"])

### Implementation of Bucketed Random Projection LSH Algorithm

In [17]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)

In [18]:
model = brp.fit(df_reviews)

In [17]:
hashed_df = model.transform(df_reviews)

In [18]:
hashed_df.count()

278677

### Key corresponding toa user for query

In [20]:
key = Vectors.dense([6748.0,0,0])

In [19]:
model.approxNearestNeighbors(df_reviews, key, 10).show()

+------+-----------------+--------------------+------------------+
|    id|         features|              hashes|           distCol|
+------+-----------------+--------------------+------------------+
|161610| [6749.0,0.0,5.0]|[[-3240.0], [-204...|5.0990195135927845|
|193267| [6749.0,5.0,1.0]|[[-3240.0], [-205...| 5.196152422706632|
|196590|[6746.0,11.0,5.0]|[[-3238.0], [-202...| 12.24744871391589|
|170556|[6749.0,16.0,1.0]|[[-3240.0], [-204...| 16.06237840420901|
| 14053|[6736.0,14.0,5.0]|[[-3233.0], [-202...|  19.1049731745428|
|220518| [6779.0,2.0,2.0]|[[-3255.0], [-206...| 31.12876483254676|
| 13951|[6706.0,14.0,5.0]|[[-3219.0], [-201...|44.553338819890925|
|170525|[6801.0,16.0,5.0]|[[-3264.0], [-203...| 55.58776843874919|
|196588|[6685.0,11.0,4.0]|[[-3209.0], [-201...| 64.07807737440318|
|149784| [6824.0,3.0,5.0]|[[-3276.0], [-206...| 76.22335600063802|
+------+-----------------+--------------------+------------------+

