In [3]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))


# Print function docstrings

help(start_spark)
help(stop_spark)
help(display_spark)
help(show_as_html)

Help on function start_spark in module __main__:

start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1)
    Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)

Help on function stop_spark in module __main__:

stop_spark()
    Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).

Help on function display_spark in module __main__:

display_spark()
    Display the status of the active Spark session if one is currently running.

Help on function show_as_html in module __main__:

show_as_html(df, n=20)
    Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n 

In [4]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.driver.memory,4g
spark.executor.memory,4g
spark.master,spark://masternode2:7077
spark.executor.id,driver
spark.sql.warehouse.dir,file:/users/home/mda205/spark-warehouse
spark.executor.cores,2
spark.driver.host,mathmadslinux2p.canterbury.ac.nz
spark.sql.shuffle.partitions,32


In [5]:
# Write your imports and code here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import textwrap
import glob

## Processing Q1

In [4]:
!hdfs dfs -ls -h hdfs:///data/ghcnd #next 4 for Q1

Found 6 items
drwxr-xr-x   - jsw93 supergroup          0 2023-03-13 15:09 hdfs:///data/ghcnd/daily
drwxr-xr-x   - jsw93 supergroup          0 2023-04-19 19:33 hdfs:///data/ghcnd/daily-uncompressed
-rw-r--r--   8 jsw93 supergroup      3.6 K 2023-03-13 14:52 hdfs:///data/ghcnd/ghcnd-countries.txt
-rw-r--r--   8 jsw93 supergroup     32.4 M 2023-03-13 14:52 hdfs:///data/ghcnd/ghcnd-inventory.txt
-rw-r--r--   8 jsw93 supergroup      1.1 K 2023-03-13 14:52 hdfs:///data/ghcnd/ghcnd-states.txt
-rw-r--r--   8 jsw93 supergroup     10.2 M 2023-03-13 14:52 hdfs:///data/ghcnd/ghcnd-stations.txt


In [5]:
!hdfs dfs -du -s -h hdfs:///data/ghcnd/daily

12.2 G  97.6 G  hdfs:///data/ghcnd/daily


In [6]:
!hdfs dfs -ls -h hdfs:///data/ghcnd/daily |wc -l

263


In [7]:
!hdfs dfs -ls -h hdfs:///data/ghcnd/daily

Found 262 items
-rw-r--r--   8 jsw93 supergroup     63.6 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1750.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1763.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.2 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1764.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1765.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1766.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1767.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.2 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1768.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1769.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1770.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2023-03-13 14:59 hdfs:///data/ghcnd/daily/1771.csv.gz
-rw-r--r--   8

## Processing Q2

Pr Q2 a

In [8]:
#Q2 a
schema_daily = StructType([
    StructField("Station_ID", StringType(), True),
    StructField("DATE", DateType(), True),
    StructField("Element", StringType(), True),
    StructField("Element_Value", IntegerType(), True),
    StructField("MEASUREMENT_FLAG", StringType(), True),
    StructField("QUALITY_FLAG", StringType(), True),
    StructField("SOURCE_FLAG", StringType(), True),
    StructField("OBSERVATION_TIME", StringType(), True),
])


Pr Q2 b

In [9]:
#Q2 b
dailyfirst1k=(
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .option("dateFormat", "yyyymmdd")
    .schema(schema_daily).load("hdfs:///data/ghcnd/daily/2023.csv.gz").limit(1000))

dailyfirst1k.show(1000)

+-----------+----------+-------+-------------+----------------+------------+-----------+----------------+
| Station_ID|      DATE|Element|Element_Value|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+----------+-------+-------------+----------------+------------+-----------+----------------+
|AE000041196|2023-01-01|   TMAX|          252|            null|        null|          S|            null|
|AE000041196|2023-01-01|   TMIN|          149|            null|        null|          S|            null|
|AE000041196|2023-01-01|   PRCP|            0|               D|        null|          S|            null|
|AE000041196|2023-01-01|   TAVG|          207|               H|        null|          S|            null|
|AEM00041194|2023-01-01|   TMAX|          255|            null|        null|          S|            null|
|AEM00041194|2023-01-01|   TMIN|          186|            null|        null|          S|            null|
|AEM00041194|2023-01-01|   PRCP|            0|

Pr Q2 c

In [10]:
#Q2 c

