# Introduction

We start by creating the 3 RDDs from the different datasets from Amazon product reviews. Note that it does not move the data at this stage due to the lazy evaluation nature.

First, we start by understanding the core concepts of Apache Spark. Themost essential compenent in Spark is the SparkContext. In pySpark, SparkContext is instaciated by the variable `sc`

In [None]:
sc

## Partitions

We can use `sc.parallelize()` function to convert a simple python list to a RDD. Let's first create a dataset in python.

In [None]:
num = 5

partition_data = []

for i in range(0,num):
    for j in range(0,i):
        partition_data.append((i,j))

print partition_data

Now lets use `sc.parallelize()` function to convert this list to a Spark RDD. Then lets use `rdd.getNumPartitions()` function to figure out how many partitions the RDD has.

In [None]:
partition_rdd = ???

Below function allows us to visualise how the data is structured inside the RDD in multiple partitions. 

In [None]:
def to_human_readable(rdd_obj):
    """Takes in an RDD and prints the contents of each partition in a single line
    
    Args: 
        rdd_obj (RDD): spark RDD with data
    """
    partition_view = rdd_obj.mapPartitions(lambda l: [l]).map(list).collect()
    
    print "Number of Partitions in the RDD : {}".format(len(partition_view))
    print "\n\n The partition contents are as follows: \n"

    for partition in partition_view:
        print partition

Now let's look at the `partiion_rdd`

In [None]:
to_human_readable(partition_rdd)

## Tranformations with no shuffles

When transformations with no shuffles are carried out. The partition structure doesn't change. Data remains in the same partitions

Now lets apply a `map()` function on the `(k, v)` key-value pair to replace v with `v+1`

In [None]:
mapped_rdd = ??? # (v+1)
to_human_readable(mapped_rdd)

Now lets apply a `filter()` function to `(k, v)` key-value pairs such that only data points where v values devisible by 2 are valid.

In [None]:
filtered_rdd = ??? # v%2 == 0
to_human_readable(filtered_rdd)

# Real world examples

Now we are going to look at a real world dataset from Amazon about its products and product reviews to use Apache spark to process this data and extract information

## Loading and managing RDDs

We load the data using the Spark context.

In [None]:
fashion = sc.textFile('Data/Reviews/fashion.json')
electronics = sc.textFile('Data/Reviews/electronics.json')
sports = sc.textFile('Data/Reviews/sports.json')

Nothing has happened, why is that?
In Spark, some operations are *transformations*, which are lazily evaluated and others are *actions*.

Read more here: http://spark.apache.org/docs/latest/programming-guide.html#transformations

Now lets look at the first line of one of the files

In [None]:
fashion.first()

Now that it's clear that the data is loading properly, we can do some basic data exploration. Lets count the number of items in each dataset using `rdd.count()` function

In [None]:
# print "fashion has {0} rows, electronics {1} rows and sports {2} rows\n".format(_, _, _)

As all three of these datasets are product reviews, we can treat them as a single dataset by using `union` function to get the union of the RDDs

In [None]:
union_of_rdds = fashion.union(electronics)
print union_of_rdds.count()

Another way of loading files is by using a list of comma-separated file paths

In [None]:
text_records = sc.textFile('Data/Reviews/fashion.json,Data/Reviews/electronics.json,Data/Reviews/sports.json')
text_records.count()

You can also use POSIX filepath wildcards to input data using `sc.TextFile()` function

In [None]:
text_records = sc.textFile(???)
text_records.count()

From the first line, we can see that the data is in json format. We can now parse the file using the json library. In order to parally parse each line from `data` rdd, we can use a linewise transformation. 

In [None]:
import json
data = ??? # json.loads(line)

Now let's imagine we want to know the number of lines in each partition. For that, we need to access the data in each single partition and run operations on them instead of on each row.

For this, we will use mapPartitionsWithIndex which takes a partition index and an iterator over the data as arguments. Each function in the API is documented in: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapPartitionsWithIndex

In [None]:
indexed_data = data.mapPartitionsWithIndex(lambda splitIndex, it: [(splitIndex, len([x for x in it]))])

