### Spark notebook ###

This notebook will only work in a Jupyter notebook or Jupyter lab session running on the cluster master node in the cloud.

Follow the instructions on the computing resources page to start a cluster and open this notebook.

**Steps**

1. Connect to the Windows server using Windows App.
2. Connect to Kubernetes.
3. Start Jupyter and open this notebook from Jupyter in order to connect to Spark.

In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Constants used to interact with Azure Blob Storage using the hdfs command or Spark

global username

username = re.sub('@.*', '', getpass.getuser())

global azure_account_name
global azure_data_container_name
global azure_user_container_name
global azure_user_token

azure_account_name = "madsstorage002"
azure_data_container_name = "campus-data"
azure_user_container_name = "campus-user"
azure_user_token = r"sp=racwdl&st=2024-09-19T08:03:31Z&se=2025-09-19T16:03:31Z&spr=https&sv=2022-11-02&sr=c&sig=kMP%2BsBsRzdVVR8rrg%2BNbDhkRBNs6Q98kYY695XMRFDU%3D"


# Functions used below

def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")

        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://localhost:{sc.uiWebUrl.split(":")[-1]}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username} (notebook)</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{username}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.driver.memory", f'{master_memory}g')
        .config("spark.executor.memory", f'{worker_memory}g')
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.kubernetes.container.image", "madsregistry001.azurecr.io/hadoop-spark:v3.3.5-openjdk-8")
        .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
        .config("spark.kubernetes.memoryOverheadFactor", "0.3")
        .config("spark.memory.fraction", "0.1")
        .config(f"fs.azure.sas.{azure_user_container_name}.{azure_account_name}.blob.core.windows.net",  azure_user_token)
        .config("spark.app.name", f"{username} (notebook)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [3]:
# Run this cell to start a spark session in this notebook

#start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)
start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1)

0,1
spark.dynamicAllocation.enabled,false
spark.fs.azure.sas.uco-user.madsstorage002.blob.core.windows.net,"""sp=racwdl&st=2024-09-19T08:00:18Z&se=2025-09-19T16:00:18Z&spr=https&sv=2022-11-02&sr=c&sig=qtg6fCdoFz6k3EJLw7dA8D3D8wN0neAYw8yG4z4Lw2o%3D"""
spark.kubernetes.driver.pod.name,spark-master-driver
spark.app.submitTime,1743799780092
spark.fs.azure.sas.campus-user.madsstorage002.blob.core.windows.net,"""sp=racwdl&st=2024-09-19T08:03:31Z&se=2025-09-19T16:03:31Z&spr=https&sv=2022-11-02&sr=c&sig=kMP%2BsBsRzdVVR8rrg%2BNbDhkRBNs6Q98kYY695XMRFDU%3D"""
spark.kubernetes.container.image.pullPolicy,IfNotPresent
spark.driver.memory,1g
spark.driver.extraJavaOptions,-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dderby.system.home=/tmp/twa78/spark/
fs.azure.sas.campus-user.madsstorage002.blob.core.windows.net,sp=racwdl&st=2024-09-19T08:03:31Z&se=2025-09-19T16:03:31Z&spr=https&sv=2022-11-02&sr=c&sig=kMP%2BsBsRzdVVR8rrg%2BNbDhkRBNs6Q98kYY695XMRFDU%3D
spark.executor.memory,1g


In [4]:
# Write your imports here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *

In [5]:
# Define the input path for daily

daily_relative_path = f'ghcnd/daily/*.csv.gz'
daily_path = f'wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/{daily_relative_path}'

print(daily_path)

wasbs://campus-data@madsstorage002.blob.core.windows.net/ghcnd/daily/*.csv.gz


In [6]:
# Use the hdfs command to explore the data in Azure Blob Storage

!hdfs dfs -ls wasbs://{azure_user_container_name}@{azure_account_name}.blob.core.windows.net/{username}/stations/

2025-04-05 10:22:22,619 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-04-05 10:22:22,879 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-04-05 10:22:22,926 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-04-05 10:22:22,926 INFO impl.MetricsSystemImpl: azure-file-system metrics system started
Found 4 items
-rw-r--r--   1 twa78 supergroup          0 2025-03-23 12:15 wasbs://campus-user@madsstorage002.blob.core.windows.net/twa78/stations/_SUCCESS
-rw-r--r--   1 twa78 supergroup    1595800 2025-03-23 12:15 wasbs://campus-user@madsstorage002.blob.core.windows.net/twa78/stations/part-00000-306a0bde-153c-4181-a159-46062114d309-c000.snappy.parquet
-rw-r--r--   1 twa78 supergroup    1608860 2025-03-23 12:15 wasbs://campus-user@madsstorage002.blob.core.windows.net/twa78/stations/part-00001-306a0bde-153c-4181-a159-46062114d309-c000.snappy.par

