In [1]:
import pandas as pd
import numpy as np
import scipy as sp

import matplotlib.pyplot as plt

from datetime import datetime

%matplotlib inline

In [2]:
import findspark
findspark.init()
import pyspark

from pyspark.sql.types import DateType
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import *
import pyspark.sql.functions as fn
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql import SQLContext

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

<h1>Data Collection</h1>

Amazon Data : Collection/Analysis of data from Amazon (Reviews and Metadata of products) associated to the thema : ecology, bio, renewable etc... (see keywords vector)

__Review data__ : 
Download http://jmcauley.ucsd.edu/data/amazon/links.html go to 'Per Category Files' section and DL 'reviews' file for a 
chosen category. (Or the Complete Review Data 18GB) It is better to not take the 5-core data as it contains only 5 reviews for each products (we're missing data)

Features:

- reviewerID - ID of the reviewer, e.g. A2SUAM1J3GNN3B
- asin - ID of the product, e.g. 0000013714
- reviewerName - name of the reviewer
- helpful - helpfulness rating of the review, e.g. 2/3
- reviewText - text of the review
- overall - rating of the product
- summary - summary of the review
- unixReviewTime - time of the review (unix time)
- reviewTime - time of the review (raw)

__Metadata (Product)__ : 
Download http://jmcauley.ucsd.edu/data/amazon/links.html go to 'Per Category Files' section and DL 'metadata' file for a chosen category. (Or the Complete Review Data 18GB) It is better to not take the 5-core data as it contains only 5 reviews for each products (we're missing data)


Features:

- asin - ID of the product, e.g. 0000031852
- title - name of the product
- price - price in US dollars (at time of crawl)
- imUrl - url of the product image
- related - related products (also bought, also viewed, bought together, buy after viewing)
- salesRank - sales rank information
- brand - brand name
- categories - list of categories the product belongs to



<h3> TO DO LIST </h3>

In the followings statements, 'extracted' means filtered with the thema : products/reviews associated to bio/ecology/renewable etc...

- Choose category to focus on : Books ? / Health Care ....
- Show Proportion of extracted data compared to the full data (of the category)
- Comparison of extracted data between categories (which one contains the most related products/review interest)
- Distribution Price of extracted data (per category) --> Compare between category and with not extracted data in the same category
- Distribution salesRank of extracted data (best per category, mean, proportion of 10% first, 20%first etc...) --> Compare between category and with not extracted data in the same category

- __Keywords__ : How to efficiently implement the selection of related data ? Currently with a list of key words, can use regexp, better writing of keywords etc.. ?

- Associate Metadata with Reviews : __Join__
> - Extract year of 1st review (which will give the publication year of the product -approximately-)
> - Histograms number of extracted products per year 
> - Histograms number of extracted reviews per year
> - Nb of reviews per product per category --> Distribution, compare between categories and with not extracted data to see if reviewers are more inerested/active with our thema products compared to others
> - Helplful note : Compare between categories / Overall
> - Mean rating product : Compare with other products


- __Prediction__
> - Nb of products for following years (Linear Regression)
> - Sentiment Analysis on Reviews (Experiment ?)
> - Prediction overall on price for following years ? (LR)


Create a folder named 'data' and store the downloaded json zip file. Then unzip in there to get the json file.

In [3]:
DATA_DIR = 'data/'

In [4]:
keywords = [" global warming", " solar energy", " recycling ", " pollution ", "solar power", " endangered species", "air pollution", \
" water pollution", " wind energy", " climate change", " wind power", " recycle ", " deforestation", " greenhouse effect", "environment", \
" sustainability ", " natural resources", "alternative energy", " climate ", "global warming", "renewable energy", " ecology", "composting", \
" carbon footprint", " bio ", " biosphere ", " renewable "]

Need to load once the global json. You probably won't be able to store it in parquet directly because some features ('related' or 'categories' e.g) can't be written in a file as it is a specific array with weird names. 
You then to clean first the data and store it to parquet. If you want next to retrieve some deleted features ('related' for example) you better do the filtering and extraction on the cleaned dataset and then merge or filter the global dataset using the new one.

> __SAMPLE WITH THE CATEGORY HEALTH AND PERSONAL CARE METADATA__

Download the meta_Health_and_Personal_Care.json file

In [5]:
meta_products = spark.read.json(DATA_DIR+"meta_Grocery_and_Gourmet_Food.json")

In [6]:
# To see an example of the data
meta_products.take(2)

[Row(asin=None, brand=None, categories=None, description=None, imUrl=None, price=None, related=None, salesRank=None, title=None),
 Row(asin='0700026444', brand=None, categories=[['Grocery & Gourmet Food']], description='Silverpot Tea, Pure Darjeeling, is an exquisite tea enjoyed at leisure, when you want to relax or celebrate.   Its mellow yet layered taste will constantly surprise and delight.  This subtle and refined tea is of incomparable taste and flavor.  \n\nPackaged in an exotic handmade pinewood chestlet, this magnificent tea was rolled from tender leaves grown and hand plucked in the legendary mist covered fields of Darjeeling.\n\nFor authentic experience of this luxury tea, it is presented as 100 gms  loose leaf, and is a great way to show your appreciation, celebrate an occasion or send your best wishes.\n\nSilverpot is different - it embraces the pioneering spirit of the Ghose family, personifies the highest standards of excellence, and is proof that this is only possible f

In [7]:
meta_products.describe()

DataFrame[summary: string, asin: string, brand: string, description: string, imUrl: string, price: string, title: string]

These are the IDs for each feature in order to extract them in some filtering/flatmap functions with spark

Example: lambda r: r[0], r[1] etc...

- asin 0
- brand 1
- categories 2
- description 3
- imUrl 4
- price 5
- related 6
- salesRank 7
- title 8

> The next code will extract the relevant and writtable features. It will then store it into parquet 

In [8]:
# This will extract only the features and turn them into more readable features.
# Filter salesRank = None because this will lead to problems for the writing in parquet
# Features removed : corruptRecord, imURL, related
data_cleaned = meta_products.rdd.filter(lambda r: (r[7] != None ) )  \
                    .flatMap(lambda r: [(r[0], r[1], r[2][0][0], r[3], r[5],r[7]['Grocery & Gourmet Food'],  r[8] )]) \


#

In [9]:
# Define the StructType to define the DataFrame that we'll create with the previously extracted rdd table

schema = StructType([
    StructField("asin", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("category", StringType(), True),
    StructField("description", StringType(), True),
    StructField("price", FloatType(), True),
    StructField("salesRank", IntegerType(), True),
    StructField("title", StringType(), True)
])

In [10]:
# Transform the RDD data into DataFrame (we'll then be able to store it in Parquet)
datacleaned_DF = spark.createDataFrame(data_cleaned, schema=schema)

In [11]:
# Example of the cleaned data (and association with the created StrucType schema)
datacleaned_DF.show(3)

+----------+-----+--------------------+--------------------+-----+---------+--------------------+
|      asin|brand|            category|         description|price|salesRank|               title|
+----------+-----+--------------------+--------------------+-----+---------+--------------------+
|0700026444| null|Grocery & Gourmet...|Silverpot Tea, Pu...| null|   620307|Pure Darjeeling T...|
|141278509X| null|Grocery & Gourmet...|Infused with Vita...| null|   620322|Archer Farms Stra...|
|1453060375|  Mio|Grocery & Gourmet...|MiO Energy is you...|11.99|   268754|Mio Energy Liquid...|
+----------+-----+--------------------+--------------------+-----+---------+--------------------+
only showing top 3 rows



In [12]:
#Save into parquet to save time in the next times
datacleaned_DF.write.mode('overwrite').parquet("meta_Grocery_and_Gourmet_Food.parquet")

In [13]:
# Read from the parquet data
datacleaned_DF = spark.read.parquet("meta_Grocery_and_Gourmet_Food.parquet")

In [14]:
# Example to see the structure of data (The StrucType schema is indeed there)
datacleaned_DF.take(1)

[Row(asin='B001EO5U92', brand=None, category='Grocery & Gourmet Food', description="Naturally nutritious. Made with organic rice flour. Wheat free. Gluten free, a special blend. For those of us on gluten free diets, this mix is ideal. Enjoy pancakes hot off the griddle or flavorful muffins from your oven whenever you choose. We're sure you'll agree that we've made it easy and satisfying! From America's heartland to your heart. Arrowhead Mills has been the pioneer and leader in organic baking mixes, grains, cereals and nut butters sine 1960. We believe in nature's abundance and treat food with respect - not chemicals! Capturing the essence if the earth with organically grown ingredients, Arrowhead Mills takes you back to the basics with the best-tasting, most diverse selection of products for home-baked goodness. Made with organic rice flour. Good source of calcium. No preservatives. Made with no genetically engineered ingredients. Certified organic by the Texas Department of Agricultur

Now that we created the new dataset here are the features ID of this new one.

These are the IDs for each feature in order to extract them in some filtering/flatmap functions with spark
Example: lambda r: r[0], r[1] etc...

- asin 0
- brand 1
- category 2
- description 3
- price 4 
- salesRank 5
- title 6

In [15]:
print("Number of data samples : " + str(datacleaned_DF.rdd.count()) )

Number of data samples : 145854


> We will now extract the data related to our thema : Ecology, Bio, Renewable etc...

In [16]:
# Filter with title and description not equal to None
# We will then be able to test if those features contains words defined in the keyword vector 
# The keyword vector represents the thema that we want : ecology, bio etc...
filter_products_bio = datacleaned_DF.rdd.filter(lambda r: (r[6] != None) &  (r[3] != None)) \
                    .filter(lambda r: (any(word in r[6].lower() for word in keywords)) | (any(word in r[3].lower() for word in keywords)) ) 



In [None]:
print("Number of data samples related to ecology/bio/renewable etc... : " + str(filter_products_bio.count()) )

In [None]:
filter_products_bio.take(3)

In [None]:
# Transform the RDD data into DataFrame (we'll then be able work and join with review data)
DF_filter_products_bio = spark.createDataFrame(filter_products_bio)

> __SAMPLE WITH THE CATEGORY HEALTH AND PERSONAL CARE REVIEWS__

Download the reviews_Health_and_Personal_Care.json file

In [None]:
reviews = spark.read.json(DATA_DIR+"reviews_Grocery_and_Gourmet_Food.json")

In [None]:
# To see an example of the data
reviews.take(1)

These are the IDs for each feature in order to extract them in some filtering/flatmap functions with spark

Example: lambda r: r[0], r[1] etc...

- asin 0
- helpful 1
- overall 2
- reviewText 3
- reviewTime 4
- reviewerID 5
- reviewerName 6 
- summary 7 
- unixReviewTime 8

In [None]:
#Save into parquet to save time in the next times
reviews.write.mode('overwrite').parquet("reviews_Grocery_and_Gourmet_Food.parquet")

In [None]:
# Read from the parquet data
reviews = spark.read.parquet("reviews_Grocery_and_Gourmet_Food.parquet")

In [None]:
print("Number of reviews : " + str(reviews.rdd.count()) )

__Join Reviews and Metadata__ 

We will now join the metadata and review dataset using the product ID. Then we'll have the reviews for all products concerned. Here we will join with the filtered data containing the products related to the thema 'bio,recycle,ecology' etc... 

Then in each row we will have the product description and the reviews associated

In [None]:
# This way allows no duplicate
review_product_join = DF_filter_products_bio.join(reviews, ['asin'])

In [None]:
print("Number of reviews related to bio/renewable/ecology etc... : " + str(review_product_join.rdd.count()) )

In [None]:
# Example of joined data
review_product_join.take(1)

In [None]:
def minUnixTime(accum, n):
    if(accum < n):
        return accum
    else:
        return n

In [None]:
import pyspark.sql.functions as sqlf

In [None]:
firstreview = review_product_join.groupBy("asin").agg(sqlf.min("unixReviewTime"))
#.min('unixReviewTime')

In [None]:
firstreview.take(5)

In [None]:
year_reviews_Pandas = firstreview.toPandas()

In [None]:
year_reviews_Pandas.head()

In [None]:
year_reviews_Pandas['Year'] = pd.to_datetime(year_reviews_Pandas['min(unixReviewTime)'],unit='s').map(lambda x: x.year)

In [None]:
year_reviews_Pandas.hist("Year", color="lime", log=True,bins=14)

Evolution of nb of reviews /year linked to products bio/renwable etc...

In [None]:
globalfirstreview = reviews.groupBy("asin").agg(sqlf.min("unixReviewTime"))

In [None]:
globalfirstreview.take(5)

In [None]:
globalyear_reviews_Pandas = globalfirstreview.toPandas()

In [None]:
globalyear_reviews_Pandas['Year'] = pd.to_datetime(globalyear_reviews_Pandas['min(unixReviewTime)'],unit='s').map(lambda x: x.year)

In [None]:
globalyear_reviews_Pandas.hist("Year", log=True, color="sienna", bins = 14)

We can see in the merged dataframe that there is the features from metadata about the products, and the features about the reviews. We will then describe the features ID : 

Example: lambda r: r[0], r[1] etc...

- asin 0
- brand 1
- category 2
- description 3
- price 4
- salesRank 5
- title 6
- helpful 7 
- overall 8
- reviewText 9
- reviewTime 10
- reviewerID 11
- reviewerName 12
- summary 13
- unixReviewTime 14



In [None]:
nbreviews_perproduct=  review_product_join.rdd.map(lambda r: [r[0],1]) \
            .reduceByKey(lambda a,b: a+b)

In [None]:
nbreviews_DF = spark.createDataFrame(nbreviews_perproduct, ['productID','freq'])

In [None]:
nbreviews_Pandas = nbreviews_DF.toPandas()

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=1, sharex=True, sharey=True)
fig.set_size_inches(16,4)

nbreviews_Pandas.hist(log=True, bins = 100, color="coral", cumulative=-1,figsize =(16,4), ax=axes)

axes.set_xlabel("Nb of reviews/ Product")
axes.set_ylabel("Frequency")
axes.set_title("Nb of reviews/ Product Distribution Associated to Ecology/Bio etc..")