<a href="https://colab.research.google.com/github/sabaripkumar/digipen/blob/main/CET3052_Colab_PySpark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1><center>Introduction to DataFrame-based PySpark Functions</center></h1>

## Objective
The objective of this notebook is to:
<li>Understand SQL operations. </li>


## Installing Spark

Install Dependencies


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=d2b66347d39ed2afb2c79e644be0a1c734e5821289e5e12ac5abc6526e22d930
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark


Set Environment Variables:

In [None]:
# PySpark applications start with initializing SparkSession which is the entry
# point of PySpark as below. In case of running it in PySpark shell via pyspark
# executable, the shell automatically creates the session in the variable spark
# for users.

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.truncate", 0)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

print(spark)

<pyspark.sql.session.SparkSession object at 0x7e536425a950>


<a id='spark-sql'></a>
## Spark SQL

SQL has been around since the 1970s. A large number of developers have mastered it and familar with its usage. As Big Data came into the landscape, new computing platforms have been created but they all chose to support SQL.

>Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

Basically, what you need to know is that Spark SQL is used to execute SQL queries on big data. Spark SQL can also be used to read data from Hive tables and views.


In [None]:
!wget https://raw.githubusercontent.com/annesjyu/dataengr2023/main/bank_marketing_1000.csv

--2024-08-08 07:50:08--  https://raw.githubusercontent.com/annesjyu/dataengr2023/main/bank_marketing_1000.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 117439 (115K) [text/plain]
Saving to: ‘bank_marketing_1000.csv’


2024-08-08 07:50:09 (12.5 MB/s) - ‘bank_marketing_1000.csv’ saved [117439/117439]



In [None]:
bank_filename = "bank_marketing_1000.csv"

# Load data and create a Spark DataFrame
df = spark.read.csv(bank_filename, header=True, sep=",")

print(f"type(df) = {type(df)}")

type(df) = <class 'pyspark.sql.dataframe.DataFrame'>


## Create A Temp View

In [None]:
# Register Temporary Table
df.createOrReplaceTempView("marketing")

# Select count of data in table
spark.sql("select count(*) as total_count from marketing").show()

+-----------+
|total_count|
+-----------+
|        999|
+-----------+



In [None]:
# Select all data from temp table
sql_df = spark.sql("select * from marketing limit 5")

sql_df.show()

+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|      job|marital|  education|default|housing|loan|  contact|month|day_of_week|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---+---------+-------+-----------+-------+-------+----+---------+-----+-----------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 56|housemaid|married|   basic.4y|     no|     no|  no|telephone|  may|        mon|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
| 57| services|married|high.school|unknown|     no|  no|telephone|  may|        mon|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
| 37| services|married|high.school|     no|    yes|  no

In [None]:
# Calculate the average age of customers
spark.sql("select round(avg(age)) as avg_age from marketing").show()

# Calculate the average age according to job
spark.sql("select job, round(avg(age)) as avg_age from marketing group by job").show()

+-------+
|avg_age|
+-------+
|   42.0|
+-------+

+-------------+-------+
|          job|avg_age|
+-------------+-------+
|   management|   46.0|
|      retired|   54.0|
|      unknown|   47.0|
|self-employed|   42.0|
|      student|   32.0|
|  blue-collar|   42.0|
| entrepreneur|   45.0|
|       admin.|   41.0|
|   technician|   42.0|
|     services|   39.0|
|    housemaid|   45.0|
|   unemployed|   42.0|
+-------------+-------+



## Create a Global View

Temporary views in Spark SQL are **session-scoped** and will *disappear* if the session that creates it terminates. If you want to have a temporary view shared among all sessions and keep alive until the Spark application closes, you can create a **global temporary view**. The global temporary view is tied to a system-preserved database *global_temp*, and we must use the qualified name to refer to it, e.g. `SELECT * FROM. global_temp.view1`.

In [None]:
# Check if the global temporary view exists
try:
  # Drop the global temporary view to contain customers from service sector.
  spark.catalog.dropGlobalTempView("global_marketing")
  print(f"Global temporary view global_marketing has been dropped.")
  spark.sql("CREATE GLOBAL TEMPORARY VIEW global_marketing AS SELECT * FROM marketing WHERE job = 'services'")
except:
  pass

spark.sql("SELECT * FROM global_temp.global_marketing").show()
spark.sql("SELECT COUNT(*) FROM global_temp.global_marketing").show()

Global temporary view global_marketing has been dropped.
+---+--------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
|age|     job| marital|          education|default|housing|loan|  contact|month|day_of_week|campaign|pdays|previous|   poutcome|emp.var.rate|cons.price.idx|cons.conf.idx|euribor3m|nr.employed|  y|
+---+--------+--------+-------------------+-------+-------+----+---------+-----+-----------+--------+-----+--------+-----------+------------+--------------+-------------+---------+-----------+---+
| 57|services| married|        high.school|unknown|     no|  no|telephone|  may|        mon|       1|  999|       0|nonexistent|         1.1|        93.994|        -36.4|    4.857|     5191.0| no|
| 37|services| married|        high.school|     no|    yes|  no|telephone|  may|        mon|       1|  999|       0|nonexistent|         1.1|        93.994