for num_partition, count_partition in indexed_data.collect():
    print "partition {0} has {1} rows".format(num_partition, count_partition)

## Reducers

The next thing we have been tasked to do is **to get the minimum and maximum number of reviews per product**.

In [None]:
ASIN_FIELD = 'asin'

# The rdd product_num will contain (product_asin, total_number_reviews)
product_num = ???


# What are the maximum and minimum number of reviews?
max_num = ???
min_num = ???

print "Max number of reviews is {0}, min number of reviews is {1}".format(max_num, min_num)

![Alt text](Images/reducebykey.png)

We can also do the above `map()` transformations using `values()`, `min(key=)` functions

In [None]:
# What are the maximum and minimum number of reviews?
max_num = product_num.values().max()

# if you want to retain to the key, use mapValues
min_asin, min_num = product_num.min(key=lambda x: x[1])

print "Max number of reviews is {0}, min number of reviews is {1}".format(max_num, min_num)

**EXERCISE**: Find the maximum score that has been given to a review of each product in the product reviews dataset

## Joining multiple sources

Joining data based on a data column is very useful in data munging. In this section we try to join the product reviews dataset with the product metadata

We want to join the product reviews by users to the product metadata.

In [None]:
product_metadata = (sc.textFile('Data/Products/sample_metadata.json').
                       map(lambda x: json.loads(x)))

print product_metadata.first()

We can notice that the product category is a multilevel list. We introduce `flatten_categories()` function to flatten these multilevel categories.

In [None]:
CATEGORIES_FIELD = 'categories'

def flatten_categories(line):
    """Takes the product category record and modifies it by converting the multilevel product category
    field to flattened version
    
    Args: 
        line (dict): product record
    Returns:
        line (dict): modified product record
        
    """
    
    old_cats = line[CATEGORIES_FIELD]
    line[CATEGORIES_FIELD] = [item for sublist in old_cats for item in sublist]
    return line

product_metadata = product_metadata.map(flatten_categories)

We want to join the review data to the metadata about the product. We can use the ASIN for that, which is a unique identifier for each product. In order to do a join, we need to turn each structure into key-value pairs.

In [None]:
key_val_data = data.map(lambda x: (x[ASIN_FIELD], x))

key_val_metadata = product_metadata.map(lambda x: (x[ASIN_FIELD], x))

print "We are joining {0} product reviews to {1} rows of metadata information about the products.\n".format(key_val_data.count(),
                                                                                                            key_val_metadata.count())
print "First row of key_val_data:"
print key_val_data.first()

In [None]:
print "number partitions key_val_data: ", 
print key_val_data.getNumPartitions()
print "number partitions key_val_metadata: ", 
print key_val_metadata.getNumPartitions()
print

joined = ???

key, (review, product) = joined.first()
print "For key {0}:\n\nthe review is {1}\n\nthe product metadata is {2}.\n".format(key, review, product)

What is the number of output partitions of the join? To understand this, the best is to refer back to the Pyspark source code: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/join.py

In [None]:
# QUESTION: what is the number of partitions of the joined dataset?

print "There are {0} partitions".format(???)

To make it easier to manipulate, we will change the structure of the joined rdd to be a single dictionary.

In [None]:
def merge_dictionaries(metadata_line, review_line):
    """merges the two dictionaries together
    
    Args:
        metadata_line (dict): product metadata record
        review_line (dict): review record line
    
    Returns:
        new_dict (dict): merged version of product review and metadata
    """
    new_dict = review_line
    new_dict.update(metadata_line)
    return new_dict

nice_joined = joined.mapValues(lambda (meta, review): merge_dictionaries(meta, review)).values()
row0, row1 = nice_joined.take(2)

print "row 0:\n\n{0}\n\nrow 1:\n\n{1}\n".format(row0, row1)

## GroupByKey

Now that we have joined two data sources, we can start doing some ad-hoc analysis of the data! Now the task is **to get the average product review length for each category**. The categories are encoded as a list of categories, so we first need to 'flatten them out'.

