### Spark notebook ###

This notebook will only work in a Jupyter session running on `mathmadslinux2p`.

You can start your own Jupyter session on `mathmadslinux2p` and open this notebook in Chrome on the MADS Windows server by

**Steps**

1. Login to the MADS Windows server using https://mathportal.canterbury.ac.nz/.
2. Download or copy this notebook to your home directory.
3. Open powershell and run `ssh mathmadslinux2p`.
4. Run `start_pyspark_notebook` or `/opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999))))`.
5. Copy / paste the url provided in the shell window into Chrome on the MADS Windows server.
6. Open the notebook from the Jupyter root directory (which is your home directory).
7. Run `start_spark()` to start a spark session in the notebook.
8. Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the Spark UI.

In [23]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

### Example notebook ###

The code below provides a template for how you would use a notebook to start spark, run some code, and then stop spark.

**Steps**

- Run `start_spark()` to start a spark session in the notebook (only change the default resources when advised to do so for an exercise or assignment)
- Write and run code interactively, creating additional cells as needed.
- Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the [Spark UI](http://mathmadslinux2p.canterbury.ac.nz:8080/).

In [24]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.driver.memory,4g
spark.executor.memory,4g
spark.master,spark://masternode2:7077
spark.driver.port,38059
spark.executor.id,driver
spark.app.id,app-20240915164201-1109
spark.executor.cores,2
spark.driver.host,mathmadslinux2p.canterbury.ac.nz


In [25]:
# Import needed modules

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, StringType, FloatType

In [26]:
# The daily schema was defined in another notebook but is needed again here for processing

daily_schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Date", StringType(), True), 
    StructField("Element", StringType(), True),
    StructField("Value", FloatType(), True),
    StructField("MeasurementFlag", StringType(), True),
    StructField("QualityFlag", StringType(), True),
    StructField("SourceFlag", StringType(), True),
    StructField("ObservationTime", StringType(), True)
])

# Analysis

## Q1(a)

In [27]:
# Load the DataFrame from the Parquet file
stations_df = spark.read.parquet("./output/enriched_stations.parquet")
countries_df = spark.read.parquet("./output/countries.parquet")
states_df = spark.read.parquet("./output/states.parquet")
inventory_df = spark.read.parquet("./output/inventory.parquet")
daily_df = spark.read.parquet("./output/daily.parquet")

# Count the total number of distinct stations based on the 'ID' column
total_stations = stations_df.select("ID").distinct().count()
print(f"Total number of stations: {total_stations}")

# Stations Active in 2024
active_stations_2024 = stations_df.filter(stations_df["LastYear"] == 2024).select("ID").distinct().count()
print(f"Stations active in 2024: {active_stations_2024}")

Total number of stations: 127994
Stations active in 2024: 36516


In [28]:
# Stations in Specific Networks (GSN, HCN, CRN)
gsn_stations = stations_df.filter(stations_df["GSN_Flag"] == "GSN").select("ID").distinct().count()
hcn_stations = stations_df.filter(stations_df["HCN_CRN_Flag"] == "HCN").select("ID").distinct().count()
crn_stations = stations_df.filter(stations_df["HCN_CRN_Flag"] == "CRN").select("ID").distinct().count()

print(f"GSN stations: {gsn_stations}")
print(f"HCN stations: {hcn_stations}")
print(f"CRN stations: {crn_stations}")

# Stations in Multiple Networks
multiple_network_stations = stations_df.filter(
    (stations_df["GSN_Flag"] == "GSN") & 
    (stations_df["HCN_CRN_Flag"].isin(["HCN", "CRN"]))
).select("ID").distinct().count()

print(f"Stations in more than one network: {multiple_network_stations}")

GSN stations: 991
HCN stations: 1218
CRN stations: 234
Stations in more than one network: 15


## Q1(b)

