In [None]:
from pyspark.sql import SparkSession
from functools import reduce
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd

spark = SparkSession.builder.appName("Ch09 - ex09").getOrCreate()

In [None]:
# ex 9.1
ex9_1 = pd.Series(["red", "blue", "blue", "yellow"])

def color_to_num(colors: pd.Series) -> pd.Series:
    return colors.apply(
        lambda x: {"red": 1, "blue": 2, "yellow": 3}.get(x)
    )

In [None]:
color_to_num(ex9_1)

In [None]:
color_to_num_udf = F.pandas_udf(color_to_num, T.IntegerType())
ex9_1_df = spark.createDataFrame(ex9_1.to_frame())

In [None]:
ex9_1_df.show()

In [None]:
ex9_1_df.select(color_to_num_udf(F.col("0")).alias("num")).show(5)

In [None]:
gsod = (
    reduce(
        lambda x, y: x.unionByName(y, allowMissingColumns=True),
        [
            spark.read.parquet(f"/opt/spark/data/gsod_noaa/gsod{year}.parquet")
            for year in range(2019, 2021)
        ],
    )
    .dropna(subset=["year", "mo", "da", "temp"])
    .where(F.col("temp") != 9999.9)
    .drop("date")
)

In [None]:
# ex 9.2
def temp_to_temp(value: pd.Series, from_temp: str, to_temp: str) -> pd.Series:
    from_temp = str.upper(from_temp)
    to_temp = str.upper(to_temp)

    acceptable_values = ["F", "C", "R", "K"]
    if (
        to_temp not in acceptable_values
        or from_temp not in acceptable_values
    ):
        return value.apply(lambda _: None)

    from_to = {
        ("C", "F"): lambda: value * (9 / 5) + 32,
        ("F", "C"): lambda: (value - 32) * (5 / 9),
        ("C", "K"): lambda: value + 273.15,
        ("K", "C"): lambda: value - 273.15,
        ("C", "R"): lambda: value * (9 / 5) + 491.67,
        ("R", "C"): lambda: (value - 491.67) * (5 / 9),
        ("F", "K"): lambda: (value - 32) * (5 / 9) + 273.15,
        ("K", "F"): lambda: (value - 273.15) * (9 / 5) + 32,
        ("F", "R"): lambda: value + 459.67,
        ("R", "F"): lambda: value - 459.67,
        ("K", "R"): lambda: value * (9 / 5),
        ("R", "K"): lambda: value * (5 / 9)
    }

    convert = from_to[(from_temp, to_temp)]
    return convert()

In [None]:
gsod.select("temp", temp_to_temp(F.col("temp"), "F", "C").alias("temp_c")).show(5, False)

In [None]:
# ex 9.3
def scale_temperature_c(temp_by_day: pd.DataFrame) -> pd.DataFrame:
    """Returns a simple normalization of the temperature for a site.
    If the temperature is constant for the whole window, defaults to 0.5."""
    def f_to_c_temp(temp):
        return (temp - 32.0) * 5.0 / 9.0

    temp = f_to_c_temp(temp_by_day.temp)
    answer = temp_by_day[["stn", "year", "mo", "da", "temp"]]
    if temp.min() == temp.max():
        return answer.assign(temp_norm=0.5)
    return answer.assign(
        temp_norm=(temp - temp.min()) / (temp.max() - temp.min())
    )

In [None]:
# ex 9.4
gsod_ex = gsod.groupby("year", "mo").applyInPandas(scale_temperature_c, schema=T.StructType([
    T.StructField("stn", T.StringType()),
    T.StructField("year", T.StringType()),
    T.StructField("mo", T.StringType()),
    T.StructField("da", T.StringType()),
    T.StructField("temp", T.DoubleType()),
    T.StructField("temp_norm", T.DoubleType())
])).show(5, False)

In [None]:
# ex 9.5
from sklearn.linear_model import LinearRegression
from typing import Sequence

@F.pandas_udf(T.ArrayType(T.DoubleType()))
def rate_of_change_temp(day: pd.Series, temp: pd.Series) -> Sequence[float]:

    fitted = LinearRegression().fit(X=day.astype("int").values.reshape(-1, 1), y=temp)

    return fitted.coef_[0], fitted.intercept_
    

In [None]:
result = gsod.groupby("stn", "year", "mo").agg(
    rate_of_change_temp(gsod["da"], gsod["temp"]).alias(
        "sol_9_5"
    )
)

result.show(5, False)

In [None]:
spark.stop()