In [None]:
import pandas as pd
from pyspark.sql import SparkSession, DataFrame
from elasticsearch import Elasticsearch, helpers
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql.functions import col
from functools import reduce
import uuid

In [None]:
spark = SparkSession.builder.config("spark.submit.deployMode","client").getOrCreate()

In [None]:
model = ALSModel.load("/user/amm9801_nyu_edu/project/als_model_merged")

In [None]:
itemfactors = spark.createDataFrame(model.itemFactors.rdd)

In [None]:
items_frame = itemfactors.select('id','features').toPandas()\
                .rename(columns={"id": "asinIdx", "features": "features"})

In [None]:
merged_reviews_filtered_transformed = spark.read.csv("/user/amm9801_nyu_edu/project/merged_reviews_filtered_transformed", inferSchema=True, header=True)merged_reviews_filtered_transformed

In [None]:
review_joined_df = items_frame.join(merged_reviews_filtered_transformed, "asinIdx", "inner").select("asin", "features")

In [None]:
spark.conf.set("spark.sql.caseSensitive", "true")
raw_metadata_books = spark.read.json('/user/amm9801_nyu_edu/project/meta_Books.json')
book_metadata = raw_metadata_books.select('asin', 'title', 'description', 'brand')

In [None]:
raw_metadata_clothing = spark.read.json('/user/sg6482_nyu_edu/project/meta_Clothing_Shoes_and_Jewelry.json')
clothing_metadata = raw_metadata_clothing.select('asin', 'title', 'description', 'brand')


In [None]:
raw_metadata_electronic = spark.read.json('/user/sa6142_nyu_edu/project/electronics/meta_Electronics.json')
electronic_metadata = raw_metadata_electronic.select('asin', 'title', 'description', 'brand')


In [None]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [None]:
merged_metadata = unionAll(book_metadata, clothing_metadata, electronic_metadata)

In [None]:
final_data = merged_metadata.join(review_joined_df, "asin", "inner")

In [None]:
final_data.show(5)

In [None]:
from elasticsearch import Elasticsearch

In [None]:
es_client = Elasticsearch('https://my-deployment-ccde32.es.us-east4.gcp.elastic-cloud.com', http_auth=('elastic','ibi6dHbvjbMem8xbqxMknZgA'))


In [None]:
index_name = "amazon_product_index"
try:
    es_client.indices.delete(index=index_name)
except Exception as e:
    print(e)
index_body = {
      'settings': {
        'number_of_shards': 1,
        'number_of_replicas': 0,
        'analysis': {
          "filter":{  
            "english_stop":{
              "type":"stop",
              "stopwords":"english"
            },
            "english_stemmer":{
              "type":"stemmer",
              "language":"english"
            }
          },  
          "analyzer": {
            "stem_english": { 
              "type":"custom",
              "tokenizer":"standard",
              "filter":[
                "lowercase",
                "english_stop",
                "english_stemmer"
              ]
            }
        }
      }},
      'mappings': {
          'properties': {
              'asin' : {'type': 'text'},
              'description': {
                  'type': 'text',
                  'analyzer': 'stem_english'
              },
              'title': {
                  'type': 'text',
                  'analyzer': 'stem_english'
              },
              'brand': {
                  'type': 'text',
                  'analyzer': 'stem_english'
              },
              "profile_vector": {
                "type": "dense_vector",
                "dims": 48
              }
          }
      }
    }
es_client.indices.create(index=index_name,body=index_body)

In [None]:
es_dataset = [{"_index": index_name, "_id": uuid.uuid4(), "_source" : {"title": doc[1]["title"], "description": doc[1]["description"],"asin": doc[1]["asin"], "brand": doc[1]["brand"], "profile_vector": doc[1]["features"] }} for doc in final_data.toPandas().iterrows()]
#bulk insert them
helpers.bulk(es_client, es_dataset)