# [Serverless Machine Learning in Action](https://www.manning.com/books/serverless-machine-learning-in-action?a_aid=osipov&a_bid=fa913283&)
## by Carl Osipov

## **Work In Progress** Source Code for [Chapter 10](https://livebook.manning.com/book/serverless-machine-learning-in-action/chapter-2?a_aid=osipov&a_bid=fa913283&) 

## <font color=red>Upload the `BUCKET_ID` file</font>

Before proceeding, ensure that you have a backup copy of the `BUCKET_ID` file created in the [Chapter 2](https://colab.research.google.com/github/osipov/smlbook/blob/master/ch2.ipynb) notebook before proceeding. The contents of the `BUCKET_ID` file are reused later in this notebook and in the other notebooks.


In [None]:
import os
from pathlib import Path
assert Path('BUCKET_ID').exists(), "Place the BUCKET_ID file in the current directory before proceeding"

BUCKET_ID = Path('BUCKET_ID').read_text().strip()
os.environ['BUCKET_ID'] = BUCKET_ID
os.environ['BUCKET_ID']

## **OPTIONAL:** Download and install AWS CLI

This is unnecessary if you have already installed AWS CLI in a preceding notebook.

In [None]:
%%bash
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip -o awscliv2.zip
sudo ./aws/install

## Specify AWS credentials

Modify the contents of the next cell to specify your AWS credentials as strings. 

If you see the following exception:

`TypeError: str expected, not NoneType`

It means that you did not specify the credentials correctly.

In [None]:
import os
# *** REPLACE None in the next 2 lines with your AWS key values ***
os.environ['AWS_ACCESS_KEY_ID'] = None
os.environ['AWS_SECRET_ACCESS_KEY'] = None

## Confirm the credentials

Run the next cell to validate your credentials.

In [None]:
%%bash
aws sts get-caller-identity

If you have specified the correct credentials as values for the `AWS_ACCESS_KEY_ID` and the `AWS_SECRET_ACCESS_KEY` environment variables, then `aws sts get-caller-identity` used by the previous cell should have returned back the `UserId`, `Account` and the `Arn` for the credentials, resembling the following

```
{
    "UserId": "█████████████████████",
    "Account": "████████████",
    "Arn": "arn:aws:iam::████████████:user/█████████"
}
```

## Specify the region

Replace the `None` in the next cell with your AWS region name, for example `us-west-2`.

In [None]:
# *** REPLACE None in the next line with your AWS region ***
os.environ['AWS_DEFAULT_REGION'] = None

If you have specified the region correctly, the following cell should return back the region that you have specifies.

In [None]:
%%bash
echo $AWS_DEFAULT_REGION

## Start with the standard header for PySpark

As with previous PySpark jobs, start with the regular imports, argument initialization, and load the DataFrame instance based on the `BUCKET_SRC_PATH` argument specified at job start time.

In [None]:
%%writefile dctaxi_feateng.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


args = getResolvedOptions(sys.argv, ['JOB_NAME',
                                     'BUCKET_SRC_PATH',
                                     'BUCKET_DST_PATH',
                                     'SEED',
                                     'BINS'])

BUCKET_SRC_PATH = args['BUCKET_SRC_PATH']
BUCKET_DST_PATH = args['BUCKET_DST_PATH']

SEED = int(args['SEED'])
BINS = int(args['BINS'])

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = ( spark.read.format("parquet")
  .option("header", True)
  .option("inferSchema", True)
  .load("{}".format(BUCKET_SRC_PATH))
  )



To retrieve the 4-tuple of minimum as well as maximum latitude and longitude values directly from the dataset you can use the following compound SQL statement via PySpark SQL API.

In [None]:
%%writefile -a dctaxi_feateng.py
def get_bounding_box(df):
  df.createOrReplaceTempView("dctaxi")
  row = spark.sql("""
  SELECT 
      MIN(lat) AS min_lat, 
      MAX(lat) AS max_lat, 
      MIN(lon) AS min_lon, 
      MAX(lon) AS max_lon 

  FROM (

    SELECT 
      MIN(origin_block_latitude_double)      AS lat,
      MIN(origin_block_longitude_double)     AS lon FROM dctaxi UNION

    SELECT 
      MIN(destination_block_latitude_double) AS lat, 
      MIN(destination_block_longitude_double)AS lon FROM dctaxi UNION

    SELECT 
      MAX(origin_block_latitude_double)      AS lat, 
      MAX(origin_block_longitude_double)     AS lon FROM dctaxi UNION

    SELECT 
      MAX(destination_block_latitude_double) AS lat, 
      MAX(destination_block_longitude_double)AS lon FROM dctaxi

  ) LIMIT 1
  """.replace('\n', '')).first()
  return row.min_lat, row.max_lat, row.min_lon, row.max_lon



The SQL statement executed against the cleaned up dataset from the chapter 4,  should output the following values:

| min_lat      | max_lat | min_lon | max_lon |
| ------------ | ------- | ------- | ------- |
| 38.81138      | 38.994909       | -77.113633      | -76.910012       |


Assuming that `linspace` was used to define binning intervals for the latitude and longitude coordinates in `lat_bins`, and `lon_bins` variables respectively, the built-in PySpark `pyspark.ml.feature.Bucketizer` class can transform the source PySpark DataFrame (`df`) and create columns that contain a representation of the binned coordinates.

In [None]:
%%writefile -a dctaxi_feateng.py
def add_binned_coordinate_features(df, lat_bins, lon_bins):
  from pyspark.ml.feature import Bucketizer
  olat_bucketizer = Bucketizer(splits = lat_bins, 
                              inputCol="origin_block_latitude_double", 
                              outputCol="origin_latitude_bin")
  olat_bucketizer.setHandleInvalid("keep")

  olon_bucketizer = Bucketizer(splits = lon_bins, 
                              inputCol="origin_block_longitude_double", 
                              outputCol="origin_longitude_bin")
  olon_bucketizer.setHandleInvalid("keep")

  dlat_bucketizer = Bucketizer(splits = lat_bins, 
                              inputCol="destination_block_latitude_double", 
                              outputCol="destination_latitude_bin")
  dlat_bucketizer.setHandleInvalid("keep")

  dlon_bucketizer = Bucketizer(splits = lon_bins, 
                              inputCol="destination_block_longitude_double", 
                              outputCol="destination_longitude_bin")
  dlon_bucketizer.setHandleInvalid("keep")

  df = olat_bucketizer.transform(df)
  df = olon_bucketizer.transform(df)
  df = dlat_bucketizer.transform(df)
  df = dlon_bucketizer.transform(df)

  return df



While PySpark included `OneHotEncoder` class can be used to convert the categorical values into a one hot encoded representation, the class has an important disadvantage. The output of the `OneHotEncoder` is a Spark-specific `SparseVector` data structure that does not natively convert into the CSV format and has limited utility outside of Spark. Fortunately, it is straightforward to implement one hot encoding in PySpark (`map_label_enc_to_one_hot_enc` below), to mirror the encoding behavior from frameworks like Pandas or Numpy, creating a column per distinct value of a one hot encoded categorical feature. 


In [None]:
%%writefile -a dctaxi_feateng.py
from pyspark.sql import Row
def map_label_enc_to_one_hot_enc(cols, upper, lower = 0):
    def label_enc_to_one_hot_enc(row):
        row_dict = row.asDict()
        for col in cols:
            #create a dictionary with every column as key
            col_dict = dict(zip( [col + "_" + str(i) for i in range(lower, upper) ],
                                [0] * (upper - lower) ) )
                        
            #set the one hot column to be 1
            col_dict.update({col + "_" + str(int(row_dict[col])): 1}) if row_dict[col] else None
    
            row_dict.update(col_dict)
        
        return Row(**row_dict)
    return label_enc_to_one_hot_enc



Once the `map_label_enc_to_one_hot_enc` function is defined, it can be used with the Spark RDD (Resilient Distribute Dataset) `map` method to one hot encode one or more columns. Although the method allows for multiple integer columns to be one hot encoded, the following example applies the function to just the `dow_integer` column.

In [None]:
%%writefile -a dctaxi_feateng.py
def add_dow_feature(df):
  from pyspark.sql.functions import dayofweek
  from pyspark.sql.types import IntegerType
  df = ( df
          .withColumn('dow_integer', 
                      dayofweek('origindatetime_ts').cast(IntegerType()) )
          .fillna( 0, ['dow_integer'] ) )

  rdd = df.rdd.map(map_label_enc_to_one_hot_enc(['dow_integer'], 
                                                lower = 1, 
                                                upper = 8)) # 7 days of the week
  return rdd.toDF()



In [None]:
%%writefile -a dctaxi_feateng.py
min_lat, max_lat, min_lon, max_lon = get_bounding_box(df)  

import numpy as np
lat_bins = np.linspace(min_lat, max_lat, BINS + 1)
lon_bins = np.linspace(min_lon, max_lon, BINS + 1)

df = add_binned_coordinate_features(df, lat_bins, lon_bins)

df = add_dow_feature(df)

cols = ['fareamount_double',
        'origin_latitude_bin', 
        'origin_longitude_bin', 
        'destination_latitude_bin', 
        'destination_longitude_bin',
        'dow_integer'] \
+ list(filter(lambda s: s.startswith('dow_integer_') \
              if s is not None else False, df.schema.names))

df = df.select(cols)

(df
 .write
 .option('header', True)
 .csv("{}".format(BUCKET_DST_PATH), mode="overwrite"))

from pyspark.sql.functions import spark_partition_id
summary_df = (df.withColumn("_partitionId", spark_partition_id())
    .groupBy("_partitionId")
    .count())

(summary_df
  .repartition(1)
  .write
  .option('header', True)
  .csv("{}/summary".format(BUCKET_DST_PATH), mode="overwrite"))

job.commit()



## Download a Utility Script to Run PySpark Jobs

The script is downloaded as `utils.sh` and is loaded in the upcoming cells using `source utils.sh` command.

In [None]:
%%bash
wget -q https://raw.githubusercontent.com/osipov/smlbook/master/utils.sh

## Run the PySpark job as specified by `dctaxi_feateng.py`.

In [None]:
%%bash
source utils.sh

PYSPARK_SRC_NAME=dctaxi_feateng.py \
PYSPARK_JOB_NAME=dc-taxi-feateng-job \
BUCKET_SRC_PATH=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/clean \
BUCKET_DST_PATH=s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/feateng \
BINS=8 \
SEED=42 \
run_job

In case of a successful completion, the last cell should have produced an output similar to the following:

```
2021-06-01 23:34:56       1840 dctaxi_feateng.py
{
    "JobName": "dc-taxi-feateng-job"
}
{
    "Name": "dc-taxi-feateng-job"
}
{
    "JobRunId": "jr_59eee7f229f448b39286f1bd19428c9082aaf6bed232342cc05e68f9246d131e"
}
Waiting for the job to finish...............SUCCEEDED
```

Copyright 2021 CounterFactual.AI LLC. All Rights Reserved.

Licensed under the GNU General Public License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. 

You may obtain a copy of the License at

https://github.com/osipov/smlbook/blob/master/LICENSE

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.