## SQL at Scale with Spark SQL

Welcome to the SQL mini project. For this project, you will use the Domino Data Lab Platform and work through a series of exercises using Spark SQL. The dataset size may not be too big but the intent here is to familiarize yourself with the Spark SQL interface which scales easily to huge datasets, without you having to worry about changing your SQL queries. 

The data you need is present in the mini-project folder in the form of three CSV files. You need to make sure that these datasets are uploaded and present in the same directory as this notebook file, since we will be importing these files in Spark and create the following tables under the __`country_club`__ database using Spark SQL.

1. The __`bookings`__ table,
2. The __`facilities`__ table, and
3. The __`members`__ table.

You will be uploading these datasets shortly into Spark to understand how to create a database within minutes! Once the database and the tables are populated, you will be focusing on the mini-project questions.

In the mini project, you'll be asked a series of questions. You can solve them using the Domino platform, but for the final deliverable, please download this notebook as an IPython notebook (__`File -> Export -> IPython Notebook`__) and upload it to your GitHub.

# Checking Existence of Spark Environment Variables

Make sure your notebook is loaded using a PySpark Workspace. If you open up a regular Jupyter workspace the following variables might not exist

In [4]:
spark

In [5]:
sqlContext

<pyspark.sql.context.SQLContext at 0x7f626d10c668>

### Run the following if you failed to open a notebook in the PySpark Workspace

This will work assuming you are using Spark in the cloud on domino or you might need to configure with your own spark instance if you are working offline

In [6]:
if 'sc' not in locals():
    from pyspark.context import SparkContext
    from pyspark.sql.context import SQLContext
    from pyspark.sql.session import SparkSession
    
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    spark = SparkSession(sc)

# Create a utility function to run SQL commands

Instead of typing the same python functions repeatedly, we build a small function where you can just pass your query to get results.

- Remember we are using Spark SQL in PySpark
- We can't run multiple SQL statements in one go (no semi-colon ';' separated SQL statements)
- We can run multi-line SQL queries (but still has to be a single statement)

In [7]:
def run_sql(statement):
    try:
        result = sqlContext.sql(statement)
    except Exception as e:
        print(e.desc, '\n', e.stackTrace)
        return
    return result

# Creating the Database

We will first create our database in which we will be creating our three tables of interest

In [8]:
run_sql('drop database if exists country_club cascade')
run_sql('create database country_club')
dbs = run_sql('show databases')
dbs.toPandas()

Unnamed: 0,databaseName
0,country_club
1,default


# Creating the Tables

In this section, we will be creating the three tables of interest and populate them with the data from the CSV files already available to you.

To get started, first make sure you have already uploaded the three CSV files and they are present in the same directory as the notebook.

Once you have done this, please remember to execute the following code to build the dataframes which will be saved as tables in our database

In [9]:
# File location and type
file_location_bookings = "./Bookings.csv"
file_location_facilities = "./Facilities.csv"
file_location_members = "./Members.csv"

file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bookings_df = (spark.read.format(file_type) 
                    .option("inferSchema", infer_schema) 
                    .option("header", first_row_is_header) 
                    .option("sep", delimiter) 
                    .load(file_location_bookings))

facilities_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_facilities))

members_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_members))

### Viewing the dataframe schemas

We can take a look at the schemas of our potential tables to be written to our database soon

In [10]:
print('Bookings Schema')
bookings_df.printSchema()
print('Facilities Schema')
facilities_df.printSchema()
print('Members Schema')
members_df.printSchema()

Bookings Schema
root
 |-- bookid: integer (nullable = true)
 |-- facid: integer (nullable = true)
 |-- memid: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- slots: integer (nullable = true)

Facilities Schema
root
 |-- facid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- membercost: double (nullable = true)
 |-- guestcost: double (nullable = true)
 |-- initialoutlay: integer (nullable = true)
 |-- monthlymaintenance: integer (nullable = true)

Members Schema
root
 |-- memid: integer (nullable = true)
 |-- surname: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- telephone: string (nullable = true)
 |-- recommendedby: integer (nullable = true)
 |-- joindate: timestamp (nullable = true)



# Create permanent tables
We will be creating three permanent tables here in our __`country_club`__ database as we discussed previously with the following code

