In [1]:
import pyspark as ps
from pyspark import SparkContext
from pyspark.sql import HiveContext
import json
import pprint

##Part 1: Introduction to SparkSQL

In [2]:
sc = SparkContext()
hive_contxt = HiveContext(sc)

In [3]:
yelp_business_schema_rdd = hive_contxt.jsonFile('s3n://AKIAIRFPID2R5FB4PMUA:TwEJU17JSQANeo6aIiz0GwHbL64RhKI9lw2tOHEd@sparkdatasets/yelp_academic_dataset_business.json')

In [4]:
yelp_business_schema_rdd.printSchema()

root
 |-- attributes: struct (nullable = true)
 |    |-- Accepts Credit Cards: string (nullable = true)
 |    |-- Accepts Insurance: boolean (nullable = true)
 |    |-- Ages Allowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: struct (nullable = true)
 |    |    |-- casual: boolean (nullable = true)
 |    |    |-- classy: boolean (nullable = true)
 |    |    |-- divey: boolean (nullable = true)
 |    |    |-- hipster: boolean (nullable = true)
 |    |    |-- intimate: boolean (nullable = true)
 |    |    |-- romantic: boolean (nullable = true)
 |    |    |-- touristy: boolean (nullable = true)
 |    |    |-- trendy: boolean (nullable = true)
 |    |    |-- upscale: boolean (nullable = true)
 |    |-- Attire: string (nullable = true)
 |    |-- BYOB: boolean (nullable = true)
 |    |-- BYOB/Corkage: string (nullable = true)
 |    |-- By Appointment Only: boolean (nullable = true)
 |    |-- Caters: boolean (nullable = true)
 |    |-- Coat Check

In [5]:
yelp_business_schema_rdd.registerTempTable('yelp_business')

In [6]:
hive_contxt.sql("""SELECT DISTINCT name from yelp_business
                   LATERAL VIEW explode(categories) c AS category
                   WHERE stars = 5 
                   AND city = 'Phoenix'
                   AND attributes.`Accepts Credit Cards` = 'true'
                   AND category = 'Restaurants'""").collect()

[Row(name=u'Auslers Grill'),
 Row(name=u'Panini Bread and Grill'),
 Row(name=u'WY Market'),
 Row(name=u'Couscous Express'),
 Row(name=u'Las Jicaras Mexican Grill'),
 Row(name=u"Adela's Italian"),
 Row(name=u"Jimmy John's"),
 Row(name=u'Los Primos Carniceria'),
 Row(name=u'Saffron JAK Original Stonebread Pizzas'),
 Row(name=u"Filiberto's Mexican Food"),
 Row(name=u'Little Miss BBQ'),
 Row(name=u'The Brown Bag'),
 Row(name=u'Sunfare'),
 Row(name=u'Altamimi Restutant'),
 Row(name=u'Tacos Huicho'),
 Row(name=u'Coe Casa'),
 Row(name=u'The Loaded Potato'),
 Row(name=u'Gluten Free Creations Bakery'),
 Row(name=u'Pollo Sabroso'),
 Row(name=u"Lil Cal's"),
 Row(name=u'Helpings Cafe, Market and Catering'),
 Row(name=u"Mulligan's Restaurant"),
 Row(name=u'Ten Handcrafted American Fare & Spirits'),
 Row(name=u'Frenchys Caribbean Dogs'),
 Row(name=u"Bertie's Of Arcadia"),
 Row(name=u'Queen Creek Olive Mill Oils & Olives Biltmore Fashion Park'),
 Row(name=u'Banh Mi Bistro Vietnamese Eatery'),
 Row(na

##Part 2: Spark and SparkSQL in practice

In [7]:
user_rdd = sc.textFile('s3n://AKIAIRFPID2R5FB4PMUA:TwEJU17JSQANeo6aIiz0GwHbL64RhKI9lw2tOHEd@sparkdatasets/users.txt')
transaction_rdd = sc.textFile('s3n://AKIAIRFPID2R5FB4PMUA:TwEJU17JSQANeo6aIiz0GwHbL64RhKI9lw2tOHEd@sparkdatasets/transactions.txt')

In [8]:
def make_json_single_row(row, field_names, strip_money_sign=False):
    row_lst = row.split(';')
    if strip_money_sign:
        row_lst[1] = float(row_lst[1].lstrip('$'))
    return json.dumps(dict(zip(field_names, row_lst)))

def make_json(rdd, field_names, strip_money_sign=False):
    return rdd.map(lambda row: make_json_single_row(row, field_names, strip_money_sign=strip_money_sign))

In [9]:
user_field_names = ['user_id', 'name', 'email', 'phone']
user_split_rdd = make_json(user_rdd, user_field_names, strip_money_sign=False)

In [10]:
transaction_field_names = ['user_id', 'amount_paid', 'date']
transaction_split_rdd = make_json(transaction_rdd, transaction_field_names, strip_money_sign=True)

In [11]:
user_schema_rdd = hive_contxt.jsonRDD(user_split_rdd)
transaction_schema_rdd = hive_contxt.jsonRDD(transaction_split_rdd)

In [12]:
print 'USER SCHEMA:'
print '--------------------'
user_schema_rdd.printSchema()
print '\n'
print 'TRANSACTION SCEHMA:'
print '--------------------'
transaction_schema_rdd.printSchema()

USER SCHEMA:
--------------------
root
 |-- email: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- user_id: string (nullable = true)



TRANSACTION SCEHMA:
--------------------
root
 |-- amount_paid: double (nullable = true)
 |-- date: string (nullable = true)
 |-- user_id: string (nullable = true)



In [13]:
user_schema_rdd.registerTempTable('user')
transaction_schema_rdd.registerTempTable('transaction')

In [14]:
print 'USER ROWS:'
print '--------------------'
pprint.pprint(hive_contxt.sql("""SELECT * FROM user LIMIT 2""").collect())
print '\n'
print 'TRANSACTION ROWS:'
print '--------------------'
pprint.pprint(hive_contxt.sql("""SELECT * FROM transaction LIMIT 2""").collect())

USER ROWS:
--------------------
[Row(email=u'prometheus.barwis@me.com', name=u'Prometheus Barwis', phone=u'(533) 072-2779', user_id=u'1106214172'),
 Row(email=u'ashraf.bainbridge@gmail.com', name=u'Ashraf Bainbridge', phone=u'', user_id=u'527133132')]


TRANSACTION ROWS:
--------------------
[Row(amount_paid=144.82, date=u'2015-09-05', user_id=u'815581247'),
 Row(amount_paid=140.93, date=u'2014-03-11', user_id=u'1534673027')]


In [16]:
hive_contxt.sql("""SELECT user.name, top_transaction.amount_paid
                   FROM (SELECT * FROM transaction ORDER BY amount_paid DESC LIMIT 10) top_transaction
                   INNER JOIN user
                   ON top_transaction.user_id = user.user_id
                """).collect()

[Row(name=u'Landri Fulshur', amount_paid=999.99),
 Row(name=u'Raziel Merk', amount_paid=999.99),
 Row(name=u'Leilani Cranstoun', amount_paid=999.98),
 Row(name=u'Zasia Scrivens', amount_paid=999.98),
 Row(name=u'Samyrah Milbourne', amount_paid=999.98),
 Row(name=u'Vishwak Farrow', amount_paid=999.98),
 Row(name=u'Ori Horrage', amount_paid=999.98),
 Row(name=u'Kianu Dyneley', amount_paid=999.99),
 Row(name=u'Andrian Waite', amount_paid=999.99),
 Row(name=u'Veida Hubbard', amount_paid=999.98)]