# CSC-634 Graduate Research Project
## Using Spark SQL in pyspark
Author: [Yunting Chiu](https://www.linkedin.com/in/yuntingchiu)\
Email: yc6705a@american.edu


# Start Spark in Google Colab

Spark SQL is Apache Spark's module for working with structured data .PySpark is a Python interface to Apache Spark. It includes the PySpark shell for interactively examining data in a distributed environment, as well as the ability to develop Spark applications using Python APIs. Most Spark technologies, such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core, are supported by PySpark.

In [None]:
# Installation
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar -xvf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [9]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.1.1


# Create Bank Database and Insert Queries
I will only create `account` and `loan` entities as an example. The original **bank database** is from [here](https://github.com/twyunting/CSC-634_Database-Management-Systems/blob/main/Assignments/hw2/hw2_bank_database.sql), where we practiced MySQL queries in CSC-634 class.


In [12]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
# create account table
account_attr = StructType([ \
    StructField("account_number",StringType(),True), \
    StructField("branch_name",StringType(),True), \
    StructField("balance",IntegerType(),True) \
  ])

# insert the values
account_val = [("A-101","Downtown", 500),
    ('A-102', 'Perryridge', 400),
    ('A-201', 'Brighton', 900),
    ('A-215', 'Mianus', 700),
    ('A-217', 'Brighton', 750),
    ('A-222', 'Redwood', 700),
    ('A-305', 'Round Hill', 350)
  ]

account = spark.createDataFrame(data = account_val, schema = account_attr)
# account.printSchema()
account.show()

+--------------+-----------+-------+
|account_number|branch_name|balance|
+--------------+-----------+-------+
|         A-101|   Downtown|    500|
|         A-102| Perryridge|    400|
|         A-201|   Brighton|    900|
|         A-215|     Mianus|    700|
|         A-217|   Brighton|    750|
|         A-222|    Redwood|    700|
|         A-305| Round Hill|    350|
+--------------+-----------+-------+



In [13]:
# create loan table
loan_attr = StructType([ \
    StructField("loan_number",StringType(),True), \
    StructField("branch_name",StringType(),True), \
    StructField("amount",IntegerType(),True) \
  ])

# insert the values
loan_val = [("L-11","Round Hill", 900),
    ('L-14', 'Downtown', 1500),
    ('L-15', 'Perryridge', 1500),
    ('L-16', 'Perryridge', 1300),
    ('L-17', 'Downtown', 1000),
    ('L-23', 'Redwood', 2000),
    ('I-93', 'Mianus', 500)
  ]

loan = spark.createDataFrame(data = loan_val, schema = loan_attr)
# account.printSchema()
loan.show()

+-----------+-----------+------+
|loan_number|branch_name|amount|
+-----------+-----------+------+
|       L-11| Round Hill|   900|
|       L-14|   Downtown|  1500|
|       L-15| Perryridge|  1500|
|       L-16| Perryridge|  1300|
|       L-17|   Downtown|  1000|
|       L-23|    Redwood|  2000|
|       I-93|     Mianus|   500|
+-----------+-----------+------+



# Retrieval Queries
1. Find all loan numbers for loans made at the Perryridge branch with loan amounts greater than $1100.

In [15]:
# SQL
#select loan_number from loan
#where branch_name = "Perryridge" and amount > 1100;

# Spark SQL
loan.select("loan_number").where("amount > 1100 and branch_name == 'Perryridge'").show()

+-----------+
|loan_number|
+-----------+
|       L-15|
|       L-16|
+-----------+



2. Find the loan number of those loans with loan amounts between \$1,000 and \$1,500 (that is, >= \$1,000 and <= \$1,500)

In [16]:
# SQL
#select loan_number from loan
#where amount between 1000 and 1500;

# Spark SQL
loan.select("loan_number").where("amount >= 1000 and amount <= 1500").show()

+-----------+
|loan_number|
+-----------+
|       L-14|
|       L-15|
|       L-16|
|       L-17|
+-----------+



3. Using aggregate function: Find the brance_name in the loan table whose total amount is greater than 1000.

In [17]:
# SQL
# select branch_name , sum(amount) from loan
# group by branch_name
# having sum(amount) > 1000;

# Spark SQL
import pyspark.sql.functions as func
loan.groupBy("branch_name").agg(func.sum("amount")).where("sum(amount) > 1000").show()

+-----------+-----------+
|branch_name|sum(amount)|
+-----------+-----------+
| Perryridge|       2800|
|   Downtown|       2500|
|    Redwood|       2000|
+-----------+-----------+



4. PySpark inner join `account` and `loan` entities



In [18]:
# SQL
# from account inner join loan on account.branch_name = loan.branch_name;

# Spark SQL
account.join(loan, account.branch_name == loan.branch_name, "inner").show()

+--------------+-----------+-------+-----------+-----------+------+
|account_number|branch_name|balance|loan_number|branch_name|amount|
+--------------+-----------+-------+-----------+-----------+------+
|         A-102| Perryridge|    400|       L-15| Perryridge|  1500|
|         A-102| Perryridge|    400|       L-16| Perryridge|  1300|
|         A-215|     Mianus|    700|       I-93|     Mianus|   500|
|         A-305| Round Hill|    350|       L-11| Round Hill|   900|
|         A-101|   Downtown|    500|       L-14|   Downtown|  1500|
|         A-101|   Downtown|    500|       L-17|   Downtown|  1000|
|         A-222|    Redwood|    700|       L-23|    Redwood|  2000|
+--------------+-----------+-------+-----------+-----------+------+



# Update Queries and View Queries
- Spark does not support **update** and **delete** queries on dataframes. For deletion, we must use Python's external API in the code.


If the loan amount is greater than $1,000, the loan amount will be increased by 5%.

### Before the query

In [19]:
# Register the DataFrame as a global temporary view
loan.createGlobalTempView("loanTMP")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.loanTMP").show()

+-----------+-----------+------+
|loan_number|branch_name|amount|
+-----------+-----------+------+
|       L-11| Round Hill|   900|
|       L-14|   Downtown|  1500|
|       L-15| Perryridge|  1500|
|       L-16| Perryridge|  1300|
|       L-17|   Downtown|  1000|
|       L-23|    Redwood|  2000|
|       I-93|     Mianus|   500|
+-----------+-----------+------+



### After the query

In [20]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql import functions as F

# SQL
# update loan
# set amount = amount + amount * 0.05 where amount > 1000;

# Spark SQL with external functions
loan = loan.withColumn("amount", F.when(F.col("amount") > 1000, 
                      F.col("amount") + F.col("amount") * 0.05).otherwise(F.col("amount"))).show()

+-----------+-----------+------+
|loan_number|branch_name|amount|
+-----------+-----------+------+
|       L-11| Round Hill| 900.0|
|       L-14|   Downtown|1575.0|
|       L-15| Perryridge|1575.0|
|       L-16| Perryridge|1365.0|
|       L-17|   Downtown|1000.0|
|       L-23|    Redwood|2100.0|
|       I-93|     Mianus| 500.0|
+-----------+-----------+------+



# Delete Queries
### Before the query

In [21]:
account.show()

+--------------+-----------+-------+
|account_number|branch_name|balance|
+--------------+-----------+-------+
|         A-101|   Downtown|    500|
|         A-102| Perryridge|    400|
|         A-201|   Brighton|    900|
|         A-215|     Mianus|    700|
|         A-217|   Brighton|    750|
|         A-222|    Redwood|    700|
|         A-305| Round Hill|    350|
+--------------+-----------+-------+



### After the Query

In [22]:
# SQL
# delete from account where branch_name != "Downtown";

# Spark SQL with external functions 
account_updated = account.filter(F.col("branch_name") == "Downtown")
account_updated.show()

+--------------+-----------+-------+
|account_number|branch_name|balance|
+--------------+-----------+-------+
|         A-101|   Downtown|    500|
+--------------+-----------+-------+



# References
- https://spark.apache.org/docs/latest/api/python/
- https://youtu.be/5dARTeE6OpU
- https://medium.com/analytics-vidhya/ultimate-guide-for-setting-up-pyspark-in-google-colab-7637f697daf1