# Batch Model Pipelines: Training and Predicting

## 0. Preparing Data in BigQuery

I use *db/store-sales-time-series-forecasting_training.csv* which sales were avalable from 2016-08-01 until 2017-07-31 and insert to BigQuery in Console by uploading CSV file.

* **Dataset ID** = store_sales

* **Table ID** = simplified_data_table

In [1]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'newacc_gcp_credential.json'
query = """
    SELECT date, store_nbr, family, sales, onpromotion
    FROM `scalable-model-piplines.store_sales.simplified_data_table` 
    limit 100    
"""

In [2]:
from google.cloud import bigquery

client = bigquery.Client()

train_data = client.query(query).to_dataframe()
train_data.head()

Unnamed: 0,date,store_nbr,family,sales,onpromotion
0,2016-08-03,1,PERSONAL CARE,200.0,0
1,2016-08-04,1,PRODUCE,1909.962,0
2,2016-08-07,1,PRODUCE,1016.462,0
3,2016-08-09,1,PRODUCE,2044.128,0
4,2016-08-14,1,PERSONAL CARE,47.0,0


## 1. Training Model from BigQuery


### 1.1 Model Trainer

A full code for demonstration of `Model Trainer` is in */demo-code/prediction_service/model_trainer.py*

1. After getting DB from BigQuery, we do *Feature Engineering* with supporting Seasonality.

