### **1. Setup Colab Environment to run SQL queries with PySpark**

In [1]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
# !tar xf spark-3.1.1-bin-hadoop3.2.tgz
# !pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2


In [4]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

### **2. Download Datasets and Store into Spark Dataframes for this exercise**

In [5]:
!wget https://raw.githubusercontent.com/socratica/sql/master/earthquake.csv 
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
# !wget https://raw.githubusercontent.com/vincentarelbundock/Rdatasets/904227f6891e20b2bf74693e46032259edbf732f/csv/medicaldata/covid_testing.csv
!wget https://raw.github.com/vincentarelbundock/Rdatasets/master/csv/Ecdat/BudgetUK.csv


--2023-02-22 12:07:31--  https://raw.githubusercontent.com/socratica/sql/master/earthquake.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2323215 (2.2M) [text/plain]
Saving to: ‘earthquake.csv’


2023-02-22 12:07:31 (38.5 MB/s) - ‘earthquake.csv’ saved [2323215/2323215]

--2023-02-22 12:07:32--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2023-02-22 12:07:32--  https://jaco

In [6]:
EQdf = spark.read.csv('/content/earthquake.csv', header=True, inferSchema=True)  
                           # use inferSchema arg to infer proper data types of each cols (otherwise will populate string for all)
EQdf.printSchema()

root
 |-- earthquake_id: integer (nullable = true)
 |-- occurred_on: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- calculation_method: string (nullable = true)
 |-- network_id: string (nullable = true)
 |-- place: string (nullable = true)
 |-- cause: string (nullable = true)



In [7]:
EQdf.where('magnitude > 6').orderBy(['magnitude'], ascending=False).limit(10).show()  
                                                  # sort and orderBy only take keyword ascending and default is True

+-------------+-------------------+--------+---------+-----+---------+------------------+--------------------+--------------------+----------+
|earthquake_id|        occurred_on|latitude|longitude|depth|magnitude|calculation_method|          network_id|               place|     cause|
+-------------+-------------------+--------+---------+-----+---------+------------------+--------------------+--------------------+----------+
|        15909| 2004-12-26 0:58:53|   3.295|   95.982| 30.0|      9.1|                mw|official200412260...|off the west coas...|earthquake|
|        19328| 2011-03-11 5:46:24|  38.297|  142.373| 29.0|      9.1|               mww|official201103110...|near the east coa...|earthquake|
|        18756| 2010-02-27 6:34:12| -36.122|  -72.898| 22.9|      8.8|               mww|official201002270...|offshore Bio-Bio,...|earthquake|
|        16154|2005-03-28 16:09:37|   2.085|   97.108| 30.0|      8.6|               mww|official200503281...|northern Sumatra,...|earthquake|

In [8]:
# create temp table view within the spark session to use .sql method and pass the usual SQL query syntax as string arg
EQdf.createOrReplaceTempView('EQdf')
spark.sql("SELECT * FROM EQdf WHERE magnitude>6 ORDER BY magnitude DESC LIMIT 10").show()  
                    # use TempView table to perform SQL style queries 

+-------------+-------------------+--------+---------+-----+---------+------------------+--------------------+--------------------+----------+
|earthquake_id|        occurred_on|latitude|longitude|depth|magnitude|calculation_method|          network_id|               place|     cause|
+-------------+-------------------+--------+---------+-----+---------+------------------+--------------------+--------------------+----------+
|        15909| 2004-12-26 0:58:53|   3.295|   95.982| 30.0|      9.1|                mw|official200412260...|off the west coas...|earthquake|
|        19328| 2011-03-11 5:46:24|  38.297|  142.373| 29.0|      9.1|               mww|official201103110...|near the east coa...|earthquake|
|        18756| 2010-02-27 6:34:12| -36.122|  -72.898| 22.9|      8.8|               mww|official201002270...|offshore Bio-Bio,...|earthquake|
|        16154|2005-03-28 16:09:37|   2.085|   97.108| 30.0|      8.6|               mww|official200503281...|northern Sumatra,...|earthquake|

In [9]:
EQdf.where((EQdf.magnitude==6) & (EQdf.depth<15)).sort(['depth'], ascending=False).limit(5).show()

