## 1.Add libraries and Initial ES

In [14]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.sql.types import *
from elasticsearch import Elasticsearch
es = Elasticsearch()

Initial index and doc type name

In [2]:
esIndex = "yelpindex"
esDocType = "yelp"

## 2. Create data schema in ES. 
This step only need execute one time for a data set because it is used to initial data schema in ES.

In [3]:
def initIndex():   
    create_index = {
        "settings": {
            "analysis": {
                "analyzer": {
                    "payload_analyzer": {
                        "type": "custom",
                        "tokenizer":"whitespace",
                        "filter":"delimited_payload_filter"
                    }
                }
            }
        },
        "mappings": {
            "yelp": {
                "properties": {
                    "text": {
                        "type": "text"
                    },
                     "userId": {
                        "type": "integer",
                        "index": "not_analyzed"
                    },
                    "itemId": {
                        "type": "integer",
                        "index": "not_analyzed"
                    },
                    "stars": {
                        "type": "double"
                    },
                    "is_open": {
                        "type": "double"
                    },
                    "@model": {
                        "properties": {
                            "factor": {
                                "type": "text",
                                "term_vector": "with_positions_offsets_payloads",
                                "analyzer" : "payload_analyzer"
                            },
                            "version": {
                                "type": "keyword"
                            }
                        }
                    }
                }
            }
        }
    }
    # create index with the settings & mappings above
    es.indices.create(index=esIndex, body=create_index)
    print "ES index(%s) create success" % esIndex
initIndex()

ES index(yelpindex) create success


## 3. Prepare data and indexed in ES server.
### 3.1 Load original data from files

This step only used for initial dataset in ES, it only need execute one time.

In [4]:
yelp_review = spark.read.json("/root/yelp/yelp_academic_dataset_review.json")\
    .select("business_id","stars","user_id","text")
yelp_business = spark.read.json("/root/yelp/yelp_academic_dataset_business.json")\
    .select("business_id","name","address","city","categories","is_open") 
yelp_review.show(5)
yelp_business.show(5)

+--------------------+-----+--------------------+--------------------+
|         business_id|stars|             user_id|                text|
+--------------------+-----+--------------------+--------------------+
|2aFiy99vNLklCx3T_...|    5|KpkOkG6RIf4Ra25Lh...|If you enjoy serv...|
|2aFiy99vNLklCx3T_...|    5|bQ7fQq1otn9hKX-gX...|After being on th...|
|2aFiy99vNLklCx3T_...|    5|r1NUhdNmL6yU9Bn-Y...|Great service! Co...|
|2LfIuF3_sX6uwe-IR...|    5|aW3ix1KNZAvoM8q-W...|Highly recommende...|
|2LfIuF3_sX6uwe-IR...|    4|YOo-Cip8HqvKp_p9n...|I walked in here ...|
+--------------------+-----+--------------------+--------------------+
only showing top 5 rows

+--------------------+-------------------+--------------------+---------+--------------------+-------+
|         business_id|               name|             address|     city|          categories|is_open|
+--------------------+-------------------+--------------------+---------+--------------------+-------+
|0DI8Dt2PJp07XkVvI...|  Inn

### 3.2 Join the review and business data so that we can get all the rates for each business

We use the business which is open in Las Vegas. The dataset include so many data and the python can not deal such dataset once. So we only use business in Las Vegas to reduce the effort of data prepare.

In [5]:
yelp_data = yelp_review.join(yelp_business, yelp_review.business_id == yelp_business.business_id)\
    .select(yelp_review.business_id, yelp_review.stars, yelp_review.user_id, yelp_review.text,\
    yelp_business.name,yelp_business.address,yelp_business.city,yelp_business.categories,yelp_business.is_open)\
    .filter(yelp_business.city == 'Las Vegas').filter( yelp_business.is_open == 1)
num_yelp = yelp_data.count()
yelp_data.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+
|         business_id|stars|             user_id|                text|                name|             address|     city|          categories|is_open|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+
|--9e1ONYQuAa-CB_R...|    1|0XVzm4kVIAaH4eQAx...|I mainly went for...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1|
|--9e1ONYQuAa-CB_R...|    5|5aFBj0emFzoXsUcKb...|"WOW!!!" that's w...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1|
|--9e1ONYQuAa-CB_R...|    1|n0y7p7B1NMia_3lpk...|I visited this pl...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1|
|--9e1ONYQuAa-CB_R...|    4|aP4BkNgP4wzQ5woQM...|Delmonico is a te...|Delmonico Steakhou

