# GHCN Data Analysis using Spark

In this notebook, we will study some of the weather data contained in the Global Historical Climate
Network (GHCN), an integrated database of climate summaries from land surface stations around the
world. The data covers the last 175 years and is collected from more than 20 independent sources,
each of which have been subjected to quality assurance reviews.

The daily climate summaries contain records from over 100,000 stations in 180 countries and territories
around the world. There are several daily variables, including maximum and minimum temperature, total
daily precipitation, snowfall, and snow depth; however, about half of the stations report precipitation only.
The records vary by station and cover intervals ranging from less than a year to 175 years in total.
The daily climate summaries are supplemented by metadata further identifying the stations, countries,
states, and elements inventory specific to each station and time period. These provide human readable
names, geographical coordinates, elevations, and date ranges for each station variable in the inventory.

**We first use the hdfs command to explore hdfs without actually loading any data into memory.**

In [1]:
# The command line is credit for https://stackoverflow.com/questions/14515266/how-to-print-file-tree-with-hadoop

# The directory tree
!hdfs dfs -ls -R /data/ghcnd | awk '{print $8}' | sed -e 's/[^-][^\/]*\//--/g' -e 's/^/ /' -e 's/-/|/'

 |---daily
 |-----1763.csv.gz
 |-----1764.csv.gz
 |-----1765.csv.gz
 |-----1766.csv.gz
 |-----1767.csv.gz
 |-----1768.csv.gz
 |-----1769.csv.gz
 |-----1770.csv.gz
 |-----1771.csv.gz
 |-----1772.csv.gz
 |-----1773.csv.gz
 |-----1774.csv.gz
 |-----1775.csv.gz
 |-----1776.csv.gz
 |-----1777.csv.gz
 |-----1778.csv.gz
 |-----1779.csv.gz
 |-----1780.csv.gz
 |-----1781.csv.gz
 |-----1782.csv.gz
 |-----1783.csv.gz
 |-----1784.csv.gz
 |-----1785.csv.gz
 |-----1786.csv.gz
 |-----1787.csv.gz
 |-----1788.csv.gz
 |-----1789.csv.gz
 |-----1790.csv.gz
 |-----1791.csv.gz
 |-----1792.csv.gz
 |-----1793.csv.gz
 |-----1794.csv.gz
 |-----1795.csv.gz
 |-----1796.csv.gz
 |-----1797.csv.gz
 |-----1798.csv.gz
 |-----1799.csv.gz
 |-----1800.csv.gz
 |-----1801.csv.gz
 |-----1802.csv.gz
 |-----1803.csv.gz
 |-----1804.csv.gz
 |-----1805.csv.gz
 |-----1806.csv.gz
 |-----1807.csv.gz
 |-----1808.csv.gz
 |-----1809.csv.gz
 |-----1810.csv.gz
 |-----1811.csv.gz
 |-----1812.csv.gz
 |-----1813.csv.gz
 |-----1814.csv.gz
 

In [2]:
!hdfs dfs -ls -h /data/ghcnd/daily

Found 260 items
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:08 /data/ghcnd/daily/1763.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.2 K 2021-08-09 15:03 /data/ghcnd/daily/1764.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:03 /data/ghcnd/daily/1765.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 14:56 /data/ghcnd/daily/1766.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:06 /data/ghcnd/daily/1767.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.2 K 2021-08-09 15:02 /data/ghcnd/daily/1768.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:03 /data/ghcnd/daily/1769.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:07 /data/ghcnd/daily/1770.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:06 /data/ghcnd/daily/1771.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:05 /data/ghcnd/daily/1772.csv.gz
-rw-r--r--   8 jsw93 supergroup      3.3 K 2021-08-09 15:08 /data/ghcnd/daily/1773.c

-rw-r--r--   8 jsw93 supergroup    155.6 M 2022-08-08 01:19 /data/ghcnd/daily/2018.csv.gz
-rw-r--r--   8 jsw93 supergroup    154.4 M 2022-08-08 01:19 /data/ghcnd/daily/2019.csv.gz
-rw-r--r--   8 jsw93 supergroup    154.5 M 2022-08-08 01:19 /data/ghcnd/daily/2020.csv.gz
-rw-r--r--   8 jsw93 supergroup    152.2 M 2022-08-08 01:19 /data/ghcnd/daily/2021.csv.gz
-rw-r--r--   8 jsw93 supergroup     84.1 M 2022-08-08 01:20 /data/ghcnd/daily/2022.csv.gz


There is __260 years__ in the daily directory. The file size in increasing significantly overtime. The file size started from 
around __3.3 KBytes__ per file (year 1763) up to 100 times bigger in year 1870 (107 years), where the file size is __346.8 KBytes__. 
The file for each year kept expanding exponentially, and reach 100 times larger in just 34 years, the file of year 1904 has the size
of __33.2Mbytes__. The file size per year keeps increasing rapidly to it peak in year 2010, where the file size of that year is 
__221.3MBytes__. Afterthat, the file size is declining to __152.2 MBytes__ in year 2021. The file of year 2022 is only __84.1MBytes__
as the year has not completed yet. 

In [3]:
!hdfs dfs -du -s -v /data/ghcnd

SIZE         DISK_SPACE_CONSUMED_WITH_ALL_REPLICAS  FULL_PATH_NAME
16974679804  135797438432                           /data/ghcnd


In [4]:
!hdfs dfs -du -v /data/ghcnd

SIZE         DISK_SPACE_CONSUMED_WITH_ALL_REPLICAS  FULL_PATH_NAME
16930794333  135446354664                           /data/ghcnd/daily
3659         29272                                  /data/ghcnd/ghcnd-countries.txt
33384684     267077472                              /data/ghcnd/ghcnd-inventory.txt
1086         8688                                   /data/ghcnd/ghcnd-states.txt
10496042     83968336                               /data/ghcnd/ghcnd-stations.txt