In [29]:
# Stations in the Southern Hemisphere
southern_hemisphere_stations = stations_df.filter(stations_df["Latitude"] < 0).select("ID").distinct().count()
print(f"Stations in the Southern Hemisphere: {southern_hemisphere_stations}")

#Stations in US Territories (excluding the US)
us_territories = stations_df.filter(
    stations_df["CountryName"].contains("United States") & 
    (stations_df["CountryName"] != "United States")
)
us_territories_count = us_territories.select("ID").distinct().count()
print(f"Stations in US territories (excluding the US): {us_territories_count}")

Stations in the Southern Hemisphere: 25357
Stations in US territories (excluding the US): 399


## Q1(c) 

In [30]:
# Count Stations by Country
stations_by_country = stations_df.groupBy("CountryCode").count().withColumnRenamed("count", "StationCount")

# Join the counts with the countries DataFrame
countries_with_station_counts = countries_df.join(
    stations_by_country, 
    countries_df["Code"] == stations_by_country["CountryCode"], 
    "left"
).select(
    countries_df["Code"].alias("CountryCode"), 
    countries_df["Name"].alias("CountryName"), 
    stations_by_country["StationCount"]
).dropDuplicates()

# Show the result
show_as_html(countries_with_station_counts)
countries_with_station_counts.write.mode("overwrite").parquet("./output/countries_with_station_counts.parquet")

Unnamed: 0,CountryCode,CountryName,StationCount
0,FR,France,111
1,GY,Guyana,2
2,MI,Malawi,20
3,NE,Niue [New Zealand],1
4,PA,Paraguay,24
5,PS,Palau,12
6,RE,Reunion [France],1
7,SA,Saudi Arabia,32
8,SZ,Switzerland,10
9,IR,Iran,35


In [31]:
# Count the total number of stations in each state
stations_by_state = stations_df.groupBy("State").count().withColumnRenamed("count", "StationCount")

# Join the counts with the states DataFrame
states_with_station_counts = states_df.join(
    stations_by_state, 
    states_df["StateCode"] == stations_by_state["State"], 
    "left"
).select(
    states_df["StateCode"].alias("State"), 
    states_df["StateName"], 
    stations_by_state["StationCount"]
).dropDuplicates()

# Show the result
show_as_html(states_with_station_counts)

# Save the result
states_with_station_counts.write.mode("overwrite").parquet("./output/states_with_station_counts.parquet")

Unnamed: 0,State,StateName,StationCount
0,AS,AMERICAN SAMOA,21.0
1,GU,GUAM,29.0
2,AB,ALBERTA,1446.0
3,IL,ILLINOIS,2193.0
4,NB,NEW BRUNSWICK,309.0
5,VT,VERMONT,394.0
6,YT,YUKON TERRITORY,128.0
7,BC,BRITISH COLUMBIA,1714.0
8,KY,KENTUCKY,995.0
9,PI,PACIFIC ISLANDS,


## Q2 (a)

In [32]:
import math

# Define the Haversine formula as a Python function
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0  # Radius of the Earth in kilometers
    
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    
    a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
    c = 2 * math.asin(math.sqrt(a))
    
    return R * c

# Convert the Python function into a Spark UDF
haversine_udf = udf(haversine, DoubleType())

In [33]:
# Apply the function to compute distances between all pairs of stations in New Zealand
stations_nz = stations_df.filter(stations_df.CountryCode == "NZ")

# Select a small subset of stations to test
subset_stations_df = stations_nz.limit(3)

# Show the subset to ensure the correct stations are selected
show_as_html(subset_stations_df)

# Cross join on the small subset of stations
cross_joined_subset = subset_stations_df.alias("a").crossJoin(
    subset_stations_df.alias("b").select(
        [col(c).alias(f"b_{c}") for c in subset_stations_df.columns]
    )
)

# Compute the distance using the Haversine UDF
distances_subset_df = cross_joined_subset.withColumn(
    "Distance_km", 
    haversine_udf("a.Latitude", "a.Longitude", "b_Latitude", "b_Longitude")
)

