In [1]:
# url_list = [('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Musical_Instruments_5.json.gz', 'Musical_Instruments'),
#            ('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Amazon_Instant_Video_5.json.gz', 'Instant_Videos')]
review_list = [('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Books_5.json.gz','Books'),
       ('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Electronics_5.json.gz','Electronics'),
       ('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Movies_and_TV_5.json.gz','Movies_and_TV'),
       ('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_CDs_and_Vinyl_5.json.gz','CDs_and_Vinyl')]
#metadata_list = [('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Amazon_Instant_Video.json.gz', 'Instant_Video')]
metadata_list = [('http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Electronics.json.gz', 'Electronics')]

stop_words_url = 'http://tacit.usc.edu/resources/stopwords_eng.txt'

In [2]:
import urllib
load_data = False
load_metadata = False
load_stop_words = False

if load_data:
  for url_tuple in review_list:
    url = url_tuple[0]
    folder_path = '/tmp/' + url_tuple[1] + '.json.gz'
    urllib.urlretrieve(url, folder_path)
    dbutils.fs.mv('file:' + folder_path, 'dbfs:/tmp/Data/' + url_tuple[1] + '.json.gz')
    display(dbutils.fs.ls("dbfs:/tmp/Data"))
    
if load_metadata:
  for url_tuple in metadata_list:
    url = url_tuple[0]
    folder_path = '/tmp/' + url_tuple[1] + '.json.gz'
    urllib.urlretrieve(url, folder_path)
    dbutils.fs.mv('file:' + folder_path, 'dbfs:/tmp/Metadata/' + url_tuple[1] + '.json.gz')
    display(dbutils.fs.ls("dbfs:/tmp/Metadata"))
    
if load_stop_words:
  urllib.urlretrieve(stop_words_url, '/tmp/stopwords_eng.txt')
  dbutils.fs.mv('file:/tmp/stopwords_eng.txt', 'dbfs:/tmp/stopwords_eng.txt')
  #   dbutils.fs.mv("file:/tmp/test.json.gz", "dbfs:/tmp/Data/test.json.gz")
  display(dbutils.fs.ls("dbfs:/tmp"))

In [3]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('asin', StringType()),
   StructField('helpful', ArrayType(IntegerType())),
   StructField('overall', FloatType()),
   StructField('reviewText', StringType()),
   StructField('reviewTime', DateType()),
   StructField('reviewerID', StringType()),
   StructField('summary', StringType())]
)
# StructField('unixReviewTime', LongType())
# ['asin', 'description', 'title', 'categories']
metadata_df_schema = StructType(
  [StructField('asin', StringType()),
   StructField('description', StringType()),
   StructField('title', StringType()),
   StructField('categories', ArrayType(StringType()))]
)
# metadata_df_schema = StructType(
#   [StructField('Product_ID', StringType()),
#    StructField('Features', ArrayType(StructType([StructField('_1', LongType()), StructField('_2', ArrayType(StringType()))])))]
# )

#parse_point_udf = udf(parse_point, ArrayType(StructType([StructField('_1', LongType()),
#                                                          StructField('_2', StringType())])))


In [4]:
#import 5-core review data

import gzip
from pyspark.sql import Row
from collections import OrderedDict
from datetime import datetime

def parse_review(path):
  g = gzip.open(path, 'rb')
  for l in g:
    yield eval(l)

def create_review_df(path):
  ratings = []
  for review in parse_review(path):
    ratings.append(review)

  #check if all entries contains a value for each key
#   missing_keys = []
#   all_keys = ['reviewerID', 'asin', 'reviewerName', 'helpful', 'unixReviewTime', 'reviewText', 'overall', 'reviewTime', 'summary']
#   for i in range(len(ratings)):
#     for j in range(len(all_keys)):
#       if (all_keys[j] not in ratings[i].keys()):
#         missing_keys.append(all_keys[j])
#   print set(missing_keys)
  #not all entries have 'reviewerName'

  #remove revivewerName, unixreviewTime from all entries and convert reivewTime to datetime.date type
  for i in range(len(ratings)):
    ratings[i].pop('reviewerName', None)
    ratings[i].pop('unixReviewTime', None)
    ratings[i]['reviewTime'] = datetime.strptime(ratings[i]['reviewTime'], '%m %d, %Y')

  return sqlContext.createDataFrame(ratings, schema=ratings_df_schema)
  
# books_df = create_review_df('/dbfs/tmp/Data/Books.json.gz')
electronics_df = create_review_df('/dbfs/tmp/Data/Electronics.json.gz')
# movies_df = create_review_df('/dbfs/tmp/Data/Movies_and_TV.json.gz')
# cds_df = create_review_df('/dbfs/tmp/Data/CDs_and_Vinyl.json.gz')
# instruments_df = create_review_df('/dbfs/tmp/Data/Musical_Instruments.json.gz')
# instant_video_df = create_review_df('/dbfs/tmp/Data/Instant_Videos.json.gz')


