<h1>1. Creating an RDD</h1>

We start by creating the 3 RDDs from the different datasets from Amazon product reviews. Note that it does not move the data at this stage due to the lazy evaluation nature.

In [2]:
sc

<pyspark.context.SparkContext at 0x1113cb1d0>

We load the data using the Spark context.

In [3]:
fashion = sc.textFile('Data/Reviews/fashion.json')
electronics = sc.textFile('Data/Reviews/electronics.json')
sports = sc.textFile('Data/Reviews/sports.json')

Nothing has happened, why is that?
In Spark, some operations are *transformations*, which are lazily evaluated and others are *actions*.

Read more here: http://spark.apache.org/docs/latest/programming-guide.html#transformations

In [4]:
# Example of a basic transformation
print "Result 1: ", fashion.map(lambda x: len(x))

# Example of an action:
print "Result 2: ", fashion.count()

Result 1:  PythonRDD[6] at RDD at PythonRDD.scala:43
Result 2:  10000


We do some basic data exploration.

In [5]:
print "fashion has {0} rows, electronics {1} rows and sports {2} rows\n".format(fashion.count(), electronics.count(), sports.count())
print "fashion first row:"
fashion.first()

fashion has 10000 rows, electronics 10000 rows and sports 10000 rows

fashion first row:


u'{"reviewerID": "A2XVJBSRI3SWDI", "asin": "0000031887", "reviewerName": "abigail", "helpful": [0, 0], "reviewText": "Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.", "overall": 5.0, "summary": "Nice tutu", "unixReviewTime": 1383523200, "reviewTime": "11 4, 2013"}'

We can union them.

In [6]:
union_of_rdds = fashion.union(electronics).union(sports)
print union_of_rdds.first()

{"reviewerID": "A2XVJBSRI3SWDI", "asin": "0000031887", "reviewerName": "abigail", "helpful": [0, 0], "reviewText": "Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.", "overall": 5.0, "summary": "Nice tutu", "unixReviewTime": 1383523200, "reviewTime": "11 4, 2013"}


We can now parse the file using the json library.

In [7]:
import json
parsed_fashion = fashion.map(lambda x: json.loads(x))
parsed_fashion.first()

{u'asin': u'0000031887',
 u'helpful': [0, 0],
 u'overall': 5.0,
 u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.',
 u'reviewTime': u'11 4, 2013',
 u'reviewerID': u'A2XVJBSRI3SWDI',
 u'reviewerName': u'abigail',
 u'summary': u'Nice tutu',
 u'unixReviewTime': 1383523200}

Another way of loading files is by using a list of comma-separated file paths or a wildcard.

In [8]:
data = sc.textFile('Data/Reviews/fashion.json,Data/Reviews/electronics.json,Data/Reviews/sports.json').map(lambda x: json.loads(x))

# QUESTION: How many partitions does the rdd have?
data.getNumPartitions()

3

Now let's imagine we want to know the number of lines in each partition. For that, we need to access the data in each single partition and run operations on them instead of on each row.

For this, we will use mapPartitionsWithIndex which takes a partition index and an iterator over the data as arguments. Each function in the API is documented in: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapPartitionsWithIndex

In [9]:
indexed_data = data.mapPartitionsWithIndex(lambda splitIndex, it: [(splitIndex, len([x for x in it]))])

for num_partition, count_partition in indexed_data.collect():
    print "partition {0} has {1} rows".format(num_partition, count_partition)

partition 0 has 10000 rows
partition 1 has 10000 rows
partition 2 has 10000 rows


<h1>2. Reducers</h1>

The next thing we have been tasked to do is **to get the minimum and maximum number of reviews per product**.

In [10]:
product_num = data.map(lambda x: (x['asin'], 1)).reduceByKey(lambda x,y: x+y)
# The rdd product_num will contain (product_asin, total_number_reviews)

# What are the maximum and minimum number of reviews?
max_num = product_num.map(lambda x: x[1]).max()
min_num = product_num.map(lambda x: x[1]).min()

print "Max number of reviews is {0}, min number of reviews is {1}".format(max_num, min_num)

Max number of reviews is 2033, min number of reviews is 1


![Alt text](Images/reducebykey.png)

**EXERCISE**: what is the max score for each product?

<h1>3. Joining multiple sources</h1>