# Show the results
show_as_html(distances_subset_df.select("ID", "b_ID", "Distance_km"))

Unnamed: 0,ID,CountryCode,Latitude,Longitude,Elevation,State,Name,GSN_Flag,HCN_CRN_Flag,WMO_ID,CountryName,StateName,FirstYear,LastYear,TotalElementsCount,CoreElementsCount,OtherElementsCount
0,NZ000933090,NZ,-39.017,174.183,32.0,,NEW PLYMOUTH AWS,GSN,,93309,New Zealand,,1944,2024,4,3,1
1,NZ000093292,NZ,-38.65,177.983,5.0,,GISBORNE AERODROME,GSN,,93292,New Zealand,,1962,2024,4,3,1
2,NZ000093844,NZ,-46.417,168.333,2.0,,INVERCARGILL AIRPOR,GSN,,93845,New Zealand,,1948,2024,4,3,1


Unnamed: 0,ID,b_ID,Distance_km
0,NZ000093292,NZ000933090,331.642103
1,NZ000093292,NZM00093110,334.360819
2,NZ000093292,NZ000093994,1111.194587
3,NZ000093844,NZ000933090,950.925285
4,NZ000093844,NZM00093110,1175.722511
5,NZ000093844,NZ000093994,2251.340913
6,NZM00093929,NZ000933090,1416.90708
7,NZM00093929,NZM00093110,1644.84014
8,NZM00093929,NZ000093994,2705.420997


## Q2 (b)

(b) Apply this function to compute the pairwise distances between all stations in New Zealand, and save the result to your output directory.
What two stations are geographically closest together in New Zealand?

In [41]:
# Cross join to get all pairs of stations, with renamed columns to avoid duplication
cross_joined_stations = stations_nz.alias("a").crossJoin(
    stations_nz.alias("b").select(
        [F.col(c).alias(f"b_{c}") for c in stations_nz.columns]  # Alias all columns from 'b'
    )
)

# Compute the distance using the UDF
distances_df = cross_joined_stations.withColumn(
    "Distance_km", 
    haversine_udf("a.Latitude", "a.Longitude", "b_Latitude", "b_Longitude")
)

# Select the required columns 
distances_df.select(
    "a.ID", "b_ID", "Distance_km"
).show()


+-----------+-----------+------------------+
|         ID|       b_ID|       Distance_km|
+-----------+-----------+------------------+
|NZ000933090|NZ000933090|               0.0|
|NZ000933090|NZ000093292| 331.6421027650992|
|NZ000933090|NZ000093844| 950.9252849528349|
|NZ000933090|NZM00093929|1416.9070798854752|
|NZ000933090|NZM00093781| 516.0307573412485|
|NZ000933090|NZ000093012|443.06207568697187|
|NZ000933090|NZ000936150| 491.5122428537933|
|NZ000933090|NZ000939450| 1553.294704214664|
|NZ000933090|NZ000093417|220.19979430792796|
|NZ000933090|NZM00093110| 230.7008604393771|
|NZ000933090|NZ000093994|1305.7041762875858|
|NZ000933090|NZM00093439|262.80637803404636|
|NZ000933090|NZ000937470| 706.9954177260232|
|NZ000933090|NZ000939870| 944.8875093692133|
|NZ000933090|NZM00093678| 380.2458664738792|
|NZ000093292|NZ000933090| 331.6421027650992|
|NZ000093292|NZ000093292|               0.0|
|NZ000093292|NZ000093844|1169.2052395763972|
|NZ000093292|NZM00093929|1604.5048627638557|
|NZ0000932

In [42]:
# Filter out pairs where the station is compared to itself
filtered_distances_df = distances_df.filter(distances_df["a.ID"] != distances_df["b_ID"])

# Find the closest pair
closest_stations = filtered_distances_df.orderBy("Distance_km").limit(1)

