**Spark notebook**

This notebook will only work in a Jupyter session running on `mathmadslinux2p`.

You can start your own Jupyter session on `mathmadslinux2p` and open this notebook in Chrome on the MADS Windows server by

1. Login to the MADS Windows server using https://mathportal.canterbury.ac.nz/.
2. Download or copy this notebook to your home directory.
3. Open powershell and run `ssh mathmadslinux2p`.
4. Run `start_pyspark_notebook` or `/opt/anaconda3/bin/jupyter-notebook --ip 132.181.129.68 --port $((8000 + $((RANDOM % 999))))`.
5. Copy / paste the url provided in the shell window into Chrome on the MADS Windows server.
6. Open the notebook from the Jupyter root directory (which is your home directory).
7. Run `start_spark()` to start a spark session in the notebook.
8. Run `stop_spark()` before closing the notebook or kill your spark application by hand using the link in the Spark UI.

In [67]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))


# Print function docstrings

help(start_spark)
help(stop_spark)
help(display_spark)
help(show_as_html)

Help on function start_spark in module __main__:

start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1)
    Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)

Help on function stop_spark in module __main__:

stop_spark()
    Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).

Help on function display_spark in module __main__:

display_spark()
    Display the status of the active Spark session if one is currently running.

Help on function show_as_html in module __main__:

show_as_html(df, n=20)
    Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n 

In [243]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.app.id,app-20220921213921-0997
spark.executor.instances,4
spark.driver.memory,4g
spark.driver.extraJavaOptions,-Dderby.system.home=/tmp/ndu31/spark/
spark.executor.memory,4g
spark.master,spark://masternode2:7077
spark.executor.id,driver
spark.sql.warehouse.dir,file:/users/home/ndu31/spark-warehouse
spark.executor.cores,2


In [244]:
# Write your imports and code here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *

In [5]:
!hdfs dfs -ls /data/ghcnd

Found 5 items
drwxr-xr-x   - jsw93 supergroup          0 2022-08-08 01:20 /data/ghcnd/daily
-rw-r--r--   8 jsw93 supergroup       3659 2022-08-08 01:21 /data/ghcnd/ghcnd-countries.txt
-rw-r--r--   8 jsw93 supergroup   33384684 2022-08-08 01:21 /data/ghcnd/ghcnd-inventory.txt
-rw-r--r--   8 jsw93 supergroup       1086 2022-08-08 01:21 /data/ghcnd/ghcnd-states.txt
-rw-r--r--   8 jsw93 supergroup   10496042 2022-08-08 01:21 /data/ghcnd/ghcnd-stations.txt


In [6]:
!hdfs dfs -ls /data/ghcnd/daily

Found 260 items
-rw-r--r--   8 jsw93 supergroup       3358 2021-08-09 15:08 /data/ghcnd/daily/1763.csv.gz
-rw-r--r--   8 jsw93 supergroup       3327 2021-08-09 15:03 /data/ghcnd/daily/1764.csv.gz
-rw-r--r--   8 jsw93 supergroup       3335 2021-08-09 15:03 /data/ghcnd/daily/1765.csv.gz
-rw-r--r--   8 jsw93 supergroup       3344 2021-08-09 14:56 /data/ghcnd/daily/1766.csv.gz
-rw-r--r--   8 jsw93 supergroup       3356 2021-08-09 15:06 /data/ghcnd/daily/1767.csv.gz
-rw-r--r--   8 jsw93 supergroup       3325 2021-08-09 15:02 /data/ghcnd/daily/1768.csv.gz
-rw-r--r--   8 jsw93 supergroup       3418 2021-08-09 15:03 /data/ghcnd/daily/1769.csv.gz
-rw-r--r--   8 jsw93 supergroup       3357 2021-08-09 15:07 /data/ghcnd/daily/1770.csv.gz
-rw-r--r--   8 jsw93 supergroup       3373 2021-08-09 15:06 /data/ghcnd/daily/1771.csv.gz
-rw-r--r--   8 jsw93 supergroup       3419 2021-08-09 15:05 /data/ghcnd/daily/1772.csv.gz
-rw-r--r--   8 jsw93 supergroup       3368 2021-08-09 15:08 /data/ghcnd/d

