In [13]:
!pip install numpy

Collecting numpy
  Downloading numpy-1.21.2-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (15.8 MB)
     |████████████████████████████████| 15.8 MB 7.9 MB/s            
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.21.2


In [25]:
from pyspark.sql import *
import gzip
import json
import os
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkConf
import numpy
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# !rm -r elasticsearch-hadoop-7.6.2

In [None]:
#Import data
!apt-get install -y wget

#Review (14GB)
!wget http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Appliances.json.gz

#Product metadata (12GB)
!wget http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles2/meta_Appliances.json.gz
    
#Download ES jar for Spark
!wget https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/7.12.0/elasticsearch-spark-30_2.12-7.12.0.jar

In [2]:
# Initializing spark context
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars ./elasticsearch-spark-30_2.12-7.12.0.jar pyspark-shell'

conf = SparkConf()
conf.set("spark.es.nodes","elasticsearch")
conf.set("spark.es.port","9200")

spark = SparkSession.builder.config(conf=conf).appName("recommendation_system").getOrCreate()

21/10/15 14:54:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
def parse(path):
    g = gzip.open(path, 'rb')
    for l in g:
        yield json.loads(l)

# function to read data and convert to spark dataframe     
def getMetaData(path):
    data = []
    data_schema =  [
                       StructField("asin", StringType(), True),
                       StructField("title", StringType(), True),
                       StructField("brand", StringType(), True),
                       StructField("category", ArrayType(StringType(), True), True),
                       StructField("main_category", StringType(), True),
#                        StructField("image", ArrayType(StringType(), True), True)
                   ]
    final_schema = StructType(fields=data_schema)
    for d in parse(path):
        review = {}
        review['asin'] = d['asin']
        review['title'] = d['title']
        review['brand'] = d['brand']
        review['category'] = d['category']
        review['main_category'] = next(reversed(d['category']), None) if len(d['category'])!= 0 else ''
#         review['image'] = d['image']
        data.append(review)
    return spark.createDataFrame(data, schema=final_schema)

In [4]:
product_data = getMetaData('meta_Appliances.json.gz')

In [5]:
product_data = product_data.dropDuplicates(['asin'])

In [6]:
product_data.show(n=1)

21/10/15 14:54:27 WARN TaskSetManager: Stage 0 contains a task of very large size (2384 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 2) / 2]

+----------+--------------------+------+--------------------+---------------+
|      asin|               title| brand|            category|  main_category|
+----------+--------------------+------+--------------------+---------------+
|B000BEZV7M|Extech RH401 Trip...|Extech|[Appliances, Part...|Humidity Meters|
+----------+--------------------+------+--------------------+---------------+
only showing top 1 row



                                                                                

In [29]:
!pip install elasticsearch
from elasticsearch import Elasticsearch



In [7]:
VECTOR_DIM = 25

# Index mapping
product_mapping = {
    # this mapping definition sets up the metadata fields for the products
    "mappings": {
        "properties": {
            "asin": {
                "type": "keyword"
            },
            "title": {
                "type": "keyword"
            },
            "brand": {
                "type": "keyword"
            },
            "category": {
                "type": "keyword"
            },
            "main_category": {
                "type": "keyword"
            },
            # the following fields define our model factor vectors and metadata
            "model_factor": {
                "type": "dense_vector",
                "dims" : VECTOR_DIM
            },
            "model_version": {
                "type": "keyword"
            },
            "model_timestamp": {
                "type": "date"
            }          
        }
    }
}

In [30]:
es = Elasticsearch(
    ['elasticsearch'],
    scheme="http",
    port=9200,
)

In [None]:
# Creating ES index (1st time)
es.indices.create(index="products", body=product_mapping)

In [15]:
es.search(index="products")

{'took': 6,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 0, 'relation': 'eq'},
  'max_score': None,
  'hits': []}}

In [7]:
# Indexing data to ES
product_data.write.format("es").option("es.mapping.id", "asin").save("products")