+-------------+-------------------+--------+---------+-----+---------+------------------+------------+--------------------+----------+
|earthquake_id|        occurred_on|latitude|longitude|depth|magnitude|calculation_method|  network_id|               place|     cause|
+-------------+-------------------+--------+---------+-----+---------+------------------+------------+--------------------+----------+
|        20808| 2013-11-25 7:21:18|-53.8708| -53.9107|14.83|      6.0|               mwc|  usb000l633|South Atlantic Ocean|earthquake|
|         8629|1990-01-06 21:44:56| -10.681|   92.987| 14.8|      6.0|                mw|  usp00043z4|  South Indian Ocean|earthquake|
|          527| 1970-07-29 5:50:58|  39.802|   77.679| 14.7|      6.0|                mw|iscgem794098|southern Xinjiang...|earthquake|
|         9683|1992-04-08 13:36:56| -16.784|   168.31| 14.3|      6.0|                mw|  usp00055gp|             Vanuatu|earthquake|
|        15820| 2004-10-27 1:40:50|  37.284|  138.885| 

In [10]:
spark.sql("SELECT * FROM EQdf WHERE ((magnitude == 6) AND (depth < 15 )) ORDER BY depth DESC LIMIT 5").show()
                     # .sql method use sql operators for multiple filtering conditions

+-------------+-------------------+--------+---------+-----+---------+------------------+------------+--------------------+----------+
|earthquake_id|        occurred_on|latitude|longitude|depth|magnitude|calculation_method|  network_id|               place|     cause|
+-------------+-------------------+--------+---------+-----+---------+------------------+------------+--------------------+----------+
|        20808| 2013-11-25 7:21:18|-53.8708| -53.9107|14.83|      6.0|               mwc|  usb000l633|South Atlantic Ocean|earthquake|
|         8629|1990-01-06 21:44:56| -10.681|   92.987| 14.8|      6.0|                mw|  usp00043z4|  South Indian Ocean|earthquake|
|          527| 1970-07-29 5:50:58|  39.802|   77.679| 14.7|      6.0|                mw|iscgem794098|southern Xinjiang...|earthquake|
|         9683|1992-04-08 13:36:56| -16.784|   168.31| 14.3|      6.0|                mw|  usp00055gp|             Vanuatu|earthquake|
|        15820| 2004-10-27 1:40:50|  37.284|  138.885| 

In [11]:
car = spark.read.csv('/content/cars.csv', header=True, sep=';', inferSchema=True)
print(car.count())
car.tail(5)

406


[Row(Car='Ford Mustang GL', MPG=27.0, Cylinders=4, Displacement=140.0, Horsepower=86.0, Weight=Decimal('2790'), Acceleration=15.6, Model=82, Origin='US'),
 Row(Car='Volkswagen Pickup', MPG=44.0, Cylinders=4, Displacement=97.0, Horsepower=52.0, Weight=Decimal('2130'), Acceleration=24.6, Model=82, Origin='Europe'),
 Row(Car='Dodge Rampage', MPG=32.0, Cylinders=4, Displacement=135.0, Horsepower=84.0, Weight=Decimal('2295'), Acceleration=11.6, Model=82, Origin='US'),
 Row(Car='Ford Ranger', MPG=28.0, Cylinders=4, Displacement=120.0, Horsepower=79.0, Weight=Decimal('2625'), Acceleration=18.6, Model=82, Origin='US'),
 Row(Car='Chevy S-10', MPG=31.0, Cylinders=4, Displacement=119.0, Horsepower=82.0, Weight=Decimal('2720'), Acceleration=19.4, Model=82, Origin='US')]

In [12]:
car.groupby('Model','Cylinders').agg({'MPG':'avg','Horsepower':'avg','Acceleration':'avg'}).sort('Model','Cylinders')

Model,Cylinders,avg(MPG),avg(Horsepower),avg(Acceleration)
70,4,22.125,91.125,16.1875
70,6,20.5,91.75,15.5
70,8,11.043478260869565,178.8695652173913,11.021739130434783
71,4,25.5,69.92857142857143,17.178571428571427
71,6,18.0,98.875,14.75
71,8,13.428571428571429,166.85714285714286,12.214285714285714
72,3,19.0,97.0,13.5
72,4,23.428571428571427,85.14285714285714,17.214285714285715
72,8,13.615384615384617,159.69230769230768,13.0
73,3,18.0,90.0,13.5


