# Part 0: Initiating a `SparkSession`

1\. Initiate a `SparkSession`. A `SparkSession` initializes both a `SparkContext` and a `SQLContext` to use RDD-based and DataFrame-based functionalities of Spark.

In [1]:
import pyspark as ps    # for the pyspark suite

spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("df lecture") \
            .getOrCreate()

# Part 1: Introduction to SparkSQL

SparkSQL allows you to execute relational queries on **structured** data using 
Spark. Today we'll get some practice with this by running some queries on a 
Yelp dataset. To begin, you will load data into a Spark `DataFrame`, which can 
then be queried as a SQL table. 

1\. Load the Yelp business data using the function `.read.json()` from the `SparkSession()` object, with input file `data/yelp_academic_dataset_business.json.gz`.

2\. Print the schema and register the `yelp_business_df` as a temporary 
table named `yelp_business` (this will enable us to query the table later using 
our `SparkSession()` object).

Now, you can run SQL queries on the `yelp_business` table. For example:

```python
result = spark.sql("SELECT name, city, state, stars FROM yelp_business LIMIT 10")
result.show()
```

In [2]:
yelp_business_df = spark.read.json('data/yelp_academic_dataset_business.json.gz')

yelp_business_df.printSchema()

print(yelp_business_df.count())

yelp_business_df.createOrReplaceTempView("yelp_business")

root
 |-- attributes: struct (nullable = true)
 |    |-- Accepts Credit Cards: string (nullable = true)
 |    |-- Accepts Insurance: boolean (nullable = true)
 |    |-- Ages Allowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: struct (nullable = true)
 |    |    |-- casual: boolean (nullable = true)
 |    |    |-- classy: boolean (nullable = true)
 |    |    |-- divey: boolean (nullable = true)
 |    |    |-- hipster: boolean (nullable = true)
 |    |    |-- intimate: boolean (nullable = true)
 |    |    |-- romantic: boolean (nullable = true)
 |    |    |-- touristy: boolean (nullable = true)
 |    |    |-- trendy: boolean (nullable = true)
 |    |    |-- upscale: boolean (nullable = true)
 |    |-- Attire: string (nullable = true)
 |    |-- BYOB: boolean (nullable = true)
 |    |-- BYOB/Corkage: string (nullable = true)
 |    |-- By Appointment Only: boolean (nullable = true)
 |    |-- Caters: boolean (nullable = true)
 |    |-- Coat Check

In [3]:
result = spark.sql("SELECT name, city, state, stars FROM yelp_business LIMIT 10")
result.show()

+--------------------+-----------+-----+-----+
|                name|       city|state|stars|
+--------------------+-----------+-----+-----+
|   Eric Goldberg, MD|    Phoenix|   AZ|  3.5|
|        Clancy's Pub| Dravosburg|   PA|  3.5|
|Cool Springs Golf...|Bethel Park|   PA|  2.5|
|    Verizon Wireless| Pittsburgh|   PA|  3.5|
|       Emil's Lounge|   Braddock|   PA|  4.5|
|Alexion's Bar & G...|   Carnegie|   PA|  4.0|
|Flynn's E W Tire ...|   Carnegie|   PA|  1.5|
|Forsythe Miniatur...|   Carnegie|   PA|  4.0|
|Quaker State Cons...|   Carnegie|   PA|  2.5|
|Kings Family Rest...|   Carnegie|   PA|  3.5|
+--------------------+-----------+-----+-----+



3\. Write a query or a sequence of transformations that returns the `name` of entries that fulfill the following 
conditions:

   - Rated at 5 `stars`
   - In the `city` of Phoenix
   - Accepts credit card (Reference the `'Accepts Credit Card'` field by 
   ``` attributes.`Accepts Credit Cards` ```.  **NOTE**: We are actually looking for the value `'true'`, not the boolean value True!)
   - Contains Restaurants in the `categories` array.  

   Hint: `LATERAL VIEW explode()` can be used to access the individual elements
   of an array (i.e. the `categories` array). For reference, you can see the 
   [first example](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView) on this page.
   
   Hint: In spark, while using `filter()` or `where()`, you can create a condition that tests if a column, made of an array, contains a given value. The functions is [pyspark.sql.functions.array_contains](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_contains).

In [4]:
from pyspark.sql.functions import col, array_contains

out = yelp_business_df.filter((col('stars') == 5) &
                        (col('city') == 'Phoenix') &
                        (col('attributes.`Accepts Credit Cards`') == 'true') &
                        (array_contains(col('categories'),'Restaurants'))).select('name')
[print(row) for row in out.collect()]
print('___'*10)
print(out.show())

