In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import subprocess
from multiprocessing import cpu_count

In [2]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
HDFS_EXE = "/home/ubuntu/hadoop-3.3.6/bin/hdfs"
IP = "192.168.1.1"  # rember to set the real IP!
cores = cpu_count()

In [4]:
spark_session = SparkSession.builder\
        .master(f"spark://{IP}:7077")\
        .appName("ECAtemp")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", cores)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

sc = spark_session.sparkContext

sc.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/01 15:35:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Data at a glance

All the data should have been extracted to HDFS: ```/data/tx/``` and ```/data/tn/``` for maximum and minimum temperatures, respectively. In addition, there is a station metadata file: ```/data/station_data.txt```.

Preview the contents of the directories:

In [5]:
for data_id in ("tx", "tn"):
    p = subprocess.run([HDFS_EXE, "dfs", "-ls", f"/data/{data_id}"],
                       check=True, capture_output=True)
    p = subprocess.run("head", input=p.stdout, capture_output=True)
    print(p.stdout.decode())

Found 2064 items
-rw-r--r--   1 ubuntu supergroup    1637898 2024-08-01 10:01 /data/tx/TX_STAID000002.txt
-rw-r--r--   1 ubuntu supergroup    1874571 2024-08-01 10:01 /data/tx/TX_STAID000004.txt
-rw-r--r--   1 ubuntu supergroup    1874582 2024-08-01 10:01 /data/tx/TX_STAID000005.txt
-rw-r--r--   1 ubuntu supergroup    1080231 2024-08-01 10:01 /data/tx/TX_STAID000007.txt
-rw-r--r--   1 ubuntu supergroup    1874586 2024-08-01 10:01 /data/tx/TX_STAID000008.txt
-rw-r--r--   1 ubuntu supergroup    1874592 2024-08-01 10:01 /data/tx/TX_STAID000009.txt
-rw-r--r--   1 ubuntu supergroup    1874571 2024-08-01 10:01 /data/tx/TX_STAID000010.txt
-rw-r--r--   1 ubuntu supergroup    1979781 2024-08-01 10:01 /data/tx/TX_STAID000011.txt
-rw-r--r--   1 ubuntu supergroup    1716813 2024-08-01 10:01 /data/tx/TX_STAID000012.txt

Found 2064 items
-rw-r--r--   1 ubuntu supergroup    1637896 2024-08-01 09:56 /data/tn/TN_STAID000002.txt
-rw-r--r--   1 ubuntu supergroup    1874571 2024-08-01 09:56 /data/tn/TN_ST

Get a preview of the first file in ```/data/tx/```:

In [6]:
p = subprocess.run([HDFS_EXE, "dfs", "-ls", "/data/tx"],
                   check=True, capture_output=True)
p = subprocess.run("head", input=p.stdout, capture_output=True)
sample_file = p.stdout.decode().split("\n")[1].strip().split()[-1]

p = subprocess.run([HDFS_EXE, "dfs", "-cat", sample_file],
                   check=True, capture_output=True)
p = subprocess.run(["head", "-30"], input=p.stdout, capture_output=True)
preview = p.stdout.decode()
print(preview)

EUROPEAN CLIMATE ASSESSMENT & DATASET (ECA&D), file created on: 13-07-2024
THESE DATA CAN BE USED FOR NON-COMMERCIAL RESEARCH AND EDUCATION PROVIDED THAT THE FOLLOWING SOURCE IS ACKNOWLEDGED: 

Klein Tank, A.M.G. and Coauthors, 2002. Daily dataset of 20th-century surface
air temperature and precipitation series for the European Climate Assessment.
Int. J. of Climatol., 22, 1441-1453.
Data and metadata available at http://www.ecad.eu

FILE FORMAT (MISSING VALUE CODE = -9999):

01-06 STAID: Station identifier
08-13 SOUID: Source identifier
15-22 DATE : Date YYYYMMDD
24-28 TX   : Maximum temperature in 0.1 &#176;C
30-34 Q_TX : quality code for TX (0='valid'; 1='suspect'; 9='missing')

This is the blended series of station FALUN, SWEDEN (STAID: 2)
Blended and updated with sources:7 36438 
See files sources.txt and stations.txt for more info.

STAID, SOUID,    DATE,   TX, Q_TX
     2, 36438,19000101,   18,    0
     2, 36438,19000102,  -10,    0
     2, 36438,19000103,  -20,    0
     2, 36

Get a preview of the metadata file:

In [7]:
p = subprocess.run([HDFS_EXE, "dfs", "-cat", "/data/station_data.txt"],
                   check=True, capture_output=True)