In [12]:
permanent_table_name_bookings = "country_club.Bookings"
bookings_df.write.format("parquet").saveAsTable(permanent_table_name_bookings)

permanent_table_name_facilities = "country_club.Facilities"
facilities_df.write.format("parquet").saveAsTable(permanent_table_name_facilities)

permanent_table_name_members = "country_club.Members"
members_df.write.format("parquet").saveAsTable(permanent_table_name_members)

### Refresh tables and check them

In [13]:
run_sql('use country_club')
run_sql('REFRESH table bookings')
run_sql('REFRESH table facilities')
run_sql('REFRESH table members')
tbls = run_sql('show tables')
tbls.toPandas()

Unnamed: 0,database,tableName,isTemporary
0,country_club,bookings,False
1,country_club,facilities,False
2,country_club,members,False


# Test a sample SQL query

__Note:__ You can use multi-line SQL queries (but still a single statement) as follows

In [14]:
result = run_sql('''
                    SELECT * 
                    FROM bookings 
                    LIMIT 3
                 ''')
result.toPandas()

Unnamed: 0,bookid,facid,memid,starttime,slots
0,0,3,1,2012-07-03 11:00:00,2
1,1,4,1,2012-07-03 08:00:00,2
2,2,6,0,2012-07-03 18:00:00,2


# Your Turn: Solve the following questions with Spark SQL

- Make use of the `run_sql(...)` function as seen in the previous example
- You can write multi-line SQL queries but it has to be a single statement (no use of semi-colons ';')
- Make use of the `toPandas()` function as depicted in the previous example to display the query results

#### Q1: Some of the facilities charge a fee to members, but some do not. Please list the names of the facilities that do.

In [15]:
result = run_sql('''
                   select * from facilities where membercost!=0
                 ''')
result.toPandas()

Unnamed: 0,facid,name,membercost,guestcost,initialoutlay,monthlymaintenance
0,0,Tennis Court 1,5.0,25.0,10000,200
1,1,Tennis Court 2,5.0,25.0,8000,200
2,4,Massage Room 1,9.9,80.0,4000,3000
3,5,Massage Room 2,9.9,80.0,4000,3000
4,6,Squash Court,3.5,17.5,5000,80


####  Q2: How many facilities do not charge a fee to members?

In [8]:
result = run_sql('''
                   select count(*) from facilities where membercost==0
                 ''')
result.toPandas()

Unnamed: 0,count(1)
0,4


#### Q3: How can you produce a list of facilities that charge a fee to members, where the fee is less than 20% of the facility's monthly maintenance cost? 
#### Return the facid, facility name, member cost, and monthly maintenance of the facilities in question.

In [30]:
result = run_sql('''
                   select facid,name,membercost,monthlymaintenance from facilities where membercost<monthlymaintenance*20/100 
                ''')
result.toPandas()

Unnamed: 0,facid,name,membercost,monthlymaintenance
0,0,Tennis Court 1,5.0,200
1,1,Tennis Court 2,5.0,200
2,2,Badminton Court,0.0,50
3,3,Table Tennis,0.0,10
4,4,Massage Room 1,9.9,3000
5,5,Massage Room 2,9.9,3000
6,6,Squash Court,3.5,80
7,7,Snooker Table,0.0,15
8,8,Pool Table,0.0,15


#### Q4: How can you retrieve the details of facilities with ID 1 and 5? Write the query without using the OR operator.

In [33]:
result = run_sql('''
                   select * from facilities where facid in ('1','5')
                ''')
result.toPandas()

Unnamed: 0,facid,name,membercost,guestcost,initialoutlay,monthlymaintenance
0,1,Tennis Court 2,5.0,25.0,8000,200
1,5,Massage Room 2,9.9,80.0,4000,3000


#### Q5: How can you produce a list of facilities, with each labelled as 'cheap' or 'expensive', depending on if their monthly maintenance cost is more than $100? 
#### Return the name and monthly maintenance of the facilities in question.

In [30]:
result = run_sql('''
                   select facid,name,monthlymaintenance,
                   CASE 
                       WHEN monthlymaintenance<100 THEN 'cheap'
                       when monthlymaintenance>100 THEN 'expensive'
                   END AS monthly_categ 
                   from facilities
                 ''')