In [7]:
# Define the input path for stations dataframe defined in Processing
stations_relative_path = f'{username}/stations'
stations_path = f'wasbs://{azure_user_container_name}@{azure_account_name}.blob.core.windows.net/{stations_relative_path}'

print(stations_path)

wasbs://campus-user@madsstorage002.blob.core.windows.net/twa78/stations


In [8]:
# Load the stations metadata into Spark from Azure Blob Storage using spark.read.text
stations = spark.read.parquet(stations_path)

# Show the resulting DataFrame
print(type(stations))
stations.printSchema()
print(stations)
stations.show(20, False)

                                                                                

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- ID: string (nullable = true)
 |-- STATE_ID: string (nullable = true)
 |-- COUNTRY_ID: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- NETWORK: string (nullable = true)
 |-- CODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- first_year: string (nullable = true)
 |-- last_year: string (nullable = true)
 |-- distinct_elements_count: long (nullable = true)
 |-- core_elements_count: long (nullable = true)
 |-- other_elements_count: long (nullable = true)

DataFrame[ID: string, STATE_ID: string, COUNTRY_ID: string, LATITUDE: string, LONGITUDE: string, ELEVATION: string, NAME: string, NETWORK: string, CODE: string, COUNTRY: string, STATE: string, first_year: string, last_year: string, distinct_elements_count: bigint, core_elements_count: bigint

                                                                                

+-----------+--------+----------+--------+---------+---------+------------------------+-------+-----+--------------------+-----+----------+---------+-----------------------+-------------------+--------------------+
|ID         |STATE_ID|COUNTRY_ID|LATITUDE|LONGITUDE|ELEVATION|NAME                    |NETWORK|CODE |COUNTRY             |STATE|first_year|last_year|distinct_elements_count|core_elements_count|other_elements_count|
+-----------+--------+----------+--------+---------+---------+------------------------+-------+-----+--------------------+-----+----------+---------+-----------------------+-------------------+--------------------+
|ACW00011647|        |AC        |17.1333 |-61.7833 |19.      |ST JOHNS                |       |     |Antigua and Barbuda |NULL |1957      |1970     |7                      |5                  |2                   |
|AE000041196|        |AE        |25.3330 |55.5170  |34.      |SHARJAH INTER. AIRP     |GSN    |41196|United Arab Emirates|NULL |1944      |2

In [9]:
# Total number of stations
print(f'{stations.count()} stations in stations.')

129657 stations in stations.


In [9]:
# Filter stations active in 2025
active_2025 = stations.filter(F.col('last_year') >= 2025)
active_2025_count = active_2025.count()
print(f"Number of active stations in 2025: {active_2025_count}")



Number of active stations in 2025: 30735


                                                                                

In [10]:
# Group by network and count the number of stations for each network
network_station_count = stations.groupBy('NETWORK').count()

# Show the results
network_station_count.show()

+-------+------+
|NETWORK| count|
+-------+------+
|       |127229|
|    HCN|  1203|
|    GSN|   976|
|GSN HCN|    15|
|    CRN|   234|
+-------+------+



In [11]:
# Count how many stations are in the southern hemisphere (lat<0)

southern_stations = stations.filter(F.col('LATITUDE') < 0)
southern_stations_count = southern_stations.count()
print(f"Number of stations in the Southern Hemisphere: {southern_stations_count}")

Number of stations in the Southern Hemisphere: 25316


In [12]:
# Count how many stations have country containing United States but not equal to.

territory_stations = stations.filter(
    (F.col("Country") != "United States") & (F.col("Country").contains('United States'))
)
territory_stations_count = territory_stations.count()
print(f"Number of stations in US territories: {territory_stations_count}")

#territory_stations.show(20, False)

Number of stations in US territories: 414


In [13]:
# Count the number of stations by country, join with 'countries' and save to storage

# Count
country_station_counts = stations.groupBy("country").agg(
    F.count("*").alias("total_stations_in_country")
)

# Set countries input path
countries_relative_path = f'ghcnd/ghcnd-countries.txt'
countries_path = f'wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/{countries_relative_path}'

# Load the countries metadata into Spark from Azure Blob Storage using spark.read.text
countries = spark.read.text(countries_path)

# Use substring to extract parts of the string into new columns
countries = countries.withColumn("COUNTRY_ID", F.trim(F.substring("value", 1, 2))) \
                     .withColumn("COUNTRY", F.trim(F.substring("value", 4, 100)))
countries = countries.drop('value')

# Join
countries_enriched = countries.join(country_station_counts, on="country", how="left")
#countries_enriched.show(20, False)

# Define output path
output_relative_path = f'{username}/countries'
output_path = f'wasbs://{azure_user_container_name}@{azure_account_name}.blob.core.windows.net/{output_relative_path}'

# Save to output
countries_enriched.write.parquet(output_path, mode="overwrite")

25/03/23 11:34:23 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1
25/03/23 11:34:25 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1


In [15]:
# Count the number of stations by state (excluding NULL states), join with 'states' and save to storage

# Count
state_station_counts = stations.groupBy("state").agg(
    F.count("*").alias("total_stations_in_state")
)

# Set states input path
states_relative_path = f'ghcnd/ghcnd-states.txt'
states_path = f'wasbs://{azure_data_container_name}@{azure_account_name}.blob.core.windows.net/{states_relative_path}'

# Load the states metadata into Spark from Azure Blob Storage using spark.read.text
states = spark.read.text(states_path)

# Use substring to extract parts of the string into new columns
states = states.withColumn("STATE_ID", F.trim(F.substring("value", 1, 2))) \
                     .withColumn("STATE", F.trim(F.substring("value", 4, 100)))
states = states.drop('value')

# Join
states_enriched = states.join(state_station_counts, on="state", how="left")
#states_enriched.show(20, False)

# Define output path
output_relative_path = f'{username}/states'
output_path = f'wasbs://{azure_user_container_name}@{azure_account_name}.blob.core.windows.net/{output_relative_path}'

# Save to output
states_enriched.write.parquet(output_path, mode="overwrite")

25/03/23 11:35:08 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1
25/03/23 11:35:09 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1


In [14]:
# Use the hdfs command to explore the data in Azure Blob Storage

!hdfs dfs -ls wasbs://{azure_user_container_name}@{azure_account_name}.blob.core.windows.net/{username}/countries/

2025-03-23 11:35:00,578 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-03-23 11:35:00,842 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-03-23 11:35:00,891 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-03-23 11:35:00,891 INFO impl.MetricsSystemImpl: azure-file-system metrics system started
Found 2 items
-rw-r--r--   1 twa78 supergroup          0 2025-03-23 11:34 wasbs://campus-user@madsstorage002.blob.core.windows.net/twa78/countries/_SUCCESS
-rw-r--r--   1 twa78 supergroup       4954 2025-03-23 11:34 wasbs://campus-user@madsstorage002.blob.core.windows.net/twa78/countries/part-00000-a009368e-c2b5-4db3-95b7-be9543b87879-c000.snappy.parquet
2025-03-23 11:35:01,269 INFO impl.MetricsSystemImpl: Stopping azure-file-system metrics system...
2025-03-23 11:35:01,269 INFO impl.MetricsSystemImpl: azure-file-system metrics system stopped.


In [11]:
import math

# Define the Haversine function
def haversine(lat1, lon1, lat2, lon2):
    # Radius of the Earth in kilometers
    R = 6371.0
    
    # Convert latitude and longitude from degrees to radians
    lat1_rad = math.radians(lat1)
    lon1_rad = math.radians(lon1)
    lat2_rad = math.radians(lat2)
    lon2_rad = math.radians(lon2)
    
    # Differences in coordinates
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad
    
    # Haversine formula
    a = math.sin(dlat / 2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    
    # Distance in kilometers
    distance = R * c
    return distance

# Define the UDF
haversine_udf = F.udf(haversine, FloatType())

In [12]:
# Convert 'latitude' and 'longitude' from strings to floats
stations = stations.withColumn("latitude", F.col("latitude").cast("float"))
stations = stations.withColumn("longitude", F.col("longitude").cast("float"))

# Take a small subset of stations for testing
stations_subset = stations.limit(5)

# Perform a CROSS JOIN to get all pairs of stations
station_pairs = stations_subset.alias("station1").crossJoin(stations_subset.alias("station2"))

# Compute the distance for each pair of stations using the UDF
distances = station_pairs.withColumn(
    "distance_km", 
    haversine_udf(
        station_pairs["station1.LATITUDE"], station_pairs["station1.LONGITUDE"],
        station_pairs["station2.LATITUDE"], station_pairs["station2.LONGITUDE"]
    )
)

# Show the result
distances.select("station1.ID", "station2.ID", "distance_km").show()

+-----------+-----------+-----------+
|         ID|         ID|distance_km|
+-----------+-----------+-----------+
|ACW00011647|ACW00011647|        0.0|
|ACW00011647|AE000041196|  11749.993|
|ACW00011647|AEM00041194|  11740.502|
|ACW00011647|AEM00041218|  11814.122|
|ACW00011647|AG000060590|  6656.2026|
|AE000041196|ACW00011647|  11749.993|
|AE000041196|AE000041196|        0.0|
|AE000041196|AEM00041194|  17.658552|
|AE000041196|AEM00041218|  119.45143|
|AE000041196|AG000060590|   5158.444|
|AEM00041194|ACW00011647|  11740.502|
|AEM00041194|AE000041196|  17.658552|
|AEM00041194|AEM00041194|        0.0|
|AEM00041194|AEM00041218|  113.15391|
|AEM00041194|AG000060590|   5146.737|
|AEM00041218|ACW00011647|  11814.122|
|AEM00041218|AE000041196|  119.45143|
|AEM00041218|AEM00041194|  113.15391|
|AEM00041218|AEM00041218|        0.0|
|AEM00041218|AG000060590|   5206.985|
+-----------+-----------+-----------+
only showing top 20 rows



In [23]:
nz_stations = stations.where(F.col('COUNTRY') == 'New Zealand')

# Perform a CROSS JOIN to get all pairs of stations
nz_station_pairs = nz_stations.alias("station1").crossJoin(nz_stations.alias("station2"))

# Compute the distance for each pair of stations using the UDF
nz_distances = nz_station_pairs.withColumn(
    "distance_km", 
    haversine_udf(
        nz_station_pairs["station1.LATITUDE"], nz_station_pairs["station1.LONGITUDE"],
        nz_station_pairs["station2.LATITUDE"], nz_station_pairs["station2.LONGITUDE"]
    )
)

# Show the result
temp = nz_distances.select("station1.NAME", "station1.ID", "station2.NAME", "station2.ID", "distance_km").orderBy('distance_km').filter(F.col('distance_km') > 0)
nz_dist = temp.withColumnRenamed("station=2.NAME", "name2")

nz_dist.show()

25/04/05 11:01:04 WARN ExtractPythonUDFFromJoinCondition: The join condition:(haversine(LATITUDE#150, LONGITUDE#167, LATITUDE#759, LONGITUDE#757)#792 > 0.0) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.


+-------------------+-----------+-------------------+-----------+-----------+
|               NAME|         ID|               NAME|         ID|distance_km|
+-------------------+-----------+-------------------+-----------+-----------+
|WELLINGTON AERO AWS|NZM00093439|    PARAPARAUMU AWS|NZ000093417|   50.52885|
|    PARAPARAUMU AWS|NZ000093417|WELLINGTON AERO AWS|NZM00093439|   50.52885|
|WELLINGTON AERO AWS|NZM00093439|           KAIKOURA|NZM00093678|   151.0717|
|           KAIKOURA|NZM00093678|WELLINGTON AERO AWS|NZM00093439|   151.0717|
| HOKITIKA AERODROME|NZ000936150|  CHRISTCHURCH INTL|NZM00093781|  152.25804|
|  CHRISTCHURCH INTL|NZM00093781| HOKITIKA AERODROME|NZ000936150|  152.25804|
|  CHRISTCHURCH INTL|NZM00093781|           KAIKOURA|NZM00093678|  152.45882|
|           KAIKOURA|NZM00093678|  CHRISTCHURCH INTL|NZM00093781|  152.45882|
|    PARAPARAUMU AWS|NZ000093417|           KAIKOURA|NZM00093678|  199.52968|
|           KAIKOURA|NZM00093678|    PARAPARAUMU AWS|NZ000093417

In [14]:
# Count the number of rows in 'daily'

# Define the schema
schema = StructType([
    StructField("ID", StringType(), True),
    StructField("DATE", IntegerType(), True),
    StructField("ELEMENT", StringType(), True),
    StructField("VALUE", IntegerType(), True),
    StructField("M-FLAG", StringType(), True), 
    StructField("Q-FLAG", StringType(), True), 
    StructField("S-FLAG", StringType(), True),
    StructField("OBS-TIME", IntegerType(), True)
])

# Load and count    
daily = spark.read.csv(daily_path, schema=schema)
row_count = daily.count()
print(f"The total number of rows in 'daily is: {row_count}")



The total number of rows in 'daily is: 3139143397


                                                                                

In [15]:
daily.show(20,False)

+-----------+--------+-------+-----+------+------+------+--------+
|ID         |DATE    |ELEMENT|VALUE|M-FLAG|Q-FLAG|S-FLAG|OBS-TIME|
+-----------+--------+-------+-----+------+------+------+--------+
|ASN00037091|20100101|PRCP   |0    |NULL  |NULL  |a     |NULL    |
|ASN00037104|20100101|PRCP   |86   |NULL  |NULL  |a     |NULL    |
|ASN00037107|20100101|PRCP   |0    |NULL  |NULL  |a     |NULL    |
|ASN00037112|20100101|PRCP   |50   |NULL  |NULL  |a     |NULL    |
|ASN00037119|20100101|PRCP   |0    |NULL  |NULL  |a     |NULL    |
|ASN00037120|20100101|PRCP   |295  |NULL  |NULL  |a     |NULL    |
|ASN00038000|20100101|TMAX   |404  |NULL  |NULL  |a     |NULL    |
|ASN00038000|20100101|TMIN   |250  |NULL  |NULL  |a     |NULL    |
|ASN00038000|20100101|PRCP   |4    |NULL  |NULL  |a     |NULL    |
|ASN00038003|20100101|TMAX   |383  |NULL  |NULL  |a     |NULL    |
|ASN00038003|20100101|TMIN   |236  |NULL  |NULL  |a     |NULL    |
|ASN00038003|20100101|PRCP   |0    |NULL  |NULL  |a     |NULL 

In [16]:
# Obtain subset of 'daily' with only core elements, and calculate which is reported the most.

# Define the core elements (from the inventory example)
core_elements = ["TMAX", "TMIN", "PRCP", "SNOW", "SNWD"]

# Filter the 'daily' dataset to include only rows with core elements
daily_filtered = daily.filter(F.col("ELEMENT").isin(core_elements))

# Count the number of observations for each core element
element_counts = daily_filtered.groupBy("ELEMENT").count()

# Show the counts for each core element
element_counts.show()

                                                                                

+-------+----------+
|ELEMENT|     count|
+-------+----------+
|   SNWD| 300711620|
|   SNOW| 359249644|
|   TMIN| 458928768|
|   PRCP|1079767077|
|   TMAX| 460114659|
+-------+----------+





The element with the most observations is: PRCP with 1079767077 observations


                                                                                

In [18]:
# How many observations of TMIN don't have a corresponding TMAX

# Filter the 'daily' dataset for only TMAX and TMIN elements
daily_filtered = daily.filter(F.col("ELEMENT").isin(["TMAX", "TMIN"]))

# Group by ID (station) and DATE (date), and collect the set of distinct elements reported
grouped_data = daily_filtered.groupBy("ID", "DATE").agg(
    F.collect_set("ELEMENT").alias("elements")
)

# Filter rows where TMAX is reported but TMIN is missing
missing_tmin = grouped_data.filter(~F.array_contains(grouped_data["elements"], "TMIN")) \
    .filter(F.array_contains(grouped_data["elements"], "TMAX"))

# Count the number of observations where TMIN is missing for TMAX
missing_tmin_count = missing_tmin.count()

# Count the number of unique stations that contributed to these missing TMIN observations
unique_stations = missing_tmin.select("ID").distinct().count()

# Output the results
print(f"Number of observations of TMAX without a corresponding TMIN: {missing_tmin_count}")
print(f"Number of unique stations contributing to these observations: {unique_stations}")




Number of observations of TMAX without a corresponding TMIN: 10660214
Number of unique stations contributing to these observations: 28754


                                                                                

In [25]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()

25/04/05 11:04:16 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