In [6]:
print yelp_data.count()
print yelp_business.count()

735029
144072


### 3.3 Split dataset to training data and bacup data

In [7]:
yelp_test, yelp_backup = yelp_data.randomSplit([0.1,0.9])
num_test=yelp_test.count()
print "The number of training data is ",num_test

The number of training data is  73405


### 3.4 Create integer id for business and user

The original business id and user id is a string but ES need integer business id and user id to improve performance. So we use pipeline transformer to create a seqence for business id and user id.

In [8]:
businessIndexer = StringIndexer(inputCol="business_id",outputCol="itemId")
userIndexer = StringIndexer(inputCol="user_id",outputCol="userId")
pipeline = Pipeline(stages=[businessIndexer, userIndexer])
review_df=pipeline.fit(yelp_test).transform(yelp_test).select("text","name","address","city","categories","is_open",col("userId").cast(IntegerType()), col("itemId").cast(IntegerType()),col("stars").cast(DoubleType()))
review_df.show(5)
review_df.printSchema()

+--------------------+--------------------+--------------------+---------+--------------------+-------+------+------+-----+
|                text|                name|             address|     city|          categories|is_open|userId|itemId|stars|
+--------------------+--------------------+--------------------+---------+--------------------+-------+------+------+-----+
|Very greasy, lack...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1| 53438|    75|  1.0|
|Ohhh where to bei...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1| 20251|    75|  1.0|
|very disappointed...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1| 53568|    75|  2.0|
|Review is for lun...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1|  7626|    75|  2.0|
|Lobster bisque, s...|Delmonico Steakhouse|3355 Las Vegas Bl...|Las Vegas|[Steakhouses, Res...|      1| 25757|    75|  2.0|
+-------

### 3.5 Insert and index data into ES

In [9]:
data = review_df.collect()
i = 0
for row in data:
    yelp = {
        "itemId": row.itemId,
        "id": row.itemId,
        "name": row.name,
        "address": row.address,
        "city":row.city,
        "categories": row.categories,
        "userId":row.userId,
        "text": row.text,
        "stars": row.stars,
        "is_open": row.is_open
    }
    es.index(esIndex, esDocType, id=yelp['itemId'], body=yelp)
    i += 1
    if i % 5000 == 0: print "Indexed %s items of %s" % (i, num_test)


Indexed 5000 items of 73405
Indexed 10000 items of 73405
Indexed 15000 items of 73405
Indexed 20000 items of 73405
Indexed 25000 items of 73405
Indexed 30000 items of 73405
Indexed 35000 items of 73405
Indexed 40000 items of 73405
Indexed 45000 items of 73405
Indexed 50000 items of 73405
Indexed 55000 items of 73405
Indexed 60000 items of 73405
Indexed 65000 items of 73405
Indexed 70000 items of 73405


## 4. Create model and save model into ES

### 4.1 Load data from es used to create model

In [3]:
yelp_df = spark.read.format("es").option("es.read.field.as.array.include", "categories").load(esIndex+"/"+esDocType)
yelp_df.printSchema()
yelp_df.show(5)

root
 |-- @model: struct (nullable = true)
 |    |-- factor: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- address: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- id: long (nullable = true)
 |-- is_open: double (nullable = true)
 |-- itemId: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- userId: integer (nullable = true)

+--------------------+--------------------+--------------------+---------+---+-------+------+--------------------+-----+--------------------+------+
|              @model|             address|          categories|     city| id|is_open|itemId|                name|stars|                text|userId|
+--------------------+--------------------+--------------------+---------+---+-------+------+--------------------+-----+--------------------+------+


### 4.2 Train ALS model

In [4]:
als = ALS(userCol="userId", itemCol="itemId", ratingCol="stars", regParam=0.1, rank=10, seed=42)
model = als.fit(yelp_df)

In [5]:
model.userFactors.show(10)
model.itemFactors.show(10)

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[0.41811794, -0.0...|
| 10|[1.204584, 0.0556...|
| 20|[0.7733746, 0.558...|
| 30|[-0.51777494, -0....|
| 40|[-1.2260402, 0.00...|
| 50|[0.3837663, -0.48...|
| 60|[-0.020416455, -0...|
| 70|[-0.37953523, -0....|
| 80|[-0.36325774, -0....|
| 90|[0.44119626, -0.0...|
+---+--------------------+
only showing top 10 rows

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[-1.6613755, 0.56...|
| 10|[-0.5781848, -0.2...|
| 20|[1.1038743, 0.636...|
| 30|[-0.24086721, 1.0...|
| 40|[-0.82231486, -0....|
| 50|[0.758447, 0.5330...|
| 60|[1.1222466, -0.17...|
| 70|[0.4667988, 0.827...|
| 80|[-0.97184336, -1....|
| 90|[-0.20555457, 1.3...|
+---+--------------------+
only showing top 10 rows



### 4.3 Convert model data inorder to save it into ES.

In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, lit
def convert_vector(x):
    '''Convert a list or numpy array to delimited token filter format'''
    return " ".join(["%s|%s" % (i, v) for i, v in enumerate(x)])
def reverse_convert(s):
    '''Convert a delimited token filter format string back to list format'''
    return  [float(f.split("|")[1]) for f in s.split(" ")]
def vector_to_struct(x, version):
    '''Convert a vector to a SparkSQL Struct with string-format vector and version fields'''
    return (convert_vector(x), version)
vector_struct = udf(vector_to_struct, \
                    StructType([StructField("factor", StringType(), True), \
                                StructField("version", StringType(), True)]))

Show model data formate

In [7]:
# test out the vector conversion function
test_vec = model.itemFactors.select("features").first().features
print test_vec
print
print convert_vector(test_vec)

[-1.6613755226135254, 0.5610125660896301, -0.9656401872634888, -0.3149564862251282, 0.5551421046257019, -1.5766654014587402, -0.15945157408714294, -0.9169647693634033, 1.0778717994689941, -0.2776658535003662]

0|-1.66137552261 1|0.56101256609 2|-0.965640187263 3|-0.314956486225 4|0.555142104626 5|-1.57666540146 6|-0.159451574087 7|-0.916964769363 8|1.07787179947 9|-0.2776658535


### 4.4 Save model into ES

In [15]:
ver = model.uid
item_vectors = model.itemFactors.select("id", vector_struct("features", lit(ver)).alias("@model"))

# write data to ES, use:
# - "id" as the column to map to ES yelp id
# - "update" write mode for ES
# - "append" write mode for Spark
item_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save(esIndex+"/"+esDocType, mode="append")

Search one data to check if model save success

In [8]:
es.search(index=esIndex, doc_type=esDocType, q="Target", size=1)

{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'1570',
    u'_index': u'yelpindex',
    u'_score': 9.771611,
    u'_source': {u'@model': {u'factor': u'0|-0.884719610214 1|-0.233291909099 2|2.21565961838 3|0.411309123039 4|-1.25451374054 5|-0.683289051056 6|-0.137509047985 7|-0.59401679039 8|-0.115472383797 9|0.815175294876',
      u'version': u'ALS_4a918e586b2296bde3f1'},
     u'address': u'8750 Charleston Blvd',
     u'categories': [u'Eyewear & Opticians',
      u'Shopping',
      u'Optometrists',
      u'Health & Medical'],
     u'city': u'Las Vegas',
     u'id': 1570,
     u'is_open': 1,
     u'itemId': 1570,
     u'name': u'Target Optical',
     u'stars': 5.0,
     u'text': u"This is my local target at Summerlin. Always good service and well organized. Lately they've hired some sketchy people. I hope they passed the background test.",
     u'userId': 8986},
    u'_type': u'yelp'}],
  u'max_score': 9.771611,
  u'total': 30},
 u'timed_out':

## 5. Search similar business from ES

### 5.1 Search similar business from ES using cosine

In [9]:

def fn_query(query_vec, q="*", cosine=False):
    return {
    "query": {
        "function_score": {
            "query" : { 
                "query_string": {
                    "query": q
                }
            },
            "script_score": {
                "script": {
                        "inline": "payload_vector_score",
                        "lang": "native",
                        "params": {
                            "field": "@model.factor",
                            "vector": query_vec,
                            "cosine" : cosine
                        }
                    }
            },
            "boost_mode": "replace"
        }
    }
}

def get_similar(the_id, q="*", num=10, index=esIndex, dt=esDocType):
    response = es.get(index=index, doc_type=dt, id=the_id)
    src = response['_source']
    if '@model' in src and 'factor' in src['@model']:
        raw_vec = src['@model']['factor']
        # our script actually uses the list form for the query vector and handles conversion internally
        query_vec = reverse_convert(raw_vec)
        q = fn_query(query_vec, q=q, cosine=True)
        results = es.search(index, dt, body=q)
        hits = results['hits']['hits']
        return src, hits[1:num+1]



In [10]:
def yelp_similar(the_id, q="*", num=10, index=esIndex, dt=esDocType):
    bussiness, recs = get_similar(the_id, q, num, index, dt)
    # display query
    print  "Business: ",bussiness['id']
    print  "Business Name: ",bussiness['name']
    print  "Address: ",bussiness['address']
    print  "Category: ",bussiness['categories']
    print  "***************************"
    print  "Similar Business List:"
    i = 0
    for rec in recs:
        i+=1
        r_score = rec['_score']
        r_source=rec['_source']
        business_id = r_source['id']
        city=r_source['city']
        name=r_source['name']
        text=r_source['text']
        userId=r_source['userId']
        stars=rec['_source']['stars']
        address=rec['_source']['address']
        categories=rec['_source']['categories']
        print "==================================="
        print "No %s:"%i
        print "Score: ", r_score
        print "Business ID: %s"%business_id#r_im_url)
        print "City: ", city
        print "Name: ", name
        print "Address:  ", address
        print "Category: ", categories  
        print "UserId: ", userId
        print "Stars: ", stars
        print "User Comment: "
        print "----------"
        print text


### 5.2 Search similar business from ES.

We can find the top 10 similar business from ES for a business ID.
User also can use other App to get the result by ES for the model have been saved.

In [11]:
yelp_similar(188)

Business:  188
Business Name:  Pink Box Doughnuts
Address:  7531 W Lake Mead Blvd
Category:  [u'Donuts', u'Food']
***************************
Similar Business List:
No 1:
Score:  0.8855039
Business ID: 2017
City:  Las Vegas
Name:  St. Tropez International Buffet
Address:   9090 Alta Dr
Category:  [u'Buffets', u'Restaurants']
UserId:  17306
Stars:  3.0
User Comment: 
----------
Big improvement on the Suncoast Buffet.  Something is there for everyone.
No 2:
Score:  0.87698865
Business ID: 6827
City:  Las Vegas
Name:  The Joint Chiropractic
Address:   7120 N Durango, Ste H-170
Category:  [u'Health & Medical', u'Chiropractors']
UserId:  38791
Stars:  5.0
User Comment: 
----------
Stacy always remembers my name. I have improved movement in my neck 50 percent since starting my routine w/The Joint.  Love it!!
No 3:
Score:  0.85656875
Business ID: 1407
City:  Las Vegas
Name:  Gorilla Sushi
Address:   1801 E Tropicana Ave, Ste 2
Category:  [u'Restaurants', u'Sushi Bars', u'Asian Fusion', u'Japa

In [13]:
yelp_similar(2391)

Business:  2391
Business Name:  Big Elvis
Address:  3475 Las Vegas Blvd S
Category:  [u'Arts & Entertainment', u'Local Flavor', u'Performing Arts']
***************************
Similar Business List:
No 1:
Score:  0.91857845
Business ID: 7784
City:  Las Vegas
Name:  Danka K. Michaels, MD
Address:   3320 N Buffalo Dr, Ste 106
Category:  [u'Doctors', u'Health & Medical']
UserId:  7811
Stars:  5.0
User Comment: 
----------
Dr Michael's is a great doctor.  I have been seeing her for four years and she and her PA Roberto are both gems.
No 2:
Score:  0.8838447
Business ID: 9172
City:  Las Vegas
Name:  Fancy Fine European Bakery
Address:   420 S Rampart Blvd
Category:  [u'Desserts', u'Ice Cream & Frozen Yogurt', u'Bakeries', u'Food']
UserId:  18221
Stars:  1.0
User Comment: 
----------
Awful!!! Ordered the red velvet cake it was like a month old it was so dry my kids wouldn't even eat it, same with the chocolate cake and the mini cheesecakes don't even taste like cheesecake! Won't be ordering 