In [13]:
car.groupBy('Model','Cylinders').avg('MPG','Horsepower','Acceleration').sort('Model','Cylinders')

Model,Cylinders,avg(MPG),avg(Horsepower),avg(Acceleration)
70,4,22.125,91.125,16.1875
70,6,20.5,91.75,15.5
70,8,11.043478260869565,178.8695652173913,11.021739130434783
71,4,25.5,69.92857142857143,17.178571428571427
71,6,18.0,98.875,14.75
71,8,13.428571428571429,166.85714285714286,12.214285714285714
72,3,19.0,97.0,13.5
72,4,23.428571428571427,85.14285714285714,17.214285714285715
72,8,13.615384615384617,159.69230769230768,13.0
73,3,18.0,90.0,13.5


In [14]:
print(car.corr('Cylinders','Acceleration'), '\n', 
      car.corr('Cylinders','MPG'), '\n',
      car.corr('Cylinders','Horsepower'))

-0.5224515124210515 
 -0.7355626558680717 
 0.8234670042496819


In [15]:
budget = spark.read.csv('/content/BudgetUK.csv', header=True, inferSchema=True)
budget.limit(10)

_c0,wfood,wfuel,wcloth,walc,wtrans,wother,totexp,income,age,children
1,0.4272,0.1342,0.0,0.0106,0.1458,0.2822,50,130,25,2
2,0.3739,0.1686,0.0091,0.0825,0.1215,0.2444,90,150,39,2
3,0.1941,0.4056,0.0012,0.0513,0.2063,0.1415,180,230,47,2
4,0.4438,0.1258,0.0539,0.0397,0.0652,0.2716,80,100,33,2
5,0.3331,0.0824,0.0399,0.1571,0.2403,0.1473,90,100,31,1
6,0.3752,0.0481,0.117,0.021,0.0955,0.3431,70,70,24,1
7,0.2568,0.0909,0.0453,0.0153,0.0227,0.5689,140,190,46,1
8,0.4533,0.2105,0.1131,0.0161,0.0,0.207,50,100,25,1
9,0.3279,0.1053,0.1671,0.0293,0.0433,0.3272,100,260,30,1
10,0.4608,0.0612,0.023,0.0338,0.1901,0.231,90,110,41,1


In [16]:
budget.count()

1519

In [18]:
print(budget.corr('income','totexp'),'\n', 
      budget.corr('age','income'),'\n',
      budget.corr('age','totexp'),'\n',
      budget.corr('age','children'),'\n',
      budget.corr('income','children'),'\n',
      budget.corr('children','totexp')
      )

0.4487403479324104 
 0.2184940446284285 
 0.18945071841034233 
 0.008092094758592572 
 0.025439105020058497 
 0.07141528176731633


In [19]:
print(budget.corr('income','wfood'),'\n', 
      budget.corr('income','wfuel'),'\n',
      budget.corr('income','wcloth'),'\n',
      budget.corr('income','walc'),'\n',
      budget.corr('income','wtrans'),'\n',
      budget.corr('income','wother')
      )

-0.23466673649177305 
 -0.028700702561703238 
 0.07343379091366131 
 0.03894605861711469 
 0.008442806796694105 
 0.1533707200514744


In [20]:
print(budget.corr('age','wfood'),'\n', 
      budget.corr('age','wfuel'),'\n',
      budget.corr('age','wcloth'),'\n',
      budget.corr('age','walc'),'\n',
      budget.corr('age','wtrans'),'\n',
      budget.corr('age','wother')
      )

0.021403727996632888 
 -0.039518423637423604 
 0.035377204639060945 
 -0.1433995887179236 
 0.02692876149417738 
 0.02606428805055384


**References:**
1. https://www.youtube.com/watch?v=3-pnWVWyH-s&list=PPSV; https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
2. https://www.youtube.com/watch?v=YufocuHbYZo&list=PPSV; https://www.youtube.com/watch?v=PkJKzR_sClM; https://raw.githubusercontent.com/socratica/sql/master/earthquake.csv
3. https://www.youtube.com/watch?v=2qz3mDt1ibI&t=307s
4. https://spark.apache.org/docs/latest/api/python/reference/index.html