# print books_df.cache().count()
print electronics_df.cache().count()
# print movies_df.cache().count()
# print cds_df.cache().count()
# print instruments_df.cache().count()
# print instant_video_df.cache().count()



electronics_df.show(3, truncate=True)

#alternative way to create dataframe, but each element will be a string
#def convert_to_row(d):
  #add 'reviewerName' to dict if missing
#  if 'reviewerName' not in d.keys():
#    d['reviewerName'] = 'unknown'
#  return Row(**OrderedDict(sorted(d.items())))
#test_df = sc.parallelize(ratings).map(convert_to_row).toDF()


In [5]:

#import metadata files

import gzip
from pyspark.sql import Row
from collections import OrderedDict
from datetime import datetime

def parse_metadata(path):
  g = gzip.open(path, 'rb')
  for l in g:
    yield eval(l)

def create_metadata_df(path):
  metadata = []
  for entry in parse_metadata(path):
    metadata.append(entry)

#   #find all keys
#   all_keys = []
#   for i in range(len(metadata)):
#     for key in metadata[i].keys():
#       if key not in all_keys:
#         all_keys.append(key)
#   print all_keys
#   #['asin', 'categories', 'price', 'related'] in instant videos
#   #['asin', 'imUrl', 'description', 'categories', 'title', 'related', 'price', 'salesRank', 'brand'] in electronics
  
#   #check number of entries that contains each key
#   key_count_dict = {}
#   for key in all_keys:
#     key_count_dict[key] = 0
#   for i in range(len(metadata)):
#     for key in all_keys:
#       if key in metadata[i].keys():
#         key_count_dict[key] += 1
#   print key_count_dict  
#   #{'asin': 498196, 'description': 459470, 'title': 491194, 'price': 389693, 'imUrl': 498021, 'related': 366959, 'salesRank': 128706, 'brand': 142532, 'categories': 498196} for electronics
  discard_missing_info_col = False
  cols_to_keep = ['asin', 'description', 'title', 'categories']
  filtered_metadata = []
  for i in range(len(metadata)):
    keep_col = True
    if discard_missing_info_col:
      #only retain items that contain info for all columns
      cols = [x for x in cols_to_keep if x in metadata[i].keys()]
      if len(cols) != len(cols_to_keep):
        keep_col = False
    if keep_col:
      temp_dict = {}
      for col in cols_to_keep: 
        if col not in metadata[i].keys():
          temp_dict[col] = None
        elif col == 'categories':#given as array of array so only keeep the inner array
          temp_dict[col] = metadata[i]['categories'][0]
        else:
          temp_dict[col] = metadata[i][col]
      filtered_metadata.append(temp_dict)
#       features = [(x, temp_dict[cols_to_keep[x]]) for x in range(len(cols_to_keep))]
#       print features
#       filtered_metadata.append((metadata[i][label], features))

  return sqlContext.createDataFrame(filtered_metadata, schema = metadata_df_schema)#schema=metadata_df_schema
  

# books_df = create_review_df('/dbfs/tmp/Metadata/Books.json.gz')
electronics_meta_df = create_metadata_df('/dbfs/tmp/Metadata/Electronics.json.gz')
electronics_meta_df.cache().count()
#instant_video_meta_df = create_metadata_df('/dbfs/tmp/Metadata/Instant_Video.json.gz')
# movies_df = create_review_df('/dbfs/tmp/Metadata/Movies_and_TV.json.gz')
# cds_df = create_review_df('/dbfs/tmp/Metadata/CDs_and_Vinyl.json.gz')
# instruments_df = create_review_df('/dbfs/tmp/Metadata/Musical_Instruments.json.gz')
# instant_video_df = create_review_df('/dbfs/tmp/Metadata/Instant_Videos.json.gz')

# print books_df.cache().count()
#print electronics_df.cache().count()
# print movies_df.cache().count()
# print cds_df.cache().count()
# print instruments_df.cache().count()
# print instant_video_df.cache().count()





In [6]:
print electronics_meta_df.count()

electronics_meta_df.show(20)



In [7]:
a = electronics_meta_df.select('title').collect()
print type(a[0]['title'])
print a[0]['title'].split()
print a[0]['title'].split()[0]

In [8]:
#import stopwords
stopwords_path = 'dbfs:/tmp/stopwords_eng.txt'
stopwords_rdd = sc.textFile(stopwords_path)
stopwords = stopwords_rdd.collect()
print len(stopwords)
print stopwords


In [9]:
# feature hashing
#convert each row of metadata_df to list of tuple format 
from pyspark.sql.functions import udf, array, split

def parse_row(description, title, categories):
  #description: string
  #title: string
  #categories: list of words
  des_words = []
  title_words = []
  if description != None:
    des_words = description.split(' ')
  if title != None:
    title_words = title.split(' ')
