# Process with Spark
2023.01.26

Objective: Clean parquetized weather station files. 

In [100]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, DateType, DoubleType
from pyspark.sql.functions import col, lit, array, explode, arrays_zip, posexplode, \
    sum as sum_, max as max_, min as min_, year, month

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('process-weather-stations') \
    .getOrCreate()

23/01/28 13:18:51 WARN Utils: Your hostname, hp-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.167 instead (on interface wlp1s0)
23/01/28 13:18:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/28 13:18:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
!ls ../data

AQC00914594.csv      CQC00914080.csv	  CQC00914855.csv
AQC00914594.parquet  CQC00914080.parquet  CQC00914855.parquet
AQC00914902.csv      CQC00914801.csv
AQC00914902.parquet  CQC00914801.parquet


In [85]:
df_t = spark.read.parquet('../data/*.parquet')

In [86]:
df_t.count()

27799

In [10]:
df_t.printSchema()

root
 |-- STATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- DATE: date (nullable = true)
 |-- HR00Val: long (nullable = true)
 |-- HR00MF: string (nullable = true)
 |-- HR00QF: string (nullable = true)
 |-- HR00S1: string (nullable = true)
 |-- HR00S2: string (nullable = true)
 |-- HR01Val: long (nullable = true)
 |-- HR01MF: string (nullable = true)
 |-- HR01QF: string (nullable = true)
 |-- HR01S1: string (nullable = true)
 |-- HR01S2: string (nullable = true)
 |-- HR02Val: long (nullable = true)
 |-- HR02MF: string (nullable = true)
 |-- HR02QF: string (nullable = true)
 |-- HR02S1: string (nullable = true)
 |-- HR02S2: string (nullable = true)
 |-- HR03Val: long (nullable = true)
 |-- HR03MF: string (nullable = true)
 |-- HR03QF: string (nullable = true)
 |-- HR03S1: string (nullable = true)
 |-- HR03S2: string (nullable = true)
 |-- HR0

In [12]:
df_t.schema