We want to join the product reviews by users to the product metadata.

In [11]:
product_metadata = sc.textFile('Data/Products/sample_metadata.json').map(lambda x: json.loads(x))
print product_metadata.first()

{u'asin': u'0000037214', u'title': u'Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory', u'price': 6.99, u'imUrl': u'http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg', u'related': {u'also_viewed': [u'B00JO8II76', u'B00DGN4R1Q', u'B00E1YRI4C']}, u'salesRank': {u'Clothing': 1233557}, u'brand': u'Big Dreams', u'categories': [[u'Clothing, Shoes & Jewelry', u'Girls'], [u'Clothing, Shoes & Jewelry', u'Novelty, Costumes & More', u'Costumes & Accessories', u'More Accessories', u'Kids & Baby']]}


In [12]:
def flatten_categories(line):
    old_cats = line['categories']
    line['categories'] = [item for sublist in old_cats for item in sublist]
    return line

product_metadata = product_metadata.map(flatten_categories)

We want to join the review data to the metadata about the product. We can use the ASIN for that, which is a unique identifier for each product. In order to do a join, we need to turn each structure into key-value pairs.

In [13]:
key_val_data = data.map(lambda x: (x['asin'], x))
key_val_metadata = product_metadata.map(lambda x: (x['asin'], x))

print "We are joining {0} product reviews to {1} rows of metadata information about the products.\n".format(key_val_data.count(),key_val_metadata.count())
print "First row of key_val_data:"
print key_val_data.first()

We are joining 30000 product reviews to 2469 rows of metadata information about the products.

First row of key_val_data:
(u'0000031887', {u'reviewerID': u'A2XVJBSRI3SWDI', u'asin': u'0000031887', u'reviewerName': u'abigail', u'helpful': [0, 0], u'reviewText': u'Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.', u'overall': 5.0, u'summary': u'Nice tutu', u'unixReviewTime': 1383523200, u'reviewTime': u'11 4, 2013'})


In [14]:
print "number partitions key_val_data: ", 
print key_val_data.getNumPartitions()
print "number partitions key_val_metadata: ", 
print key_val_metadata.getNumPartitions()
print

joined = key_val_data.join(key_val_metadata)

key, (review, product) = joined.first()
print "For key {0}:\n\nthe review is {1}\n\nthe product metadata is {2}.\n".format(key, review, product)

number partitions key_val_data:  3
number partitions key_val_metadata:  2

For key 8179050874:

the review is {u'reviewerID': u'A1IQJSHCMW69O5', u'asin': u'8179050874', u'reviewerName': u'Jose Perez', u'helpful': [0, 0], u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..", u'overall': 1.0, u'summary': u'This is not for Bold is for Curve', u'unixReviewTime': 1242518400, u'reviewTime': u'05 17, 2009'}

the product metadata is {u'asin': u'8179050874', u'salesRank': {u'Electronics': 324466}, u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg', u'categories': [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries'], u'title': u'PRIVACY Screen Saver for your BLACKBERRY B

What is the number of output partitions of the join? To understand this, the best is to refer back to the Pyspark source code: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/join.py

In [15]:
# QUESTION: what is the number of partitions of the joined dataset?

print "There are {0} partitions".format(joined.getNumPartitions())

There are 5 partitions


To make it easier to manipulate, we will change the structure of the joined rdd to be a single dictionary.

In [16]:
def merge_dictionaries(metadata_line, review_line):
    new_dict = review_line
    new_dict.update(metadata_line)
    return new_dict

nice_joined = joined.map(lambda x: merge_dictionaries(x[1][0], x[1][1]))
row0, row1 = nice_joined.take(2)

print "row 0:\n\n{0}\n\nrow 1:\n\n{1}\n".format(row0, row1)

row 0:

{u'reviewerID': u'A1IQJSHCMW69O5', u'asin': u'8179050874', u'reviewerName': u'Jose Perez', u'helpful': [0, 0], u'title': u'PRIVACY Screen Saver for your BLACKBERRY Bold 9000 ! Shield and Prevent others from viewing your information while protecting your phone!', u'imUrl': u'http://ecx.images-amazon.com/images/I/41f2QHnWYNL._SY300_.jpg', u'reviewText': u"I bought this item because of the description that is for the Blackberry bold, to my surprise is for the curve it doesn't fit the screen there is like one inch of screen not protected by the screen, also it reflects sunlight making the screen virtually unusable when outdoors, and looks ugly..", u'overall': 1.0, u'summary': u'This is not for Bold is for Curve', u'unixReviewTime': 1242518400, u'salesRank': {u'Electronics': 324466}, u'reviewTime': u'05 17, 2009', u'categories': [u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries']}

row 1:

{u'reviewerID': u'A2HC8YQVZ4HMF5', u'asin': u

<h1>4. GroupByKey</h1>

Now that we have joined two data sources, we can start doing some ad-hoc analysis of the data! Now the task is **to get the average product review length for each category**. The categories are encoded as a list of categories, so we first need to 'flatten them out'.

In [17]:
nice_joined.cache()
nice_joined.count()

30000

In [18]:
original_categories = nice_joined.map(lambda x: x['categories'])
flat_categories = nice_joined.flatMap(lambda x: x['categories'])

print "original_categories.take(5):\n"
print '\n'.join([str(x) for x in original_categories.take(5)]) + '\n'

print "flat_categories.take(5):\n"
print '\n'.join([str(x) for x in flat_categories.take(5)]) + '\n'

num_categories = flat_categories.distinct().count()
print "There are {0} distinct categories.".format(num_categories)

original_categories.take(5):

[u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries']
[u'Electronics', u'Computers & Accessories', u'Laptop & Netbook Computer Accessories', u'Batteries']
[u'Clothing, Shoes & Jewelry', u'Novelty, Costumes & More', u'Costumes & Accessories', u'Costumes', u'Kids & Baby', u'Infants & Toddlers', u'Baby Boys']
[u'Clothing, Shoes & Jewelry', u'Novelty, Costumes & More', u'Costumes & Accessories', u'Costumes', u'Kids & Baby', u'Infants & Toddlers', u'Baby Boys']
[u'Sports & Outdoors', u'Outdoor Gear', u'Camping & Hiking', u'Camp Bedding', u'Sleeping Pads', u'Foam Pads']

flat_categories.take(5):

Electronics
Computers & Accessories
Laptop & Netbook Computer Accessories
Batteries
Electronics

There are 925 categories.


Next, in order to get the average review length across all categories, we will use a new function: groupByKey!

In [67]:
category_review = nice_joined.flatMap(lambda x: [(y, len(x['reviewText'])) for y in x['categories']])
print "After the flatMap: " + str(category_review.first())
print "After the groupByKey: " + str(category_review.groupByKey().map(lambda x: (x[0], list(x[1]))).first())
print

grouped_category_review = category_review.groupByKey().map(lambda x: (x[0], sum(x[1])/float(len(x[1]))))
print "grouped_category_review.first(): " + str(grouped_category_review.first()) + '\n'

### Now we can sort the categories by average product review length
print "The top 10 categories are: " + str(sorted(grouped_category_review.collect(), key=lambda x: x[1], reverse=True)[:10])

After the flatMap: (u'Electronics', 293)
After the groupByKey: (u'Screen Protectors', [191, 208, 135, 163, 108, 782, 161, 1002, 446, 85])

grouped_category_review.first(): (u'Screen Protectors', 328.1)

The top 10 categories are: [(u'Photos', 4305.5), (u'Motets', 3404.0), (u'Free-Weight Racks', 3404.0), (u'Weight Racks', 3404.0), (u'Magnificats', 3404.0), (u'Bags, Packs & Accessories', 3281.5), (u'Rugby', 3156.0), (u'Rifles', 2066.6666666666665), (u'Soul-Jazz & Boogaloo', 1945.0), (u'Sonatinas', 1908.8)]


**EXERCISE**: Do the same thing, but this time you are not allowed to use groupByKey()!

<h1>Optional: Data skewness</h1>

In [18]:
from math import exp
from datetime import datetime

def get_part_index(splitIndex, iterator):
    for it in iterator:
        yield (splitIndex, it)
       
def count_elements(splitIndex, iterator):
    n = sum(1 for _ in iterator)
    yield (splitIndex, n)
        
print "***Creating the large rdd***"
num_parts = 16
# create the large skewed rdd
skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x)))).mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x)).cache()
print "first 5 items:" + str(skewed_large_rdd.take(5))
print "num rows: " + str(skewed_large_rdd.count())
print "num partitions: " + str(skewed_large_rdd.getNumPartitions())
print "The distribution of elements per partition is " + str(skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())
print

