## Lab 7 Exercise 2 - EDA & Spark SQL in PySpark

First configure our PySpark environment below, and then let's load the data from S3 into our Spark session. 

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

Then load the NOAA Global Historical Climatology Network (GHCN) Daily Weather Data. Documentation of the dataset: https://docs.opendata.aws/noaa-ghcn-pds/readme.html

In [2]:
df = spark.read.csv('s3://noaa-ghcn-pds/csv/by_year/2024.csv', header=True)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1714068429575_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Take a look at the data:

In [3]:
df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[ID: string, DATE: string, ELEMENT: string, DATA_VALUE: string, M_FLAG: string, Q_FLAG: string, S_FLAG: string, OBS_TIME: string]

In [4]:
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ID: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- DATA_VALUE: string (nullable = true)
 |-- M_FLAG: string (nullable = true)
 |-- Q_FLAG: string (nullable = true)
 |-- S_FLAG: string (nullable = true)
 |-- OBS_TIME: string (nullable = true)

In [5]:
df.count() # return total count of rows in dataset

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

9616274

Given that Spark DataFrames are built off of RDDs, we can still use methods like `take` to make small amounts of our data visible as list (we almost will never want to `collect` our data when working with big data, though; **ask**: Why?):

Spark SQL also includes a `show` method that makes this a bit prettier, though:

In [6]:
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------+-------+----------+------+------+------+--------+
|         ID|    DATE|ELEMENT|DATA_VALUE|M_FLAG|Q_FLAG|S_FLAG|OBS_TIME|
+-----------+--------+-------+----------+------+------+------+--------+
|AE000041196|20240101|   TMAX|       278|  null|  null|     S|    null|
|AE000041196|20240101|   TMIN|       182|  null|  null|     S|    null|
|AE000041196|20240101|   PRCP|         0|     D|  null|     S|    null|
|AE000041196|20240101|   TAVG|       236|     H|  null|     S|    null|
|AEM00041194|20240101|   TMAX|       277|  null|  null|     S|    null|
+-----------+--------+-------+----------+------+------+------+--------+
only showing top 5 rows

## Exploring Data with Spark SQL

In [7]:
import pyspark.sql.functions as F

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Convert column types
df = df.withColumn("DATE", F.col("DATE").cast("int"))
df = df.withColumn("DATA_VALUE", F.col("DATA_VALUE").cast("double"))
# convert unit of temperature
df = df.withColumn("DATA_VALUE", F.col("DATA_VALUE") / 10.0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

For the following 2 tasks, you can choose to complete using SQL queries or built-in Spark functions.

TODO 1: 
Print out the date and maximum temperature from the DataFrame (ELEMENT is "TMAX") for each date, and order by date.

In [12]:
df.createOrReplaceTempView("weather")
query = """
            SELECT DATE, MAX(DATA_VALUE) as MaxTemperature
            FROM weather
            WHERE ELEMENT = 'TMAX'
            GROUP BY DATE
            ORDER BY DATE
        """
max_temperatures = spark.sql(query)
max_temperatures.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------+
|    DATE|MaxTemperature|
+--------+--------------+
|20240101|          45.0|
|20240102|          45.5|
|20240103|          42.3|
|20240104|          41.6|
|20240105|          42.7|
|20240106|          42.4|
|20240107|          43.0|
|20240108|          47.8|
|20240109|          43.9|
|20240110|          44.3|
|20240111|          45.0|
|20240112|          43.0|
|20240113|          44.6|
|20240114|          43.1|
|20240115|          43.7|
|20240116|          43.7|
|20240117|          44.0|
|20240118|          42.0|
|20240119|          44.4|
|20240120|          46.2|
+--------+--------------+
only showing top 20 rows

TODO 2: Add another condition to filter out temperature lower than 47 degrees Celsius (note the unit of temperature in the data is tenths of degrees Celsius), and change the order to be by max temperature in descending order.

In [13]:
df.createOrReplaceTempView("weather")
query = """
            SELECT DATE, MAX(DATA_VALUE) as MaxTemperature
            FROM weather
            WHERE ELEMENT = 'TMAX'
            GROUP BY DATE
            HAVING MAX(DATA_VALUE) >= 47
            ORDER BY MaxTemperature DESC
        """
max_temperatures = spark.sql(query)
max_temperatures.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------+
|    DATE|MaxTemperature|
+--------+--------------+
|20240314|         202.2|
|20240310|          67.2|
|20240304|          52.8|
|20240417|          50.8|
|20240324|          50.0|
|20240218|          49.3|
|20240403|          48.5|
|20240108|          47.8|
|20240219|          47.7|
|20240311|          47.2|
+--------+--------------+

Use built-in methods and functions from Spark SQL to do the same thing on our Spark DataFrame directly:

In [14]:
max_temperatures = (df
                    .filter(F.col("ELEMENT") == "TMAX")  
                    .groupBy("DATE") 
                    .agg(F.max("DATA_VALUE").alias("MaxTemperature")) 
                    .orderBy("DATE"))
max_temperatures.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------+
|    DATE|MaxTemperature|
+--------+--------------+
|20240101|          45.0|
|20240102|          45.5|
|20240103|          42.3|
|20240104|          41.6|
|20240105|          42.7|
|20240106|          42.4|
|20240107|          43.0|
|20240108|          47.8|
|20240109|          43.9|
|20240110|          44.3|
|20240111|          45.0|
|20240112|          43.0|
|20240113|          44.6|
|20240114|          43.1|
|20240115|          43.7|
|20240116|          43.7|
|20240117|          44.0|
|20240118|          42.0|
|20240119|          44.4|
|20240120|          46.2|
+--------+--------------+
only showing top 20 rows

In [16]:
max_temperatures = (df
                    .filter(F.col("ELEMENT") == "TMAX") 
                    .groupBy("DATE") 
                    .agg(F.max("DATA_VALUE").alias("MaxTemperature")) 
                    .filter(F.col("MaxTemperature") >= 47) 
                    .orderBy(F.col("MaxTemperature").desc()))

max_temperatures.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------------+
|    DATE|MaxTemperature|
+--------+--------------+
|20240314|         202.2|
|20240310|          67.2|
|20240304|          52.8|
|20240417|          50.8|
|20240324|          50.0|
|20240218|          49.3|
|20240403|          48.5|
|20240108|          47.8|
|20240219|          47.7|
|20240311|          47.2|
+--------+--------------+