2. Fit the model and temporarily save the trained model and its information to *tmp/* directory. 

3. Upload the trained model to Google Cloud Storage using *gcsfs* library

```python
def upload_File_to_Cloud_Storage(src_dir: str, gcs_dst: str, recursive=False):
    fs = gcsfs.GCSFileSystem()
    fs.put(src_dir, gcs_dst, recursive=recursive)

bucket_name = "gcs://scalable-model-piplines-trained_model/"

upload_File_to_Cloud_Storage(path + model_name + today, 
                             bucket_name + model_name + today, recursive=True)

upload_File_to_Cloud_Storage(path + dp_name + today + '.pkl', 
                             bucket_name + dp_name + today + '.pkl')
```

4. Clean up the temporary files. 

I assume the model is trained every day, so the file name of the model would be in a format of "sale_forecasting_sklearn" + "today", for example, *sale_forecasting_sklearn2023-02-28*

We can specify the model version or overwrite the previous model to make it persistent and consistent.

We also ensure the model was trained properly and accurately before moving to production. We also run some services to double-check the model and send alerts to developers about any rising issues. Cross-check is a good practice to ensure everything is going well.

### 1.2 Containerize the Model Trainer

```bash
$ docker image build -t "prediction_service" .

$ docker run -d -p 8080:80 prediction_service

$ docker tag prediction_service us.gcr.io/scalable-model-piplines/prediction_service

$ docker push us.gcr.io/scalable-model-piplines/prediction_service
```

Now we can deploy this image in `Google Kubernetes Engine`, expose it with `load balancer`, and map the port to 80.

![gke-prediction-service-deployment](images/gke-prediction-service-deployment.png)

In [55]:
import requests
result = requests.get("http://34.69.150.86/run")
print(result.json())

{'success': True}


Now the Model Trainer can be called via HTTP Web Endpoint in the demonstration.

### 1.3 Scheduler with `Cloud Composer`

After running the GKE of Model Trainer, we set up a schedule to run this service every day at 00:00 AM. It is possible to self-host Airflow on Kubernetes, but it can be complex to set up. There are also fully-managed versions of Airflow available for cloud platforms such as Cloud Composer on GCP.

1. Enable `Google Cloud Composer API` and create a new Environment named *run-model-trainer-everday*.

2. Add DAG to GKE cluster which runs Model Trainer. In `Cloud Composer` UI >> select Composer cluster >> 

For the purpose of demonstration, we call `Model Trainer` via HTTP Web Endpoint. Below is the snippet code of `task1_run_Model_Trainer.py`

```python
def main():
    import requests
    result = requests.get("http://34.69.150.86/run")
```

However, `Cloud Composer` can do so much more. For example, it can run a  Python app to build and train a model, then run another (Python) app to either deploy the trained model to a storage or containerize it to a Docker image.

GCP provides `Cloud Build` to run a job to build and push the Docker image to Artifact Registry.

#### Multiple Tasks with Multiple Python Scripts

`Cloud Composer` runs and manages scripts in a bucket of `Google Storage`. In this case, it's us-west1-run-model-trainer--5f3a0641-bucket

```
us-west1-run-model-trainer--5f3a0641-bucket/
    dags/
        train_model.py
        tasks/
             task1_run_Model_Trainer.py
             task2_upload_Model.py
```

The template for `train_model.py` is 

```python
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from tasks import task1_run_Model_Trainer, task2_upload_Model

default_dag_args = {}

with DAG(
    dag_id="train_model",
    default_args=default_dag_args,
    start_date=datetime(2023, 2, 25, 0, 0),
    #schedule_interval=timedelta(days=1), # every day
    # At 08:00 AM  every day
    schedule_interval='0 8 * * *'
) as dag:
    do_stuff1 = PythonOperator(
        task_id="task_1",
        python_callable=task1_run_Model_Trainer.main,  # assume entrypoint is main()
    )
    do_stuff2 = PythonOperator(
        task_id="task_2",
        python_callable=task2_upload_Model.main,  # assume entrypoint is main()
    )
    do_stuff1 >> do_stuff2
```

To make two actions sequential, we use ">>", for example *do_stuff1 >> do_stuff2*

Right after putting `train_model.py` into `dags` folder in Google Storage bucket of Cloud Composer, `Cloud Composer` updates the new DAG and runs it if needed.

![Google_Composer.png](images/Google_Composer_dags.png)

`Cloud Composer` also supports UI for Airflow:

![Google_Composer_aiflow_ui.png](images/Google_Composer_aiflow_ui.png)

### 1.4 Batch Training Model

In case of very large records in BigQuery, we can do batch training by feed the model batch by batch. BigQuery supports *Batch query jobs*. The demo code:

```python
job_config = bigquery.QueryJobConfig(
    priority=bigquery.QueryPriority.BATCH
)
query_job = client.query(sql, job_config=job_config)  # Make an API request.
while(query_job.state != 'DONE'):
    # DO Somthing
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )  # Make a next API request.
    
```

There are several Python libraries which support batching. 


### 1.5 Tensorflow Model

If the model built with Tensorflow and needs query data from BigQuery, we can use [`tfio.bigquery.BigQueryClient`](https://www.tensorflow.org/io/api_docs/python/tfio/bigquery/BigQueryClient).

[There is an example of using it here.](https://www.tensorflow.org/io/tutorials/bigquery)


### 1.6 AI Platform Training

Besides general cloud environments, GCP provides `AI Platform Training` to run TensorFlow, scikit-learn, and XGBoost training applications in the cloud. AI Platform Training provides the dependencies required to train machine learning models using these hosted frameworks in their runtime versions. Additionally, we can use custom containers to run training jobs with other machine learning frameworks.

`AI Platform Training` strongly supports [distributed training with TensorFlow](https://www.tensorflow.org/guide/distributed_training).

## 2. Spark: Batch Model with PySpark and MLlib

[`Dataproc`](https://cloud.google.com/dataproc) is a fully managed and highly scalable service for running Apache Hadoop, Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks. Use Dataproc for data lake modernization, ETL, and secure data science, at scale, integrated with Google Cloud, at a fraction of the cost.

### 2.1 MLlib

MLlib contains many algorithms including:


* Classification: logistic regression, naive Bayes,...
* Regression: generalized linear regression, survival regression,...
* Decision trees, random forests, and gradient-boosted trees
* Recommendation: alternating least squares (ALS)
* Clustering: K-means, Gaussian mixtures (GMMs),...
* Topic modeling: latent Dirichlet allocation (LDA)
* Frequent itemsets, association rules, and sequential pattern mining

### 2.1 Train Model with MLlib



This code below is `pyspark-training-model/run_trainning.py`

```python
from pyspark.context import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.sql.session import SparkSession

# Define a function that collects the features of interest
# (date, store_nbr) into a vector.
# Package the vector in a tuple containing the label (`sales`) for that row.
def vector_from_inputs(r):
      return (float(r["sales"]), Vectors.dense(time.mktime(r["date"].timetuple()),
                                            float(r["store_nbr"]),))
sc = SparkContext()
spark = SparkSession(sc)

# Read the data from BigQuery as a Spark Dataframe.
sales_data = spark.read.format("bigquery").option('project','scalable-model-piplines').option(
    "table", "store_sales.simplified_data_table").load()
# Create a view so that Spark SQL queries can be run against the data.
sales_data.createOrReplaceTempView("sales_data")

clean_data = spark.sql(query)
# Create an input DataFrame for Spark ML using the above function.
training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label",
                                                             "features"])
training_data.cache()
# Construct a new LinearRegression object and fit the training data.
lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal")
model = lr.fit(training_data)
```

### 2.2 Submit Job in `Dataproc`

To run this Python code, we need to submit it as job to the Dataproc service.

1. Copy the code file (run_trainning.py) to a bucket amd get its full path file (gs://scalable-model-piplines-dataproc-model-trainer/run_trainning.py)

2. In the main page of Dataproc UI, choose `SUBMIT JOB` 

3. Select PySpark as the Job type and insert the link of the Python file.

4. To use BigQuery service, we need to add *spark-bigquery-connector*: Insert `gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar` in the **Jar files** field.

If nothing wrong, we will get a status of success.

![spark-job-submit-success.png](images/spark-job-submit-success.png)

### 2.3 Scheduler

It is possible to use `Cloud Composer` to schedule jobs in Dataproc by running [`Dataproc Serverless`](https://cloud.google.com/dataproc-serverless/docs/overview) workloads. A guideline was [provided here](https://cloud.google.com/composer/docs/composer-2/run-dataproc-workloads).

It's also possible to [dynamically create DataProc cluster using Cloud Composer](https://www.linkedin.com/pulse/dynamically-create-dataproc-cluster-using-cloud-composer-kanungo) to schedule a job and once the job is finished how we can decommission the cluster automatically.

## 3. Batch Predicting Model

### 3.1 Google Cloud Dataflow

Dataflow is a tool for building data pipelines that can run locally or scale up to large clusters in a managed environment. We briefly talked about it in the last Chapter.

Now we will use Apache Beam to build a Predicting Service in the `Google Cloud Dataflow` environment

### 3.2 Process Data with `DoFn`

Below is a template of how we define a class of `DoFn` to predict each row (element) from data (for example, loaded from BigQuery). Because it separates the whole data, it can scale up to a massive processing data pipeline.

#### Prediction

```python
import apache_beam as beam

class ApplyDoFn(beam.DoFn):
    
    def __init__(self):
        self._model = None
     
    def process(self, element):
        if self._model is None:
            self._model = LOAD_MODEL
        
        new_x = TRANSFORM(element)
        prediction = self._model.predict(new_x)[0]
        return [ { 'guid': element['guid'], 'prediction': prediction } ]
    
predictions = data | 'Apply Model' >> beam.ParDo(ApplyDoFn())
```

Now we can save the prediction results anywhere, it could be another table of BigQuery or a NoSQL DB such as `Google Cloud Datastore`

#### Save to `Cloud Datastore`

```python
class PublishDoFn(beam.DoFn):
    
    def __init__(self):
        from google.cloud import datastore       
        self._ds = datastore
    
    def process(self, element):
        client = self._ds.Client()
        entity = self._ds.Entity(client.key())
        entity['prediction'] = element['prediction']         
        entity['time'] = element['time']
        client.put(entity)

predictions | 'Create entities' >> beam.ParDo(PublishDoFn())
```


### 3.3 Predicting Service 

To create a workflow with Beam, we use the pipe syntax in Python to chain different steps together. The result is a DAG of operations to perform that can be distributed across machines in a cluster.

```python
# define the pipeline steps
p = beam.Pipeline(options=pipeline_options)
data = p | 'Read Data' >> beam.io.Read(DATA_SOURCE)
scored = data | 'Apply Model for Each Element/User' >> beam.ParDo(ApplyDoFn())
scored | 'Save to BigQuery' >> beam.io.Write(beam.io.WriteToBigQuery(
                'prediction_Table', 'dataset_ID', schema = schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
scored | 'Save to Other DB like Datastore' >> beam.ParDo(PublishDoFn())

# run the pipeline
result = p.run()
result.wait_until_finish()
```