print "***Creating the small rdd***"
small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))
print "first 5 items:" + str(small_rdd.take(5))
print "num rows: " + str(small_rdd.count())
print "num partitions: " + str(small_rdd.getNumPartitions())
print "The distribution of elements per partition is " + str(small_rdd.mapPartitionsWithIndex(lambda ind, x: count_elements(ind, x)).collect())

print

print "Joining them"
t0 = datetime.now()
result = skewed_large_rdd.leftOuterJoin(small_rdd)
result.count() 
print "The direct join takes %s"%(str(datetime.now() - t0))
print "The joined rdd has {0} partitions and {1} rows".format(result.getNumPartitions(), result.count())

***Creating the large rdd***
first 5 items:[(0, 0), (1, 0), (1, 1), (2, 0), (2, 1)]
num rows: 5171502
num partitions: 16
The distribution of elements per partition is [(0, 1), (1, 2), (2, 7), (3, 20), (4, 54), (5, 148), (6, 403), (7, 1096), (8, 2980), (9, 8103), (10, 22026), (11, 59874), (12, 162754), (13, 442413), (14, 1202604), (15, 3269017)]

***Creating the small rdd***
first 5 items:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
num rows: 16
num partitions: 16
The distribution of elements per partition is [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1), (11, 1), (12, 1), (13, 1), (14, 1), (15, 1)]

Joining them
The direct join takes 0:00:37.777983
The joined rdd has 32 partitions and 5171502 rows


<h1>Optional: Integrating Spark with popular Python libraries</h1>

In [77]:
import sklearn
import pickle

model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))
model
bla = fashion.map(lambda x: eval(x)['reviewText']).first()
model_b = sc.broadcast(model)
fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()

('Perfect red tutu for the price. I baught it as part of my daughters Halloween costume and it looked great on her.',
 'fashion')

<h1> Part 2: Spark DataFrame API and Spark SQL</h1>

<h1>Introduction</h1>

This is the latter part of the tutorial. The main focus will be on Spark DataFrames and Spark SQL.

In [19]:
review_filepaths = 'Data/Reviews/*'
textRDD = sc.textFile(review_filepaths)

print 'number of reviews : {0}'.format(textRDD.count())

print 'sample row : \n{0}'.format(textRDD.first())

number of reviews : 30000
sample row : 
{"reviewerID": "AKM1MP6P0OYPR", "asin": "0132793040", "reviewerName": "Vicki Gibson \"momo4\"", "helpful": [1, 1], "reviewText": "Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.", "overall": 5.0, "summary": "Very thorough", "unixReviewTime": 1365811200, "reviewTime": "04 13, 2013"}


<h1>5. Loading Data into a DataFrame</h1>

A DataFrame requires schema. There are two main functions that can be used to assign schema into an RDD. 
+ Inferring Schema : This functions infers the schema of the RDD by observing it
+ Applying Schema  : This function applies a manually defined schema an RDD

In [20]:
# You need SQL context do 
from pyspark.sql import SQLContext

# # Instantiate SQL Context
sqlContext = SQLContext(sc)
sqlContext
# sqlContext

# print sqc

<pyspark.sql.context.SQLContext at 0x1134d2390>

<h2>Inferring the Schema Using Reflection</h2>

In [21]:
inferredDF = sqlContext.read.json(review_filepaths)
inferredDF.first()