StructType([StructField('STATION', StringType(), True), StructField('NAME', StringType(), True), StructField('LATITUDE', DoubleType(), True), StructField('LONGITUDE', DoubleType(), True), StructField('ELEVATION', DoubleType(), True), StructField('DATE', DateType(), True), StructField('HR00Val', LongType(), True), StructField('HR00MF', StringType(), True), StructField('HR00QF', StringType(), True), StructField('HR00S1', StringType(), True), StructField('HR00S2', StringType(), True), StructField('HR01Val', LongType(), True), StructField('HR01MF', StringType(), True), StructField('HR01QF', StringType(), True), StructField('HR01S1', StringType(), True), StructField('HR01S2', StringType(), True), StructField('HR02Val', LongType(), True), StructField('HR02MF', StringType(), True), StructField('HR02QF', StringType(), True), StructField('HR02S1', StringType(), True), StructField('HR02S2', StringType(), True), StructField('HR03Val', LongType(), True), StructField('HR03MF', StringType(), True), 

In [11]:
df_t.show(1,False,True)

23/01/26 19:57:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

-RECORD 0--------------------
 STATION   | AQC00914594     
 NAME      | MALAELOA, AS AQ 
 LATITUDE  | -14.33333       
 LONGITUDE | -170.76667      
 ELEVATION | 42.4            
 DATE      | 2019-09-29      
 HR00Val   | -9999           
 HR00MF    |                 
 HR00QF    |                 
 HR00S1    | H               
 HR00S2    |                 
 HR01Val   | -9999           
 HR01MF    |                 
 HR01QF    |                 
 HR01S1    | H               
 HR01S2    |                 
 HR02Val   | -9999           
 HR02MF    |                 
 HR02QF    |                 
 HR02S1    | H               
 HR02S2    |                 
 HR03Val   | -9999           
 HR03MF    |                 
 HR03QF    |                 
 HR03S1    | H               
 HR03S2    |                 
 HR04Val   | -9999           
 HR04MF    |                 
 HR04QF    |                 
 HR04S1    | H               
 HR04S2    |                 
 HR05Val   | -9999           
 HR05MF   

In [3]:
station_schema = StructType([StructField('STATION', StringType(), True),
 StructField('NAME', StringType(), True),
 StructField('LATITUDE', DoubleType(), True),
 StructField('LONGITUDE', DoubleType(), True),
 StructField('ELEVATION', DoubleType(), True),
 StructField('DATE', DateType(), True),
 StructField('HR00Val', LongType(), True),
 StructField('HR00MF', StringType(), True),
 StructField('HR00QF', StringType(), True),
 StructField('HR00S1', StringType(), True),
 StructField('HR00S2', StringType(), True),
 StructField('HR01Val', LongType(), True),
 StructField('HR01MF', StringType(), True),
 StructField('HR01QF', StringType(), True),
 StructField('HR01S1', StringType(), True),
 StructField('HR01S2', StringType(), True),
 StructField('HR02Val', LongType(), True),
 StructField('HR02MF', StringType(), True),
 StructField('HR02QF', StringType(), True),
 StructField('HR02S1', StringType(), True),
 StructField('HR02S2', StringType(), True),
 StructField('HR03Val', LongType(), True),
 StructField('HR03MF', StringType(), True),
 StructField('HR03QF', StringType(), True),
 StructField('HR03S1', StringType(), True),
 StructField('HR03S2', StringType(), True),
 StructField('HR04Val', LongType(), True),
 StructField('HR04MF', StringType(), True),
 StructField('HR04QF', StringType(), True),
 StructField('HR04S1', StringType(), True),
 StructField('HR04S2', StringType(), True),
 StructField('HR05Val', LongType(), True),
 StructField('HR05MF', StringType(), True),
 StructField('HR05QF', StringType(), True),
 StructField('HR05S1', StringType(), True),
 StructField('HR05S2', StringType(), True),
 StructField('HR06Val', LongType(), True),
 StructField('HR06MF', StringType(), True),
 StructField('HR06QF', StringType(), True),
 StructField('HR06S1', StringType(), True),
 StructField('HR06S2', StringType(), True),
 StructField('HR07Val', LongType(), True),
 StructField('HR07MF', StringType(), True),
 StructField('HR07QF', StringType(), True),
 StructField('HR07S1', StringType(), True),
 StructField('HR07S2', StringType(), True),
 StructField('HR08Val', LongType(), True),
 StructField('HR08MF', StringType(), True),
 StructField('HR08QF', StringType(), True),
 StructField('HR08S1', StringType(), True),
 StructField('HR08S2', StringType(), True),
 StructField('HR09Val', LongType(), True),
 StructField('HR09MF', StringType(), True),
 StructField('HR09QF', StringType(), True),
 StructField('HR09S1', StringType(), True),
 StructField('HR09S2', StringType(), True),
 StructField('HR10Val', LongType(), True),
 StructField('HR10MF', StringType(), True),
 StructField('HR10QF', StringType(), True),
 StructField('HR10S1', StringType(), True),
 StructField('HR10S2', StringType(), True),
 StructField('HR11Val', LongType(), True),
 StructField('HR11MF', StringType(), True),
 StructField('HR11QF', StringType(), True),
 StructField('HR11S1', StringType(), True),
 StructField('HR11S2', StringType(), True),
 StructField('HR12Val', LongType(), True),
 StructField('HR12MF', StringType(), True),
 StructField('HR12QF', StringType(), True),
 StructField('HR12S1', StringType(), True),
 StructField('HR12S2', StringType(), True),
 StructField('HR13Val', LongType(), True),
 StructField('HR13MF', StringType(), True),
 StructField('HR13QF', StringType(), True),
 StructField('HR13S1', StringType(), True),
 StructField('HR13S2', StringType(), True),
 StructField('HR14Val', LongType(), True),
 StructField('HR14MF', StringType(), True),
 StructField('HR14QF', StringType(), True),
 StructField('HR14S1', StringType(), True),
 StructField('HR14S2', StringType(), True),
 StructField('HR15Val', LongType(), True),
 StructField('HR15MF', StringType(), True),
 StructField('HR15QF', StringType(), True),
 StructField('HR15S1', StringType(), True),
 StructField('HR15S2', StringType(), True),
 StructField('HR16Val', LongType(), True),
 StructField('HR16MF', StringType(), True),
 StructField('HR16QF', StringType(), True),
 StructField('HR16S1', StringType(), True),
 StructField('HR16S2', StringType(), True),
 StructField('HR17Val', LongType(), True),
 StructField('HR17MF', StringType(), True),
 StructField('HR17QF', StringType(), True),
 StructField('HR17S1', StringType(), True),
 StructField('HR17S2', StringType(), True),
 StructField('HR18Val', LongType(), True),
 StructField('HR18MF', StringType(), True),
 StructField('HR18QF', StringType(), True),
 StructField('HR18S1', StringType(), True),
 StructField('HR18S2', StringType(), True),
 StructField('HR19Val', LongType(), True),
 StructField('HR19MF', StringType(), True),
 StructField('HR19QF', StringType(), True),
 StructField('HR19S1', StringType(), True),
 StructField('HR19S2', StringType(), True),
 StructField('HR20Val', LongType(), True),
 StructField('HR20MF', StringType(), True),
 StructField('HR20QF', StringType(), True),
 StructField('HR20S1', StringType(), True),
 StructField('HR20S2', StringType(), True),
 StructField('HR21Val', LongType(), True),
 StructField('HR21MF', StringType(), True),
 StructField('HR21QF', StringType(), True),
 StructField('HR21S1', StringType(), True),
 StructField('HR21S2', StringType(), True),
 StructField('HR22Val', LongType(), True),
 StructField('HR22MF', StringType(), True),
 StructField('HR22QF', StringType(), True),
 StructField('HR22S1', StringType(), True),
 StructField('HR22S2', StringType(), True),
 StructField('HR23Val', LongType(), True),
 StructField('HR23MF', StringType(), True),
 StructField('HR23QF', StringType(), True),
 StructField('HR23S1', StringType(), True),
 StructField('HR23S2', StringType(), True),
 StructField('DlySum', LongType(), True),
 StructField('DlySumMF', StringType(), True),
 StructField('DlySumQF', StringType(), True),
 StructField('DlySumS1', StringType(), True),
 StructField('DlySumS2', StringType(), True)])

In [87]:
df_t = spark.read.schema(station_schema).parquet('../data/*.parquet')

In [5]:
df_t.show(1,False,True)

23/01/28 13:20:04 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

-RECORD 0--------------------
 STATION   | AQC00914594     
 NAME      | MALAELOA, AS AQ 
 LATITUDE  | -14.33333       
 LONGITUDE | -170.76667      
 ELEVATION | 42.4            
 DATE      | 2019-09-29      
 HR00Val   | -9999           
 HR00MF    |                 
 HR00QF    |                 
 HR00S1    | H               
 HR00S2    |                 
 HR01Val   | -9999           
 HR01MF    |                 
 HR01QF    |                 
 HR01S1    | H               
 HR01S2    |                 
 HR02Val   | -9999           
 HR02MF    |                 
 HR02QF    |                 
 HR02S1    | H               
 HR02S2    |                 
 HR03Val   | -9999           
 HR03MF    |                 
 HR03QF    |                 
 HR03S1    | H               
 HR03S2    |                 
 HR04Val   | -9999           
 HR04MF    |                 
 HR04QF    |                 
 HR04S1    | H               
 HR04S2    |                 
 HR05Val   | -9999           
 HR05MF   

In [None]:
# separate station metrics from hourly values; station metrics can be found here: HPD_v02r02_stationinv_c20221129.csv 
# unstack hourly values
# replace -9999 with Null (python None)
# partition by year, month

In [7]:
# cols_nested = [[f'HR{i:02}Val', f'HR{i:02}MF', f'HR{i:02}QF', f'HR{i:02}S1', f'HR{i:02}S2'] for i in range(24)]
# cols = [col for col_list in cols_nested for col in col_list]

In [40]:
# function to generate hourly columns
def generate_cols_as_array(col_name):
    return array([col(f'HR{i:02}{col_name}') for i in range(24)]).alias(col_name)

In [105]:
# unstack hourly columns
col_types = ['Val', 'MF', 'QF', 'S1', 'S2']

df_t2 = (df_t.select('STATION', 'DATE', arrays_zip(*[generate_cols_as_array(c) for c in col_types]
                                          ).alias('zip'))  # generate zipped arrays for each hour
 .select('*', posexplode('zip').alias('hr', 'exp'))  # explode zipped arrays
 .select('*', *[col('exp')[c].alias(c) for c in col_types],  # extract array
        )
 .select('STATION', 'DATE', 'hr', *col_types)
)

In [89]:
df_t2.show(50)

+-----------+----------+---+---+---+---+---+---+
|    STATION|      DATE| hr|Val| MF| QF| S1| S2|
+-----------+----------+---+---+---+---+---+---+
|CQC00914855|1979-09-01|  0|  0|  g|   |  4|   |
|CQC00914855|1979-09-01|  1|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  2|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  3|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  4|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  5|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  6|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  7|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  8|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01|  9|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01| 10|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01| 11|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01| 12|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01| 13|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01| 14|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01| 15|  0|  Z|   |  4|   |
|CQC00914855|1979-09-01| 16|  0|  Z|   |  4|   |
|CQC00914855|1979-09

In [106]:
# replace -9999 values and ' ' strings with null
df_t2 = (df_t2.replace({-9999: None}, subset=['Val'])
     .replace({' ': None}, subset=['MF', 'QF', 'S1', 'S2'])
)

In [91]:
df_t2.show(50)

+-----------+----------+---+---+---+----+---+----+
|    STATION|      DATE| hr|Val| MF|  QF| S1|  S2|
+-----------+----------+---+---+---+----+---+----+
|CQC00914855|1979-09-01|  0|  0|  g|null|  4|null|
|CQC00914855|1979-09-01|  1|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  2|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  3|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  4|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  5|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  6|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  7|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  8|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01|  9|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01| 10|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01| 11|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01| 12|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01| 13|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01| 14|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01| 15|  0|  Z|null|  4|null|
|CQC00914855|1979-09-01| 16|  0

In [92]:
df_t2.count()

                                                                                

667176

In [93]:
df_t.count()

27799

In [94]:
df_t.count() * 24

667176

In [68]:
df_t2.select(sum_('Val'), min_('Val'), max_('Val')).show()

+--------+--------+--------+
|sum(Val)|min(Val)|max(Val)|
+--------+--------+--------+
|    7685|       0|     178|
+--------+--------+--------+



In [95]:
df_t2.select(min_('DATE'), max_('DATE')).show()



+----------+----------+
| min(DATE)| max(DATE)|
+----------+----------+
|1979-09-01|2021-02-09|
+----------+----------+



                                                                                

In [98]:
df_t2.write.mode('overwrite').partitionBy('DATE').parquet('../data/out')

                                                                                

In [107]:
df_t2 = df_t2.withColumn('year', year('DATE')).withColumn('month', month('DATE'))

In [108]:
df_t2.write.mode('overwrite').partitionBy('year', 'month').parquet('../data/out')

                                                                                