-rw-r--r--   8 jsw93 supergroup  142350529 2021-08-09 15:03 /data/ghcnd/daily/1953.csv.gz
-rw-r--r--   8 jsw93 supergroup  145311928 2021-08-09 15:06 /data/ghcnd/daily/1954.csv.gz
-rw-r--r--   8 jsw93 supergroup  148575733 2021-08-09 15:05 /data/ghcnd/daily/1955.csv.gz
-rw-r--r--   8 jsw93 supergroup  151453029 2021-08-09 15:03 /data/ghcnd/daily/1956.csv.gz
-rw-r--r--   8 jsw93 supergroup  154780951 2021-08-09 15:01 /data/ghcnd/daily/1957.csv.gz
-rw-r--r--   8 jsw93 supergroup  156547861 2021-08-09 15:00 /data/ghcnd/daily/1958.csv.gz
-rw-r--r--   8 jsw93 supergroup  160538756 2021-08-09 15:05 /data/ghcnd/daily/1959.csv.gz
-rw-r--r--   8 jsw93 supergroup  163916212 2021-08-09 15:07 /data/ghcnd/daily/1960.csv.gz
-rw-r--r--   8 jsw93 supergroup  169644718 2021-08-09 15:05 /data/ghcnd/daily/1961.csv.gz
-rw-r--r--   8 jsw93 supergroup  173399072 2021-08-09 15:01 /data/ghcnd/daily/1962.csv.gz
-rw-r--r--   8 jsw93 supergroup  177698967 2021-08-09 15:00 /data/ghcnd/daily/1963.csv.gz

In [7]:
!hdfs dfs -du -h -v /data/ghcnd

SIZE    DISK_SPACE_CONSUMED_WITH_ALL_REPLICAS  FULL_PATH_NAME
15.8 G  126.1 G                                /data/ghcnd/daily
3.6 K   28.6 K                                 /data/ghcnd/ghcnd-countries.txt
31.8 M  254.7 M                                /data/ghcnd/ghcnd-inventory.txt
1.1 K   8.5 K                                  /data/ghcnd/ghcnd-states.txt
10.0 M  80.1 M                                 /data/ghcnd/ghcnd-stations.txt


In [8]:
!hdfs dfs -du -s -h /data/ghcnd

15.8 G  126.5 G  /data/ghcnd


In [9]:
!hdfs dfs -du -h -v /data/ghcnd/daily

SIZE     DISK_SPACE_CONSUMED_WITH_ALL_REPLICAS  FULL_PATH_NAME
3.3 K    26.2 K                                 /data/ghcnd/daily/1763.csv.gz
3.2 K    26.0 K                                 /data/ghcnd/daily/1764.csv.gz
3.3 K    26.1 K                                 /data/ghcnd/daily/1765.csv.gz
3.3 K    26.1 K                                 /data/ghcnd/daily/1766.csv.gz
3.3 K    26.2 K                                 /data/ghcnd/daily/1767.csv.gz
3.2 K    26.0 K                                 /data/ghcnd/daily/1768.csv.gz
3.3 K    26.7 K                                 /data/ghcnd/daily/1769.csv.gz
3.3 K    26.2 K                                 /data/ghcnd/daily/1770.csv.gz
3.3 K    26.4 K                                 /data/ghcnd/daily/1771.csv.gz
3.3 K    26.7 K                                 /data/ghcnd/daily/1772.csv.gz
3.3 K    26.3 K                                 /data/ghcnd/daily/1773.csv.gz
3.3 K    26.5 K                                 /data/ghcnd/daily/1774.csv.gz
6

In [71]:
# DateTime and Timestamp does not work for the schema

daily_schema = StructType([
    StructField('ID', StringType()),
    StructField('DATE', StringType()),
    StructField('ELEMENT', StringType()),
    StructField('VALUE', FloatType()),
    StructField('MEASUREMENT_FLAG', StringType()),
    StructField('QUALITY_FLAG', StringType()),
    StructField('SOURCE_FLAG', StringType()),
    StructField('OBSERVATION_TIME', StringType())
])

In [72]:
daily_info = (spark.read.format("com.databricks.spark.csv")
              .option("header", "false")
              .option("inferSchema", "false")
              .schema(daily_schema)
              .load("hdfs:///data/ghcnd/daily/2022.csv.gz")
              .limit(1000))
daily_info.cache()
daily_info.show(15, False)

