In [10]:
from pyspark.sql import SparkSession
from functools import reduce
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
from time import sleep
from typing import Iterator, Sequence, Tuple
from sklearn.linear_model import LinearRegression

In this chapter, we work with 10 years' (2010 to 2020) worth of NOAA weather data located in Google BigQuery, which totals over 40 million records. The `bigquery-public-data` is a project available to all.

Here, we read in a large amount of data from a warehouse and assemble a single data frame representing weather information across the globe for a period of 10 years.

In [3]:
# Use the Spark connector package version closest to our installed version of Spark (3.4.1)
spark = SparkSession.builder.config(
    "spark.jars.packages",
    "com.google.cloud.spark:spark-3.3-bigquery:0.32.0"
).config(
    "parentProject", "cool-wharf-393713"
).getOrCreate()

In [4]:
# Abstract the table reading routine into a reusable function, returning the resulting data frame
def read_df_from_bq(year):
    return (
        spark.read.format("bigquery").option(
            "table", f"bigquery-public-data.noaa_gsod.gsod{year}"
        )
        .option("credentialsFile", "../../../../cool-wharf-393713-73800a184f10.json")
        .load()
    )

In [20]:
gsod = (
    reduce(
        # use a lambda function over a list comprehension of data frames to union them all
        lambda x, y: x.unionByName(y, allowMissingColumns=True),
        [read_df_from_bq(year) for year in range(2010, 2021)],
    )
    .dropna(subset=["year", "mo", "da", "temp"])
    .where(F.col("temp") != 9999.9)
    .drop("date")
)

In [13]:
# Pandas scalar UDF that transforms Fahrenheit into Celsius
@F.pandas_udf(T.DoubleType())
def f_to_c(degrees: pd.Series) -> pd.Series:
    """Transforms Fahrenheight to Celcius."""
    return (degrees - 32) *5 / 9

In [21]:
gsod.select("temp").distinct().show(5)

+----+
|temp|
+----+
|69.8|
|74.5|
|64.2|
|76.4|
|15.5|
+----+
only showing top 5 rows



In [22]:
# Create a temp_c column by applying the scalar UDF:
gsod = gsod.withColumn("temp_c", f_to_c(F.col("temp")))
gsod.select("temp", "temp_c").distinct().show(5)

+----+-------------------+
|temp|             temp_c|
+----+-------------------+
|37.2| 2.8888888888888906|
|71.6| 21.999999999999996|
|70.4| 21.333333333333336|
|29.6|-1.3333333333333326|
|-1.1| -18.38888888888889|
+----+-------------------+
only showing top 5 rows



Iterator of Series UDFs are very useful when you have an expensive cold start operation you
need to perform. By cold start, we mean an operation we need to perform once at the
beginning of the processing step, before working through the data. Deserializing a
local ML model (fitted with scikit-learn or another Python modeling library) is an
example: we would need to unpack and read the model once for the whole data
frame, and then it could be used to process all records.

In [26]:
# Pandas Iterator of Series to Iterator of Series UDF
@F.pandas_udf(T.DoubleType())
def f_to_c2(degrees: Iterator[pd.Series]) -> Iterator[pd.Series]:
    """Transforms Farhenheit to Celcius."""
    # We simulate a cold start using sleep() for five seconds.
    # The cold start will happen on each worker once, rather than for every batch.
    sleep(5)
    # We iterate over each batch, using yield (instead of return)
    for batch in degrees:
        yield (batch - 32) * 5 / 9

In [27]:
gsod.select(
    "temp", f_to_c2(F.col("temp")).alias("temp_c")
).distinct().show(5)

+----+-------------------+
|temp|             temp_c|
+----+-------------------+
|37.2| 2.8888888888888906|
|71.6| 21.999999999999996|
|70.4| 21.333333333333336|
|29.6|-1.3333333333333326|
|-1.1| -18.38888888888889|
+----+-------------------+
only showing top 5 rows



We can use an Iterator of multiple Series to Iterator of Series to assemple the `year`, `mo` and `da` columns (representing year, month and day) into a single column:

1) `year_mo_da`` is an Iterator of a tuple of Series, representing all the batches of values contained in the `year``, `mo``, and `da` columns.
2) To access each batch, we use a for loop over the iterator, the same principle as
for the Iterator of Series UDF.
3) To extract each individual series from the tuple, we use multiple assignments.
In this case, `year` will map to the first Series of the tuple, `mo` to the second, and
`da` to the third.
4) Since `pd.to_datetime` requests a data frame containing the year, month, and
day columns, we create the data frame via a dictionary, giving the keys the relevant column names. `pd.to_datetime` returns a Series.
5) Finally, we yield the answer to build the Iterator of Series, fulfilling our contract.

In [29]:
@F.pandas_udf(T.DateType())
def create_date(
    year_mo_da: Iterator[Tuple[pd.Series, pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
    """Merges three sols (Y-M-D of a date) into a Date col."""
    for year, mo, da in year_mo_da:
        yield pd.to_datetime(
            pd.DataFrame(dict(year=year, month=mo, day=da))
        )

In [30]:
gsod.select(
    "year", "mo", "da",
    create_date(F.col("year"), F.col("mo"), F.col("da")).alias("date")
).distinct().show(5)

+----+---+---+----------+
|year| mo| da|      date|
+----+---+---+----------+
|2010| 12| 20|2010-12-20|
|2010| 08| 31|2010-08-31|
|2010| 03| 17|2010-03-17|
|2010| 12| 16|2010-12-16|
|2010| 04| 03|2010-04-03|
+----+---+---+----------+
only showing top 5 rows



### Exercise 9.1

What are the values of `WHICH_TYPE` and `WHICH_SIGNATURE` in the following code block?

```
exo9_1 = pd.Series(["red", "blue", "blue", "yellow"])
def color_to_num(colors: WHICH_SIGNATURE) -> WHICH_SIGNATURE:
 return colors.apply(
 lambda x: {"red": 1, "blue": 2, "yellow": 3}.get(x)
 )
color_to_num(exo9_1)
# 0 1
# 1 2
# 2 2
# 3 3
color_to_num_udf = F.pandas_udf(color_to_num, WHICH_TYPE)

```

Answer: 

`WHICH_SIGNATURE`: `pd.Series`

`WHICH_TYPE`: `T.IntegerType()`

In [35]:
# Use a grouped aggregate UDF to get the slope of temperature for a given period using sklearn
@F.pandas_udf(T.DoubleType())
def rate_of_change_temperature(day: pd.Series, temp: pd.Series) -> float:
    """Returns the slope of the daily temperature for a given period of time."""
    return (
        LinearRegression()
        .fit(X=day.astype(int).values.reshape(-1, 1), y=temp)
        .coef_[0] # Because we only have one feature, we select the first value of coef_ as the slope
    )

In [36]:
# Apply the grouped aggregate UDF to get the rate of change of temperature 
# for each month for each station
result = gsod.groupby("stn", "year", "mo").agg(
    rate_of_change_temperature(gsod["da"], gsod["temp"]).alias("rt_chg_temp")
)

In [37]:
result.show(5, False)

+------+----+---+-------------------+
|stn   |year|mo |rt_chg_temp        |
+------+----+---+-------------------+
|007032|2013|01 |-0.2795031055900618|
|008268|2010|07 |-2.1999999999999877|
|008400|2010|02 |1.7828571428571434 |
|008400|2010|08 |0.8514508067970873 |
|008401|2011|03 |0.7514516129032259 |
+------+----+---+-------------------+
only showing top 5 rows



In [39]:
# Use a group map UDF to scale temperature values.
# Our function must return a complete DataFrame, meaning all the columns that we want to 
# display need to be returned, including the one we grouped against.
def scale_temperature(temp_by_day: pd.DataFrame) -> pd.DataFrame:
    """Returns a simple normalisation of the temperature for a site.
    
    If the temperature is constant for the whole window, defaults to 0.5."""
    temp = temp_by_day.temp
    answer = temp_by_day[["stn", "year", "mo", "da", "temp"]]
    if temp.min() == temp.max():
        return answer.assign(temp_norm=0.5)
    return answer.assign(
        temp_norm=(temp - temp.min()) / (temp.max() - temp.min())
    )

In [40]:
# We use groupby() to split the data into manageable batches, then pass our function
# to the applyInPandas() method
gsod_map = gsod.groupby("stn", "year", "mo").applyInPandas(
    scale_temperature,
    schema=(
        "stn string, year string, mo string, "
        "da string, temp double, temp_norm double"
    )
)

In [41]:
gsod_map.show(5, False)

### Exercise 9.2

Using the following definitions, create a `temp_to_temp(value, from_temp, to_temp)`
that takes a numerical value in `from_temp` degrees and converts it to `to` degrees. Use
a pandas UDF this time (we did the same exercise in chapter 8).
- `C = (F - 32) * 5 / 9` (Celsius)
- `K = C + 273.15` (Kelvin)
- `R = F + 459.67` (Rankine)

In [7]:
def temp_to_temp(value: pd.Series, from_temp: str, to_temp: str) -> pd.Series:
    conversion_formulas = {
        ('F', 'C'): lambda x: (x - 32) * 5 / 9,
        ('F', 'K'): lambda x: (x + 459.67) * 5 / 9,
        ('F', 'R'): lambda x: x + 459.67,
        ('C', 'F'): lambda x: x * 9 / 5 + 32,
        ('C', 'K'): lambda x: x + 273.15,
        ('C', 'R'): lambda x: (x + 273.15) * 9 / 5,
        ('K', 'F'): lambda x: x * 9 / 5 - 459.67,
        ('K', 'C'): lambda x: x - 273.15,
        ('K', 'R'): lambda x: x * 9 / 5,
        ('R', 'F'): lambda x: x - 459.67,
        ('R', 'C'): lambda x: (x - 459.67) * 5 / 9,
        ('R', 'K'): lambda x: x * 5 / 9,
    }

    if from_temp not in conversion_formulas or to_temp not in conversion_formulas:
        return value.apply(lambda _: None)

    return conversion_formulas[(from_temp, to_temp)](value)

### Exercise 9.3

Modify the following code block to use Celsius degrees instead of Fahrenheit. How is
the result of the UDF different if applied to the same data frame?

```
def scale_temperature(temp_by_day: pd.DataFrame) -> pd.DataFrame:
 """Returns a simple normalization of the temperature for a site.
 If the temperature is constant for the whole window, defaults to 0.5."""
 temp = temp_by_day.temp
 answer = temp_by_day[["stn", "year", "mo", "da", "temp"]]
 if temp.min() == temp.max():
 return answer.assign(temp_norm=0.5)
 return answer.assign(
 temp_norm=(temp - temp.min()) / (temp.max() - temp.min())
 )