result.toPandas()

Unnamed: 0,facid,name,monthlymaintenance,monthly_categ
0,0,Tennis Court 1,200,expensive
1,1,Tennis Court 2,200,expensive
2,2,Badminton Court,50,cheap
3,3,Table Tennis,10,cheap
4,4,Massage Room 1,3000,expensive
5,5,Massage Room 2,3000,expensive
6,6,Squash Court,80,cheap
7,7,Snooker Table,15,cheap
8,8,Pool Table,15,cheap


#### Q6: You'd like to get the first and last name of the last member(s) who signed up. Do not use the LIMIT clause for your solution.

In [56]:
result = run_sql('''
                    select * from members where memid not in (select distinct(recommendedby) from members where recommendedby is not null)
                
                 ''')
result.toPandas()

Unnamed: 0,memid,surname,firstname,address,zipcode,telephone,recommendedby,joindate
0,0,GUEST,GUEST,GUEST,0,(000) 000-0000,,2012-07-01 00:00:00
1,7,Dare,Nancy,"6 Hunting Lodge Way, Boston",10383,(833) 776-4001,4.0,2012-07-25 08:59:12
2,8,Boothe,Tim,"3 Bloomsbury Close, Reading, 00234",234,(811) 433-2547,3.0,2012-07-25 16:02:35
3,10,Owen,Charles,"52 Cheshire Grove, Winchester, 28563",28563,(855) 542-5251,1.0,2012-08-03 19:42:37
4,12,Baker,Anne,"55 Powdery Street, Boston",80743,844-076-5141,9.0,2012-08-10 14:23:22
5,14,Smith,Jack,"252 Binkington Way, Boston",69302,(822) 163-3254,1.0,2012-08-10 16:22:05
6,17,Pinker,David,"5 Impreza Road, Boston",65332,811 409-6734,13.0,2012-08-16 11:32:47
7,21,Mackenzie,Anna,"64 Perkington Lane, Reading",64577,(822) 661-2898,1.0,2012-08-26 09:32:05
8,22,Coplin,Joan,"85 Bard Street, Bloomington, Boston",43533,(822) 499-2232,16.0,2012-08-29 08:32:41
9,24,Sarwin,Ramnaresh,"12 Bullington Lane, Boston",65464,(822) 413-1470,15.0,2012-09-01 08:44:42


####  Q7: How can you produce a list of all members who have used a tennis court?
- Include in your output the name of the court, and the name of the member formatted as a single column. 
- Ensure no duplicate data
- Also order by the member name.

In [37]:
result = run_sql('''
                   select * from members where memid in (select memid from bookings where facid in (select facid from facilities where name like '%Tennis%'))
                   
                 ''')
result.toPandas()



Unnamed: 0,memid,surname,firstname,address,zipcode,telephone,recommendedby,joindate
0,0,GUEST,GUEST,GUEST,0,(000) 000-0000,,2012-07-01 00:00:00
1,1,Smith,Darren,"8 Bloomsbury Close, Boston",4321,555-555-5555,,2012-07-02 12:02:05
2,2,Smith,Tracy,"8 Bloomsbury Close, New York",4321,555-555-5555,,2012-07-02 12:08:23
3,3,Rownam,Tim,"23 Highway Way, Boston",23423,(844) 693-0723,,2012-07-03 09:32:15
4,4,Joplette,Janice,"20 Crossing Road, New York",234,(833) 942-4710,1.0,2012-07-03 10:25:05
5,5,Butters,Gerald,"1065 Huntingdon Avenue, Boston",56754,(844) 078-4130,1.0,2012-07-09 10:44:09
6,6,Tracy,Burton,"3 Tunisia Drive, Boston",45678,(822) 354-9973,,2012-07-15 08:52:55
7,7,Dare,Nancy,"6 Hunting Lodge Way, Boston",10383,(833) 776-4001,4.0,2012-07-25 08:59:12
8,8,Boothe,Tim,"3 Bloomsbury Close, Reading, 00234",234,(811) 433-2547,3.0,2012-07-25 16:02:35
9,9,Stibbons,Ponder,"5 Dragons Way, Winchester",87630,(833) 160-3900,6.0,2012-07-25 17:09:05