# Show the closest pair of stations
closest_stations.show()

+-----------+-----------+--------+---------+---------+-----+---------------+--------+------------+------+-----------+---------+---------+--------+------------------+-----------------+------------------+-----------+-------------+----------+-----------+-----------+-------+-------------------+----------+--------------+--------+-------------+-----------+-----------+----------+--------------------+-------------------+--------------------+-----------------+
|         ID|CountryCode|Latitude|Longitude|Elevation|State|           Name|GSN_Flag|HCN_CRN_Flag|WMO_ID|CountryName|StateName|FirstYear|LastYear|TotalElementsCount|CoreElementsCount|OtherElementsCount|       b_ID|b_CountryCode|b_Latitude|b_Longitude|b_Elevation|b_State|             b_Name|b_GSN_Flag|b_HCN_CRN_Flag|b_WMO_ID|b_CountryName|b_StateName|b_FirstYear|b_LastYear|b_TotalElementsCount|b_CoreElementsCount|b_OtherElementsCount|      Distance_km|
+-----------+-----------+--------+---------+---------+-----+---------------+--------+---

## Q3 (a)

In [16]:
# Count the Number of Rows in Daily
total_rows = daily_df.count()
print(f"Total number of rows in daily: {total_rows}")

Total number of rows in daily: 3119374043


## Q3(b)

In [17]:
# Filter the Core Elements
core_elements = ["PRCP", "TMAX", "TMIN", "SNOW", "SNWD"]
core_elements_df = daily_df.filter(daily_df['Element'].isin(core_elements))
show_as_html(core_elements_df)

Unnamed: 0,ID,Date,Element,Value,MeasurementFlag,QualityFlag,SourceFlag,ObservationTime
0,AE000041196,20100101,TMAX,259,,,S,
1,AE000041196,20100101,TMIN,120,,,S,
2,AEM00041194,20100101,TMAX,250,,,S,
3,AEM00041194,20100101,TMIN,168,,,S,
4,AEM00041194,20100101,PRCP,0,,,S,
5,AEM00041217,20100101,TMAX,250,,,S,
6,AEM00041217,20100101,TMIN,146,,,S,
7,AEM00041218,20100101,TMAX,265,,,S,
8,AG000060390,20100101,TMAX,180,,,S,
9,AG000060390,20100101,PRCP,0,,,S,


In [19]:
counts_by_element = core_elements_df.groupBy('Element').count()
show_as_html(counts_by_element)

# Determine which element has the most observations
max_observations = counts_by_element.orderBy(col('count').desc()).first()
print(f"Element with the most observations: {max_observations['Element']} with {max_observations['count']} observations")


Unnamed: 0,Element,count
0,SNWD,299076145
1,SNOW,356187192
2,TMIN,456739567
3,PRCP,1073530896
4,TMAX,457927581


Element with the most observations: PRCP with 1073530896 observations


## Q3(c)

In [24]:
# Create DataFrames for TMAX and TMIN
tmax_df = core_elements_df.filter(core_elements_df['Element'] == 'TMAX')
tmin_df = core_elements_df.filter(core_elements_df['Element'] == 'TMIN')

# Perform a left anti join to find TMAX without a corresponding TMIN
tmax_without_tmin_df = tmax_df.join(tmin_df, ['ID', 'Date'], 'left_anti')

# Count the number of TMAX without TMIN
tmax_without_tmin_count = tmax_without_tmin_df.count()
print(f"Number of TMAX observations without a corresponding TMIN: {tmax_without_tmin_count}")

Number of TMAX observations without a corresponding TMIN: 10567304


In [25]:
# Count the number of unique stations
unique_stations = tmax_without_tmin_df.select('ID').distinct().count()

print(f"Number of unique stations contributing to these observations: {unique_stations}")

Number of unique stations contributing to these observations: 28716


In [43]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()