+-----------+--------+-------+-----+----------------+------------+-----------+----------------+
|ID         |DATE    |ELEMENT|VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|
+-----------+--------+-------+-----+----------------+------------+-----------+----------------+
|AE000041196|20220101|TAVG   |204.0|H               |null        |S          |null            |
|AEM00041194|20220101|TAVG   |211.0|H               |null        |S          |null            |
|AEM00041217|20220101|TAVG   |209.0|H               |null        |S          |null            |
|AEM00041218|20220101|TAVG   |207.0|H               |null        |S          |null            |
|AG000060390|20220101|TAVG   |121.0|H               |null        |S          |null            |
|AG000060590|20220101|TAVG   |151.0|H               |null        |S          |null            |
|AG000060611|20220101|TAVG   |111.0|H               |null        |S          |null            |
|AGE00147708|20220101|TMIN   |73.0 |null

In [12]:
!hdfs dfs -cat /data/ghcnd/daily/2022.csv.gz | zcat | head

AE000041196,20220101,TAVG,204,H,,S,
AEM00041194,20220101,TAVG,211,H,,S,
AEM00041217,20220101,TAVG,209,H,,S,
AEM00041218,20220101,TAVG,207,H,,S,
AG000060390,20220101,TAVG,121,H,,S,
AG000060590,20220101,TAVG,151,H,,S,
AG000060611,20220101,TAVG,111,H,,S,
AGE00147708,20220101,TMIN,73,,,S,
AGE00147708,20220101,PRCP,0,,,S,
AGE00147708,20220101,TAVG,133,H,,S,

gzip: stdout: Broken pipe
cat: Unable to write to output stream.


In [73]:
station_info = (
spark.read.format("text")
.load("hdfs:///data/ghcnd/ghcnd-stations.txt"))
station_info.show(15,False)

+-------------------------------------------------------------------------------------+
|value                                                                                |
+-------------------------------------------------------------------------------------+
|ACW00011604  17.1167  -61.7833   10.1    ST JOHNS COOLIDGE FLD                       |
|ACW00011647  17.1333  -61.7833   19.2    ST JOHNS                                    |
|AE000041196  25.3330   55.5170   34.0    SHARJAH INTER. AIRP            GSN     41196|
|AEM00041194  25.2550   55.3640   10.4    DUBAI INTL                             41194|
|AEM00041217  24.4330   54.6510   26.8    ABU DHABI INTL                         41217|
|AEM00041218  24.2620   55.6090  264.9    AL AIN INTL                            41218|
|AF000040930  35.3170   69.0170 3366.0    NORTH-SALANG                   GSN     40930|
|AFM00040938  34.2100   62.2280  977.2    HERAT                                  40938|
|AFM00040948  34.5660   69.2120 

In [138]:
stations = station_info.select(station_info.value.substr(1,11).cast("string").alias("ID"),
                    station_info.value.substr(13,7).cast("float").alias("LATITUDE"),
                    station_info.value.substr(22,8).cast("float").alias("LONGITUDE"),
                    station_info.value.substr(32,7).cast("float").alias("ELEVATION"),
                    station_info.value.substr(39,2).cast("string").alias("STATE_CODE"),
                    station_info.value.substr(42,31).cast("string").alias("STATION_NAME"),
                    station_info.value.substr(73,3).cast("string").alias("GSN_FLAG"),
                    station_info.value.substr(77,3).cast("string").alias("HCN_CRN_FLAG"),
                    station_info.value.substr(81,6).cast("string").alias("WMO_ID")
                   )
stations.show(15,False)
stations.count()


+-----------+--------+---------+---------+----------+-------------------------------+--------+------------+------+
|ID         |LATITUDE|LONGITUDE|ELEVATION|STATE_CODE|STATION_NAME                   |GSN_FLAG|HCN_CRN_FLAG|WMO_ID|
+-----------+--------+---------+---------+----------+-------------------------------+--------+------------+------+
|ACW00011604|17.116  |-61.783  |10.1     |          |ST JOHNS COOLIDGE FLD          |        |            |      |
|ACW00011647|17.133  |-61.783  |19.2     |          |ST JOHNS                       |        |            |      |
|AE000041196|25.333  |55.517   |34.0     |          |SHARJAH INTER. AIRP            |GSN     |            |41196 |
|AEM00041194|25.255  |55.364   |10.4     |          |DUBAI INTL                     |        |            |41194 |
|AEM00041217|24.433  |54.651   |26.8     |          |ABU DHABI INTL                 |        |            |41217 |
|AEM00041218|24.262  |55.609   |264.9    |          |AL AIN INTL                

122047