#   return [(0,description), (1,title), (2, categories[0])]

  all_words = [des_words, title_words, categories]

  #assign featureID to its index in all_words (e.g. des_words_ID = 0, title_words_ID = 1, etc.)
  tuple_list = []
  for ind in range(len(all_words)):
    for word in all_words[ind]:
      lower_case_word = word.lower()
      if lower_case_word not in stopwords:
        tuple_list.append((ind, lower_case_word))
  return tuple_list

parse_row_udf = udf(parse_row, ArrayType(StructType([StructField('_1', LongType()),
                                                         StructField('_2', StringType())])))
tuple_electronics_meta_df = electronics_meta_df.select(electronics_meta_df.asin, parse_row_udf(electronics_meta_df.description, electronics_meta_df.title, electronics_meta_df.categories).alias('features'))
electronics_meta_df.unpersist()
tuple_electronics_meta_df.cache().show(3)


In [10]:
a =  tuple_electronics_meta_df.select('features').first()[0]
# parse_point(raw_df.select('text').first()[0])
print a

In [11]:
from collections import defaultdict
import hashlib

def hash_function(raw_feats, num_buckets, print_mapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use print_mapping=True for debug purposes and to better understand how the hashing works.

    Args:
        raw_feats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        num_buckets (int): Number of buckets to use as features.
        print_mapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = { category + ':' + str(ind):
                int(int(hashlib.md5(category + ':' + str(ind)).hexdigest(), 16) % num_buckets)
                for ind, category in raw_feats}
    if(print_mapping): print mapping

    def map_update(l, r):
        l[r] += 1.0
        return l

    sparse_features = reduce(map_update, mapping.values(), defaultdict(float))
    return dict(sparse_features)

In [12]:
hash_function(a, 10, True)

In [13]:
from pyspark.ml.linalg import Vectors, VectorUDT


num_hash_buckets = 2 ** 3

# UDF that returns a vector of hashed features given an Array of tuples
tuples_to_hash_features_udf = udf(lambda x: Vectors.sparse(num_hash_buckets, hash_function(x, num_hash_buckets)), VectorUDT())

def add_hashed_features(df):
    """Return a DataFrame with labels and hashed features.

    Note:
        Make sure to cache the DataFrame that you are returning.

    Args:
        df (DataFrame with 'tuples' column): A DataFrame containing the tuples to be hashed.

    Returns:
        DataFrame: A DataFrame with a 'label' column and a 'features' column that contains a
            SparseVector of hashed features.
    """
    return (df.select(df.asin, tuples_to_hash_features_udf(df.features).alias('features')))

hash_electronics_meta_df = add_hashed_features(tuple_electronics_meta_df)
tuple_electronics_meta_df.unpersist()
hash_electronics_meta_df.cache().show(3)

In [14]:
print hash_electronics_meta_df.take(2)

In [15]:
kmeans.explainParams()

In [16]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
# data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),(Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
# df = spark.createDataFrame(data, ["features"])
kmeans = KMeans(k=2, seed=1, featuresCol="features")
model = kmeans.fit(hash_electronics_meta_df)
model.computeCost(hash_electronics_meta_df)
centers = model.clusterCenters()

print centers

In [17]:
transformed = model.transform(hash_electronics_meta_df).select("features", "prediction")
print transformed.filter(transformed.prediction == 0).count()
print transformed.filter(transformed.prediction == 1).count()

rows = transformed.collect()

In [18]:
rows[:5]

In [19]:
from pyspark.sql.functions import year, month, avg, count
ratings_by_year_df = ratings_df.select('overall', year('reviewTime').alias('Year')) \
                               .withColumnRenamed('overall', 'Rating') \
                               .groupby('Year').agg(avg('Rating').alias('Avg_rating'), count('Rating').alias('Num_reviews')) \
                               .orderBy('Year')

ratings_by_month_df = ratings_df.select('overall', month('reviewTime').alias('Month')) \
                               .withColumnRenamed('overall', 'Rating') \
                               .groupby('Month').agg(avg('Rating').alias('Avg_rating'), count('Rating').alias('Num_reviews')) \
                               .orderBy('Month')
                               
ratings_by_year_df.show()
ratings_by_month_df.show()


In [20]:
import matplotlib.pyplot as plt
import numpy as np
# plt.scatter(ratings_by_year_df.Year.Values, ratings_by_year_df.Rating.Values)
x = np.array(ratings_by_year_df.select('Year').collect())
y = np.array(ratings_by_year_df.select('Avg_Rating').collect())
fig, ax = plt.subplots()
ax.scatter(x,y)
plt.title('Beauty_year')
display(fig)


In [21]:
x = np.array(ratings_by_month_df.select('Month').collect())
y = np.array(ratings_by_month_df.select('Avg_Rating').collect())
fig2, ax2 = plt.subplots()
ax2.scatter(x,y)
plt.title('Beauty_month')
display(fig2)

In [22]:
ratings_df.describe('overall').show()