Row(asin=u'0132793040', helpful=[1, 1], overall=5.0, reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', reviewerID=u'AKM1MP6P0OYPR', reviewerName=u'Vicki Gibson "momo4"', summary=u'Very thorough', unixReviewTime=1365811200)

In [23]:
inferredDF.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



<h2>Manually Specifying the Schema</h2>

The Documentation about different data types can be found at [Spark SQL DataTypes section](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types "Spark SQL DataTypes Documentation") 
+ Defining the schema can be useful

In [22]:
# Export the modules
from pyspark.sql.types import *

# Define Schema
REVIEWS_SCHEMA_DEF = StructType([
        StructField('reviewerID', StringType(), True),
        StructField('asin', StringType(), True),
        StructField('reviewerName', StringType(), True),
        StructField('helpful', ArrayType(
                IntegerType(), True), 
            True),
        StructField('reviewText', StringType(), True),
        StructField('reviewTime', StringType(), True),
        StructField('overall', DoubleType(), True),
        StructField('summary', StringType(), True),
        StructField('unixReviewTime', LongType(), True)
    ])

print REVIEWS_SCHEMA_DEF

StructType(List(StructField(reviewerID,StringType,true),StructField(asin,StringType,true),StructField(reviewerName,StringType,true),StructField(helpful,ArrayType(IntegerType,true),true),StructField(reviewText,StringType,true),StructField(reviewTime,StringType,true),StructField(overall,DoubleType,true),StructField(summary,StringType,true),StructField(unixReviewTime,LongType,true)))


*QUESTION*: What do you think will happen if *QUESTION*: What do you think will happen if we remove some fields from this schema?

1. The schema fails
2. The schema works fine

ANSWER???

In [23]:
# Using a handcrafted schema with to create a DataFrame
appliedDF = sqlContext.read.json(review_filepaths,schema=REVIEWS_SCHEMA_DEF)
appliedDF.first()

Row(reviewerID=u'AKM1MP6P0OYPR', asin=u'0132793040', reviewerName=u'Vicki Gibson "momo4"', helpful=[1, 1], reviewText=u'Corey Barker does a great job of explaining Blend Modes in this DVD. All of the Kelby training videos are great but pricey to buy individually. If you really want bang for your buck just subscribe to Kelby Training online.', reviewTime=u'04 13, 2013', overall=5.0, summary=u'Very thorough', unixReviewTime=1365811200)

<h1>6. DataFrame operations</h1>

Spark DataFrame API allow you to do multiple operations on the Data. The primary advantage of using the DataFrame API is that you can do data transoformations with the high level API without having to use Python. Using the high level API has its advantages which will be explained later in the tutorial.

DataFrame API have functionality similar to that of Core RDD API. For example: 
+ map                     : foreach, Select
+ mapPartition            : foreachPartition
+ filter                  : filter
+ groupByKey, reduceByKey : groupBy 

<h2>6.1. Selecting Columns</h2>

You can use SELECT statement to select columns from your dataframe

In [1]:
columnDF = appliedDF.select(appliedDF.asin,
                            appliedDF.overall,
                            appliedDF.reviewText,
                            appliedDF.helpful[0]/appliedDF.helpful[1],
                            appliedDF.reviewerID,
                            appliedDF.unixReviewTime).\
                    withColumnRenamed('(helpful[0] / helpful[1])','helpful')
columnDF.show()

NameError: name 'appliedDF' is not defined

<h2>6.2. Missing Values</h2>

Similar to Pandas, DataFrames come equipped with functions to address missing data.
+ dropna function: can be used to remove observations with missing values
+ fillna function: can be used to fill missing values with a default value

In [25]:
# get null observations out
densedDF=columnDF.dropna(subset=["overall"]).fillna(0.0,subset=["helpful"]) 
densedDF.show()

+----------+-------+--------------------+------------------+--------------+--------------+
|      asin|overall|          reviewText|           helpful|    reviewerID|unixReviewTime|
+----------+-------+--------------------+------------------+--------------+--------------+
|0132793040|    5.0|Corey Barker does...|               1.0| AKM1MP6P0OYPR|    1365811200|
|0321732944|    5.0|While many beginn...|               0.0|A2CX7LUOHB2NDG|    1341100800|
|0439886341|    1.0|It never worked. ...|               1.0|A2NWSAGRHCP8N5|    1367193600|
|0439886341|    3.0|Some of the funct...|               1.0|A2WNBOD3WNDNKT|    1374451200|
|0439886341|    1.0|Do not waste your...|               1.0|A1GI0U4ZRJA8WN|    1334707200|
|0511189877|    5.0|Dog got the old r...|               0.0|A1QGNMC6O1VW39|    1397433600|
|0511189877|    2.0|This remote, for ...|               1.0|A3J3BRHTDRFJ2G|    1397433600|
|0511189877|    5.0|We had an old Tim...|               0.0|A2TY0BTJOTENPG|    1395878400|

<h2>6.3. Filtering rows</h2>

Filtering lets you select rows based on arguments. The implementation pattern is similar to filtering RDDs, But simpler. 

In [26]:
filteredDF=densedDF.filter(densedDF.overall>=3)
filteredDF.show()

+----------+-------+--------------------+------------------+--------------+--------------+
|      asin|overall|          reviewText|           helpful|    reviewerID|unixReviewTime|
+----------+-------+--------------------+------------------+--------------+--------------+
|0132793040|    5.0|Corey Barker does...|               1.0| AKM1MP6P0OYPR|    1365811200|
|0321732944|    5.0|While many beginn...|               0.0|A2CX7LUOHB2NDG|    1341100800|
|0439886341|    3.0|Some of the funct...|               1.0|A2WNBOD3WNDNKT|    1374451200|
|0511189877|    5.0|Dog got the old r...|               0.0|A1QGNMC6O1VW39|    1397433600|
|0511189877|    5.0|We had an old Tim...|               0.0|A2TY0BTJOTENPG|    1395878400|
|0511189877|    5.0|This unit works j...|               0.0|A34ATBPOK6HCHY|    1395532800|
|0511189877|    5.0|It is an exact du...|               0.0| A89DO69P0XZ27|    1395446400|
|0511189877|    5.0|Works on my t.v. ...|               0.0| AZYNQZ94U6VDB|    1401321600|

<h2>6.4. Grouping by overall scores</h2>

Grouping is equivalent to the groupByKey in the core RDD API. You can transform the grouped values using a summary action such as:
+ count
+ sum
+ average
+ max and so on ...

In [27]:
grouped = filteredDF.groupBy("overall").count()
grouped.show()

+-------+-----+
|overall|count|
+-------+-----+
|    3.0| 2128|
|    5.0|18503|
|    4.0| 5324|
+-------+-----+



<h2>6.5. Joining DataFrames together</h2>

You can join two DataFrames together by using a common key.

In [28]:
product_filepaths = 'Data/Products/*'
productRDD = sc.textFile(product_filepaths)
productRDD.first()

u'{"asin": "0000037214", "title": "Purple Sequin Tiny Dancer Tutu Ballet Dance Fairy Princess Costume Accessory", "price": 6.9900000000000002, "imUrl": "http://ecx.images-amazon.com/images/I/31mCncNuAZL.jpg", "related": {"also_viewed": ["B00JO8II76", "B00DGN4R1Q", "B00E1YRI4C"]}, "salesRank": {"Clothing": 1233557}, "brand": "Big Dreams", "categories": [["Clothing, Shoes & Jewelry", "Girls"], ["Clothing, Shoes & Jewelry", "Novelty, Costumes & More", "Costumes & Accessories", "More Accessories", "Kids & Baby"]]}'

In [29]:
# Load Dataset2 : Amazon Product information
# First, define Schema for second Dataset
PRODUCTS_SCHEMA_DEF = StructType([
        StructField('asin', StringType(), True),
        StructField('title', StringType(), True),
        StructField('price', DoubleType(), True),
        StructField('categories', ArrayType(ArrayType(
            StringType(), True),True),True)
    ])

# Load the dataset
productDF = sqlContext.read.json(product_filepaths,PRODUCTS_SCHEMA_DEF)
productDF.show()
# productDF.first()

+----------+--------------------+-----+--------------------+
|      asin|               title|price|          categories|
+----------+--------------------+-----+--------------------+
|0000037214|Purple Sequin Tin...| 6.99|[WrappedArray(Clo...|
|0000032069|Adult Ballet Tutu...| 7.89|[WrappedArray(Spo...|
|0000031909|Girls Ballet Tutu...|  7.0|[WrappedArray(Spo...|
|0000032034|Adult Ballet Tutu...| 7.87|[WrappedArray(Spo...|
|0000031852|Girls Ballet Tutu...| 3.17|[WrappedArray(Spo...|
|0000032050|Adult Ballet Tutu...|12.85|[WrappedArray(Spo...|
|0000031887|Ballet Dress-Up F...| 6.79|[WrappedArray(Clo...|
|0000031895|Girls Ballet Tutu...| 2.99|[WrappedArray(Spo...|
|0123456479|SHINING IMAGE HUG...|64.98|[WrappedArray(Clo...|
|0132793040|Kelby Training DV...| null|[WrappedArray(Ele...|
|0188477284|Klean Kanteen Cla...| null|[WrappedArray(Spo...|
|0321732944|Kelby Training DV...| null|[WrappedArray(Ele...|
|0439886341|Digital Organizer...| 8.15|[WrappedArray(Ele...|
|0456844570|RiZ Women's 

In [30]:
enrichedReviews = filteredDF.join(productDF, productDF.asin==filteredDF.asin).dropna(subset="title")
enrichedReviews.count()

25566

When you join two RDDs, you have to restructure the data into (k,V) pairs where the key is the join key. This may involve two additional map transformations. This is not necessary in DataFrames.  

In [31]:
enrichedReviews

DataFrame[asin: string, overall: double, reviewText: string, helpful: double, reviewerID: string, unixReviewTime: bigint, asin: string, title: string, price: double, categories: array<array<string>>]

In [32]:
enrichedReviews.show()

+----------+-------+--------------------+------------------+--------------+--------------+----------+--------------------+------+--------------------+
|      asin|overall|          reviewText|           helpful|    reviewerID|unixReviewTime|      asin|               title| price|          categories|
+----------+-------+--------------------+------------------+--------------+--------------+----------+--------------------+------+--------------------+
|0132793040|    5.0|Corey Barker does...|               1.0| AKM1MP6P0OYPR|    1365811200|0132793040|Kelby Training DV...|  null|[WrappedArray(Ele...|
|0321732944|    5.0|While many beginn...|               0.0|A2CX7LUOHB2NDG|    1341100800|0321732944|Kelby Training DV...|  null|[WrappedArray(Ele...|
|0439886341|    3.0|Some of the funct...|               1.0|A2WNBOD3WNDNKT|    1374451200|0439886341|Digital Organizer...|  8.15|[WrappedArray(Ele...|
|0511189877|    5.0|Dog got the old r...|               0.0|A1QGNMC6O1VW39|    1397433600|0511

<h1>7. Saving your DataFrame</h1> 

Now that we have done some operations on the data, we can save the file for later use. Standard data formats are a great way to opening up valuable data to your entire organization. Spark DataFrames can be saved in many different formats including and not limited to JSON, parquet, Hive and etc... 

In [33]:
try:
    columnDF.write.parquet('Data/Outputs/reviews_filtered.parquet')
    print "Saved as parquet successfully"
except:
    print "ERROR !!"



ERROR !!


<h1>8. Using Spark SQL</h1>

Spark DataFrames also allow you to use Spark SQL to query from Petabytes of data. Spark comes with a SQL like query language which can be used to query from Distributed DataFrames. A key advantage of using Spark SQL is that the [Catelyst query optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html "Catelyst") under the hood transforms your SQL query to run it most efficiently. 

<h2>8.1. Example Queries</h2>

Spark SQL can leverage the same functionality as the DataFrame API provides. In fact, it provides more functionality via SQL capabilities and HQL capabilities that are available to Spark SQL environment. 

For the sake of time constrains, I will explain different functions available in Spark SQL environment by using examples that use multiple functions. This will benefit by:
+ Covering many functions that are possible via spark SQL
+ Giving an understanding about how to pipe multiple functions together


In [34]:
# Read the reviews parquet file
reviewsDF = sqlContext.read.parquet('Data/Outputs/reviews_filtered.parquet')

# Register the DataFrames to be used in sql
reviewsDF.registerTempTable("reviews")
productDF.registerTempTable("products")

print 'There are {0} reviews about {1} products'.format(reviewsDF.count(),productDF.count())

There are 30000 reviews about 2469 products


In [36]:
sql_query = """SELECT reviews.asin, overall, reviewText, price
            FROM reviews JOIN products ON  reviews.asin=products.asin
            WHERE price > 50.00
"""

result = sqlContext.sql(sql_query)
result.show()

+----------+-------+--------------------+------+
|      asin|overall|          reviewText| price|
+----------+-------+--------------------+------+
|0528881469|    5.0|Love it has every...|299.99|
|0528881469|    1.0|I have owned two ...|299.99|
|0528881469|    5.0|We got this GPS f...|299.99|
|0528881469|    1.0|I'm a professiona...|299.99|
|0528881469|    4.0|This is a great t...|299.99|
|0528881469|    3.0|Well, what can I ...|299.99|
|0528881469|    2.0|Not going to writ...|299.99|
|0528881469|    2.0|My brother is a t...|299.99|
|0528881469|    4.0|This unit is a fa...|299.99|
|0528881469|    5.0|I did a lot of co...|299.99|
|0528881469|    4.0|I purchased this ...|299.99|
|0528881469|    5.0|EXCELLENT. BEST T...|299.99|
|0528881469|    1.0|I was real psyche...|299.99|
|0528881469|    4.0|Well as one of th...|299.99|
|0528881469|    1.0|Thought the unit ...|299.99|
|0528881469|    4.0|Was fast and what...|299.99|
|0528881469|    2.0|Twice this item h...|299.99|
|0528881469|    1.0|

<h1>Optional: User Defined Functions</h1>

Spark SQL also provides the functionality similar to User Defined Functions (UDF) offering in Hive. Spark uses registerFunction() function to register python functions in SQLContext.

In [101]:
import re

def transform_review(review):
    x1 = re.sub('[^0-9a-zA-Z\s]+','',review)
    return x1.lower()

result.registerTempTable("result")
sqlContext.registerFunction("to_lowercase", lambda x:transform_review(x), returnType=StringType())

sql_query_transform = """SELECT asin, reviewText, to_lowercase(reviewText) as cleaned
            FROM result
"""

result_transform = sqlContext.sql(sql_query_transform)
result_transform.show()

AnalysisException: u"cannot resolve 'cleaned' given input columns asin, overall, reviewText, price;"

<h1>Optional : Mix and Match!!</h1>

You can also mix DataFrames, RDDs and SparkSQL to make it work for you. 

<h2>Scenario</h2>

We want to investigate the average rating of reviews in terms of the categories they belong to. In order to do this, we:
+ query the needed data using DataFrames API
+ classify the reviews into different categories using core RDD API
+ query the avearage rating for each category using Spark SQL

In [119]:
import sklearn
import cPickle

from pyspark.sql import Row

model = cPickle.load(open('Data/classifiers/classifier.pkl', 'r'))
classifier_b = sc.broadcast(model)

classifiedRDD = result_transform.filter("cleaned <> ''")\
                                .map(lambda row: 
                                     (row.asin,row.reviewText,str(classifier_b.value.predict([row.reviewText])[0]))
                                    )

CLASSIFIED_SCHEMA = StructType([
        StructField('asin', StringType(), True),
        StructField('review', StringType(), True),
        StructField('category', StringType(), True)
    ])

classifiedDF = sqlContext.createDataFrame(classifiedRDD,CLASSIFIED_SCHEMA)

classifiedDF.show()

+----------+--------------------+-----------+
|      asin|              review|   category|
+----------+--------------------+-----------+
|0528881469|Love it has every...|electronics|
|0528881469|I have owned two ...|electronics|
|0528881469|We got this GPS f...|electronics|
|0528881469|I'm a professiona...|electronics|
|0528881469|This is a great t...|electronics|
|0528881469|Well, what can I ...|electronics|
|0528881469|Not going to writ...|electronics|
|0528881469|My brother is a t...|electronics|
|0528881469|This unit is a fa...|     sports|
|0528881469|I did a lot of co...|electronics|
|0528881469|I purchased this ...|electronics|
|0528881469|EXCELLENT. BEST T...|electronics|
|0528881469|I was real psyche...|electronics|
|0528881469|Well as one of th...|electronics|
|0528881469|Thought the unit ...|electronics|
|0528881469|Was fast and what...|     sports|
|0528881469|Twice this item h...|     sports|
|0528881469|DONT WAIST YOUR M...|electronics|
|0528881469|We had the GPS fo...|e

In [120]:
classifiedDF.registerTempTable('enrichedReviews')

sql_query_test = """SELECT category, avg(overall) as avgRating
            FROM reviews 
            JOIN products ON reviews.asin=products.asin 
            JOIN enrichedReviews ON products.asin=enrichedReviews.asin
            WHERE price > 50.0
            GROUP BY enrichedReviews.category
"""

resultTest = sqlContext.sql(sql_query_test)
resultTest.show()

+-----------+-----------------+
|   category|        avgRating|
+-----------+-----------------+
|electronics|3.767588990706229|
|    fashion| 4.00352086576236|
|     sports|4.287197603795619|
+-----------+-----------------+