In [93]:
stations.filter(stations.WMO_ID == "     ").count()

113953

In [139]:
state_info = (
spark.read.format("text")
.load("hdfs:///data/ghcnd/ghcnd-states.txt"))
state_info.show(15,False)

+--------------------------------------------------+
|value                                             |
+--------------------------------------------------+
|AB ALBERTA                                        |
|AK ALASKA                                         |
|AL ALABAMA                                        |
|AR ARKANSAS                                       |
|AS AMERICAN SAMOA                                 |
|AZ ARIZONA                                        |
|BC BRITISH COLUMBIA                               |
|CA CALIFORNIA                                     |
|CO COLORADO                                       |
|CT CONNECTICUT                                    |
|DC DISTRICT OF COLUMBIA                           |
|DE DELAWARE                                       |
|FL FLORIDA                                        |
|FM MICRONESIA                                     |
|GA GEORGIA                                        |
+---------------------------------------------

In [140]:
states = state_info.select(state_info.value.substr(1,2).cast("string").alias("STATE_CODE"),
                    state_info.value.substr(4,46).cast("string").alias("STATE_NAME")
                    )

states.show(15,False)
states.count()

+----------+----------------------------------------------+
|STATE_CODE|STATE_NAME                                    |
+----------+----------------------------------------------+
|AB        |ALBERTA                                       |
|AK        |ALASKA                                        |
|AL        |ALABAMA                                       |
|AR        |ARKANSAS                                      |
|AS        |AMERICAN SAMOA                                |
|AZ        |ARIZONA                                       |
|BC        |BRITISH COLUMBIA                              |
|CA        |CALIFORNIA                                    |
|CO        |COLORADO                                      |
|CT        |CONNECTICUT                                   |
|DC        |DISTRICT OF COLUMBIA                          |
|DE        |DELAWARE                                      |
|FL        |FLORIDA                                       |
|FM        |MICRONESIA                  

74

In [78]:
country_info = (
spark.read.format("text")
.load("hdfs:///data/ghcnd/ghcnd-countries.txt"))
country_info.show(15,False)

+----------------------------------+
|value                             |
+----------------------------------+
|AC Antigua and Barbuda            |
|AE United Arab Emirates           |
|AF Afghanistan                    |
|AG Algeria                        |
|AJ Azerbaijan                     |
|AL Albania                        |
|AM Armenia                        |
|AO Angola                         |
|AQ American Samoa [United States] |
|AR Argentina                      |
|AS Australia                      |
|AU Austria                        |
|AY Antarctica                     |
|BA Bahrain                        |
|BB Barbados                       |
+----------------------------------+
only showing top 15 rows



In [144]:
countries = country_info.select(country_info.value.substr(1,2).cast("string").alias("CODE"),
                    country_info.value.substr(4,60).cast("string").alias("COUNTRY_NAME")
                    )

countries.cache
countries.show(15,False)
countries.count()


+----+-------------------------------+
|CODE|COUNTRY_NAME                   |
+----+-------------------------------+
|AC  |Antigua and Barbuda            |
|AE  |United Arab Emirates           |
|AF  |Afghanistan                    |
|AG  |Algeria                        |
|AJ  |Azerbaijan                     |
|AL  |Albania                        |
|AM  |Armenia                        |
|AO  |Angola                         |
|AQ  |American Samoa [United States] |
|AR  |Argentina                      |
|AS  |Australia                      |
|AU  |Austria                        |
|AY  |Antarctica                     |
|BA  |Bahrain                        |
|BB  |Barbados                       |
+----+-------------------------------+
only showing top 15 rows



219

In [145]:
inventory_info = (
spark.read.format("text")
.load("hdfs:///data/ghcnd/ghcnd-inventory.txt"))
inventory_info.show(15,False)