#Bringing in the small directories 
states=(
    spark.read.text("hdfs:///data/ghcnd/ghcnd-states.txt"))

states=states.withColumn(
    'State_Code', F.substring('value', 1,2)).withColumn(
    'State_Name', F.substring('value', 4,47))

states=states.drop(F.col("value"))
#--------
countries=(
    spark.read.text("hdfs:///data/ghcnd/ghcnd-countries.txt"))

countries=countries.withColumn(
    'Country_Code', F.substring('value', 1,2)).withColumn(
    'Country_Name', F.substring('value', 4,60))

countries=countries.drop(F.col("value"))
#--------
inventory=(
    spark.read.text("hdfs:///data/ghcnd/ghcnd-inventory.txt"))

inventory=inventory.withColumn(
    'Station_ID', F.substring('value', 1,11)).withColumn(
    'Lat', F.substring('value', 13,8)).withColumn(
    'Lon', F.substring('value', 22,9)).withColumn(
    'Element', F.substring('value', 32,4)).withColumn(
    'First_Year', F.substring('value', 37,4)).withColumn(
    'Last_Year', F.substring('value', 42,4))

inventory=inventory.drop(F.col("value"))
#------
stations=(
    spark.read.text("hdfs:///data/ghcnd/ghcnd-stations.txt"))

stations=stations.withColumn(
    'Station_ID', F.substring('value', 1,11)).withColumn(
    'Lat', F.substring('value', 13,8)).withColumn(
    'Lon', F.substring('value', 22,9)).withColumn(
    'Elevation', F.substring('value', 32, 6)).withColumn(
    'State_Code', F.substring('value', 39,2)).withColumn(
    'Station_name', F.substring('value',42,30)).withColumn(
    'GSN_flag', F.substring('value', 73,3)).withColumn(
    'HCN_flag', F.substring('value',77,3)).withColumn(
    'WMO_ID', F.substring('value', 81,5))

stations=stations.drop(F.col("value"))

In [11]:
#also part of Q2c 
print("Inventory has "+f'{inventory.count()}'+" rows of data")
print("Countries has "+f'{countries.count()}'+" rows of data")
print("States has "+f'{states.count()}'+" rows of data")
print("Stations has "+f'{stations.count()}'+" rows of data")

wmo=(stations
     .select(
         F.count(F.when(
             (F.col("WMO_ID") =="     "), 1)).alias("# Stations without a WMO_id"),
         F.count(F.when(
             (F.col("WMO_ID") !="     "), 1)).alias("# Stations with a WMO_id")))
wmo.show()

Inventory has 737925 rows of data
Countries has 219 rows of data
States has 74 rows of data
Stations has 124247 rows of data
+---------------------------+------------------------+
|# Stations without a WMO_id|# Stations with a WMO_id|
+---------------------------+------------------------+
|                     116288|                    7959|
+---------------------------+------------------------+



## Processing Q3

In [12]:
# !mkdir outputs

Pr Q3 a

In [13]:
#Q3 a
station_wCountry_Codes=stations.withColumn("Country_Code", stations.Station_ID.substr(1,2))

#Q3 b
station_and_countries=(station_wCountry_Codes
                       .join(countries, on="Country_Code", how="left"))
#Q3 c
station_country_states=(station_and_countries
                        .join(states, on='State_Code', how="left")
                        .na.fill(''))

Pr Q3 d

In [14]:
core_elements=["PRCP","SNOW","SNWD","TMAX","TMIN"]


In [15]:
#3 d
modified_inventory=(
    (inventory.drop(F.col("Lat")).drop(F.col("Lon"))
    .groupBy(["Station_ID"])
    .agg(
        F.collect_set("Element").alias("Station_Elements"),
        F.first("First_Year").alias("FirstYear"),
        F.last("Last_Year").alias("LastYear"),
        F.countDistinct("Element").alias("Element_Count")))
    .withColumn("Core_Element_Count", 
                F.size(F.array_intersect(
                    F.col("Station_Elements"), 
                    F.array([F.lit(w) for w in core_elements]))))
    .withColumn("Not_Core_Elements", 
                (F.col("Element_Count")-F.col("Core_Element_Count"))))

In [16]:
modified_inventory.show()

