# SQL and Spark Recitation 2023

A relational database is one type of database. It uses a structure that allows us to identify and access data in relation to another piece of data in the database. Data in a relational database is organized into tables.

### SQL recap

Recall that SQL is a language to perform operations on tabular data (e.g. selection, projection, joins, etc…)

- We write queries in SQL to retrieve data and answer questions about it.
- Declarative Language (not procedural) - You describe what the result you want is, NOT how to obtain the result.

Using an SQL query, you can create and delete, or modify tables, as well as select, insert, and delete data from existing tables.

NOTE: The exact syntax of SQL may vary depending on the underlying database you are using. But most are very similar.


In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import sqlite3


In [None]:
!pip install SQLAlchemy==1.4.46

Collecting SQLAlchemy==1.4.46
  Downloading SQLAlchemy-1.4.46-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: SQLAlchemy
  Attempting uninstall: SQLAlchemy
    Found existing installation: SQLAlchemy 2.0.20
    Uninstalling SQLAlchemy-2.0.20:
      Successfully uninstalled SQLAlchemy-2.0.20
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ipython-sql 0.5.0 requires sqlalchemy>=2.0, but you have sqlalchemy 1.4.46 which is incompatible.[0m[31m
[0mSuccessfully installed SQLAlchemy-1.4.46


In [None]:
!pip install pandasql


Collecting pandasql
  Downloading pandasql-0.7.3.tar.gz (26 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pandasql
  Building wheel for pandasql (setup.py) ... [?25l[?25hdone
  Created wheel for pandasql: filename=pandasql-0.7.3-py3-none-any.whl size=26771 sha256=4eefe9ca3a77eb178e2164c742011e56c4bca1911ece6334c0089406ed88c966
  Stored in directory: /root/.cache/pip/wheels/e9/bc/3a/8434bdcccf5779e72894a9b24fecbdcaf97940607eaf4bcdf9
Successfully built pandasql
Installing collected packages: pandasql
Successfully installed pandasql-0.7.3


In [None]:
import pandasql as ps


In [None]:
# Set up a database
conn = sqlite3.connect('recitationTest.db')

### Our Dataset

#### Spaceship Management Database

We want to keep track of:
- Crew members and their roles (captain,scientist, etc…)
- Equipment (centrifuge, lab gloves, soldering stations, etc..).
- Hours and what days crew members worked.
- Which crew members manage which equipment.
- We want to ask questions about this data.

### Setting up our data tables
      

In [None]:
### Create Tables with Schema
# Data on Crew
conn.execute("""
DROP TABLE IF EXISTS crew
""")

conn.execute('''
CREATE TABLE crew (
crew_id INTEGER PRIMARY KEY,
name TEXT,
rank INTEGER,
role_id INTEGER
)''')

print('done')

done


In [None]:
# Roles
conn.execute("""
DROP TABLE IF EXISTS roles
""")

conn.execute('''
CREATE TABLE roles (
role_id INTEGER PRIMARY KEY,
name TEXT
)''')

print('done')

done


In [None]:
#Equipment
conn.execute("""
DROP TABLE IF EXISTS equipment
""")

conn.execute('''
CREATE TABLE equipment (
equip_id INTEGER PRIMARY KEY,
name TEXT
)''')

print('done')

done


In [None]:
#Manages
#removed id INTEGER PRIMARY KEY,

conn.execute("""
DROP TABLE IF EXISTS manages
""")

conn.execute('''
CREATE TABLE manages (
id INTEGER PRIMARY KEY,
crew_id INTEGER,
equip_id INTEGER
)
''')

print('done')

done


In [None]:
#Worklog
conn.execute("""
DROP TABLE IF EXISTS worklog
""")

conn.execute('''
CREATE TABLE worklog (
id INTEGER PRIMARY KEY,
crew_id INTEGER,
day INTEGER,
hours INTEGER
)
''')

print('done')

done


Now let's insert values into the database

In [None]:
conn.execute('''
INSERT INTO crew VALUES
(1, "Jane", 10, 1),
(2, "Dan", 9, 2),
(3, "Alex", 4, 3),
(4, "Jen", 4, 4),
(5, "Brandon", 1, NULL)
''')


conn.execute('''
INSERT INTO roles VALUES
(1, "captain"),
(2, "scientist"),
(3, "engineer"),
(4, "engineer 2")
''')


conn.execute('''
INSERT INTO equipment VALUES
(1, "Centrifuge"),
(2, "Soldering Station"),
(3, "Notebook"),
(4, "Chemical Z")
''')


conn.execute('''
INSERT INTO manages VALUES
(1, 2, 1),
(2, 3, 2),
(3, 1, 3),
(4, 2, 4),
(5, 1, 4)
''')


conn.execute('''
INSERT INTO worklog VALUES
(1, 1, 1, 10),
(2, 2, 1, 5),
(3, 3, 1, 8),
(4, 4, 1, 12),
(5, 1, 2, 5),
(6, 2, 2, 8),
(7, 3, 2, 9),
(8, 4, 2, 8),
(9, 4, 2, 2)
''')



<sqlite3.Cursor at 0x7969ed03a1c0>

### Examining the Data

In [None]:
#Load the crew table
crew_df = pd.read_sql('''SELECT *
                        FROM crew''', conn) #Selecting all columns here
crew_df

Unnamed: 0,crew_id,name,rank,role_id
0,1,Jane,10,1.0
1,2,Dan,9,2.0
2,3,Alex,4,3.0
3,4,Jen,4,4.0
4,5,Brandon,1,


In [None]:
crew_df.dtypes

crew_id      int64
name        object
rank         int64
role_id    float64
dtype: object

In [None]:
roles_df = pd.read_sql('''SELECT *
                        FROM roles''', conn) #Selecting all columns here
roles_df

Unnamed: 0,role_id,name
0,1,captain
1,2,scientist
2,3,engineer
3,4,engineer 2


In [None]:
roles_df.dtypes

role_id     int64
name       object
dtype: object

In [None]:
equipment_df = pd.read_sql('''SELECT *
                        FROM equipment''', conn) #Selecting all columns here
equipment_df

Unnamed: 0,equip_id,name
0,1,Centrifuge
1,2,Soldering Station
2,3,Notebook
3,4,Chemical Z


In [None]:
manages_df = pd.read_sql('''SELECT *
                        FROM manages''', conn) #Selecting all columns here
manages_df

Unnamed: 0,id,crew_id,equip_id
0,1,2,1
1,2,3,2
2,3,1,3
3,4,2,4
4,5,1,4


In [None]:
worklog_df = pd.read_sql('''SELECT *
                        FROM worklog''', conn) #Selecting all columns here
worklog_df

Unnamed: 0,id,crew_id,day,hours
0,1,1,1,10
1,2,2,1,5
2,3,3,1,8
3,4,4,1,12
4,5,1,2,5
5,6,2,2,8
6,7,3,2,9
7,8,4,2,8
8,9,4,2,2


Now that we have formed dataframes for our tables, we can use pandasql.
The idea of pandasql is to make Python speak SQL!
You can find more information here: https://community.alteryx.com/t5/Data-Science-Blog/pandasql-Make-python-speak-SQL/ba-p/138435


Suppose we just want to list down the names of crew members! 'Select' helps in retrieving rows and columns which we would like to see

In [None]:
### Select only the names of crew members
query_crew_names = '''SELECT name FROM crew_df'''
crew_names_df = ps.sqldf(query_crew_names, locals())
crew_names_df

Unnamed: 0,name
0,Jane
1,Dan
2,Alex
3,Jen
4,Brandon


#### Conditional Retrieval

We use the WHERE clause to apply a condition to our retrieval.

In [None]:
#Load the crew table
#####Retrieve all tuples where crew members have rank either 10 or 4 and their name starts with letter J
query_conditional = '''SELECT *
                        FROM crew_df
                        WHERE rank IN (4,10) AND name LIKE 'j%'
                        '''
crew_rank_df = ps.sqldf(query_conditional, locals()) #Selecting all columns here
crew_rank_df

Unnamed: 0,crew_id,name,rank,role_id
0,1,Jane,10,1.0
1,4,Jen,4,4.0


#### Ordering

You can order your results by values in the columns.

Let’s retrieve the equipment list in increasing lexicographic
order.

In [None]:
query_ordering2 = '''SELECT * FROM manages_df ORDER BY crew_id ASC, equip_id DESC '''
manages_order_df = ps.sqldf(query_ordering2, locals())
manages_order_df

Unnamed: 0,id,crew_id,equip_id
0,5,1,4
1,3,1,3
2,4,2,4
3,1,2,1
4,2,3,2


Use DESC for descending.

You can order by multiple columns. List from highest priority to least. If there is an equal value in a column, the next one in the list will be used. E.g. ORDER BY name, id

#### Distinct Values

You can retrieve a unique set of values only. For example, let’s retrieve a list of all ranks that are assigned to our crew members (without any duplicates).

In [None]:
query_allRanks = '''SELECT rank FROM crew_df '''
all_ranks_df = ps.sqldf(query_allRanks, locals())
all_ranks_df

Unnamed: 0,rank
0,10
1,9
2,4
3,4
4,1


In [None]:
query_distinctRanks = '''SELECT DISTINCT rank FROM crew_df '''
distinct_ranks_df = ps.sqldf(query_distinctRanks, locals())
distinct_ranks_df

Unnamed: 0,rank
0,10
1,9
2,4
3,1


#### Null Values

Unless you specify in the schema (e.g. when creating the table), all values could take on NULL (except for primary key).

In [None]:
query_null = '''SELECT * FROM crew_df WHERE role_id IS NULL'''
null_row_df = ps.sqldf(query_null, locals())
null_row_df

Unnamed: 0,crew_id,name,rank,role_id
0,5,Brandon,1,


### Relationships

Tables have relationships amongst themselves.

One to One: A record in a table is associated with one and only one record in another table. (Crew members will be assigned only one role )

One to Many: A record in a table is associated with more than one record in another table. (A crew member can have multiple records in worklog entries)

Many to Many: Multiple records in a table are associated with multiple records in another table (Crew members can manage multiple equipments, and equipments can be managed by multiple crew members)

<p align = "center">
<img src = "https://imgur.com/5kbMODk.png" width= "900" align ="center"/>

#### Joins

What are the roles of the crew members?

In [None]:
crew_df

Unnamed: 0,crew_id,name,rank,role_id
0,1,Jane,10,1.0
1,2,Dan,9,2.0
2,3,Alex,4,3.0
3,4,Jen,4,4.0
4,5,Brandon,1,


In [None]:
roles_df

Unnamed: 0,role_id,name
0,1,captain
1,2,scientist
2,3,engineer
3,4,engineer 2


In [None]:
query_cremember_role = '''SELECT crew_df.name, roles_df.name
                        FROM crew_df JOIN roles_df ON crew_df.role_id=roles_df.role_id'''
crewMember_role_df = ps.sqldf(query_cremember_role, locals())

crewMember_role_df

Unnamed: 0,name,name.1
0,Jane,captain
1,Dan,scientist
2,Alex,engineer
3,Jen,engineer 2


Find the equipments handled by the crew members

In [None]:
query_manyTomany = '''SELECT c.name AS name, e.name AS equipment
                      FROM crew_df AS c
                       JOIN manages_df AS m ON c.crew_id=m.crew_id
                         JOIN equipment_df AS e ON m.equip_id=e.equip_id'''


crewMember_equipment = ps.sqldf(query_manyTomany, locals())
crewMember_equipment

Unnamed: 0,name,equipment
0,Jane,Notebook
1,Jane,Chemical Z
2,Dan,Centrifuge
3,Dan,Chemical Z
4,Alex,Soldering Station


Find the percentage of all equipment maintained by each crew member. Final table should have columns = name, percentage

In [None]:
query_complex = '''SELECT c.name AS name, COUNT(*)*1.0/(SELECT COUNT(*) FROM equipment_df) as percentage
                      FROM crew_df AS c
                       JOIN manages_df AS m ON c.crew_id=m.crew_id
                         JOIN equipment_df AS e ON m.equip_id=e.equip_id
                         GROUP BY c.name'''


crewMember_equipment_percentage = ps.sqldf(query_complex, locals())
crewMember_equipment_percentage

Unnamed: 0,name,percentage
0,Alex,0.25
1,Dan,0.5
2,Jane,0.5


Find the ratio of maintained equipment and total worklog hours of each crew member. Final table should have columns = name, percentage

In [None]:
worklog_df

Unnamed: 0,id,crew_id,day,hours
0,1,1,1,10
1,2,2,1,5
2,3,3,1,8
3,4,4,1,12
4,5,1,2,5
5,6,2,2,8
6,7,3,2,9
7,8,4,2,8
8,9,4,2,2


In [None]:
query_more_complex = '''

WITH temp as (
  SELECT crew_id, SUM(hours) AS total_hours
  FROM worklog_df
  GROUP BY crew_id
)
SELECT c.name AS name, t.total_hours, COUNT(*)*1.0/t.total_hours as percentage
FROM crew_df AS c
JOIN manages_df AS m ON c.crew_id=m.crew_id
JOIN equipment_df AS e ON m.equip_id=e.equip_id
JOIN temp t ON t.crew_id = m.crew_id
GROUP BY c.name

'''


crewMember_equipment_ratio = ps.sqldf(query_more_complex, locals())
crewMember_equipment_ratio

Unnamed: 0,name,total_hours,percentage
0,Alex,17,0.058824
1,Dan,13,0.153846
2,Jane,15,0.133333


### Spark SQL

**Note**: the following cells are for Recitation 5, feel free to ignore them for now.

#### Loading Spark

***Warning:*** The following cell will take approx. 2 minutes to run!

In [None]:
!apt install libkrb5-dev
#!wget https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz  -- Kavish: Downloading from this link would cause issues when the version is archived
!wget https://archive.apache.org/dist/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!pip install findspark
!pip install pyspark==3.2.2
!apt update
!apt install gcc python-dev libkrb5-dev

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

import os
os.environ['SPARK_HOME'] = "/content/spark-3.2.2-bin-hadoop3.2"
import pyspark
from pyspark.sql import SQLContext



In [None]:
os.environ['SPARK_HOME']

In [None]:
spark = SparkSession.builder.appName('Recitation5').getOrCreate()

In [None]:
try:
    if(spark == None):
        spark = SparkSession.builder.appName('Initial').getOrCreate()
        sqlContext=SQLContext(spark)
except NameError:
    spark = SparkSession.builder.appName('Initial').getOrCreate()
    sqlContext=SQLContext(spark)


#### Conditional Retrieval

In [None]:
crew_sdf = spark.createDataFrame(crew_df)

In [None]:
crew_sdf.show()

In [None]:
crew_sdf.createOrReplaceTempView('crew_spark_table');

In [None]:
# Retrieve all tuples where crew members have rank either 10 or 4 and their name starts with letter J
crew_rank_sdf = spark.sql('SELECT * FROM crew_spark_table WHERE rank IN (4,10) AND name LIKE \'J%\'')

In [None]:
crew_rank_sdf.show()

#### Ordering

In [None]:
# Let’s retrieve the names on the equipment list in increasing lexicographic order.
equipment_sdf = spark.createDataFrame(equipment_df)
equipment_sdf.createOrReplaceTempView('equipment_spark_table')
equip_order_sdf = spark.sql('SELECT name FROM equipment_spark_table ORDER BY name ASC')
equip_order_sdf.show()

#### Where Clause and sdf.count()

In [None]:
# Retrieve the row(s) in crew_rank for Jane
crew_rank_name_row_sdf = crew_rank_sdf.where(crew_rank_sdf.name == 'Jane')
crew_rank_name_row_sdf.show()
print(crew_rank_name_row_sdf.count())

#### Aggregate Operation

In [None]:
# Find out the sum and average rank of all crew members
rankSum = crew_sdf.agg({'rank':'sum'})
rankSum.show()
avgRank = crew_sdf.agg({'rank':'avg'})
avgRank.show()

#### Renaming Column in sdf

In [None]:
# Rename the column "name" to "Equipment Name" in equipment_sdf
equipment_renamed_sdf = equipment_sdf.withColumnRenamed('name','Equipment Name')
equipment_renamed_sdf.show()

#### Grouping

In [None]:
# Find out the count of each rank, name the columns to be "rank" and "count", order by count descending
rank_count_sdf = crew_sdf.groupBy("rank").agg({'name':'count'}).withColumnRenamed('count(name)','count').orderBy(["count"],ascending = [0])
rank_count_sdf.show()

#### Creating an Empty Spark Data frame

Add address, list of classes

In [None]:
schema = StructType([
            StructField("Id", IntegerType(), False),
            StructField("Name", StringType(), True),

            StructField("Classes", ArrayType(StructType([
                        StructField("Class", StringType(), nullable=True),
                        StructField("Time", StringType(), nullable=True),
                        StructField("Room number", StringType(), nullable=True)])), nullable=True)
            ])
empty_sdf = spark.createDataFrame({}, schema)
empty_sdf.show()

In [None]:
# empty_sdf.col("Classes").type

#### Union Operation

In [None]:
conn.execute("""
DROP TABLE IF EXISTS engineering_buildings
""")

# TODO: Complete the code for DType of classes
conn.execute('''
CREATE TABLE engineering_buildings (
id INTEGER PRIMARY KEY,
Name TEXT,
Classes TEXT)
''')
conn.execute('''
INSERT INTO engineering_buildings VALUES
(1, "Skirkanich",'[{"CIS 5500", "1:45 PM", "Berger Auditorium"},{"CIS 7000", "3:30 PM", "Berger Auditorium"}]'),
(2, "Towne",'[{"CIS 5450", "1:45 PM", "Towne 100"],["CIS 1230", "3:30 PM", "Towne 319"}]'),
(3, "Levine",'[{"CIS 2340", "1:45 PM", "Levine 101"],["CIS 3450", "3:30 PM", "Levine 612"}]')
''')
#Load the Building table
building_df = pd.read_sql('''SELECT *
                        FROM engineering_buildings''', conn) #Selecting all columns here
building_sdf = spark.createDataFrame(building_df)
building_sdf.show()

In [None]:
schema = StructType([
            StructField("Id", IntegerType(), False),
            StructField("Name", StringType(), True),
            StructField("Classes", StringType(), True)
            ])
schema_sdf = spark.createDataFrame({}, schema)
schema_sdf.show()

In [None]:
buildings_sdf = schema_sdf.union(building_sdf)
buildings_sdf.show()

####Time Example

### Please download the two csv files and upload them using the upload button on the left panel!

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Please update `driver_standings_filepath` and `drivers_filepath` below with the filepaths to `driver_standings.csv` and `drivers.csv` in your Google Drive!

In [None]:
# TODO: update with appropriate filepaths to driver_standings.csv and drivers.csv in your Google Drive!
# driver_standings_filepath = '/content/drive/Shareddrives/CIS 5450 2022 Fall/Recitations/Recitation 5/driver_standings.csv'
driver_standings_filepath = '/content/driver_standings.csv'
# drivers_filepath = '/content/drive/Shareddrives/CIS 5450 2022 Fall/Recitations/Recitation 5/drivers.csv'
drivers_filepath = '/content/drivers.csv'
f1_drivers_standings_sdf = spark.read.format("csv").option("header","true").load(driver_standings_filepath)
f1_drivers_sdf = spark.read.format("csv").option("header","true").load(drivers_filepath)

We need to create a temp view of data before Spark can query it, as Spark does not persist data in memory!

In [None]:
f1_drivers_standings_sdf.createOrReplaceTempView('driver_standings')
f1_drivers_sdf.createOrReplaceTempView('drivers')

In [None]:
test = spark.sql('''
SELECT DISTINCT min(dob) as year
FROM drivers

''')
test.show()

In [None]:
f1_drivers_sdf.show()

In [None]:
f1_drivers_standings_sdf.show()

In [None]:
import time
f1_drivers_df = f1_drivers_sdf.toPandas()
f1_drivers_standings_df = f1_drivers_standings_sdf.toPandas()


In [None]:
start_time = time.time()
time_test = ps.sqldf('SELECT * from f1_drivers_standings_df WHERE driverId = 1')
time_elapsed_pandas = time.time() - start_time
time_elapsed_pandas

In [None]:
start_time = time.time()
result = spark.sql('SELECT * from driver_standings WHERE driverId = 1')
time_elapsed_spark = time.time() - start_time
time_elapsed_spark

In [None]:
driver_join_sdf = spark.sql('''
SELECT drivers.driverRef, driver_standings.wins, driver_standings.points, driver_standings.position, driver_standings.raceId FROM driver_standings
JOIN drivers
ON driver_standings.driverID = drivers.driverID
''')
driver_join_sdf.show()

#### CASE Statement

In [None]:
query = '''SELECT *,
                  CASE
                        WHEN (YEAR(dob) >= 1890 and YEAR(dob)<1900) THEN '1890-1900'
                        WHEN (YEAR(dob) >=1900 and YEAR(dob)<1910) THEN '1900-1910'
                        WHEN (YEAR(dob) >=1910 and YEAR(dob)<1920) THEN '1910-1920'
                        WHEN (YEAR(dob) >=1920 and YEAR(dob)<1930) THEN '1920-1930'
                        WHEN (YEAR(dob) >=1930 and YEAR(dob)<1940) THEN '1930-1940'
                        WHEN (YEAR(dob) >=1940 and YEAR(dob)<1950) THEN '1940-1950'
                        WHEN (YEAR(dob) >=1950 and YEAR(dob)<1960) THEN '1950-1960'
                        WHEN (YEAR(dob) >=1960 and YEAR(dob)<1970) THEN '1960-1970'
                        WHEN (YEAR(dob) >=1970 and YEAR(dob)<1980) THEN '1970-1980'
                        WHEN (YEAR(dob) >=1980 and YEAR(dob)<1990) THEN '1980-1990'
                        ELSE '1990-2000'
                   END AS Decade
           FROM drivers
           '''


# Define and save raw_hire_train_sdf

decade_sdf = spark.sql(query)
decade_sdf.createOrReplaceTempView("raw_hire_train")



In [None]:
decade_sdf.show()

#### UDF(User Defined Function)

In [None]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import IntegerType
def podium(position):

  try:
    if (position <= 3):
      podium = 1
    else:
      podium = 0
    return podium
  except:
    return None

# Register udf as a SQL function. DO NOT EDIT
spark.udf.register("Podium", podium, IntegerType())

In [None]:
pod = udf(lambda x: podium(int(x)),IntegerType())

In [None]:
driver_join_sdf.withColumn("Podium",pod(driver_join_sdf.position)).show()