In [None]:
nice_joined.cache()
nice_joined.count()

In [None]:
original_categories = nice_joined.map(lambda x: x[CATEGORIES_FIELD])
flat_categories = nice_joined.flatMap(lambda x: x[CATEGORIES_FIELD])

print "original_categories.take(5):\n"
print '\n'.join([str(x) for x in original_categories.take(5)]) + '\n'

print "flat_categories.take(5):\n"
print '\n'.join([str(x) for x in flat_categories.take(5)]) + '\n'

# How many distinct categories are there??? 
num_categories = flat_categories.distinct().count()
print "There are {0} distinct categories.".format(num_categories)

In [None]:
nice_joined.first()

Next, in order to get the average review length across all categories, we will use a new function: `groupByKey()`

In [None]:
TEXT_FIELD = 'reviewText'

category_review = nice_joined.flatMap(lambda x: [(y, len(x[TEXT_FIELD])) for y in x[CATEGORIES_FIELD]])
print "After the flatMap: " + str(category_review.first())
print "After the groupByKey: " + str(category_review.groupByKey().mapValues(list).first())
print

grouped_category_review = category_review.groupByKey().mapValues(lambda x: (sum(x)/float(len(x))))
print "grouped_category_review.first(): " + str(grouped_category_review.first()) + '\n'

### Now we can sort the categories by average product review length
print "The top 10 categories are: " + str(sorted(grouped_category_review.collect(), key=lambda (cat, len): len, reverse=True)[:10])

**EXERCISE**: Do the same thing, but this time you are not allowed to use groupByKey()!

## Optional: Data skewness

 Data skewness is one of the common problems with Big Data.Having skewed data can affect both the computation cost and the stability of the cluster.
 
 To understand skew, first lets created a normal dataset

In [None]:
num = 1000

data = []

for i in range(0,num):
    for j in range(0,i):
        data.append((i,j))

len(data)

now, we introduce a skewed key to this dataset. 

In [None]:
big_num = 1000000

skew_data = data

for i in range(0, big_num):
    skew_data.append((big_num, i))

len(skew_data)

We can now load this data to a spark RDD and run a shuffle (`groupByKey()`) to see how the skew affects the computation resources.

In [None]:
dataset = skew_data

rdd = sc.parallelize(dataset)
rdd.getNumPartitions()

In [None]:
grouped_rdd = rdd.groupByKey().cache()
mapped_rdd = grouped_rdd.map(lambda (k, v): (k, [(i + 10) for i in v]))
mapped_rdd.count()

## Optional: Integrating Spark with popular Python libraries

In [None]:
import sklearn
import pickle

model = pickle.load(open('Data/classifiers/classifier.pkl', 'r'))
model
bla = fashion.map(lambda x: eval(x)['reviewText']).first()
model_b = sc.broadcast(model)
fashion.map(lambda x: eval(x)['reviewText']).map(lambda x: (x, model_b.value.predict([x])[0])).first()

<h1> Part 2: Spark DataFrame API and Spark SQL</h1>

## Introduction

This is the latter part of the tutorial. The main focus will be on Spark DataFrames and Spark SQL.

In [None]:
review_filepaths = 'Data/Reviews/*'
textRDD = sc.textFile(review_filepaths)

print 'number of reviews : {0}'.format(textRDD.count())

print 'sample row : \n{0}'.format(textRDD.first())

## Loading Data into a DataFrame

A DataFrame requires schema. There are two main functions that can be used to assign schema into an RDD. 
+ Inferring Schema : This functions infers the schema of the RDD by observing it
+ Applying Schema  : This function applies a manually defined schema an RDD

In [None]:
# You need SQL context do 
from pyspark.sql import SQLContext

# # Instantiate SQL Context
sqlContext = SQLContext(sc)
# sqlContext

# print sqc

## Inferring the Schema Using Reflection

In [None]:
inferredDF = sqlContext.read.json(review_filepaths)
inferredDF.first()

In [None]:
inferredDF.printSchema()

## Manually Specifying the Schema