+---------------------------------------------+
|value                                        |
+---------------------------------------------+
|ACW00011604  17.1167  -61.7833 TMAX 1949 1949|
|ACW00011604  17.1167  -61.7833 TMIN 1949 1949|
|ACW00011604  17.1167  -61.7833 PRCP 1949 1949|
|ACW00011604  17.1167  -61.7833 SNOW 1949 1949|
|ACW00011604  17.1167  -61.7833 SNWD 1949 1949|
|ACW00011604  17.1167  -61.7833 PGTM 1949 1949|
|ACW00011604  17.1167  -61.7833 WDFG 1949 1949|
|ACW00011604  17.1167  -61.7833 WSFG 1949 1949|
|ACW00011604  17.1167  -61.7833 WT03 1949 1949|
|ACW00011604  17.1167  -61.7833 WT08 1949 1949|
|ACW00011604  17.1167  -61.7833 WT16 1949 1949|
|ACW00011647  17.1333  -61.7833 TMAX 1961 1961|
|ACW00011647  17.1333  -61.7833 TMIN 1961 1961|
|ACW00011647  17.1333  -61.7833 PRCP 1957 1970|
|ACW00011647  17.1333  -61.7833 SNOW 1957 1970|
+---------------------------------------------+
only showing top 15 rows



In [159]:
inventories = inventory_info.select(inventory_info.value.substr(1,11).cast("string").alias("ID"),
                    inventory_info.value.substr(13,8).cast("float").alias("LATITUDE"),
                      inventory_info.value.substr(22,9).cast("float").alias("LONGITUDE"),
                      inventory_info.value.substr(32,4).cast("string").alias("ELEMENT"),
                      inventory_info.value.substr(37,4).cast("integer").alias("FIRSTYEAR"),
                      inventory_info.value.substr(42,4).cast("integer").alias("LASTYEAR")
                    )

inventories.show(15,False)
inventories.count()

+-----------+--------+---------+-------+---------+--------+
|ID         |LATITUDE|LONGITUDE|ELEMENT|FIRSTYEAR|LASTYEAR|
+-----------+--------+---------+-------+---------+--------+
|ACW00011604|17.1167 |-61.7833 |TMAX   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |TMIN   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |PRCP   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |SNOW   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |SNWD   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |PGTM   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |WDFG   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |WSFG   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |WT03   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |WT08   |1949     |1949    |
|ACW00011604|17.1167 |-61.7833 |WT16   |1949     |1949    |
|ACW00011647|17.1333 |-61.7833 |TMAX   |1961     |1961    |
|ACW00011647|17.1333 |-61.7833 |TMIN   |1961     |1961    |
|ACW00011647|17.1333 |-61.7833 |PRCP   |

725754

In [147]:
stations = (
stations
.withColumn("CODE", stations.ID[0:2]))
stations.show(15, False)

+-----------+--------+---------+---------+----------+-------------------------------+--------+------------+------+----+
|ID         |LATITUDE|LONGITUDE|ELEVATION|STATE_CODE|STATION_NAME                   |GSN_FLAG|HCN_CRN_FLAG|WMO_ID|CODE|
+-----------+--------+---------+---------+----------+-------------------------------+--------+------------+------+----+
|ACW00011604|17.116  |-61.783  |10.1     |          |ST JOHNS COOLIDGE FLD          |        |            |      |AC  |
|ACW00011647|17.133  |-61.783  |19.2     |          |ST JOHNS                       |        |            |      |AC  |
|AE000041196|25.333  |55.517   |34.0     |          |SHARJAH INTER. AIRP            |GSN     |            |41196 |AE  |
|AEM00041194|25.255  |55.364   |10.4     |          |DUBAI INTL                     |        |            |41194 |AE  |
|AEM00041217|24.433  |54.651   |26.8     |          |ABU DHABI INTL                 |        |            |41217 |AE  |
|AEM00041218|24.262  |55.609   |264.9   

In [148]:
stations_countries = stations.join(
  countries,
  on="CODE",
  how="left"
)

stations_countries.show(15,False)

+----+-----------+--------+---------+---------+----------+-------------------------------+--------+------------+------+---------------------+
|CODE|ID         |LATITUDE|LONGITUDE|ELEVATION|STATE_CODE|STATION_NAME                   |GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_NAME         |
+----+-----------+--------+---------+---------+----------+-------------------------------+--------+------------+------+---------------------+
|AC  |ACW00011604|17.116  |-61.783  |10.1     |          |ST JOHNS COOLIDGE FLD          |        |            |      |Antigua and Barbuda  |
|AC  |ACW00011647|17.133  |-61.783  |19.2     |          |ST JOHNS                       |        |            |      |Antigua and Barbuda  |
|AE  |AE000041196|25.333  |55.517   |34.0     |          |SHARJAH INTER. AIRP            |GSN     |            |41196 |United Arab Emirates |
|AE  |AEM00041194|25.255  |55.364   |10.4     |          |DUBAI INTL                     |        |            |41194 |United Arab Emirates |
|AE  |