```

Answer: the output will be the same since the normalisation process does not change based on the units of temperature.

In [8]:
def scale_temperature(temp_by_day: pd.DataFrame) -> pd.DataFrame:
   """Returns a simple normalization of the temperature for a site.
   If the temperature is constant for the whole window, defaults to 0.5."""
   def f_to_c(temp):
     return (temp - 32.0) * 5.0 / 9.0
   temp = f_to_c(temp_by_day.temp)
   answer = temp_by_day[["stn", "year", "mo", "da", "temp"]]
   if temp.min() == temp.max():
     return answer.assign(temp_norm=0.5)
   return answer.assign(
      temp_norm=(temp - temp.min()) / (temp.max() - temp.min())
    )

### Exercise 9.4

Complete the schema of the following code block, using scale_temperature_C from
the previous exercise. What happens if we apply our group map UDF like so instead?

```
gsod_exo = gsod.groupby("year", "mo").applyInPandas(scale_temperature, schema=???)
```

In [9]:
schema=(
        "year string, mo string, "
        "temp double, temp_norm double"
    )

### Exercise 9.5

Modify the following code block to return both the intercept of the linear regression
as well as the slope in an ArrayType. (Hint: The intercept is in the intercept_ attribute of the fitted model.)

```
@F.pandas_udf(T.DoubleType())
def rate_of_change_temperature(day: pd.Series, temp: pd.Series) -> float:
 """Returns the slope of the daily temperature for a given period of 
time."""
 return (
 LinearRegression()
 .fit(X=day.astype("int").values.reshape(-1, 1), y=temp)
 .coef_[0]
 )
```

In [None]:
@F.pandas_udf(T.ArrayType(T.DoubleType()))
def rate_of_change_temperature(day: pd.Series, temp: pd.Series) -> Sequence[float]:
    """Returns the intercept and slope of the daily temperature for a given period of time."""
    model = LinearRegression().fit(X=day.astype("int").values.reshape(-1, 1), y=temp)
    return model.intercept_, model.coef_[0]