The Documentation about different data types can be found at [Spark SQL DataTypes section](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types "Spark SQL DataTypes Documentation") 
+ Defining the schema can be useful

In [None]:
# Export the modules
from pyspark.sql.types import *

# Define Schema
REVIEWS_SCHEMA_DEF = StructType([
        StructField('reviewerID', StringType(), True),
        StructField('asin', StringType(), True),
        StructField('reviewerName', StringType(), True),
        StructField('helpful', ArrayType(
                IntegerType(), True), 
            True),
#         add review text here
        StructField('reviewTime', StringType(), True),
        StructField('overall', DoubleType(), True),
        StructField('summary', StringType(), True),
        StructField('unixReviewTime', LongType(), True)
    ])

print REVIEWS_SCHEMA_DEF

*QUESTION*: What do you think will happen if *QUESTION*: What do you think will happen if we remove some fields from this schema?

1. The schema fails
2. The schema works fine

ANSWER???

In [None]:
# Using a handcrafted schema with to create a DataFrame
appliedDF = sqlContext.read.json(review_filepaths)
appliedDF.first()

<h1>6. DataFrame operations</h1>

Spark DataFrame API allow you to do multiple operations on the Data. The primary advantage of using the DataFrame API is that you can do data transoformations with the high level API without having to use Python. Using the high level API has its advantages which will be explained later in the tutorial.

DataFrame API have functionality similar to that of Core RDD API. For example: 
+ map                     : foreach, Select
+ mapPartition            : foreachPartition
+ filter                  : filter
+ groupByKey, reduceByKey : groupBy 

<h2>6.1. Selecting Columns</h2>

You can use SELECT statement to select columns from your dataframe

In [None]:
columnDF = appliedDF.select(appliedDF.asin,
                            appliedDF.overall,
                            appliedDF.reviewText,
#                             add a meaningful helpful column
                            appliedDF.reviewerID,
                            appliedDF.unixReviewTime).\
                    withColumnRenamed('(helpful[0] / helpful[1])','helpful')
columnDF.show()

## Missing Values

Similar to Pandas, DataFrames come equipped with functions to address missing data.
+ dropna function: can be used to remove observations with missing values
+ fillna function: can be used to fill missing values with a default value

In [None]:
# get null observations out

# need to drop rows that dont have overall score
# need 0.0 as the default value in helpful column 
densedDF= ???
densedDF.show()

## Filtering rows

Filtering lets you select rows based on arguments. The implementation pattern is similar to filtering RDDs, But simpler. 

In [None]:
filteredDF= ??? # densedDF.overall>=3
filteredDF.show()

## Grouping by overall scores

Grouping is equivalent to the groupByKey in the core RDD API. You can transform the grouped values using a summary action such as:
+ count
+ sum
+ average
+ max and so on ...


Lets gropu the numbre of reviews by the overall score in the reviews dataset

In [None]:
grouped = ???
grouped.show()

## Joining DataFrames together

You can join two DataFrames together by using a common key.

In [None]:
product_filepaths = 'Data/Products/*'
productRDD = sc.textFile(product_filepaths)
productRDD.first()

In [None]:
# Load Dataset2 : Amazon Product information
# First, define Schema for second Dataset
PRODUCTS_SCHEMA_DEF = StructType([
        StructField('asin', StringType(), True),
        StructField('title', StringType(), True),
        StructField('price', DoubleType(), True),
        StructField('categories', ArrayType(ArrayType(
            StringType(), True),True),True)
    ])

# Load the dataset
productDF = sqlContext.read.json(product_filepaths,PRODUCTS_SCHEMA_DEF)
productDF.show()
# productDF.first()

Now let us join the two datasets together based on `asin`

In [None]:
enrichedReviews = (filteredDF.join(productDF, 
                                   ???).
                              dropna(subset="title"))
enrichedReviews.count()

When you join two RDDs, you have to restructure the data into (k,V) pairs where the key is the join key. This may involve two additional map transformations. This is not necessary in DataFrames.  

In [None]:
enrichedReviews.show()

