In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('weather').getOrCreate()

In [None]:
df = spark.read.text("s3://noaa-ghcn-pds/ghcnd-stations.txt")

In [None]:
from pyspark.sql.functions import trim

stations = df.select(
    df.value.substr(1,11).alias('ID'),
    df.value.substr(13,8).cast('double').alias('LATITUDE'),
    df.value.substr(22,9).cast('double').alias('LONGITUDE'),
    df.value.substr(32,6).cast('double').alias('ELEVATION'),
    trim(df.value.substr(39,2)).alias('STATE'),
    trim(df.value.substr(42,30)).alias('NAME'),
    trim(df.value.substr(73,3)).alias('GSN_FLAG'),
    trim(df.value.substr(77,3)).alias('NETWORK_FLAG'),
    trim(df.value.substr(81,5)).alias('WMO_ID')
)

In [None]:
stations.cache()

In [None]:
stations.show()

In [None]:
stations.filter(("NAME like '%COLUMBUS%' AND STATE = 'OH'")).show()

In [None]:
stations.filter(("NAME like '%PORT COLUMBUS%' AND STATE = 'OH'")).show()

In [None]:
from pyspark.sql.types import *
schema = StructType([
    StructField("ID", StringType(), False),
    StructField("OBS_DATE", DateType(), False),
    StructField("ELEMENT", StringType(), False),
    StructField("DATA_VALUE", IntegerType(), True),
    StructField("M_FLAG", StringType(), True),
    StructField("Q_FLAG", StringType(), True),
    StructField("S_FLAG", StringType(), True),
    StructField("OBS_TIME", StringType(), True)])
df2 = spark.read.csv("s3://noaa-ghcn-pds/csv.gz/2018.csv.gz",schema,dateFormat='yyyyMMdd')

In [None]:
df2.cache()

In [None]:
df2.show()

In [None]:
port_cmh_df = df2.filter(("ID = 'USW00014821' and MONTH(OBS_DATE) = 7"))

In [None]:
port_cmh_df.cache()

In [None]:
port_cmh_df.show()

In [None]:
from pyspark.sql.functions import format_number

cmh_min_temps = (port_cmh_df.filter("ELEMENT = 'TMIN'")
   .withColumn('LOW_TEMP_C', port_cmh_df.DATA_VALUE / 10)
   .withColumn('LOW_TEMP_F', format_number(port_cmh_df.DATA_VALUE * .18 + 32,1)))

In [None]:
cmh_min_temps.select('OBS_DATE', 'LOW_TEMP_C', 'LOW_TEMP_F').show()

In [None]:
cmh_max_temps = (port_cmh_df.filter("ELEMENT = 'TMAX'")
   .withColumn('HIGH_TEMP_C', port_cmh_df.DATA_VALUE / 10)
   .withColumn('HIGH_TEMP_F', format_number(port_cmh_df.DATA_VALUE * .18 + 32,1)))

In [None]:
cmh_max_temps.select('OBS_DATE', 'HIGH_TEMP_C', 'HIGH_TEMP_F').show()

In [None]:
cmh_2018_07_temps = cmh_min_temps.select('OBS_DATE', 'LOW_TEMP_C', 'LOW_TEMP_F').join(cmh_max_temps.select('OBS_DATE', 'HIGH_TEMP_C', 'HIGH_TEMP_F'), 'OBS_DATE')

In [None]:
cmh_2018_07_temps.show()

In [None]:
cmh_2018_07_temps.write.format("com.databricks.spark.csv").option("header", "true").save("s3://YOUR-BUCKET-NAME-HERE/cmh-temps-csv")