Row(name='Auslers Grill')
Row(name="Mulligan's Restaurant")
Row(name='Sunfare')
Row(name='Subway')
Row(name="Lil Cal's")
Row(name="Ed's")
Row(name='Frenchys Caribbean Dogs')
Row(name='WY Market')
Row(name='Pollo Sabroso')
Row(name='Queen Creek Olive Mill Oils & Olives Biltmore Fashion Park')
Row(name='Gluten Free Creations Bakery')
Row(name='Panini Bread and Grill')
Row(name='One Eighty Q')
Row(name='Saffron JAK Original Stonebread Pizzas')
Row(name='Los Primos Carniceria')
Row(name="Bertie's Of Arcadia")
Row(name='Little Miss BBQ')
Row(name='Las Jicaras Mexican Grill')
Row(name='Santos Lucha Libre')
Row(name='Taqueria El Chino')
Row(name="Filiberto's Mexican Food")
Row(name='Helpings Cafe, Market and Catering')
Row(name='Altamimi Restutant')
Row(name='Tacos Huicho')
Row(name="Jimmy John's")
Row(name='Ten Handcrafted American Fare & Spirits')
Row(name='The Brown Bag')
Row(name='Coe Casa')
Row(name="Adela's Italian")
Row(name='The Loaded Potato')
Row(name='Banh Mi Bistro Vietnamese Eate

In [5]:
spark.sql("""SELECT DISTINCT name from yelp_business
                   LATERAL VIEW explode(categories) c AS category
                   WHERE stars = 5 
                   AND city = 'Phoenix'
                   AND attributes.`Accepts Credit Cards` = 'true'
                   AND category = 'Restaurants'""").collect()

[Row(name='Panini Bread and Grill'),
 Row(name='Gluten Free Creations Bakery'),
 Row(name='Ten Handcrafted American Fare & Spirits'),
 Row(name='One Eighty Q'),
 Row(name='Subway'),
 Row(name='Banh Mi Bistro Vietnamese Eatery'),
 Row(name='Tacos Huicho'),
 Row(name='Helpings Cafe, Market and Catering'),
 Row(name='Sunfare'),
 Row(name="Bertie's Of Arcadia"),
 Row(name='Los Primos Carniceria'),
 Row(name='The Brown Bag'),
 Row(name='Little Miss BBQ'),
 Row(name='Auslers Grill'),
 Row(name="Adela's Italian"),
 Row(name='Coe Casa'),
 Row(name='Altamimi Restutant'),
 Row(name='Las Jicaras Mexican Grill'),
 Row(name='Queen Creek Olive Mill Oils & Olives Biltmore Fashion Park'),
 Row(name='Saffron JAK Original Stonebread Pizzas'),
 Row(name='Couscous Express'),
 Row(name='The Loaded Potato'),
 Row(name='Santos Lucha Libre'),
 Row(name="Mulligan's Restaurant"),
 Row(name='Taqueria El Chino'),
 Row(name="Jimmy John's"),
 Row(name="Lil Cal's"),
 Row(name='Frenchys Caribbean Dogs'),
 Row(name="E

## Part 2: Spark and SparkSQL in Practice 

Now that we have a basic knowledge of how SparkSQL works, let's try dealing with a real-life scenario where some data manipulation/cleaning is required before we can query the data with SparkSQL. We will be using a dataset of user information and a data set of purchases that our users have made. We'll be cleaning the data in a regular Spark RDD before querying it with SparkSQL.

   1\. Load a dataframe `users` from `data/users.txt` using `spark.read.csv` with the following parameters: no headers, use separator `";"`, and infer the schema of the underlying data (for now). Use `.show(5)` and `.printSchema()` to check the result.
   
   2\. Create a schema for this dataset using proper names and types for the columns, using types from the `pyspark.sql.types` module (see lecture). Use that schema to read the `users` dataframe again and use `.printSchema()` to check the result.
   
   Note: Each row in the `users` file represents the user with his/her `user_id, name, email, phone`.

In [6]:
# import the many data types
from pyspark.sql.types import *

# create a schema of your own
users_schema = StructType( [
    StructField('user_id',IntegerType(),True),
    StructField('name',StringType(),True),
    StructField('email',StringType(),True),
    StructField('phone',StringType(),True) ] )

In [7]:
users = spark.read.csv('data/users.txt',
                         header=False,       # use headers or not
                         quote='"',         # char for quotes
                         sep=";",           # char for separation
                         schema=users_schema, # using predefined schema
                         inferSchema=False)  # do we infer schema or not ?

users.count()
users.show(5)
users.printSchema()

+----------+-----------------+--------------------+--------------------+
|   user_id|             name|               email|               phone|
+----------+-----------------+--------------------+--------------------+
|1106214172|Prometheus Barwis|prometheus.barwis...|      (533) 072-2779|
| 527133132|Ashraf Bainbridge|ashraf.bainbridge...|                null|
|1290614884|   Alain Hennesey|alain.hennesey@fa...|(942) 208-8460,(8...|
|1700818057| Hamed Fingerhuth|hamed.fingerhuth@...|                null|
|  17378782|    Annamae Leyte|annamae.leyte@msn...|                null|
+----------+-----------------+--------------------+--------------------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)



   3\. Load an RDD `transactions_rdd` from `data/transactions.txt` using `spark.sparkContext.textFile`. Use `.take(5)` to check the result.
   
   Use `.map()` to split those csv-like lines, to strip the dollar sign on the second column, and to cast each column to its proper type.
   
   4\. Create a schema for this dataset using proper names and types for the columns, using types from the `pyspark.sql.types` module (see lecture). Use that schema to convert `transactions_rdd` into a dataframe `transactions`  and use `.show(5)` and `.printSchema()` to check the result.
   
   Each row in the `transactions` file has the columns  `user_id, amount_paid, date`.

In [8]:
transactions_rdd = spark.sparkContext.textFile('data/transactions.txt')\
                        .map(lambda s : s.split(';'))\
                        .map(lambda row : (int(row[0]),float(row[1].lstrip("$")),row[2]))

transactions_rdd.take(5)

[(815581247, 144.82, '2015-09-05'),
 (1534673027, 140.93, '2014-03-11'),
 (842468364, 104.26, '2014-05-06'),
 (1720001139, 194.6, '2015-08-24'),
 (1397891675, 307.72, '2015-09-25')]

In [9]:
# create a schema of your own
transactions_schema = StructType( [
    StructField('user_id',IntegerType(),True),
    StructField('amount_paid',FloatType(),True),
    StructField('date',StringType(),True) ] )

In [10]:
transactions = spark.createDataFrame(transactions_rdd, transactions_schema)

transactions.count()
transactions.show(5)
transactions.printSchema()

+----------+-----------+----------+
|   user_id|amount_paid|      date|
+----------+-----------+----------+
| 815581247|     144.82|2015-09-05|
|1534673027|     140.93|2014-03-11|
| 842468364|     104.26|2014-05-06|
|1720001139|      194.6|2015-08-24|
|1397891675|     307.72|2015-09-25|
+----------+-----------+----------+
only showing top 5 rows

root
 |-- user_id: integer (nullable = true)
 |-- amount_paid: float (nullable = true)
 |-- date: string (nullable = true)



5\. Write a sequence of transformations or a SQL query that returns the names and the amount paid for the users with the **top 10** transaction amounts.

In [11]:
users.join(transactions.orderBy(col('amount_paid'), ascending=False).limit(10),
           transactions.user_id == users.user_id).show()

+----------+-----------------+--------------------+--------------------+----------+-----------+----------+
|   user_id|             name|               email|               phone|   user_id|amount_paid|      date|
+----------+-----------------+--------------------+--------------------+----------+-----------+----------+
|1093225999|   Landri Fulshur|landri.fulshur@me...|(898) 198-1781,(6...|1093225999|     999.99|2015-03-04|
| 504736332|      Raziel Merk|raziel.merk@faceb...|(275) 456-4661,(7...| 504736332|     999.99|2015-01-10|
|1009490315|Leilani Cranstoun|leilani.cranstoun...|                null|1009490315|     999.98|2014-09-05|
|1378643543|   Zasia Scrivens|zasia.scrivens@ms...|      (880) 354-8779|1378643543|     999.98|2014-04-04|
|  50874512|Samyrah Milbourne|samyrah.milbourne...|                null|  50874512|     999.98|2015-03-07|
| 420754422|   Vishwak Farrow|vishwak.farrow@me...|(979) 784-6613,(9...| 420754422|     999.98|2015-11-23|
| 740624030|      Ori Horrage|ori.hor

In [12]:
users.createOrReplaceTempView('users')
transactions.createOrReplaceTempView('transactions')

spark.sql("""SELECT users.name, top_transactions.amount_paid
                FROM (SELECT * FROM transactions ORDER BY amount_paid DESC LIMIT 10) top_transactions
                   INNER JOIN users 
                   ON top_transactions.user_id = users.user_id
                """).show()

+-----------------+-----------+
|             name|amount_paid|
+-----------------+-----------+
|   Landri Fulshur|     999.99|
|      Raziel Merk|     999.99|
|Leilani Cranstoun|     999.98|
|   Zasia Scrivens|     999.98|
|Samyrah Milbourne|     999.98|
|   Vishwak Farrow|     999.98|
|      Ori Horrage|     999.98|
|    Kianu Dyneley|     999.99|
|    Andrian Waite|     999.99|
|    Veida Hubbard|     999.98|
+-----------------+-----------+