21/10/15 14:54:39 WARN TaskSetManager: Stage 2 contains a task of very large size (2384 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

### Preparing feature vectors

In [9]:
# function to read rating data
def getRatingData(path):
    data = []
    data_schema = [
               StructField("asin", StringType(), True),
               StructField("reviewerId", StringType(), True),
               StructField("rating", FloatType(), True)]
    final_schema = StructType(fields=data_schema)
    for d in parse(path):
        review = {}
        review['asin'] = d['asin']
        review['reviewerId'] = d['reviewerID']
        review['rating'] = d['overall']
        data.append(review)
    return spark.createDataFrame(data, schema=final_schema)

df_rating= getRatingData('Appliances.json.gz')

In [15]:
# convert strings to vectors
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df_rating.columns)-set(['rating'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(df_rating).transform(df_rating)

21/10/15 14:58:22 WARN TaskSetManager: Stage 4 contains a task of very large size (11992 KiB). The maximum recommended task size is 1000 KiB.
21/10/15 14:58:29 WARN TaskSetManager: Stage 6 contains a task of very large size (11992 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [22]:
#creating an item vector using ALS 
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="reviewerId_index",itemCol="asin_index",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(transformed)

21/10/15 15:00:43 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:00:43 WARN TaskSetManager: Stage 8 contains a task of very large size (11992 KiB). The maximum recommended task size is 1000 KiB.
21/10/15 15:00:45 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:00:45 WARN TaskSetManager: Stage 9 contains a task of very large size (11992 KiB). The maximum recommended task size is 1000 KiB.
21/10/15 15:00:49 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:00:53 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:01:02 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:01:04 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:01:10 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:01:11 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS


In [26]:
#Evaluating the model
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
predictions=model.transform(transformed)
rmse=evaluator.evaluate(predictions)

21/10/15 15:03:09 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:03:10 WARN TaskSetManager: Stage 55 contains a task of very large size (11992 KiB). The maximum recommended task size is 1000 KiB.
21/10/15 15:03:13 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:03:15 WARN DAGScheduler: Broadcasting large task binary with size 20.4 MiB
21/10/15 15:03:22 WARN DAGScheduler: Broadcasting large task binary with size 20.5 MiB
21/10/15 15:05:58 WARN DAGScheduler: Broadcasting large task binary with size 20.5 MiB
21/10/15 15:07:46 WARN DAGScheduler: Broadcasting large task binary with size 20.5 MiB
                                                                                

### Get Similar

In [36]:
def vector_query(query_vec, category,vector_field, cosine=False):
    """
    Construct an Elasticsearch script score query using `dense_vector` fields
    
    The script score query takes as parameters the query vector (as a Python list)
    
    Parameters
    ----------
    query_vec : list
        The query vector
    vector_field : str
        The field name in the document against which to score `query_vec`
    q : str, optional
        Query string for the search query (default: '*' to search across all documents)
    cosine : bool, optional
        Whether to compute cosine similarity. If `False` then the dot product is computed (default: False)
     
    Note: Elasticsearch cannot rank negative scores. Therefore, in the case of the dot product, a sigmoid transform
    is applied. In the case of cosine similarity, 1.0 is added to the score. In both cases, documents with no 
    factor vectors are ignored by applying a 0.0 score.
    
    The query vector passed in will be the user factor vector (if generating recommended items for a user)
    or product factor vector (if generating similar items for a given item)
    """
    
    if cosine:
        score_fn = "doc['{v}'].size() == 0 ? 0 : cosineSimilarity(params.vector, '{v}') + 1.0"
    else:
        score_fn = "doc['{v}'].size() == 0 ? 0 : sigmoid(1, Math.E, -dotProduct(params.vector, '{v}'))"
       
    score_fn = score_fn.format(v=vector_field, fn=score_fn)
    
    return {
    "query": {
        "script_score": {
            "query" : { 
                "bool" : {
                      "filter" : {
                            "term" : {
                              "main_category" : category
                            }
                        }
                }
            },
            "script": {
                "source": score_fn,
                "params": {
                    "vector": query_vec
                }
            }
        }
    }
}


def get_similar(the_id, num=10, index="products", vector_field='model_factor'):
    """
    Given a item id, execute the recommendation script score query to find similar items,
    ranked by cosine similarity. We return the `num` most similar, excluding the item itself.
    """
    response = es.get(index=index, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        category = src['main_category']
        q = vector_query(query_vec, category,vector_field, cosine=True)
#         print(q)
        results = es.search(index=index, body=q)
        hits = results['hits']['hits']
        return src,hits[1:num+1]

def display_similar(the_id, num=10, es_index="products"):
    """
    Display query product, together with similar product and similarity scores, in a table
    """
    product, recs = get_similar(the_id, num, es_index)
       
    display(HTML("<h2>Get similar products for:</h2>"))
    display(HTML("<h4>%s (ASIN - %s)</h4>" % (product['title'], product['asin'])))
    display(HTML("<br>"))
    display(HTML("<h2>People who liked this product also liked these:</h2>"))
    sim_html = "<table border=0>"
    i = 0
    pd_data = []
    for rec in recs:
        r_score = rec['_score']
        r_title = rec['_source']['title']
        r = {}
        r['asin'] = rec['_source']['asin']
        r['title'] = r_title
        r['score'] = r_score
        pd_data.append(r)
#         r_im_url = next(iter(rec['_source']['image']), '')
#         sim_html += "<tr><td><h5>%s</h5><img src=%s width=150></img></td><td><h5>%2.3f</h5></td></tr>" % (r_title, r_im_url, r_score)
        i += 1
    sim_html += "</table>"
    pd.set_option('display.max_colwidth', -1) 
    pd_df = pd.DataFrame (pd_data)
    display(HTML(pd_df.to_html()))
    display(HTML(sim_html))

In [38]:
data = display_similar('B000P9BY2E', num=5)
data

TypeError: cannot unpack non-iterable NoneType object