<a href="https://colab.research.google.com/github/wdpressplus-bigdata/wdpressplus-bigdata/blob/main/notebooks/7-1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# p.xxx List 7.2

import pathlib
import requests

def download_file(filename):
    prefix = 'https://github.com/wdpressplus-bigdata/uscrn/raw/main'
    r = requests.get(f"{prefix}/2020/{filename}")
    r.raise_for_status()
    path = pathlib.Path('./raw')
    path.mkdir(parents=True, exist_ok=True)
    with open(path / filename, 'wb') as f:
        f.write(r.content)
    print(f"Saved {path / filename}")

FILES = [
    'CRNS0101-05-2020-AK_Aleknagik_1_NNE.txt',
    'CRNS0101-05-2020-AK_Bethel_87_WNW.txt',
]
for filename in FILES:
    download_file(filename)

In [None]:
# p.xx

!pip install pandas

In [None]:
import glob
import pandas as pd

def read_tables():
    for path in glob.glob('./raw/*.txt'):
        yield pd.read_table(path, delimiter='\s+', header=None, dtype='str')

df = pd.concat(read_tables())
df.head(2)

In [None]:
df1 = df[[0, 1, 2, 8]]
df1.columns = ['wbanno', 'utc_date', 'utc_time', 'temperature']
df1.head(2)

In [None]:
df2 = df1.copy()
df2.index = pd.to_datetime(df2['utc_date'] + df2['utc_time'])
df2.drop(columns=['utc_date', 'utc_time'], inplace=True)
df2.head(2)

In [None]:
df2.describe().T

In [None]:
df2['temperature'] = df2['temperature'].astype('float')
df2.describe().T

In [None]:
df3 = df2.copy()
df3.loc[df3['temperature'] == -9999.0, 'temperature'] = None
df3.describe().T

In [None]:
# p.xx

!pip install pyspark

In [None]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [None]:
rdd = spark.sparkContext.textFile('./raw/*')
rdd.take(2)

In [None]:
rdd.count()

In [None]:
from datetime import datetime, timezone
from pyspark.sql import Row

def parse_line(line):
    f = line.split()
    wbanno = f[0]
    dt = datetime.strptime(f[1] + f[2], '%Y%m%d%H%M')
    dt = dt.replace(tzinfo=timezone.utc)
    temperature = None if f[8] == '-9999.0' else float(f[8])
    return Row(timestamp=dt, wbanno=wbanno, temperature=temperature)

rows = rdd.map(parse_line)
rows.take(2)

In [None]:
df = rdd.map(parse_line).toDF()
df

In [None]:
spark.conf.set("spark.sql.session.timeZone", 'UTC')

df.show(2)

In [None]:
df.describe().show()

In [None]:
df.createOrReplaceTempView('uscrn')

In [None]:
query = '''
SELECT
  wbanno,
  min_by(timestamp, temperature) timestamp_min,
  min(temperature) t_min,
  max_by(timestamp, temperature) timestamp_max,
  max(temperature) t_max
FROM
  uscrn
GROUP by
  1
'''
spark.sql(query).show()

In [None]:
df.write.save('./uscrn-parquet')

In [None]:
!ls ./uscrn-parquet

In [None]:
df = spark.read.load('./uscrn-parquet')
df.groupBy('wbanno').avg('temperature').show()

In [None]:
# Spark Dataframe

from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.session.timeZone", 'UTC')

df = spark.read.load('./uscrn-parquet')
df1 = df.groupBy('timestamp').avg().toPandas()
df1.sort_values(by='avg(temperature)', ascending=False).head(2)

In [None]:
# Pandas Dataframe

!pip install pyarrow

In [None]:
import pandas as pd
df = pd.read_parquet('./uscrn-parquet')

df1 = df.groupby('timestamp').mean()
df1.sort_values(by='temperature', ascending=False).head(2)

In [None]:
# p.xx

from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.session.timeZone", 'UTC')

df = spark.read.load('./uscrn-parquet')
df1 = df.where("timestamp >= '2020-01-01' AND timestamp < '2020-04-01'")
df1.count()

In [None]:
df1.coalesce(1).write.save('./export', format='csv', header=True)

In [None]:
!ls ./export