In [32]:
from pathlib import Path
import pandas as pd
from datetime import timedelta
import argparse
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types
from pyspark.sql import SQLContext
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/Users/admin/Downloads/data-pipeline-396809-583622d42f37.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('Monkey D.Luffy') \
    .set("spark.jars", "/Users/admin/Public/project/noaa-climate-datasets/lib/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

23/08/27 14:42:07 WARN Utils: Your hostname, ThienLes-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
23/08/27 14:42:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/08/27 14:42:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark = SparkSession.builder \
    .appName('Monkey D.Luffy') \
    .getOrCreate()

In [73]:
df_vm = spark.read.parquet("gs://noaa_ghcn_data_lake_data-pipeline-396809/data/pq/climate/2022/countryCode=VM/*")\
    .withColumn('date', F.to_date('DateStr', "yyyyMMdd"))

In [57]:
df_station = spark.read.parquet("gs://noaa_ghcn_data_lake_data-pipeline-396809/data/pq/station/stations.parquet")

                                                                                

In [52]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext.sql("set spark.sql.caseSensitive=true")

DataFrame[key: string, value: string]

In [58]:
df_vm = df_vm.withColumn('date', F.to_date('DateStr', "yyyyMMdd"))

In [76]:
df_vm = df_vm \
    .join(df_station, df_vm.StationId == df_station.stationId) \
    .drop('stationId')

In [74]:
df_vm.take(5)

                                                                                

[Row(StationId='VM000048900', DateStr='20220330', Element='TAVG', Value=289.0, M-Flag='H', Q-Flag=None, S-Flag='S', Obs-Time=None, date=datetime.date(2022, 3, 30)),
 Row(StationId='VM000048900', DateStr='20220513', Element='TAVG', Value=299.0, M-Flag='H', Q-Flag=None, S-Flag='S', Obs-Time=None, date=datetime.date(2022, 5, 13)),
 Row(StationId='VMM00048806', DateStr='20220330', Element='TMAX', Value=265.0, M-Flag=None, Q-Flag=None, S-Flag='S', Obs-Time=None, date=datetime.date(2022, 3, 30)),
 Row(StationId='VMM00048806', DateStr='20220513', Element='TMAX', Value=318.0, M-Flag=None, Q-Flag=None, S-Flag='S', Obs-Time=None, date=datetime.date(2022, 5, 13)),
 Row(StationId='VMM00048806', DateStr='20220330', Element='TMIN', Value=180.0, M-Flag=None, Q-Flag=None, S-Flag='S', Obs-Time=None, date=datetime.date(2022, 3, 30))]

In [77]:
df_vm.createOrReplaceTempView("VM")

In [78]:
countryCode = "VM"
df = spark.sql(f"""
select 
    StationId, 
    Element, 
    year(date) AS Year, 
    name as City,
    AVG(Value)/10 as average_tmp,
    MAX(Value)/10 as max_tmp,
    Min(Value)/10 as min_tmp
from 
    {countryCode}
where 
    Element = 'TAVG'
group by 
    1, 2, 3, 4
""")

In [79]:
df.take(100)

                                                                                

[Row(StationId='VMM00048887', Element='TAVG', Year=2022, City='PHAN THIET', average_tmp=27.628169014084506, max_tmp=30.3, min_tmp=24.2),
 Row(StationId='VMM00048826', Element='TAVG', Year=2022, City='PHU LIEN', average_tmp=23.69774647887324, max_tmp=31.5, min_tmp=7.4),
 Row(StationId='VMM00048825', Element='TAVG', Year=2022, City='HA DONG', average_tmp=24.5087323943662, max_tmp=34.1, min_tmp=8.9),
 Row(StationId='VMM00048840', Element='TAVG', Year=2022, City='THANH HOA', average_tmp=24.434084507042254, max_tmp=33.8, min_tmp=9.0),
 Row(StationId='VMW00041010', Element='TAVG', Year=2022, City='NHA TRANG', average_tmp=27.31605633802817, max_tmp=31.2, min_tmp=23.3),
 Row(StationId='VMM00048845', Element='TAVG', Year=2022, City='VINH', average_tmp=24.8487323943662, max_tmp=34.6, min_tmp=10.1),
 Row(StationId='VM000048900', Element='TAVG', Year=2022, City='TAN SON HOA', average_tmp=28.247945205479454, max_tmp=31.7, min_tmp=24.9),
 Row(StationId='VMM00048917', Element='TAVG', Year=2022, City=

In [26]:
df_all = spark.read.parquet("gs://noaa_ghcn_data_lake_swift-arcadia-387709/data/pq/climate/2022/*")

                                                                                

In [27]:
df_all = df_all.withColumn('countryCode', F.substring("StationId", 0, 2))

In [28]:
df_all.createOrReplaceTempView("noaa_all")

In [31]:
count_data = spark.sql("""
select Element, countryCode, count(*) from noaa_all as num_records
group by Element, countryCode
order by 3 desc
""")

In [33]:
count_data.take(100)

                                                                                

[Row(Element='PRCP', countryCode='US', count(1)=7789214),
 Row(Element='SNOW', countryCode='US', count(1)=5199590),
 Row(Element='TMAX', countryCode='US', count(1)=2656927),
 Row(Element='TMIN', countryCode='US', count(1)=2653396),
 Row(Element='SNWD', countryCode='US', count(1)=2461821),
 Row(Element='TOBS', countryCode='US', count(1)=1616423),
 Row(Element='PRCP', countryCode='AS', count(1)=1391158),
 Row(Element='TAVG', countryCode='US', count(1)=833241),
 Row(Element='WESD', countryCode='US', count(1)=528067),
 Row(Element='PRCP', countryCode='CA', count(1)=484711),
 Row(Element='AWND', countryCode='US', count(1)=418447),
 Row(Element='WSF2', countryCode='US', count(1)=394645),
 Row(Element='WDF2', countryCode='US', count(1)=394547),
 Row(Element='WSF5', countryCode='US', count(1)=382282),
 Row(Element='WDF5', countryCode='US', count(1)=381795),
 Row(Element='TMAX', countryCode='CA', count(1)=335544),
 Row(Element='TMIN', countryCode='CA', count(1)=335122),
 Row(Element='WESF', cou