#### Q8: How can you produce a list of bookings on the day of 2012-09-14 which will cost the member (or guest) more than $30? 

- Remember that guests have different costs to members (the listed costs are per half-hour 'slot')
- The guest user's ID is always 0. 

#### Include in your output the name of the facility, the name of the member formatted as a single column, and the cost.

- Order by descending cost, and do not use any subqueries.

In [170]:
result = run_sql('''
                   
                   select facid,memid,starttime,slots,membercost,guestcost,
                   IF(memid=0,slots*guestcost,slots*membercost) AS total_cost
                   from bookings join facilities using(facid) where bookings.starttime like '%2012-09-14%'
                   and IF(memid=0,slots*guestcost,slots*membercost) >30
                   
                 ''')
result.toPandas()

Unnamed: 0,facid,memid,starttime,slots,membercost,guestcost,total_cost
0,0,0,2012-09-14 16:00:00,3,5.0,25.0,75.0
1,0,0,2012-09-14 19:00:00,3,5.0,25.0,75.0
2,1,0,2012-09-14 14:00:00,3,5.0,25.0,75.0
3,1,0,2012-09-14 17:00:00,6,5.0,25.0,150.0
4,4,0,2012-09-14 09:00:00,2,9.9,80.0,160.0
5,4,0,2012-09-14 13:00:00,2,9.9,80.0,160.0
6,4,13,2012-09-14 14:00:00,4,9.9,80.0,39.6
7,4,0,2012-09-14 16:00:00,2,9.9,80.0,160.0
8,5,0,2012-09-14 11:00:00,4,9.9,80.0,320.0
9,6,0,2012-09-14 09:30:00,4,3.5,17.5,70.0


#### Q9: This time, produce the same result as in Q8, but using a subquery.

In [166]:
result = run_sql('''
                   
                   select facid,memid,starttime,slots,membercost,guestcost,total_cost from (select facid,memid,starttime,slots,membercost,guestcost,
                   case memid
                       when 0 then slots*guestcost
                       else slots*membercost
                   end as total_cost
                   from bookings join facilities using(facid) where bookings.starttime like '%2012-09-14%') where total_cost>30.0
                   
                 ''')
result.toPandas()

Unnamed: 0,facid,memid,starttime,slots,membercost,guestcost,total_cost
0,0,0,2012-09-14 16:00:00,3,5.0,25.0,75.0
1,0,0,2012-09-14 19:00:00,3,5.0,25.0,75.0
2,1,0,2012-09-14 14:00:00,3,5.0,25.0,75.0
3,1,0,2012-09-14 17:00:00,6,5.0,25.0,150.0
4,4,0,2012-09-14 09:00:00,2,9.9,80.0,160.0
5,4,0,2012-09-14 13:00:00,2,9.9,80.0,160.0
6,4,13,2012-09-14 14:00:00,4,9.9,80.0,39.6
7,4,0,2012-09-14 16:00:00,2,9.9,80.0,160.0
8,5,0,2012-09-14 11:00:00,4,9.9,80.0,320.0
9,6,0,2012-09-14 09:30:00,4,3.5,17.5,70.0


#### Q10: Produce a list of facilities with a total revenue less than 1000.
- The output should have facility name and total revenue, sorted by revenue. 
- Remember that there's a different cost for guests and members!

In [190]:
result = run_sql('''
                 
                 select facid,name,sum(IF(memid=0,slots*guestcost,slots*membercost)) as revenue
                 from facilities join bookings using(facid)
                 group by facid,name
                 order by revenue
                 
                 ''')
result.toPandas()

Unnamed: 0,facid,name,revenue
0,3,Table Tennis,180.0
1,7,Snooker Table,240.0
2,8,Pool Table,270.0
3,2,Badminton Court,1906.5
4,6,Squash Court,13468.0
5,0,Tennis Court 1,13860.0
6,1,Tennis Court 2,14310.0
7,5,Massage Room 2,14454.6
8,4,Massage Room 1,50351.6


In [None]:
#query to check the correctness of above result.
# result = run_sql('''
                 
#                  select 
#                  sum(IF(memid=0,slots*guestcost,slots*membercost)) as total_cost
#                  from facilities join bookings using(facid) where facid=8
                 
                 
#                  ''')
# result.toPandas()