## Saving your DataFrame 

Now that we have done some operations on the data, we can save the file for later use. Standard data formats are a great way to opening up valuable data to your entire organization. Spark DataFrames can be saved in many different formats including and not limited to JSON, parquet, Hive and etc... 

In [None]:
try:
    columnDF.write.parquet('Data/Outputs/reviews_filtered.parquet')
    print "Saved as parquet successfully"
except:
    print "ERROR !!"



# Using Spark SQL

Spark DataFrames also allow you to use Spark SQL to query from Petabytes of data. Spark comes with a SQL like query language which can be used to query from Distributed DataFrames. A key advantage of using Spark SQL is that the [Catelyst query optimizer](https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html "Catelyst") under the hood transforms your SQL query to run it most efficiently. 

## Example Queries

Spark SQL can leverage the same functionality as the DataFrame API provides. In fact, it provides more functionality via SQL capabilities and HQL capabilities that are available to Spark SQL environment. 

For the sake of time constrains, I will explain different functions available in Spark SQL environment by using examples that use multiple functions. This will benefit by:
+ Covering many functions that are possible via spark SQL
+ Giving an understanding about how to pipe multiple functions together


In [None]:
# Read the reviews parquet file
reviewsDF = sqlContext.read.parquet('Data/Outputs/reviews_filtered.parquet')

# Register the DataFrames to be used in sql
reviewsDF.registerTempTable("reviews")
productDF.registerTempTable("products")

print 'There are {0} reviews about {1} products'.format(reviewsDF.count(),productDF.count())

In [None]:
sql_query = """SELECT reviews.asin, overall, reviewText, price
            FROM reviews JOIN products ON  reviews.asin=products.asin
            WHERE price > 50.00
"""

result = sqlContext.sql(sql_query)
result.show()

<h1>Optional: User Defined Functions</h1>

Spark SQL also provides the functionality similar to User Defined Functions (UDF) offering in Hive. Spark uses registerFunction() function to register python functions in SQLContext.

In [None]:
import re

def transform_review(review):
    x1 = re.sub('[^0-9a-zA-Z\s]+','',review)
    return x1.lower()

result.registerTempTable("result")
sqlContext.registerFunction("to_lowercase", lambda x:transform_review(x), returnType=StringType())

sql_query_transform = """SELECT asin, reviewText, to_lowercase(reviewText) as cleaned
            FROM result
"""

result_transform = sqlContext.sql(sql_query_transform)
result_transform.show()

<h1>Optional : Mix and Match!!</h1>

You can also mix DataFrames, RDDs and SparkSQL to make it work for you. 

<h2>Scenario</h2>

We want to investigate the average rating of reviews in terms of the categories they belong to. In order to do this, we:
+ query the needed data using DataFrames API
+ classify the reviews into different categories using core RDD API
+ query the avearage rating for each category using Spark SQL

In [None]:
import sklearn
import cPickle

from pyspark.sql import Row

model = cPickle.load(open('Data/classifiers/classifier.pkl', 'r'))
classifier_b = sc.broadcast(model)

classifiedRDD = result_transform.filter("cleaned <> ''")\
                                .rdd.map(lambda row: 
                                     (row.asin,row.reviewText,str(classifier_b.value.predict([row.reviewText])[0]))
                                    )

CLASSIFIED_SCHEMA = StructType([
        StructField('asin', StringType(), True),
        StructField('review', StringType(), True),
        StructField('category', StringType(), True)
    ])

classifiedDF = sqlContext.createDataFrame(classifiedRDD,CLASSIFIED_SCHEMA)

classifiedDF.show()

In [None]:
classifiedDF.registerTempTable('enrichedReviews')

sql_query_test = """SELECT category, avg(overall) as avgRating
            FROM reviews 
            JOIN products ON reviews.asin=products.asin 
            JOIN enrichedReviews ON products.asin=enrichedReviews.asin
            WHERE price > 50.0
            GROUP BY enrichedReviews.category
"""

resultTest = sqlContext.sql(sql_query_test)
resultTest.show()