+-----------+--------------------+---------+--------+-------------+------------------+-----------------+
| Station_ID|    Station_Elements|FirstYear|LastYear|Element_Count|Core_Element_Count|Not_Core_Elements|
+-----------+--------------------+---------+--------+-------------+------------------+-----------------+
|AGE00147719|[TMAX, TMIN, PRCP...|     1957|    2023|            4|                 3|                1|
|ALE00100939|        [TMAX, PRCP]|     1940|    1990|            2|                 2|                0|
|AQC00914873|[WT01, TMAX, TMIN...|     1955|    1959|           12|                 5|                7|
|AR000000002|              [PRCP]|     1981|    2000|            1|                 1|                0|
|AR000875850|[TMAX, TMIN, PRCP...|     1973|    2023|            5|                 4|                1|
|ARM00087022|[TMAX, TMIN, PRCP...|     1977|    2023|            4|                 3|                1|
|ARM00087480|[TMAX, TMIN, PRCP...|     1998|    2023|  

In [18]:
# Q3 d
# im pretty sure Hazel helped me with this prcp_only bit
prcp_only = lambda s: s=="PRCP"

q=(modified_inventory
   .select(
       F.count(F.when(
           (F.col("Core_Element_Count")=="5"),1))
       .alias("# stations collecting all 5 core element"),
       F.count(F.when((
           F.forall(F.col("Station_Elements"), prcp_only))
           ,1))
       .alias("# stations only collecting Precip"))
  )

q.show()

+----------------------------------------+---------------------------------+
|# stations collecting all 5 core element|# stations only collecting Precip|
+----------------------------------------+---------------------------------+
|                                   20449|                            16272|
+----------------------------------------+---------------------------------+



Pr Q3 e

In [19]:
# Q3 e
stations=(station_country_states.join(
    modified_inventory, on="Station_ID", how="left"))

# need to remove arrays to save as csv
stations_with_array_removed=(stations
                             .withColumn('Station_Elements', F.col('Station_Elements')
                                         .cast('string')))

In [20]:
def savefile(filename, tablename):

    name = "mda205"

    data_path = f"hdfs:///user/{name}/outputs/{filename}/"


    tablename.write.mode("overwrite").csv(data_path)
    
    (tablename.write
        .option("compression", "gzip")
        .mode("overwrite")
        .csv(data_path))

In [21]:
savefile("no_array_stations", stations_with_array_removed) 

Pr Q3 f

In [22]:
# Q3 f
stations_with_dailyfirst1k=(
    stations
    .join(dailyfirst1k
          .withColumnRenamed("ID", "Station_ID"), 
          on="Station_ID", how="left"))

stations_with_dailyfirst1k.filter(F.col("Station_Name")=="").count()

0

In [23]:
stations_broadcasted = F.broadcast(stations.withColumnRenamed("ID", "Station_ID")).cache()

In [24]:
# 3 f but not actually running - probably very expensive

# stations_daily=(
#     stations_broadcasted
#     .join(daily
#           .withColumnRenamed("ID", "Station_ID"), 
#           on="Station_ID", how="left"))

In [6]:
schema_stations = StructType([
    StructField("Station_ID", StringType(), True),
    StructField("State_Code", StringType(), True),
    StructField("Country_Code", StringType(), True),
    StructField("Lat", DoubleType(), True),
    StructField("Lon", DoubleType(), True),
    StructField("Elevation", IntegerType(), True),
    StructField("Station_Name", StringType(), True),
    StructField("GSN_flag", StringType(), True),
    StructField("HCN_flag", StringType(), True),
    StructField("WMO_ID", IntegerType(), True),
    StructField("Country_Name", StringType(), True),
    StructField("State_Name", StringType(), True),
    StructField("Station_Elements", StringType(), True),
    StructField("FirstYear", IntegerType(), True),
    StructField("LastYear", IntegerType(), True),
    StructField("Element_Count", IntegerType(), True),
    StructField("Core_Element_Count", IntegerType(), True),
    StructField("Not_Core_Elements", IntegerType(), True)
])

stations=(
    spark.read.format("com.databricks.spark.csv")
    .option("header", "False")
    .option("inferSchema", "False")
    .schema(schema_stations)
    .load("hdfs:///user/mda205/outputs/no_array_stations"))

In [7]:
stations.filter(F.col("Country_Code")=="EK").count()

2

In [8]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()