In [155]:
stations_states = (stations_countries.join(
  states,
  on="STATE_CODE",
  how="left"
).fillna({"STATE_NAME":""}))


stations_states.show(15,False)

+----------+----+-----------+--------+---------+---------+-------------------------------+--------+------------+------+---------------------+----------+
|STATE_CODE|CODE|ID         |LATITUDE|LONGITUDE|ELEVATION|STATION_NAME                   |GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_NAME         |STATE_NAME|
+----------+----+-----------+--------+---------+---------+-------------------------------+--------+------------+------+---------------------+----------+
|          |AC  |ACW00011604|17.116  |-61.783  |10.1     |ST JOHNS COOLIDGE FLD          |        |            |      |Antigua and Barbuda  |          |
|          |AC  |ACW00011647|17.133  |-61.783  |19.2     |ST JOHNS                       |        |            |      |Antigua and Barbuda  |          |
|          |AE  |AE000041196|25.333  |55.517   |34.0     |SHARJAH INTER. AIRP            |GSN     |            |41196 |United Arab Emirates |          |
|          |AE  |AEM00041194|25.255  |55.364   |10.4     |DUBAI INTL              

In [156]:
for colname in stations_states.columns:
    if isinstance(stations_states.schema[colname].dataType,
                 StringType):
        stations_states = stations_states.withColumn(colname, F.trim(F.col(colname)))
        
stations_states.show(15,False)

+----------+----+-----------+--------+---------+---------+----------------------+--------+------------+------+--------------------+----------+
|STATE_CODE|CODE|ID         |LATITUDE|LONGITUDE|ELEVATION|STATION_NAME          |GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_NAME        |STATE_NAME|
+----------+----+-----------+--------+---------+---------+----------------------+--------+------------+------+--------------------+----------+
|          |AC  |ACW00011604|17.116  |-61.783  |10.1     |ST JOHNS COOLIDGE FLD |        |            |      |Antigua and Barbuda |          |
|          |AC  |ACW00011647|17.133  |-61.783  |19.2     |ST JOHNS              |        |            |      |Antigua and Barbuda |          |
|          |AE  |AE000041196|25.333  |55.517   |34.0     |SHARJAH INTER. AIRP   |GSN     |            |41196 |United Arab Emirates|          |
|          |AE  |AEM00041194|25.255  |55.364   |10.4     |DUBAI INTL            |        |            |41194 |United Arab Emirates|          |

In [157]:
null_filled_df = stations_states.withColumn("STATE_CODE", 
                                            F.when(F.col("STATE_CODE")== "" , "N/A")
                                            .otherwise(F.col("STATE_CODE")))


    
null_filled_df.show(15,False)

+----------+----+-----------+--------+---------+---------+----------------------+--------+------------+------+--------------------+----------+
|STATE_CODE|CODE|ID         |LATITUDE|LONGITUDE|ELEVATION|STATION_NAME          |GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_NAME        |STATE_NAME|
+----------+----+-----------+--------+---------+---------+----------------------+--------+------------+------+--------------------+----------+
|N/A       |AC  |ACW00011604|17.116  |-61.783  |10.1     |ST JOHNS COOLIDGE FLD |        |            |      |Antigua and Barbuda |          |
|N/A       |AC  |ACW00011647|17.133  |-61.783  |19.2     |ST JOHNS              |        |            |      |Antigua and Barbuda |          |
|N/A       |AE  |AE000041196|25.333  |55.517   |34.0     |SHARJAH INTER. AIRP   |GSN     |            |41196 |United Arab Emirates|          |
|N/A       |AE  |AEM00041194|25.255  |55.364   |10.4     |DUBAI INTL            |        |            |41194 |United Arab Emirates|          |

In [183]:
years = inventories.groupby("ID").agg(F.min("FIRSTYEAR").alias("FIRSTYEAR"), F.max("LASTYEAR").alias("LASTYEAR"),
                                      F.countDistinct("ELEMENT").alias("TOTAL_ELEMENTS"))
years.show()