The total size of all of the data is __16,974,679,804 Bytes__, and __16,930,794,333 Bytes__ is for daily

**We then define a schema for daily data based on the description in the GHCN Daily README. This schema should use the data types defined in pyspark.sql**

We define some functions to load spark in jupyter notebook

In [5]:
# Import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))


In [None]:
# Start Spark
start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

In [7]:
# Import 
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [8]:
# Core elements condition

core_elms = ['PRCP','SNOW','SNWD','TMAX','TMIN']
core_cond = F.col('ELEMENT') == core_elms[0]
for i in range(1,len(core_elms)):
    core_cond = core_cond | (F.col('ELEMENT') == core_elms[i])


mm_elms = ['TMAX','TMIN']
mm_cond = F.col('ELEMENT') == mm_elms[0]
for i in range(1,len(mm_elms)):
    mm_cond = mm_cond | (F.col('ELEMENT') == mm_elms[i])

The schema for daily showed as below:

In [9]:
# daily Schema

schema_daily = StructType([
    StructField("ID", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("ELEMENT", StringType(), True),
    StructField("VALUE", FloatType(), True),
    StructField("MEASUREMENT_FLAG", StringType(), True),
    StructField("QUALITY_FLAG", StringType(), True),
    StructField("SOURCE_FLAG", StringType(), True),
    StructField("OBSERVATION_TIME", StringType(), True),
])

We then load 1000 rows of the2022.csv.gz file into Spark by using the limit command immediately after the read command, to verify the data description accuracy

In [10]:
daily2022 = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .option("delimiter", ",")
    .schema(schema_daily)
    .load("hdfs:///data/ghcnd/daily/2022.csv.gz")
    .limit(1000)
)
show_as_html(daily2022, 5)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,MEASUREMENT_FLAG,QUALITY_FLAG,SOURCE_FLAG,OBSERVATION_TIME
0,AE000041196,20220101,TAVG,204.0,H,,S,
1,AEM00041194,20220101,TAVG,211.0,H,,S,
2,AEM00041217,20220101,TAVG,209.0,H,,S,
3,AEM00041218,20220101,TAVG,207.0,H,,S,
4,AG000060390,20220101,TAVG,121.0,H,,S,


The description of the data is partially correct. The VALUE variable seems to be integer type rather than Real type. 

**Next, we load each of stations, states, countries, and inventory into Spark as well. However, those files are in text formatting, and this format is not included in the standard spark.read library. We will use pyspark.sql.functions.substring to read files to Spark.**

In [11]:
# Loading ghcnd-countries.txt

countries = spark.read.text("/data/ghcnd/ghcnd-countries.txt")
countries = countries.select(
        countries.value.substr(1, 2).alias('CODE'),
        countries.value.substr(4, 61).alias('NAME'))

show_as_html(countries,5)

print(f'\nNumber of Rows are: {countries.count()}')

countries.printSchema()

Unnamed: 0,CODE,NAME
0,AC,Antigua and Barbuda
1,AE,United Arab Emirates
2,AF,Afghanistan
3,AG,Algeria
4,AJ,Azerbaijan



Number of Rows are: 219
root
 |-- CODE: string (nullable = true)
 |-- NAME: string (nullable = true)



In [12]:
# Loading ghcnd-inventory.txt

inventory = spark.read.text("/data/ghcnd/ghcnd-inventory.txt")
inventory = inventory.select(
    inventory.value.substr(1, 11).alias('ID'),
    inventory.value.substr(13, 8).cast(FloatType()).alias('LATITUDE'),
    inventory.value.substr(22, 8).cast(FloatType()).alias('LONGITUDE'),
    inventory.value.substr(32, 4).alias('ELEMENT'),
    inventory.value.substr(37, 4).cast(IntegerType()).alias('FIRSTYEAR'),
    inventory.value.substr(42, 4).cast(IntegerType()).alias('LASTYEAR')
)
   
show_as_html(inventory,5)

inventory.printSchema()

print(f'\nNumber of Rows are: {inventory.count()}')

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEMENT,FIRSTYEAR,LASTYEAR
0,ACW00011604,17.116699,-61.783001,TMAX,1949,1949
1,ACW00011604,17.116699,-61.783001,TMIN,1949,1949
2,ACW00011604,17.116699,-61.783001,PRCP,1949,1949
3,ACW00011604,17.116699,-61.783001,SNOW,1949,1949
4,ACW00011604,17.116699,-61.783001,SNWD,1949,1949


