# Part 0: Initiating a `SparkSession`

1\. Initiate a `SparkSession`. A `SparkSession` initializes both a `SparkContext` and a `SQLContext` to use RDD-based and DataFrame-based functionalities of Spark.

In [115]:
import pyspark as ps

spark = ps.sql.SparkSession.builder \
        .master("local[4]") \
        .appName("df lecture") \
        .getOrCreate()
        
sc = spark.sparkContext
sq = ps.SQLContext(sc)

In [21]:
df.registerTempTable?

# Part 1: Introduction to SparkSQL

SparkSQL allows you to execute relational queries on **structured** data using 
Spark. Today we'll get some practice with this by running some queries on a 
Yelp dataset. To begin, you will load data into a Spark `DataFrame`, which can 
then be queried as a SQL table. 

1\. Load the Yelp business data using the function `.read.json()` from the `SparkSession()` object, with input file `data/yelp_academic_dataset_business.json.gz`.

In [22]:
df=spark.read.json('/Users/datascientist/Downloads/spark_files/data/yelp_academic_dataset_business.json.gz')

df.registerTempTable("yelp_business")

2\. Print the schema and register the `yelp_business_df` as a temporary 
table named `yelp_business` (this will enable us to query the table later using 
our `SparkSession()` object).

Now, you can run SQL queries on the `yelp_business` table. For example:

```python
result = spark.sql("SELECT name, city, state, stars FROM yelp_business LIMIT 10")
result.show()
```

In [79]:
result = spark.sql("SELECT categories FROM yelp_business LIMIT 1")
result.show()