+-----------+---------+--------+--------------+
|         ID|FIRSTYEAR|LASTYEAR|TOTAL_ELEMENTS|
+-----------+---------+--------+--------------+
|AEM00041217|     1983|    2022|             4|
|AGE00147708|     1879|    2022|             5|
|AGE00147714|     1896|    1938|             3|
|AGM00060452|     1985|    2022|             4|
|AGM00060511|     1983|    2022|             5|
|AJ000037749|     1936|    2022|             5|
|ALE00100939|     1940|    2000|             2|
|AQC00914021|     1955|    1957|            10|
|AQC00914424|     1969|    1975|             5|
|AQC00914873|     1955|    1967|            12|
|AR000000002|     1981|    2000|             1|
|AR000087374|     1956|    2022|             5|
|AR000870470|     1956|    2022|             5|
|AR000877500|     1956|    2022|             5|
|ARM00087480|     1965|    2022|             5|
|ARM00087509|     1973|    2022|             5|
|ARM00087679|     1973|    2022|             4|
|ASN00001006|     1951|    2022|        

In [173]:
for colname in inventories.columns:
    if isinstance(inventories.schema[colname].dataType,
                 StringType):
        inventories = inventories.withColumn(colname, F.trim(F.col(colname)))
        
inventories.filter(inventories.ELEMENT == "").show(truncate=False)


+---+--------+---------+-------+---------+--------+
|ID |LATITUDE|LONGITUDE|ELEMENT|FIRSTYEAR|LASTYEAR|
+---+--------+---------+-------+---------+--------+
+---+--------+---------+-------+---------+--------+



In [191]:
core_elements = inventories.select(["ID", "ELEMENT"]).where(
    (inventories.ELEMENT == "PRCP") |
    (inventories.ELEMENT == "SNOW") |
    (inventories.ELEMENT == "SNWD") |
    (inventories.ELEMENT == "TMAX") |
    (inventories.ELEMENT == "TMIN")).groupBy("ID").pivot("ELEMENT").agg({"ELEMENT":"count"})

In [193]:
other_elements = inventories.select(["ID", "ELEMENT"]).where(
    (inventories.ELEMENT != "PRCP") |
    (inventories.ELEMENT != "SNOW") |
    (inventories.ELEMENT != "SNWD") |
    (inventories.ELEMENT != "TMAX") |
    (inventories.ELEMENT != "TMIN")).groupBy("ID").agg({"ELEMENT":"count"}).select(F.col("ID"), F.col("count(ELEMENT)").alias("OTHER_ELEMENTS"))

In [195]:
inventory_elements = (years.join(
  core_elements,
  on="ID",
  how="left"
)
  .join(
  other_elements,
  on="ID",
  how="left"
).fillna(0))

inventory_elements.show()

+-----------+---------+--------+--------------+----+----+----+----+----+--------------+
|         ID|FIRSTYEAR|LASTYEAR|TOTAL_ELEMENTS|PRCP|SNOW|SNWD|TMAX|TMIN|OTHER_ELEMENTS|
+-----------+---------+--------+--------------+----+----+----+----+----+--------------+
|AEM00041217|     1983|    2022|             4|   1|   0|   0|   1|   1|             4|
|AGE00147708|     1879|    2022|             5|   1|   0|   1|   1|   1|             5|
|AGE00147710|     1909|    2009|             4|   1|   0|   0|   1|   1|             4|
|AGE00147714|     1896|    1938|             3|   1|   0|   0|   1|   1|             3|
|AGE00147719|     1888|    2022|             4|   1|   0|   0|   1|   1|             4|
|AGM00060360|     1945|    2022|             4|   1|   0|   0|   1|   1|             4|
|AGM00060445|     1957|    2022|             5|   1|   0|   1|   1|   1|             5|
|AGM00060452|     1985|    2022|             4|   1|   0|   0|   1|   1|             4|
|AGM00060511|     1983|    2022|

In [207]:
inv_count = inventory_elements.select(["ID", "PRCP", "SNOW", "SNWD", "TMAX", "TMIN"]).where(
    (inventory_elements.PRCP != 0) &
    (inventory_elements.SNOW != 0) &
    (inventory_elements.SNWD != 0) &
    (inventory_elements.TMAX != 0) &
    (inventory_elements.TMIN != 0)
)

inv_count.count()

20300

In [205]:
prcp_count = (
    inventory_elements.where(
    (inventory_elements.PRCP != 0) &
    (inventory_elements.TOTAL_ELEMENTS == inventory_elements.PRCP)
))
prcp_count.count()


16159

In [209]:
data = (null_filled_df.join(
  inventory_elements,
  on="ID",
  how="left"
))
        
data.show()