root
 |-- ID: string (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- ELEMENT: string (nullable = true)
 |-- FIRSTYEAR: integer (nullable = true)
 |-- LASTYEAR: integer (nullable = true)


Number of Rows are: 725754


In [13]:
# Loading ghcnd-states.txt

states = spark.read.text("/data/ghcnd/ghcnd-states.txt")
states = states.select(
    states.value.substr(1, 2).alias('CODE'),
    states.value.substr(4, 47).alias('NAME')
)

show_as_html(states,5)

states.printSchema()

print(f'\nNumber of Rows are: {states.count()}')

Unnamed: 0,CODE,NAME
0,AB,ALBERTA
1,AK,ALASKA
2,AL,ALABAMA
3,AR,ARKANSAS
4,AS,AMERICAN SAMOA


root
 |-- CODE: string (nullable = true)
 |-- NAME: string (nullable = true)


Number of Rows are: 74


In [14]:
# Loading ghcnd-stations.txt

stations = spark.read.text("/data/ghcnd/ghcnd-stations.txt")
stations = stations.select(
    stations.value.substr(1, 11).alias('ID'),
    stations.value.substr(13, 8).cast(FloatType()).alias('LATITUDE'),
    stations.value.substr(22, 8).cast(FloatType()).alias('LONGITUDE'),
    stations.value.substr(32, 6).cast(FloatType()).alias('ELEVATION'),
    stations.value.substr(39, 2).alias('STATE'),
    stations.value.substr(42, 30).alias('NAME'),
    stations.value.substr(73, 3).alias('GSN_FLAG'),
    stations.value.substr(77, 3).alias('HCN_CRN_FLAG'),
    stations.value.substr(81, 5).alias('WMO_ID'),
)

# Replace empty string with None/null
stations = (stations
            .select([
                F.when(
                (F.col(c)=="   ") | (F.col(c)=="  ") | (F.col(c)=="     "),
                None)
            .otherwise(F.col(c)).alias(c) for c in stations.columns])
           )

show_as_html(stations,5)

print(f'\nNumber of Rows are: {stations.count()}')


stations.printSchema()

# WMOID Null value count
null_count = stations.select([F.count(F.when(F.col('WMO_ID').isNull(),True))
                              .alias("Number of Stations that do not have WMO_ID")])
show_as_html(null_count)




Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID
0,ACW00011604,17.116699,-61.783001,10.1,,ST JOHNS COOLIDGE FLD,,,
1,ACW00011647,17.133301,-61.783001,19.200001,,ST JOHNS,,,
2,AE000041196,25.333,55.516998,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0
3,AEM00041194,25.254999,55.363998,10.4,,DUBAI INTL,,,41194.0
4,AEM00041217,24.433001,54.651001,26.799999,,ABU DHABI INTL,,,41217.0



Number of Rows are: 122047
root
 |-- ID: string (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- ELEVATION: float (nullable = true)
 |-- STATE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- GSN_FLAG: string (nullable = true)
 |-- HCN_CRN_FLAG: string (nullable = true)
 |-- WMO_ID: string (nullable = true)



Unnamed: 0,Number of Stations that do not have WMO_ID
0,113953


We then extract the two character country code from each station code in stations and store the output as a new column using the withColumn command.

In [15]:
stations = stations.withColumn('COUNTRY_CODE', F.substring('ID', 1,2))
show_as_html(stations,5)

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY_CODE
0,ACW00011604,17.116699,-61.783001,10.1,,ST JOHNS COOLIDGE FLD,,,,AC
1,ACW00011647,17.133301,-61.783001,19.200001,,ST JOHNS,,,,AC
2,AE000041196,25.333,55.516998,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,AE
3,AEM00041194,25.254999,55.363998,10.4,,DUBAI INTL,,,41194.0,AE
4,AEM00041217,24.433001,54.651001,26.799999,,ABU DHABI INTL,,,41217.0,AE


LEFT JOIN stations with countries using your output above

In [16]:
stations = (
  stations
    .join(
        countries.withColumnRenamed('CODE','COUNTRY_CODE')
                 .withColumnRenamed('NAME','COUNTRY'),
        on = 'COUNTRY_CODE',
        how="left"
    )
)

show_as_html(stations,5)

Unnamed: 0,COUNTRY_CODE,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY
0,AC,ACW00011604,17.116699,-61.783001,10.1,,ST JOHNS COOLIDGE FLD,,,,Antigua and Barbuda
1,AC,ACW00011647,17.133301,-61.783001,19.200001,,ST JOHNS,,,,Antigua and Barbuda
2,AE,AE000041196,25.333,55.516998,34.0,,SHARJAH INTER. AIRP,GSN,,41196.0,United Arab Emirates
3,AE,AEM00041194,25.254999,55.363998,10.4,,DUBAI INTL,,,41194.0,United Arab Emirates
4,AE,AEM00041217,24.433001,54.651001,26.799999,,ABU DHABI INTL,,,41217.0,United Arab Emirates


Then LEFT JOIN stations and states, allowing for the fact that state codes are only provided for stations in the US.

_Note: Stations from CA also have state codes_

In [17]:
stations = (
  stations
    .join(
        states.withColumnRenamed('CODE','STATE')
              .withColumnRenamed('NAME','STATE_NAME'), 
        on = 'STATE',
        how="left"
    )
)

print('Brief view on stations from US')
show_as_html(stations.filter(stations.COUNTRY_CODE == "US"),5)

print('\n\n\nBrief view on stations from CA')
show_as_html(stations.filter(stations.COUNTRY_CODE == "CA"),5)

Brief view on stations from US


Unnamed: 0,STATE,COUNTRY_CODE,ID,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY,STATE_NAME
0,SD,US,US009052008,43.733299,-96.633003,482.0,SIOUX FALLS (ENVIRON. CANADA),,,,United States,SOUTH DAKOTA
1,CO,US,US10RMHS145,40.526798,-105.111,1569.099976,RMHS 1.6 SSW,,,,United States,COLORADO
2,NE,US,US10adam001,40.568001,-98.505997,598.0,JUNIATA 1.5 S,,,,United States,NEBRASKA
3,NE,US,US10adam002,40.5093,-98.549004,601.099976,JUNIATA 6.0 SSW,,,,United States,NEBRASKA
4,NE,US,US10adam003,40.466301,-98.653,615.099976,HOLSTEIN 0.1 NW,,,,United States,NEBRASKA





Brief view on stations from CA


Unnamed: 0,STATE,COUNTRY_CODE,ID,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY,STATE_NAME
0,BC,CA,CA001010066,48.866699,-123.282997,4.0,ACTIVE PASS,,,,Canada,BRITISH COLUMBIA
1,BC,CA,CA001010235,48.400002,-123.483002,17.0,ALBERT HEAD,,,,Canada,BRITISH COLUMBIA
2,BC,CA,CA001010595,48.583302,-123.515999,85.0,BAMBERTON OCEAN CEMENT,,,,Canada,BRITISH COLUMBIA
3,BC,CA,CA001010720,48.5,-124.0,351.0,BEAR CREEK,,,,Canada,BRITISH COLUMBIA
4,BC,CA,CA001010774,48.5,-123.349998,61.0,BEAVER LAKE,,,,Canada,BRITISH COLUMBIA


Analysing the inventory files to see what was the first and last year that each station was active and collected any element at all

In [18]:
new_inventory = (inventory
                 .groupby('ID')
                     .agg(
                         F.min('FIRSTYEAR').cast(IntegerType()).alias('START_YEAR'),
                         F.max('LASTYEAR').cast(IntegerType()).alias('END_YEAR'),
                         F.countDistinct('ELEMENT').cast(IntegerType()).alias('TOTAL_NUM_ELM'),
                         # count distinct ELEMENT Code, no dropDuplicates needed
                         F.countDistinct(F.when(core_cond,F.col('ELEMENT'))).cast(IntegerType()).alias('NUM_CORE_ELM'),
                         F.collect_set(F.col('ELEMENT')).alias('ELM_SET')
                     )
                )

new_inventory = new_inventory.withColumn('NUM_OTHER_ELM',(new_inventory['TOTAL_NUM_ELM'] - new_inventory['NUM_CORE_ELM']).cast(IntegerType()))

show_as_html(new_inventory,5)

Unnamed: 0,ID,START_YEAR,END_YEAR,TOTAL_NUM_ELM,NUM_CORE_ELM,ELM_SET,NUM_OTHER_ELM
0,AGE00147719,1888,2022,4,3,"[TMAX, TMIN, PRCP, TAVG]",1
1,ALE00100939,1940,2000,2,2,"[TMAX, PRCP]",0
2,AQC00914873,1955,1967,12,5,"[WT03, TMAX, TMIN, PRCP, SNWD, MDPR, DAPR, SNO...",7
3,AR000000002,1981,2000,1,1,[PRCP],0
4,AR000875850,1908,2022,5,4,"[TMAX, TMIN, PRCP, SNWD, TAVG]",1


In [19]:
all_core_cnt = new_inventory.filter((F.col('NUM_CORE_ELM')==5)).select('ID').count()
only_PRCP_cnt = new_inventory.filter((F.col('TOTAL_NUM_ELM')==1)&(F.array_contains(F.col('ELM_SET'),'PRCP'))).select('ID').count()

print(f'Number of stations that collect all five core elements: {all_core_cnt}\n')
print(f'Number of stations that only collect precipitation: {only_PRCP_cnt}')


Number of stations that collect all five core elements: 20300

Number of stations that only collect precipitation: 16159


LEFT JOIN stations and the output above

In [20]:
print(f'Number of stations in the inventory metafile: {inventory.dropDuplicates(["ID"]).count()}\n')

print(f'Number of stations in the station metafile: {stations.dropDuplicates(["ID"]).count()}')

prejoin_inventory = new_inventory.drop('ELM_SET')

stations_full = (
  stations
    .join(
        prejoin_inventory, 
        on = 'ID',
        how="left"
    )
)



#Some stations do not have records in inventory

show_as_html(stations_full.filter(stations_full.START_YEAR.isNull()),5)

Number of stations in the inventory metafile: 122010

Number of stations in the station metafile: 122047


Unnamed: 0,ID,STATE,COUNTRY_CODE,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY,STATE_NAME,START_YEAR,END_YEAR,TOTAL_NUM_ELM,NUM_CORE_ELM,NUM_OTHER_ELM
0,USC00517052,HI,US,21.377501,-157.75,45.099998,OLOMANA FIRE STN 790.7,,,,United States,HAWAII,,,,,
1,US1VTBN0016,VT,US,44.244598,-72.970001,331.899994,STARKSBORO 4.4 ENE,,,,United States,VERMONT,,,,,
2,USC00516938,HI,US,21.349701,-157.822006,249.899994,NUUANU UPPER 782.3,,,,United States,HAWAII,,,,,
3,USC00518053,HI,US,19.714399,-155.134003,262.100006,PIIHONUA KPUA 89.11,,,,United States,HAWAII,,,,,
4,USC00130046,IA,US,41.5014,-94.635002,420.600006,ADAIR,,,,United States,IOWA,,,,,


In [21]:
# Save output to file
stations_full.write.mode("overwrite").parquet(f"hdfs:///user/{name}/Assignment2Data/stations_enriched.parquet")

We then LEFT JOIN 1000 rows subset of daily and your output above to test if any stations in the subset of daily that are not in stations at all

In [22]:
stations_enr = spark.read.parquet(f'/user/{name}/Assignment2Data/stations_enriched.parquet')

daily2022_stations = (
  daily2022
    .join(
        stations_enr, 
        on = 'ID',
        how="left"
    )
)

show_as_html(daily2022_stations,5)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,MEASUREMENT_FLAG,QUALITY_FLAG,SOURCE_FLAG,OBSERVATION_TIME,STATE,COUNTRY_CODE,...,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY,STATE_NAME,START_YEAR,END_YEAR,TOTAL_NUM_ELM,NUM_CORE_ELM,NUM_OTHER_ELM
0,AE000041196,20220101,TAVG,204.0,H,,S,,,AE,...,GSN,,41196,United Arab Emirates,,1944,2022,4,3,1
1,AEM00041194,20220101,TAVG,211.0,H,,S,,,AE,...,,,41194,United Arab Emirates,,1983,2022,4,3,1
2,AEM00041217,20220101,TAVG,209.0,H,,S,,,AE,...,,,41217,United Arab Emirates,,1983,2022,4,3,1
3,AEM00041218,20220101,TAVG,207.0,H,,S,,,AE,...,,,41218,United Arab Emirates,,1994,2022,4,3,1
4,AG000060390,20220101,TAVG,121.0,H,,S,,,AG,...,GSN,,60390,Algeria,,1940,2022,5,4,1


In [23]:
# Check for stations in daily subset that are not in the stations_enriched dataframe
show_as_html(daily2022_stations.filter(daily2022_stations.START_YEAR.isNull()),5)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,MEASUREMENT_FLAG,QUALITY_FLAG,SOURCE_FLAG,OBSERVATION_TIME,STATE,COUNTRY_CODE,...,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY,STATE_NAME,START_YEAR,END_YEAR,TOTAL_NUM_ELM,NUM_CORE_ELM,NUM_OTHER_ELM


We can see that all the stations in the subset are in or stations dataframme.

Since joining require shuffering of RRD, and in our case , the daily is huge 126.1GBytes, then this operation is expensive. Even
when our stations dataframe is not too big, and it can be broadcasted,the Spark still need to copy it to all the nodes in the
cluster.

We could determine if there are any stations in daily that are not in stations by using __pyspark.sql.DataFrame.subtract__ or by 
__pyspark.sql.DataFrame.exceptAll__. By subtracting or exceptAll, distinct IDs in daily to IDs in stations, we can find the the 
station's IDs in daily but not in stations.

## Analysis


**First, we analyse how many stations are in each of the GCOS Surface Network (GSN), the US Historical Climatology Network (HCN), and the US Climate Reference Network (CRN)? Are there any stations that are in more than one of these networks?**

In [24]:
temp = stations_enr.select(F.countDistinct('ID'))

print('Total number of stations:'+ str(temp.collect()[0][0]))
print(f'\nNumber of stations were active in 2021: {stations_enr.filter(stations_enr.END_YEAR >2021).count()}')
print(f'\nNumber of stations are in the GCOS Surface Network (GSN):{stations_enr.filter(stations_enr.GSN_FLAG == "GSN").count()}')
print(f'\nNumber of stations are in the US Historical Climatology Network (HCN):{stations_enr.filter(stations_enr.HCN_CRN_FLAG == "HCN").count()}')
print(f'\nNumber of stations are in the US Climate Reference Network (CRN):{stations_enr.filter(stations_enr.HCN_CRN_FLAG == "CRN").count()}')

Total number of stations:122047

Number of stations were active in 2021: 38145

Number of stations are in the GCOS Surface Network (GSN):991

Number of stations are in the US Historical Climatology Network (HCN):1218

Number of stations are in the US Climate Reference Network (CRN):0


Count the total number of stations in each country, and store the output in countries using the with ColumnRenamed command. 


In [25]:
stations_country = (stations_enr.groupby('COUNTRY_CODE')
                        .agg(F.countDistinct('ID').cast(IntegerType()).alias('NUM_OF_STATIONS'))
                   )

countries = (
  countries
    .join(stations_country.withColumnRenamed('COUNTRY_CODE','CODE'), on = 'CODE',how="left")
)
show_as_html(countries,5)

Unnamed: 0,CODE,NAME,NUM_OF_STATIONS
0,AC,Antigua and Barbuda,2
1,AE,United Arab Emirates,4
2,AF,Afghanistan,4
3,AG,Algeria,87
4,AJ,Azerbaijan,66


In [26]:
stations_state = (stations_enr.groupby('STATE')
                        .agg(F.countDistinct('ID').cast(IntegerType()).alias('NUM_OF_STATIONS'))
                   )
states = (
  states
    .join(stations_state.withColumnRenamed('STATE','CODE'), on = 'CODE',how="left")
)
show_as_html(states,5)

Unnamed: 0,CODE,NAME,NUM_OF_STATIONS
0,AB,ALBERTA,1440
1,AK,ALASKA,1018
2,AL,ALABAMA,1059
3,AR,ARKANSAS,901
4,AS,AMERICAN SAMOA,21


In [27]:
# Write the temporary data
stations_full.write.mode("overwrite").parquet(f"hdfs:///user/{name}/countries_enriched.parquet")
stations_full.write.mode("overwrite").parquet(f"hdfs:///user/{name}/states_enriched.parquet")

**Some of the countries in the database are territories of the United States as indicated by the name of the country. How many stations are there in total in the territories of the United States around the world, excluding the United States itself?**

In [28]:

print(f'\nNumber of stations in the Southern Hemisphere: {stations_enr.filter(stations_enr.LATITUDE < 0 ).count()}')

countsta=stations_enr.filter(stations_enr.COUNTRY.contains('[United States]')).count()
print(f'\nNumber of stations in the territories of the United States around the world: {countsta}')


Number of stations in the Southern Hemisphere: 25337

Number of stations in the territories of the United States around the world: 352


**We now can try to play around with some Spark functions. We will write a Spark function that computes the geographical distance between two stations using their latitude and longitude as arguments, taking into account that the earth is spherical.**


In [29]:
show_as_html(stations_enr,5)

Unnamed: 0,ID,STATE,COUNTRY_CODE,LATITUDE,LONGITUDE,ELEVATION,NAME,GSN_FLAG,HCN_CRN_FLAG,WMO_ID,COUNTRY,STATE_NAME,START_YEAR,END_YEAR,TOTAL_NUM_ELM,NUM_CORE_ELM,NUM_OTHER_ELM
0,AGM00060402,,AG,36.712002,5.07,6.1,SOUMMAM,,,60402,Algeria,,1973,2022,5,4,1
1,AGM00060461,,AG,35.700001,-0.65,22.0,ORAN-PORT,,,60461,Algeria,,1995,2022,4,3,1
2,AJ000037734,,AJ,40.799999,46.0,404.0,SHAMHOR,,,37734,Azerbaijan,,1936,1991,1,1,0
3,AJ000037740,,AJ,40.983002,47.867001,682.0,QABALA,,,37740,Azerbaijan,,1936,2017,5,4,1
4,AM000037618,,AM,41.117001,44.283001,1509.0,TASHIR,,,37618,Armenia,,1928,2022,5,4,1


In [30]:
# Create a dataframe to work on
stations_NZ = (stations_enr
               .filter(stations_enr.COUNTRY_CODE=='NZ')
               .select('ID','NAME','LATITUDE','LONGITUDE')

)
show_as_html(stations_NZ,5)

Unnamed: 0,ID,NAME,LATITUDE,LONGITUDE
0,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007
1,NZM00093929,ENDERBY ISLAND AWS,-50.483002,166.300003
2,NZ000933090,NEW PLYMOUTH AWS,-39.016998,174.182999
3,NZ000093844,INVERCARGILL AIRPOR,-46.417,168.332993
4,NZM00093781,CHRISTCHURCH INTL,-43.488998,172.531998


In [31]:
# Cross join
NZ_pairwise = (stations_NZ.crossJoin(stations_NZ).toDF('ID_A','NAME_A','LATITUDE_A','LONGITUDE_A',
                                                       'ID_B','NAME_B','LATITUDE_B','LONGITUDE_B'))

# Remove same ID pairs
NZ_pairwise = (NZ_pairwise.filter(NZ_pairwise.ID_A != NZ_pairwise.ID_B))

show_as_html(NZ_pairwise,5)

Unnamed: 0,ID_A,NAME_A,LATITUDE_A,LONGITUDE_A,ID_B,NAME_B,LATITUDE_B,LONGITUDE_B
0,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZM00093929,ENDERBY ISLAND AWS,-50.483002,166.300003
1,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000933090,NEW PLYMOUTH AWS,-39.016998,174.182999
2,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000093844,INVERCARGILL AIRPOR,-46.417,168.332993
3,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZM00093781,CHRISTCHURCH INTL,-43.488998,172.531998
4,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000093417,PARAPARAUMU AWS,-40.900002,174.983002


In [32]:
# UDF to calculate spherical geometry distance
from math import radians, cos, sin, asin, sqrt

def comp_distance(latiA,longA,latiB,longB):
    latiA,longA,latiB,longB = map(radians,[latiA,longA,latiB,longB])
    # Distance 
    distLati = latiB - latiA
    distLong = longB - longA
    # Area
    area = sin(distLati/2)**2 + cos(latiA) * cos(latiB) * sin(distLong/2)**2
    centAngle = 2 * asin(sqrt(area))
    # Spherical distance (earth radius = 6371km)
    shpDist = abs(round(centAngle * 6371,5))
    return shpDist

udf_distance = F.udf(comp_distance)

We use the function above to compute the pairwise distances between all stations in New Zealand, and save the result to our directory.Beside, we try to findout what two stations are geographically the closest together in New Zealand?

In [33]:
NZ_stations_dist = (NZ_pairwise.withColumn('DISTANCE', udf_distance(
    NZ_pairwise.LATITUDE_A, NZ_pairwise.LONGITUDE_A, NZ_pairwise.LATITUDE_B, NZ_pairwise.LONGITUDE_B)
    ))

show_as_html(NZ_stations_dist,5)

Unnamed: 0,ID_A,NAME_A,LATITUDE_A,LONGITUDE_A,ID_B,NAME_B,LATITUDE_B,LONGITUDE_B,DISTANCE
0,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZM00093929,ENDERBY ISLAND AWS,-50.483002,166.300003,303.56667
1,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000933090,NEW PLYMOUTH AWS,-39.016998,174.182999,1553.29464
2,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000093844,INVERCARGILL AIRPOR,-46.417,168.332993,684.60169
3,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZM00093781,CHRISTCHURCH INTL,-43.488998,172.531998,1037.8551
4,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000093417,PARAPARAUMU AWS,-40.900002,174.983002,1368.0573


However, the spherical geometry distance can also be computed by Spark SQL built-in. This method will be less expensive in compare
to the UDF, and importing of math library. The code below will be credited for __blackbishop__ (
https://stackoverflow.com/questions/60086180/pyspark-how-to-apply-a-python-udf-to-pyspark-dataframe-columns )

In [34]:
NZ_stations_dist_noUDF = (NZ_pairwise
                          .withColumn('dlon', F.radians(F.col('LONGITUDE_B')) - F.radians(F.col('LONGITUDE_A'))) 
                          .withColumn('dlat', F.radians(F.col('LATITUDE_B')) - F.radians(F.col('LATITUDE_A')))
                          .withColumn('areas', (((F.sin(F.col('dlat')/2))**2) + F.cos(F.radians(F.col('LATITUDE_A'))) *
                                                F.cos(F.radians(F.col('LATITUDE_B'))) * ((F.sin(F.col('dlon')/2))**2)
                                               ))
                          .withColumn('angle', 2 * (F.asin(F.sqrt(F.col('areas')))))
                          .withColumn('DISTANCE', F.col('angle')* 6371)
                          .drop('dlon', 'dlat','areas','angle')
                         )

show_as_html(NZ_stations_dist_noUDF,5)

Unnamed: 0,ID_A,NAME_A,LATITUDE_A,LONGITUDE_A,ID_B,NAME_B,LATITUDE_B,LONGITUDE_B,DISTANCE
0,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZM00093929,ENDERBY ISLAND AWS,-50.483002,166.300003,303.566667
1,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000933090,NEW PLYMOUTH AWS,-39.016998,174.182999,1553.294641
2,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000093844,INVERCARGILL AIRPOR,-46.417,168.332993,684.601688
3,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZM00093781,CHRISTCHURCH INTL,-43.488998,172.531998,1037.855098
4,NZ000939450,CAMPBELL ISLAND AWS,-52.549999,169.167007,NZ000093417,PARAPARAUMU AWS,-40.900002,174.983002,1368.057301


The two stations which are geographically the closest together in New Zealand showed below

In [35]:
show_as_html(NZ_stations_dist_noUDF.sort('DISTANCE'),1)

Unnamed: 0,ID_A,NAME_A,LATITUDE_A,LONGITUDE_A,ID_B,NAME_B,LATITUDE_B,LONGITUDE_B,DISTANCE
0,NZM00093439,WELLINGTON AERO AWS,-41.333,174.800003,NZ000093417,PARAPARAUMU AWS,-40.900002,174.983002,50.52885


**We now work on whole dataset, we first try to count the number of rows in daily.**

In [41]:
years = list(range(1763,2023))

daily_full = spark.read.csv([f"/data/ghcnd/daily/{year}.csv.gz" for year in years], schema=schema_daily)

print(f"daily 1763 to 2022 observations count = {daily_full.count()}")
print(f"daily 1763 to 2022 num partitions count = {daily_full.rdd.getNumPartitions()}")

daily 1763 to 2022 observations count = 3018826504
daily 1763 to 2022 num partitions count = 111


We then filter daily using the filter command to obtain the subset of observations containing the five core elements described in inventory, and findout how many observations are there for each of the five core elements? Which element has the most observations?


In [42]:
daily_subset = (daily_full.filter(core_cond))
show_as_html(daily_subset,5)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,MEASUREMENT_FLAG,QUALITY_FLAG,SOURCE_FLAG,OBSERVATION_TIME
0,CA002303986,20100101,TMAX,205.0,,G,C,
1,CA002303986,20100101,TMIN,-300.0,,,C,
2,CA002303986,20100101,PRCP,4.0,,,C,
3,CA002303986,20100101,SNOW,4.0,,,C,
4,CA002303986,20100101,SNWD,0.0,,I,C,


In [43]:
core_elms_count = daily_subset.groupBy('ELEMENT').count()
show_as_html(core_elms_count.sort('count'))

Unnamed: 0,ELEMENT,count
0,SNWD,290998195
1,SNOW,344268930
2,TMIN,445687425
3,TMAX,447084093
4,PRCP,1048156273


The __PRCP__ element has the most observations.

Many stations collect TMIN and TMAX, but do not necessarily report them simultaneously due to issues with data collection or coverage. Determine how many observations of TMIN do not have a corresponding observation of TMAX. How many unique stations contributed to these observations?

In [44]:
daily_minmax = daily_full.filter(mm_cond).sort(F.col('DATE'),F.col('ID'))

show_as_html(daily_minmax,5)

Unnamed: 0,ID,DATE,ELEMENT,VALUE,MEASUREMENT_FLAG,QUALITY_FLAG,SOURCE_FLAG,OBSERVATION_TIME
0,ITE00100554,17630101,TMAX,-36.0,,,E,
1,ITE00100554,17630101,TMIN,-50.0,,,E,
2,ITE00100554,17630102,TMAX,-26.0,,,E,
3,ITE00100554,17630102,TMIN,-40.0,,,E,
4,ITE00100554,17630103,TMAX,-9.0,,,E,


In [45]:
daily_minmax_merge = (
    daily_minmax
    .filter(F.col('ELEMENT')=='TMIN')
    .select(F.col('ID'), F.col('DATE'), F.col('VALUE').alias('TMIN'))
    .join(
        (daily_minmax
            .filter(F.col('ELEMENT')=='TMAX')
            .select(F.col('ID'), F.col('DATE'), F.col('VALUE').alias('TMAX'))
        ),
        on = ['ID','DATE'],
        how = 'fullouter'
    )
)

show_as_html(daily_minmax_merge,5)

Unnamed: 0,ID,DATE,TMIN,TMAX
0,ACW00011604,19490314,206.0,278.0
1,ACW00011604,19490316,222.0,283.0
2,ACW00011604,19490401,194.0,278.0
3,ACW00011604,19490409,233.0,283.0
4,ACW00011604,19490506,233.0,283.0


In [46]:
minNmax = daily_minmax_merge.filter((daily_minmax_merge.TMIN.isNotNull())&(daily_minmax_merge.TMAX.isNull()))
minNmax_dist_count = minNmax.select(F.countDistinct("ID"))

print(f'Number of observations of TMIN do not have a corresponding observation of TMAX: {minNmax.count()}')
print(f'Number of unique stations contributed to these observations: {str(minNmax_dist_count.collect()[0][0])}')

Number of observations of TMIN do not have a corresponding observation of TMAX: 8848299
Number of unique stations contributed to these observations: 27678


Number of observations of TMIN do not have a corresponding observation of TMAX: 8848299 

Number of unique stations contributed to these observations: 27678

Filter daily to obtain all observations of TMIN and TMAX for all stations in New Zealand, and save the result to directory. We try to findout how many observations are there, and how many years are covered by the observations?

In [47]:
NZ_Tminmax = daily_minmax_merge.filter(F.substring('ID',1,2)=='NZ')

NZ_station = (
    stations.filter(F.substring('ID',1,2)=='NZ')
    .select(stations.ID,stations.NAME)
)

NZ_Tminmax = NZ_Tminmax.join(NZ_station, on = 'ID',how="left")

show_as_html(NZ_Tminmax,5)

NZ_Tminmax.repartition(1).write.mode("overwrite").option('header','true').csv(f'hdfs:///user/{name}/NZ_Tminmax.csv')

year_count = NZ_Tminmax.select(F.countDistinct(F.substring('DATE',1,4)))

print(f'Number of observations of New Zealand TMIN and TMAX: {NZ_Tminmax.count()}')
print(f'Number of years are covered by the observations: {str(year_count.collect()[0][0])}')

Unnamed: 0,ID,DATE,TMIN,TMAX,NAME
0,NZ000936150,19640128,116.0,183.0,HOKITIKA AERODROME
1,NZ000936150,19640312,122.0,163.0,HOKITIKA AERODROME
2,NZ000936150,19640506,26.0,150.0,HOKITIKA AERODROME
3,NZ000936150,19640528,67.0,128.0,HOKITIKA AERODROME
4,NZ000936150,19640629,8.0,123.0,HOKITIKA AERODROME


Number of observations of New Zealand TMIN and TMAX: 254742
Number of years are covered by the observations: 83


In [48]:
!hdfs dfs -cat /user/NZ_Tminmax.csv/part-00000-df264a49-342d-4be8-83e3-b51f1fda7526-c000.csv | wc -l

254743


In [49]:
NZ_Tminmax_avg = (
    NZ_Tminmax
        .groupby('DATE')
        .agg(
            F.avg(F.col('TMIN')).cast(FloatType()).alias('TMIN_A'),
            F.avg(F.col('TMAX')).cast(FloatType()).alias('TMAX_A')
        )
)
show_as_html(NZ_Tminmax_avg,5)

NZ_Tminmax_avg.repartition(1).write.mode("overwrite").option('header','true').csv(f'hdfs:///user/{name}/NZ_TminmaxA.csv')

Unnamed: 0,DATE,TMIN_A,TMAX_A
0,19660519,39.099998,121.777779
1,19670922,81.800003,145.777771
2,19680524,56.888889,145.142853
3,19680729,57.375,132.875
4,19680809,55.0,137.714279


We now group the precipitation observations by year and country. Compute the average rainfall in each year for each country, and save this result to directory. Besides, we also try to findout Which country has the highest average rainfall in a single year across the entire dataset? Is this result sensible? Is this result consistent with our previous analysis?

In [50]:
world_rainfall = (
    daily_subset
        .filter(F.col('ELEMENT')=='PRCP')
        .withColumn('COUNTRY',F.substring('ID',1,2))
        .withColumn('YEAR',F.substring('DATE',1,4))
        .groupby('COUNTRY','YEAR')
        .agg(
            F.avg(F.col('VALUE')).alias('AVG_PRCP')
        )
)

In [51]:
show_as_html(world_rainfall.sort(world_rainfall.AVG_PRCP.desc()),10)
world_rainfall.write.mode("overwrite").parquet(f"hdfs:///user/{name}/Assignment2Data/world_rainfall.parquet")

Unnamed: 0,COUNTRY,YEAR,AVG_PRCP
0,EK,2000,4361.0
1,DR,1975,3414.0
2,LA,1974,2480.5
3,BH,1978,2244.714286
4,NN,1979,1967.0
5,CS,1974,1820.0
6,BH,1979,1755.545455
7,NS,1973,1710.0
8,UC,1978,1675.038462
9,BH,1977,1541.714286


Equatorial Guinea has the highest average rainfall of 4361.00 in year 2000.

In [52]:
# Verify the result of Equatorial Guinea in year 2000
rain_sub =  (daily_subset
        .filter(F.col('ELEMENT')=='PRCP')
        .withColumn('COUNTRY',F.substring('ID',1,2))
        .withColumn('YEAR',F.substring('DATE',1,4)))

show_as_html(rain_sub.filter((F.col('COUNTRY')=='EK')&(F.col('YEAR')=='2000')))

Unnamed: 0,ID,DATE,ELEMENT,VALUE,MEASUREMENT_FLAG,QUALITY_FLAG,SOURCE_FLAG,OBSERVATION_TIME,COUNTRY,YEAR
0,EKM00064810,20000622,PRCP,4361.0,,G,S,,EK,2000


There is only one record for Equatorial Guinea's rainfall in year 2000, therefore it caused the problem

In [53]:
world_rainfall_2021 = (
    world_rainfall.filter(F.col('YEAR')=='2021')
        .join(
            countries.select(F.col('CODE').alias('COUNTRY'), F.col('NAME')),
            on = 'COUNTRY',
            how = 'left'
        )
)

show_as_html(world_rainfall_2021,10)
world_rainfall_2021.repartition(1).write.mode("overwrite").option('header','true').csv(f'hdfs:///user/{name}/world_rainfall_2021.csv')


Unnamed: 0,COUNTRY,YEAR,AVG_PRCP,NAME
0,TI,2021,71.173469,Tajikistan
1,BG,2021,0.0,Bangladesh
2,CA,2021,26.304228,Canada
3,MX,2021,20.586373,Mexico
4,MZ,2021,67.330448,Mozambique
5,NI,2021,148.647482,Nigeria
6,SW,2021,18.821426,Sweden
7,WQ,2021,0.0,Wake Island [United States]
8,GH,2021,76.07177,Ghana
9,GM,2021,21.919924,Germany


In [54]:
# Stop Spark
stop_spark()