p = subprocess.run("head", input=p.stdout, capture_output=True)
preview_meta = p.stdout.decode()
print(preview_meta)

STAID,STANAME,CN,LAT,LON,HGHT
2,FALUN,SE,60.616667,15.616667,160
4,LINKOEPING,SE,58.4,15.533056,93
5,LINKOEPING-MALMSLAETT,SE,58.4,15.533056,93
7,KARLSTAD-AIRPORT,SE,59.444444,13.3375,107
8,OESTERSUND,SE,63.183056,14.483056,376
9,OESTERSUND-FROESOEN,SE,63.183056,14.483056,376
10,STOCKHOLM,SE,59.35,18.05,44
11,KREMSMUENSTER (TAWES),AT,48.055,14.130833,382
12,GRAZ-UNIVERSITAET,AT,47.077778,15.448889,367



### Load & preprocess data

In [8]:
data_root = f"hdfs://{IP}:9000/data/"
rdds = {}
dfs  = {}

for data_id in ("tx", "tn"):
    data_dir = data_root + data_id
    rdds[data_id] = sc.textFile(data_dir)

We need to preprocess the max and min temperature data by exluding the first 20 lines from each file and converting the rest into a proper dataframe:

In [9]:
def Preprocess_step1(rdd):

    # read every file excluding first 20 lines
    df = rdd\
        .zipWithIndex()\
        .filter(lambda x: x[1] >= 20)\
        .map(lambda x: (x[0], ))\
        .toDF()

    # get column names from 1st row
    column_names = df.first()["_1"].split(",")
    column_names = [x.strip() for x in column_names]
    # drop 1st row
    df = df.withColumn("index", monotonically_increasing_id())
    df = df\
        .filter(df.index > 0)\
        .drop("index")

    # form columns
    for i,name in enumerate(column_names):
        df = df.withColumn(name, split(df["_1"], ",").getItem(i))
        df = df.withColumn(name, df[name].cast("int"))
    df = df.drop("_1")

    return df

In [10]:
for data_id in ("tx", "tn"):
    dfs[data_id] = Preprocess_step1(rdds[data_id])
    dfs[data_id].show()

                                                                                

+-----+-----+--------+----+----+
|STAID|SOUID|    DATE|  TX|Q_TX|
+-----+-----+--------+----+----+
|    2|36438|19000101|  18|   0|
|    2|36438|19000102| -10|   0|
|    2|36438|19000103| -20|   0|
|    2|36438|19000104| -30|   0|
|    2|36438|19000105| -60|   0|
|    2|36438|19000106|-150|   0|
|    2|36438|19000107| -90|   0|
|    2|36438|19000108| -40|   0|
|    2|36438|19000109| -40|   0|
|    2|36438|19000110| -10|   0|
|    2|36438|19000111| -24|   0|
|    2|36438|19000112|-120|   0|
|    2|36438|19000113|-100|   0|
|    2|36438|19000114| -60|   0|
|    2|36438|19000115| -70|   0|
|    2|36438|19000116| -10|   0|
|    2|36438|19000117| -20|   0|
|    2|36438|19000118| -20|   0|
|    2|36438|19000119| -34|   0|
|    2|36438|19000120| -20|   0|
+-----+-----+--------+----+----+
only showing top 20 rows



                                                                                

+-----+-----+--------+----+----+
|STAID|SOUID|    DATE|  TN|Q_TN|
+-----+-----+--------+----+----+
|    2|36439|19000101| -30|   0|
|    2|36439|19000102| -50|   0|
|    2|36439|19000103| -70|   0|
|    2|36439|19000104|-110|   0|
|    2|36439|19000105|-190|   0|
|    2|36439|19000106|-220|   0|
|    2|36439|19000107|-200|   0|
|    2|36439|19000108|-110|   0|
|    2|36439|19000109| -80|   0|
|    2|36439|19000110| -60|   0|
|    2|36439|19000111|-160|   0|
|    2|36439|19000112|-210|   0|
|    2|36439|19000113|-220|   0|
|    2|36439|19000114|-130|   0|
|    2|36439|19000115|-190|   0|
|    2|36439|19000116|-110|   0|
|    2|36439|19000117| -60|   0|
|    2|36439|19000118| -60|   0|
|    2|36439|19000119|-130|   0|
|    2|36439|19000120|-120|   0|
+-----+-----+--------+----+----+
only showing top 20 rows



Then we need to convert the date from string to an appropriate type and place NAs where missing values are:

In [13]:
def Preprocess_step2(df, data_id):
    temper_col = data_id.upper()
    df = df.withColumn("DATE", to_date(df["DATE"], format="yyyyMMdd"))
    df = df.withColumn(temper_col, when(df[f"Q_{temper_col}"]==0,
                                        df[temper_col]).otherwise(None))
    return df

In [15]:
for data_id in ("tx", "tn"):
    dfs[data_id] = Preprocess_step2(dfs[data_id], data_id)
    dfs[data_id].show()
    dfs[data_id].printSchema()

                                                                                

+-----+-----+----------+----+----+
|STAID|SOUID|      DATE|  TX|Q_TX|
+-----+-----+----------+----+----+
|    2|36438|1900-01-01|  18|   0|
|    2|36438|1900-01-02| -10|   0|
|    2|36438|1900-01-03| -20|   0|
|    2|36438|1900-01-04| -30|   0|
|    2|36438|1900-01-05| -60|   0|
|    2|36438|1900-01-06|-150|   0|
|    2|36438|1900-01-07| -90|   0|
|    2|36438|1900-01-08| -40|   0|
|    2|36438|1900-01-09| -40|   0|
|    2|36438|1900-01-10| -10|   0|
|    2|36438|1900-01-11| -24|   0|
|    2|36438|1900-01-12|-120|   0|
|    2|36438|1900-01-13|-100|   0|
|    2|36438|1900-01-14| -60|   0|
|    2|36438|1900-01-15| -70|   0|
|    2|36438|1900-01-16| -10|   0|
|    2|36438|1900-01-17| -20|   0|
|    2|36438|1900-01-18| -20|   0|
|    2|36438|1900-01-19| -34|   0|
|    2|36438|1900-01-20| -20|   0|
+-----+-----+----------+----+----+
only showing top 20 rows

root
 |-- STAID: integer (nullable = true)
 |-- SOUID: integer (nullable = true)
 |-- DATE: date (nullable = true)
 |-- TX: integer (n

Lastly, we need to read the station metadata.

In [16]:
meta = spark_session.read.csv(data_root + "station_data.txt", header=True)
for col in ("LAT", "LON"):
    meta = meta.withColumn(col, meta[col].cast("float"))
meta = meta.withColumn("HGHT", meta["HGHT"].cast("int"))
meta.show()
meta.printSchema()

+-----+--------------------+---+---------+---------+----+
|STAID|             STANAME| CN|      LAT|      LON|HGHT|
+-----+--------------------+---+---------+---------+----+
|    2|               FALUN| SE| 60.61667|15.616667| 160|
|    4|          LINKOEPING| SE|     58.4|15.533056|  93|
|    5|LINKOEPING-MALMSL...| SE|     58.4|15.533056|  93|
|    7|    KARLSTAD-AIRPORT| SE|59.444443|  13.3375| 107|
|    8|          OESTERSUND| SE|63.183056|14.483056| 376|
|    9| OESTERSUND-FROESOEN| SE|63.183056|14.483056| 376|
|   10|           STOCKHOLM| SE|    59.35|    18.05|  44|
|   11|KREMSMUENSTER (TA...| AT|   48.055|14.130833| 382|
|   12|   GRAZ-UNIVERSITAET| AT|47.077778|15.448889| 367|
|   13|      INNSBRUCK-UNIV| AT|    47.26|11.384167| 578|
|   14|  SALZBURG-FLUGHAFEN| AT|47.789165|13.008056| 430|
|   15|   SONNBLICK (TAWES)| AT| 47.05417|  12.9575|3109|
|   16|     WIEN-HOHE WARTE| AT|48.248333|16.356388| 198|
|   21|         ZAGREB-GRIC| HR|45.816666|15.978056| 156|
|   28| HELSIN

## TEST CELLS

In [17]:
tst = spark_session.createDataFrame([(18, 0), (-9999, 9), (45, 0), (-56, 1), (-9999, 9)], ("a", "b"))
tst.show()
tst = tst.withColumn("a", when(tst["b"]==0, tst["a"]).otherwise(None))
tst.show()
tst.dropna("any").show()

+-----+---+
|    a|  b|
+-----+---+
|   18|  0|
|-9999|  9|
|   45|  0|
|  -56|  1|
|-9999|  9|
+-----+---+

+----+---+
|   a|  b|
+----+---+
|  18|  0|
|NULL|  9|
|  45|  0|
|NULL|  1|
|NULL|  9|
+----+---+

+---+---+
|  a|  b|
+---+---+
| 18|  0|
| 45|  0|
+---+---+



In [63]:
spark_session.stop()