+-----------+----------+----+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+---------+--------+--------------+----+----+----+----+----+--------------+
|         ID|STATE_CODE|CODE|LATITUDE|LONGITUDE|ELEVATION|        STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|        COUNTRY_NAME|STATE_NAME|FIRSTYEAR|LASTYEAR|TOTAL_ELEMENTS|PRCP|SNOW|SNWD|TMAX|TMIN|OTHER_ELEMENTS|
+-----------+----------+----+--------+---------+---------+--------------------+--------+------------+------+--------------------+----------+---------+--------+--------------+----+----+----+----+----+--------------+
|AEM00041217|       N/A|  AE|  24.433|   54.651|     26.8|      ABU DHABI INTL|        |            | 41217|United Arab Emirates|          |     1983|    2022|             4|   1|   0|   0|   1|   1|             4|
|AGE00147708|       N/A|  AG|   36.72|     4.05|    222.0|          TIZI OUZOU|        |            | 60395|             Algeria|          |

In [226]:
name = "ndu31"

data.write.parquet(f"hdfs:///user/{name}/outputs/ghcnd/stations.parquet", mode="overwrite")



In [231]:
!hdfs dfs -rmr /user/ndu31/outputs/ghcnd/stations

rmr: DEPRECATED: Please use '-rm -r' instead.
Deleted /user/ndu31/outputs/ghcnd/stations


In [232]:
!hdfs dfs -ls /user/ndu31/outputs/ghcnd/

Found 1 items
drwxr-xr-x   - ndu31 ndu31          0 2022-09-21 21:23 /user/ndu31/outputs/ghcnd/stations.parquet


In [234]:
stations = (
spark.read.format("parquet")
.load(f"hdfs:///user/{name}/outputs/ghcnd/stations.parquet"))
stations.show(15,False)

+-----------+----------+----+--------+---------+---------+-------------------+--------+------------+------+--------------------+----------+---------+--------+--------------+----+----+----+----+----+--------------+
|ID         |STATE_CODE|CODE|LATITUDE|LONGITUDE|ELEVATION|STATION_NAME       |GSN_FLAG|HCN_CRN_FLAG|WMO_ID|COUNTRY_NAME        |STATE_NAME|FIRSTYEAR|LASTYEAR|TOTAL_ELEMENTS|PRCP|SNOW|SNWD|TMAX|TMIN|OTHER_ELEMENTS|
+-----------+----------+----+--------+---------+---------+-------------------+--------+------------+------+--------------------+----------+---------+--------+--------------+----+----+----+----+----+--------------+
|AE000041196|N/A       |AE  |25.333  |55.517   |34.0     |SHARJAH INTER. AIRP|GSN     |            |41196 |United Arab Emirates|          |1944     |2022    |4             |1   |0   |0   |1   |1   |4             |
|AEM00041218|N/A       |AE  |24.262  |55.609   |264.9    |AL AIN INTL        |        |            |41218 |United Arab Emirates|          |1994 

In [235]:
daily = (
    daily_info.join(
    stations,
    on="ID",
    how="left")
)

daily.show()

+-----------+--------+-------+-----+----------------+------------+-----------+----------------+----------+----+--------+---------+---------+-------------------+--------+------------+------+--------------------+----------+---------+--------+--------------+----+----+----+----+----+--------------+
|         ID|    DATE|ELEMENT|VALUE|MEASUREMENT_FLAG|QUALITY_FLAG|SOURCE_FLAG|OBSERVATION_TIME|STATE_CODE|CODE|LATITUDE|LONGITUDE|ELEVATION|       STATION_NAME|GSN_FLAG|HCN_CRN_FLAG|WMO_ID|        COUNTRY_NAME|STATE_NAME|FIRSTYEAR|LASTYEAR|TOTAL_ELEMENTS|PRCP|SNOW|SNWD|TMAX|TMIN|OTHER_ELEMENTS|
+-----------+--------+-------+-----+----------------+------------+-----------+----------------+----------+----+--------+---------+---------+-------------------+--------+------------+------+--------------------+----------+---------+--------+--------------+----+----+----+----+----+--------------+
|AE000041196|20220101|   TAVG|204.0|               H|        null|          S|            null|       N/A|  AE| 

In [240]:
nullcount = (daily.select(["ID", "STATION_NAME"])
            .where(daily.STATION_NAME.isNull()))

nullcount.count()



0

In [241]:
nullcount2 = (daily.select("ID")
              .subtract(stations.select("ID")))
              
nullcount2.count()

0

In [245]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()