+--------------------+
|          categories|
+--------------------+
|[Doctors, Health ...|
+--------------------+



3\. Write a query or a sequence of transformations that returns the `name` of entries that fulfill the following 
conditions:

   - Rated at 5 `stars`
   - In the `city` of Phoenix
   - Accepts credit card (Reference the `'Accepts Credit Card'` field by 
   ``` attributes.`Accepts Credit Cards` ```)
   - Contains Restaurants in the `categories` array.  

   Hint: `LATERAL VIEW explode()` can be used to access the individual elements
   of an array (i.e. the `categories` array). For reference, you can see the 
   [first example](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView) on this page.
   
   Hint: In spark, while using `filter()` or `where()`, you can create a condition that tests if a column, made of an array, contains a given value. The functions is [pyspark.sql.functions.array_contains](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_contains).

In [70]:
df.categories.like('Restaurants')

Column<categories LIKE Restaurants>

In [93]:
result = spark.sql("SELECT name, city, categories, attributes FROM yelp_business LATERAL VIEW explode(categories) as cat WHERE stars = '5.0' AND city = 'Phoenix' AND attributes.`Accepts Credit Cards` = 'true' AND cat='Restaurants'")
result.show()

+--------------------+-------+--------------------+--------------------+
|                name|   city|          categories|          attributes|
+--------------------+-------+--------------------+--------------------+
|       Auslers Grill|Phoenix|       [Restaurants]|[true,null,null,f...|
|Mulligan's Restau...|Phoenix|       [Restaurants]|[true,null,null,b...|
|             Sunfare|Phoenix|[Food Delivery Se...|[true,null,null,n...|
|              Subway|Phoenix|[Fast Food, Sandw...|[true,null,null,n...|
|           Lil Cal's|Phoenix|       [Restaurants]|[true,null,null,f...|
|                Ed's|Phoenix|[American (Tradit...|[true,null,null,f...|
|Frenchys Caribbea...|Phoenix|[Food, Hot Dogs, ...|[true,null,null,n...|
|           WY Market|Phoenix|[American (Tradit...|[true,null,null,b...|
|       Pollo Sabroso|Phoenix|[Fast Food, Ameri...|[true,null,null,n...|
|Queen Creek Olive...|Phoenix|[Food, Specialty ...|[true,null,null,b...|
|Gluten Free Creat...|Phoenix|[Bakeries, Food, ...|

## Part 2: Spark and SparkSQL in Practice 

Now that we have a basic knowledge of how SparkSQL works, let's try dealing with a real-life scenario where some data manipulation/cleaning is required before we can query the data with SparkSQL. We will be using a dataset of user information and a data set of purchases that our users have made. We'll be cleaning the data in a regular Spark RDD before querying it with SparkSQL.

   1\. Load a dataframe `users` from S3 link `''s3a://sparkdatasets/users.txt'` (no credentials needed but if you en
   counter any problem just us
   
   e local copy `data/users.txt` instead) using `spark.read.csv` with the following parameters: no headers, use separator `";"`, and infer the schema of the underlying data (for now). Use `.show(5)` and `.printSchema()` to check the result.

In [128]:
users=spark.read.csv('../data/users.txt',header=False,sep=';',inferSchema=True)
users.take(10)
type(users)

pyspark.sql.dataframe.DataFrame

   2\. Create a schema for this dataset using proper names and types for the columns, using types from the `pyspark.sql.types` module (see lecture). Use that schema to read the `users` dataframe again and use `.printSchema()` to check the result.
   
   Note: Each row in the `users` file represents the user with his/her `user_id, name, email, phone`.

In [163]:
users = users.toDF('id','name','email','phone')
users.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)



In [166]:
users.show()

+----------+--------------------+--------------------+--------------------+
|        id|                name|               email|               phone|
+----------+--------------------+--------------------+--------------------+
|1106214172|   Prometheus Barwis|prometheus.barwis...|      (533) 072-2779|
| 527133132|   Ashraf Bainbridge|ashraf.bainbridge...|                null|
|1290614884|      Alain Hennesey|alain.hennesey@fa...|(942) 208-8460,(8...|
|1700818057|    Hamed Fingerhuth|hamed.fingerhuth@...|                null|
|  17378782|       Annamae Leyte|annamae.leyte@msn...|                null|
|1723254379|         Chao Peachy|chao.peachy@me.co...|      (510) 121-0098|
|1946358537|Somtochukwu Mouri...|somtochukwu.mouri...|      (669) 504-8080|
|  33663453|     Elisabeth Berry|elisabeth.berry@f...|      (802) 973-8267|
|1329323232|       Jalan Blakely|jalan.blakely@gma...|                null|
|  68524725|         Lyric Boddy|lyric.boddy@yahoo...|      (273) 077-4039|
| 629898066|

   3\. Load an RDD `transactions_rdd` from S3 link `''s3a://sparkdatasets/transactions.txt'` (no credentials needed but if you encounter any problem just use local copy `data/transactions.txt` instead) using `spark.sparkContext.textFile`. Use `.take(5)` to check the result.
   
   Use `.map()` to split those csv-like lines, to strip the dollar sign on the second column, and to cast each column to its proper type.

In [145]:
transactions=spark.read.csv('../data/transactions.txt',header=False,sep=';',inferSchema=True)
transactions.show()

+----------+-------+--------------------+
|       _c0|    _c1|                 _c2|
+----------+-------+--------------------+
| 815581247|$144.82|2015-09-05 00:00:...|
|1534673027|$140.93|2014-03-11 00:00:...|
| 842468364|$104.26|2014-05-06 00:00:...|
|1720001139|$194.60|2015-08-24 00:00:...|
|1397891675|$307.72|2015-09-25 00:00:...|
| 926282663| $36.69|2014-10-24 00:00:...|
| 694853136| $39.59|2014-11-26 00:00:...|
| 636287877|$430.94|2015-06-12 00:00:...|
|1396310477|  $31.4|2014-12-05 00:00:...|
|1279939289|$180.69|2015-03-26 00:00:...|
| 859061953|$383.35|2014-06-06 00:00:...|
|1983919868| $256.2|2015-09-28 00:00:...|
| 589339046|$930.56|2014-09-21 00:00:...|
|1559785598|$423.77|2015-05-18 00:00:...|
| 347589978|$309.53|2015-10-11 00:00:...|
| 963722938|$299.19|2014-04-06 00:00:...|
|1808365853|$426.21|2015-09-10 00:00:...|
| 417552135|$732.27|2015-09-30 00:00:...|
| 744965566|$186.33|2015-12-30 00:00:...|
|1513020241| $925.8|2014-10-06 00:00:...|
+----------+-------+--------------

   4\. Create a schema for this dataset using proper names and types for the columns, using types from the `pyspark.sql.types` module (see lecture). Use that schema to convert `transactions_rdd` into a dataframe `transactions`  and use `.show(5)` and `.printSchema()` to check the result.
   
   Each row in the `transactions` file has the columns  `user_id, amount_paid, date`.

In [146]:
transactions = transactions.toDF('id','amount_paid','date')
transactions.printSchema()

root
 |-- id: integer (nullable = true)
 |-- amount_paid: string (nullable = true)
 |-- date: timestamp (nullable = true)



5\. Write a sequence of transformations or a SQL query that returns the names and the amount paid for the users with the **top 10** transaction amounts.

In [172]:
transactions.registerTempTable("trans")
users.registerTempTable('use')

transactions.select().groupby('date')
out=transactions.join(users, transactions.id == users.id).orderBy('amount_paid',ascending=False) 
out.show(10)

+----------+-----------+--------------------+----------+-----------------+--------------------+--------------------+
|        id|amount_paid|                date|        id|             name|               email|               phone|
+----------+-----------+--------------------+----------+-----------------+--------------------+--------------------+
|1093225999|    $999.99|2015-03-04 00:00:...|1093225999|   Landri Fulshur|landri.fulshur@me...|(898) 198-1781,(6...|
| 225990677|    $999.99|2014-07-11 00:00:...| 225990677|    Andrian Waite|andrian.waite@gma...|                null|
| 197275390|    $999.99|2014-09-09 00:00:...| 197275390|    Kianu Dyneley|kianu.dyneley@gma...|                null|
| 504736332|    $999.99|2015-01-10 00:00:...| 504736332|      Raziel Merk|raziel.merk@faceb...|(275) 456-4661,(7...|
| 420754422|    $999.98|2015-11-23 00:00:...| 420754422|   Vishwak Farrow|vishwak.farrow@me...|(979) 784-6613,(9...|
|1378643543|    $999.98|2014-04-04 00:00:...|1378643543|   Zasia

In [None]:
spark.sql("SELECT use.name, trans.amount_paid, FROM trans,use JOIN use.id = trans.id ORDER BY trans.amount_paid DESC LIMIT 10")