# Recommendations on GCP with TensorFlow and WALS with Cloud Composer
***
This lab is adapted from the original [solution](https://github.com/GoogleCloudPlatform/tensorflow-recommendation-wals) created by [lukmanr](https://github.com/GoogleCloudPlatform/tensorflow-recommendation-wals/commits?author=lukmanr) 

This project deploys a solution for a recommendation service on GCP, using the WALS algorithm in TensorFlow. Components include:

- Recommendation model code, and scripts to train and tune the model on ML Engine
- A REST endpoint using Google Cloud Endpoints for serving recommendations
- An Airflow server managed by Cloud Composer for running scheduled model training


## Confirm Prerequisites

### Create a Cloud Composer Instance
- Create a Cloud Composer [instance](https://console.cloud.google.com/composer/environments/create?project=)
    1. Specify 'composer' for name
    2. Choose a location
    3. Keep the remaining settings at their defaults
    4. Select Create

This takes 15 - 20 minutes. Continue with the rest of the lab as you will be using Cloud Composer near the end.

In [1]:
%%bash
pip install sh --upgrade pip # needed to execute shell scripts later

Collecting sh
  Downloading https://files.pythonhosted.org/packages/4a/22/17b22ef5b049f12080f5815c41bf94de3c229217609e469001a8f80c1b3d/sh-1.12.14-py2.py3-none-any.whl
Collecting pip
  Downloading https://files.pythonhosted.org/packages/d7/41/34dd96bd33958e52cb4da2f1bf0818e396514fd4f4725a79199564cd0c20/pip-19.0.2-py2.py3-none-any.whl (1.4MB)
Installing collected packages: sh, pip
  Found existing installation: pip 18.1
    Uninstalling pip-18.1:
      Successfully uninstalled pip-18.1
Successfully installed pip-19.0.2 sh-1.12.14


### Setup environment variables
<span style="color: blue">__Replace the below settings with your own.__</span> Note: you can leave AIRFLOW_BUCKET blank and come back to it after your Composer instance is created which automatically will create an Airflow bucket for you. <br><br>

### 1. Make a GCS bucket with the name recserve_[YOUR-PROJECT-ID]:

In [2]:
import os
BUCKET = 'qwiklabs-gcp-226104af6ccf7ec4' # REPLACE WITH A BUCKET NAME (PUT YOUR PROJECT ID AND WE CREATE THE BUCKET ITSELF NEXT)
PROJECT = 'qwiklabs-gcp-226104af6ccf7ec4' # REPLACE WITH YOUR PROJECT ID
REGION = 'us-east1' # REPLACE WITH YOUR REGION e.g. us-central1

# do not change these
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = 'recserve_' + BUCKET
os.environ['REGION'] = REGION

In [3]:
%%bash

gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


In [4]:
%%bash

# create GCS bucket with recserve_PROJECT_NAME if not exists
exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)
if [ -n "$exists" ]; then
   echo "Not creating recserve_bucket since it already exists."
else
   echo "Creating recserve_bucket"
   gsutil mb -l ${REGION} gs://${BUCKET}
fi

Creating recserve_bucket


Creating gs://recserve_qwiklabs-gcp-226104af6ccf7ec4/...


### Setup Google App Engine permissions
1. In [IAM](https://console.cloud.google.com/iam-admin/iam?project=), __change permissions for "Compute Engine default service account" from Editor to Owner__. This is required so you can create and deploy App Engine versions from within Cloud Datalab. Note: the alternative is to run all app engine commands directly in Cloud Shell instead of from within Cloud Datalab.<br/><br/>

2. Create an App Engine instance if you have not already by uncommenting and running the below code

In [None]:
# %%bash
# run app engine creation commands
# gcloud app create --region ${REGION} # see: https://cloud.google.com/compute/docs/regions-zones/
# gcloud app update --no-split-health-checks

# Part One: Setup and Train the WALS Model

## Upload sample data to BigQuery 
This tutorial comes with a sample Google Analytics data set, containing page tracking events from the Austrian news site Kurier.at. The schema file '''ga_sessions_sample_schema.json''' is located in the folder data in the tutorial code, and the data file '''ga_sessions_sample.json.gz''' is located in a public Cloud Storage bucket associated with this tutorial. To upload this data set to BigQuery:

### Copy sample data files into our bucket

In [5]:
%%bash

gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/ga_sessions_sample.json.gz gs://${BUCKET}/data/ga_sessions_sample.json.gz
gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv data/recommendation_events.csv
gsutil -m cp gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv gs://${BUCKET}/data/recommendation_events.csv


Copying gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/ga_sessions_sample.json.gz [Content-Type=application/json]...
/ [0/1 files][    0.0 B/121.3 MiB]   0% Done                                    / [0/1 files][121.3 MiB/121.3 MiB]  99% Done                                    -- [1/1 files][121.3 MiB/121.3 MiB] 100% Done                                    
Operation completed over 1 objects/121.3 MiB.                                    
Copying gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv...
/ [0/1 files][    0.0 B/ 10.0 MiB]   0% Done                                    -- [1/1 files][ 10.0 MiB/ 10.0 MiB] 100% Done                                    
Operation completed over 1 objects/10.0 MiB.                                     
Copying gs://cloud-training-demos/courses/machine_learning/deepdive/10_recommendation/endtoend/data/recommendation_events.csv [Content

### 2. Create empty BigQuery dataset and load sample JSON data
Note: Ingesting the 400K rows of sample data. This usually takes 5-7 minutes.

In [6]:
%%bash

# create BigQuery dataset if it doesn't already exist
exists=$(bq ls -d | grep -w GA360_test)
if [ -n "$exists" ]; then
   echo "Not creating GA360_test since it already exists."
else
   echo "Creating GA360_test dataset."
   bq --project_id=${PROJECT} mk GA360_test 
fi

# create the schema and load our sample Google Analytics session data
bq load --source_format=NEWLINE_DELIMITED_JSON \
 GA360_test.ga_sessions_sample \
 gs://${BUCKET}/data/ga_sessions_sample.json.gz \
 data/ga_sessions_sample_schema.json # can't load schema files from GCS

Creating GA360_test dataset.
Dataset 'qwiklabs-gcp-226104af6ccf7ec4:GA360_test' successfully created.



Waiting on bqjob_r118d244dbe778795_00000168f081adc3_1 ... (0s) Current status: RUNNING                                                                                      Waiting on bqjob_r118d244dbe778795_00000168f081adc3_1 ... (1s) Current status: RUNNING                                                                                      Waiting on bqjob_r118d244dbe778795_00000168f081adc3_1 ... (2s) Current status: RUNNING                                                                                      Waiting on bqjob_r118d244dbe778795_00000168f081adc3_1 ... (3s) Current status: RUNNING                                                                                      Waiting on bqjob_r118d244dbe778795_00000168f081adc3_1 ... (4s) Current status: RUNNING                                                                                      Waiting on bqjob_r118d244dbe778795_00000168f081adc3_1 ... (5s) Current status: RUNNING                                          

## Install WALS model training package and model data

### 1. Create a distributable package. Copy the package up to the code folder in the bucket you created previously.

In [7]:
%%bash

cd wals_ml_engine

echo "creating distributable package"
python setup.py sdist

echo "copying ML package to bucket"
gsutil cp dist/wals_ml_engine-0.1.tar.gz gs://${BUCKET}/code/

creating distributable package
running sdist
running egg_info
creating wals_ml_engine.egg-info
writing requirements to wals_ml_engine.egg-info/requires.txt
writing wals_ml_engine.egg-info/PKG-INFO
writing top-level names to wals_ml_engine.egg-info/top_level.txt
writing dependency_links to wals_ml_engine.egg-info/dependency_links.txt
writing manifest file 'wals_ml_engine.egg-info/SOURCES.txt'
reading manifest file 'wals_ml_engine.egg-info/SOURCES.txt'
writing manifest file 'wals_ml_engine.egg-info/SOURCES.txt'
running check
creating wals_ml_engine-0.1
creating wals_ml_engine-0.1/trainer
creating wals_ml_engine-0.1/wals_ml_engine.egg-info
copying files to wals_ml_engine-0.1...
copying README.md -> wals_ml_engine-0.1
copying setup.py -> wals_ml_engine-0.1
copying trainer/__init__.py -> wals_ml_engine-0.1/trainer
copying trainer/model.py -> wals_ml_engine-0.1/trainer
copying trainer/task.py -> wals_ml_engine-0.1/trainer
copying trainer/util.py -> wals_ml_engine-0.1/trainer
copying trainer/



Copying file://dist/wals_ml_engine-0.1.tar.gz [Content-Type=application/x-tar]...
/ [0 files][    0.0 B/  8.3 KiB]                                                / [1 files][  8.3 KiB/  8.3 KiB]                                                
Operation completed over 1 objects/8.3 KiB.                                      


### 2. Run the WALS model on the sample data set:

In [8]:
%%bash

# view the ML train local script before running
cat wals_ml_engine/mltrain.sh

# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


usage () {
  echo "usage: mltrain.sh [local | train | tune] [gs://]<input_file>.csv
                  [--data-type ratings|web_views]
                  [--delimiter <delim>]
                  [--use-optimized]
                  [--headers]

Use 'local' to train locally with a local data file, and 'train' and 'tune' to
run on ML Engine.  For ML Engine jobs the input file must reside on GCS.

Optional

In [9]:
%%bash

cd wals_ml_engine

# train locally with unoptimized hyperparams
./mltrain.sh local ../data/recommendation_events.csv --data-type web_views --use-optimized

# Options if we wanted to train on CMLE. We will do this with Cloud Composer later
# train on ML Engine with optimized hyperparams
# ./mltrain.sh train ../data/recommendation_events.csv --data-type web_views --use-optimized

# tune hyperparams on ML Engine:
# ./mltrain.sh tune ../data/recommendation_events.csv --data-type web_views


Fri Feb 15 09:39:59 UTC 2019
Fri Feb 15 09:41:32 UTC 2019


  from ._conv import register_converters as _register_converters
INFO:tensorflow:Train Start: 2019-02-15 09:40:48
  frac = np.array(1.0/(data > 0.0).sum(axis))
2019-02-15 09:40:52.051815: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2019-02-15 09:40:54.131366: W tensorflow/core/framework/allocator.cc:101] Allocation of 277426800 exceeds 10% of system memory.
2019-02-15 09:40:56.240732: W tensorflow/core/framework/allocator.cc:101] Allocation of 277426800 exceeds 10% of system memory.
2019-02-15 09:40:58.022367: W tensorflow/core/framework/allocator.cc:101] Allocation of 277426800 exceeds 10% of system memory.
2019-02-15 09:40:59.780613: W tensorflow/core/framework/allocator.cc:101] Allocation of 277426800 exceeds 10% of system memory.
2019-02-15 09:41:01.560994: W tensorflow/core/framework/allocator.cc:101] Allocation of 277426800 exceeds 10% of system memory.
2019-02-15 09:41:03.329632

This will take a couple minutes, and create a job directory under wals_ml_engine/jobs like "wals_ml_local_20180102_012345/model", containing the model files saved as numpy arrays.

### View the locally trained model directory

In [10]:
ls wals_ml_engine/jobs

[0m[01;34mwals_ml_local_20190215_093959[0m/


### 3. Copy the model files from this directory to the model folder in the project bucket:
In the case of multiple models, take the most recent (tail -1)

In [11]:
%bash
export JOB_MODEL=$(find wals_ml_engine/jobs -name "model" | tail -1)
gsutil cp ${JOB_MODEL}/* gs://${BUCKET}/model/
  
echo "Recommendation model file numpy arrays in bucket:"  
gsutil ls gs://${BUCKET}/model/

Recommendation model file numpy arrays in bucket:
gs://recserve_qwiklabs-gcp-226104af6ccf7ec4/model/col.npy
gs://recserve_qwiklabs-gcp-226104af6ccf7ec4/model/item.npy
gs://recserve_qwiklabs-gcp-226104af6ccf7ec4/model/row.npy
gs://recserve_qwiklabs-gcp-226104af6ccf7ec4/model/user.npy


Copying file://wals_ml_engine/jobs/wals_ml_local_20190215_093959/model/col.npy [Content-Type=application/octet-stream]...
/ [0 files][    0.0 B/642.1 KiB]                                                / [1 files][642.1 KiB/642.1 KiB]                                                Copying file://wals_ml_engine/jobs/wals_ml_local_20190215_093959/model/item.npy [Content-Type=application/octet-stream]...
/ [1 files][642.1 KiB/685.0 KiB]                                                / [2 files][685.0 KiB/685.0 KiB]                                                Copying file://wals_ml_engine/jobs/wals_ml_local_20190215_093959/model/row.npy [Content-Type=application/octet-stream]...
/ [2 files][685.0 KiB/  9.9 MiB]                                                / [3 files][  9.9 MiB/  9.9 MiB]                                                -Copying file://wals_ml_engine/jobs/wals_ml_local_20190215_093959/model/user.npy [Content-Type=application/octet-stream]...
- [3 files][  9.9 MiB/

# Install the recserve endpoint

### 1. Prepare the deploy template for the Cloud Endpoint API:

In [12]:
%bash
cd scripts
cat prepare_deploy_api.sh

#!/bin/bash
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -euo pipefail

source util.sh

main() {
  # Get our working project, or exit if it's not set.
  local project_id=$(get_project_id)
  if [[ -z "$project_id" ]]; then
    exit 1
  fi
  local temp_file=$(mktemp)
  export TEMP_FILE="${temp_file}.yaml"
  mv "$temp_file" "$TEMP_FILE"

  # Because the included API is a template, we have to do some string
  # substitution before we can deploy 

In [13]:
%%bash
printf "\nCopy and run the deploy script generated below:\n"
cd scripts
./prepare_deploy_api.sh                         # Prepare config file for the API.


Copy and run the deploy script generated below:
Preparing config for deploying service in ../app/openapi.yaml...
To deploy:  gcloud endpoints services deploy /tmp/tmp.zuKGTZGtgy.yaml


This will output somthing like:

```To deploy:  gcloud endpoints services deploy /var/folders/1m/r3slmhp92074pzdhhfjvnw0m00dhhl/T/tmp.n6QVl5hO.yaml```

### 2. Run the endpoints deploy command output above:
<span style="color: blue">Be sure to __replace the below [FILE_NAME]__ with the results from above before running.</span>

In [15]:
%%bash
gcloud endpoints services deploy /tmp/tmp.zuKGTZGtgy.yaml

Waiting for async operation operations/serviceConfigs.qwiklabs-gcp-226104af6ccf7ec4.appspot.com:df38bc24-f87d-4069-b213-e333b141a481 to complete...
Operation finished successfully. The following command can describe the Operation details:
 gcloud endpoints operations describe operations/serviceConfigs.qwiklabs-gcp-226104af6ccf7ec4.appspot.com:df38bc24-f87d-4069-b213-e333b141a481

Waiting for async operation operations/rollouts.qwiklabs-gcp-226104af6ccf7ec4.appspot.com:3601edae-b191-43f1-8839-d79134185d87 to complete...
Operation finished successfully. The following command can describe the Operation details:
 gcloud endpoints operations describe operations/rollouts.qwiklabs-gcp-226104af6ccf7ec4.appspot.com:3601edae-b191-43f1-8839-d79134185d87

Enabling service [endpoints.googleapis.com] on project [qwiklabs-gcp-226104af6ccf7ec4]...
Waiting for async operation operations/acf.cd277535-2030-458e-b4a8-e6b899d5e519 to complete...
Operation finished successfully. The following command can de

### 3. Prepare the deploy template for the App Engine App:

In [16]:
%%bash
# view the app deployment script
cat scripts/prepare_deploy_app.sh

#!/bin/bash
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -euo pipefail

source util.sh

main() {
  # Get our working project, or exit if it's not set.
  local project_id="$(get_project_id)"
  if [[ -z "$project_id" ]]; then
    exit 1
  fi
  # Try to create an App Engine project in our selected region.
  # If it already exists, return a success ("|| true").
  echo "gcloud app create --region=$REGION"
  gcloud app create --region="$REGION" ||

In [17]:
%%bash
# prepare to deploy 
cd scripts

./prepare_deploy_app.sh

gcloud app create --region=us-east1
To deploy:  gcloud -q app deploy ../app/app_template.yaml_deploy.yaml


You are creating an app for project [qwiklabs-gcp-226104af6ccf7ec4].
cannot be changed. More information about regions is at
<https://cloud.google.com/appengine/docs/locations>.

ERROR: (gcloud.app.create) The project [qwiklabs-gcp-226104af6ccf7ec4] already contains an App Engine application. You can deploy your application using `gcloud app deploy`.


You can ignore the script output "ERROR: (gcloud.app.create) The project [...] already contains an App Engine application. You can deploy your application using gcloud app deploy." This is expected.

The script will output something like:

```To deploy:  gcloud -q app deploy app/app_template.yaml_deploy.yaml```

### 4. Run the command above:

In [18]:
%%bash
gcloud -q app deploy app/app_template.yaml_deploy.yaml

Services to deploy:

descriptor:      [/content/datalab/notebooks/training-data-analyst/courses/machine_learning/deepdive/10_recommend/labs/endtoend/app/app_template.yaml_deploy.yaml]
source:          [/content/datalab/notebooks/training-data-analyst/courses/machine_learning/deepdive/10_recommend/labs/endtoend/app]
target project:  [qwiklabs-gcp-226104af6ccf7ec4]
target service:  [default]
target version:  [20190215t094835]
target url:      [https://qwiklabs-gcp-226104af6ccf7ec4.appspot.com]


Enabling service [appengineflex.googleapis.com] on project [qwiklabs-gcp-226104af6ccf7ec4]...
Waiting for async operation operations/acf.74605317-cfc8-4a8a-af96-ba5cb4beddfc to complete...
Operation finished successfully. The following command can describe the Operation details:
 gcloud services operations describe operations/tmo-acf.74605317-cfc8-4a8a-af96-ba5cb4beddfc
Beginning deployment of service [default]...
Building and pushing image for service [default]
Started cloud build [777fa2c9-7668

This will take 7 - 10 minutes to deploy the app. While you wait, consider starting on Part Two below and completing the Cloud Composer DAG file.

## Query the API for Article Recommendations
Lastly, you are able to test the recommendation model API by submitting a query request. Note the example userId passed and numRecs desired as the URL parameters for the model input.

In [20]:
%%bash
cd scripts
./query_api.sh          # Query the API.
./generate_traffic.sh   # Send traffic to the API.

curl "https://qwiklabs-gcp-226104af6ccf7ec4.appspot.com/recommendation?userId=5448543647176335931&numRecs=5"
{"articles":["299824032","299865757","299935287","299959410","1701682"]}

This command will exit automatically in 300 seconds.
Generating traffic to https://qwiklabs-gcp-226104af6ccf7ec4.appspot.com/recommendation?userId=5448543647176335931&numRecs=5...
Press Ctrl-C to stop.


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    73  100    73    0     0     57      0  0:00:01  0:00:01 --:--:--    57100    73  100    73    0     0     57      0  0:00:01  0:00:01 --:--:--    57


If the call is successful, you will see the article IDs recommended for that specific user by the WALS ML model <br/>
(Example: curl "https://qwiklabs-gcp-12345.appspot.com/recommendation?userId=5448543647176335931&numRecs=5"
{"articles":["299824032","1701682","299935287","299959410","298157062"]} )

__Part One is done!__ You have successfully created the back-end architecture for serving your ML recommendation system. But we're not done yet, we still need to automatically retrain and redeploy our model once new data comes in. For that we will use [Cloud Composer](https://cloud.google.com/composer/) and [Apache Airflow](https://airflow.apache.org/).<br/><br/>

***
# Part Two: Setup a scheduled workflow with Cloud Composer
In this section you will complete a partially written training.py DAG file and copy it to the DAGS folder in your Composer instance.

## Copy your Airflow bucket name
1. Navigate to your Cloud Composer [instance](https://console.cloud.google.com/composer/environments?project=)<br/><br/>
2. Select __DAGs Folder__<br/><br/>
3. You will be taken to the Google Cloud Storage bucket that Cloud Composer has created automatically for your Airflow instance<br/><br/>
4. __Copy the bucket name__ into the variable below (example: us-central1-composer-08f6edeb-bucket)

In [19]:
AIRFLOW_BUCKET = 'asia-northeast1-mlcomposer-67ae19d4-bucket' # REPLACE WITH AIRFLOW BUCKET NAME
os.environ['AIRFLOW_BUCKET'] = AIRFLOW_BUCKET

## Complete the training.py DAG file
Apache Airflow orchestrates tasks out to other services through a [DAG (Directed Acyclic Graph)](https://airflow.apache.org/concepts.html) file which specifies what services to call, what to do, and when to run these tasks. DAG files are written in python and are loaded automatically into Airflow once present in the Airflow/dags/ folder in your Cloud Composer bucket. 

Your task is to complete the partially written DAG file below which will enable the automatic retraining and redeployment of our WALS recommendation model. 

__Complete the #TODOs__ in the Airflow DAG file below and execute the code block to save the file

In [29]:
%%writefile airflow/dags/training.py

# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""DAG definition for recserv model training."""

import airflow
from airflow import DAG

# Reference for all available airflow operators: 
# https://github.com/apache/incubator-airflow/tree/master/airflow/contrib/operators
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.hooks.base_hook import BaseHook
# from airflow.contrib.operators.mlengine_operator import MLEngineTrainingOperator
# above mlengine_operator currently doesnt support custom MasterType so we import our own plugins:

# custom plugins
from airflow.operators.app_engine_admin_plugin import AppEngineVersionOperator
from airflow.operators.ml_engine_plugin import MLEngineTrainingOperator


import datetime

def _get_project_id():
  """Get project ID from default GCP connection."""

  extras = BaseHook.get_connection('google_cloud_default').extra_dejson
  key = 'extra__google_cloud_platform__project'
  if key in extras:
    project_id = extras[key]
  else:
    raise ('Must configure project_id in google_cloud_default '
           'connection from Airflow Console')
  return project_id

PROJECT_ID = _get_project_id()

# Data set constants, used in BigQuery tasks.  You can change these
# to conform to your data.

# TODO: Specify your BigQuery dataset name and table name
DATASET = 'GA360_test'
TABLE_NAME = 'ga_sessions_sample'
ARTICLE_CUSTOM_DIMENSION = '10'

# TODO: Confirm bucket name and region
# GCS bucket names and region, can also be changed.
BUCKET = 'gs://recserve_' + PROJECT_ID
REGION = 'us-east1'

# The code package name comes from the model code in the wals_ml_engine
# directory of the solution code base.
PACKAGE_URI = BUCKET + '/code/wals_ml_engine-0.1.tar.gz'
JOB_DIR = BUCKET + '/jobs'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': datetime.timedelta(minutes=5)
}

# Default schedule interval using cronjob syntax - can be customized here
# or in the Airflow console.

# TODO: Specify a schedule interval in CRON syntax to run once a day at 2100 hours (9pm)
# Reference: https://airflow.apache.org/scheduler.html
schedule_interval = '00 21 * * *' # example '00 XX 0 0 0'

# TODO: Title your DAG to be recommendations_training_v1
dag = DAG('recommendations_training_v1', 
          default_args=default_args,
          schedule_interval=schedule_interval)

dag.doc_md = __doc__


#
#
# Task Definition
#
#

# BigQuery training data query

bql='''
#legacySql
SELECT
 fullVisitorId as clientId,
 ArticleID as contentId,
 (nextTime - hits.time) as timeOnPage,
FROM(
  SELECT
    fullVisitorId,
    hits.time,
    MAX(IF(hits.customDimensions.index={0},
           hits.customDimensions.value,NULL)) WITHIN hits AS ArticleID,
    LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId, visitNumber
                             ORDER BY hits.time ASC) as nextTime
  FROM [{1}.{2}.{3}]
  WHERE hits.type = "PAGE"
) HAVING timeOnPage is not null and contentId is not null;
'''

bql = bql.format(ARTICLE_CUSTOM_DIMENSION, PROJECT_ID, DATASET, TABLE_NAME)

# TODO: Complete the BigQueryOperator task to truncate the table if it already exists before writing
# Reference: https://airflow.apache.org/integration.html#bigqueryoperator
t1 = BigQueryOperator( # correct the operator name
    task_id='bq_rec_training_data',
    bql=bql,
    destination_dataset_table='%s.recommendation_events' % DATASET,
    write_disposition='WRITE_TRUNCATE', # specify to truncate on writes
    dag=dag)

# BigQuery training data export to GCS

# TODO: Fill in the missing operator name for task #2 which
# takes a BigQuery dataset and table as input and exports it to GCS as a CSV
training_file = BUCKET + '/data/recommendation_events.csv'
t2 = BigQueryToCloudStorageOperator( # correct the name
    task_id='bq_export_op',
    source_project_dataset_table='%s.recommendation_events' % DATASET,
    destination_cloud_storage_uris=[training_file],
    export_format='CSV',
    dag=dag
)


# ML Engine training job

job_id = 'recserve_{0}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M'))
job_dir = BUCKET + '/jobs/' + job_id
output_dir = BUCKET
training_args = ['--job-dir', job_dir,
                 '--train-files', training_file,
                 '--output-dir', output_dir,
                 '--data-type', 'web_views',
                 '--use-optimized']

# TODO: Fill in the missing operator name for task #3 which will
# start a new training job to Cloud ML Engine
# Reference: https://airflow.apache.org/integration.html#cloud-ml-engine
# https://cloud.google.com/ml-engine/docs/tensorflow/machine-types
t3 = MLEngineTrainingOperator( # complete the name
    task_id='ml_engine_training_op',
    project_id=PROJECT_ID,
    job_id=job_id,
    package_uris=[PACKAGE_URI],
    training_python_module='trainer.task',
    training_args=training_args,
    region=REGION,
    scale_tier='CUSTOM',
    master_type='complex_model_m_gpu',
    dag=dag
)

# App Engine deploy new version

t4 = AppEngineVersionOperator(
    task_id='app_engine_deploy_version',
    project_id=PROJECT_ID,
    service_id='default',
    region=REGION,
    service_spec=None,
    dag=dag
)

# TODO: Be sure to set_upstream dependencies for all tasks
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3) # complete


Overwriting airflow/dags/training.py


### Copy local Airflow DAG file and plugins into the DAGs folder

In [30]:
%bash
gsutil cp airflow/dags/training.py gs://${AIRFLOW_BUCKET}/dags # overwrite if it exists
gsutil cp -r airflow/plugins gs://${AIRFLOW_BUCKET} # copy custom plugins

Copying file://airflow/dags/training.py [Content-Type=text/x-python]...
/ [0 files][    0.0 B/  6.0 KiB]                                                / [1 files][  6.0 KiB/  6.0 KiB]                                                
Operation completed over 1 objects/6.0 KiB.                                      
Copying file://airflow/plugins/ml_engine_plugin.py [Content-Type=text/x-python]...
/ [0 files][    0.0 B/  7.9 KiB]                                                / [1 files][  7.9 KiB/  7.9 KiB]                                                Copying file://airflow/plugins/gae_admin_plugin.py [Content-Type=text/x-python]...
/ [1 files][  7.9 KiB/ 17.9 KiB]                                                / [2 files][ 17.9 KiB/ 17.9 KiB]                                                -
Operation completed over 2 objects/17.9 KiB.                                     


2. Navigate to your Cloud Composer [instance](https://console.cloud.google.com/composer/environments?project=)<br/><br/>

3. Trigger a __manual run__ of your DAG for testing<br/><br/>

3. Ensure your DAG runs successfully (all nodes outlined in dark green and 'success' tag shows)

![Successful Airflow DAG run](./img/airflow_successful_run.jpg "Successful Airflow DAG run")


## Troubleshooting your DAG

DAG not executing successfully? Follow these below steps to troubleshoot.

Click on the name of a DAG to view a run (ex: recommendations_training_v1)

1. Select a node in the DAG (red or yellow borders mean failed nodes)
2. Select View Log
3. Scroll to the bottom of the log to diagnose
4. X Option: Clear and immediately restart the DAG after diagnosing the issue

Tips:
- If bq_rec_training_data immediately fails without logs, your DAG file is missing key parts and is not compiling
- ml_engine_training_op will take 9 - 12 minutes to run. Monitor the training job in [ML Engine](https://console.cloud.google.com/mlengine/jobs?project=)
- Lastly, check the [solution endtoend.ipynb](../endtoend/endtoend.ipynb) to compare your lab answers

![Viewing Airflow logs](./img/airflow_viewing_logs.jpg "Viewing Airflow logs")

# Congratulations!
You have made it to the end of the end-to-end recommendation system lab. You have successfully setup an automated workflow to retrain and redeploy your recommendation model.

***
# Challenges

Looking to solidify your Cloud Composer skills even more? Complete the __optional challenges__ below
<br/><br/>
### Challenge 1
Use either the [BigQueryCheckOperator](https://airflow.apache.org/integration.html#bigquerycheckoperator) or the [BigQueryValueCheckOperator](https://airflow.apache.org/integration.html#bigqueryvaluecheckoperator) to create a new task in your DAG that ensures the SQL query for training data is returning valid results before it is passed to Cloud ML Engine for training. 
<br/><br/>
Hint: Check for COUNT() = 0 or other health check
<br/><br/><br/>
### Challenge 2
Create a Cloud Function to [automatically trigger](https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf) your DAG when a new recommendation_events.csv file is loaded into your Google Cloud Storage Bucket. 
<br/><br/>
Hint: Check the [composer_gcf_trigger.ipynb lab](../composer_gcf_trigger/composertriggered.ipynb) for inspiration
<br/><br/><br/>
### Challenge 3
Modify the BigQuery query in the DAG to only train on a portion of the data available in the dataset using a WHERE clause filtering on date. Next, parameterize the WHERE clause to be based on when the Airflow DAG is run
<br/><br/>
Hint: Make use of prebuilt [Airflow macros](https://airflow.incubator.apache.org/_modules/airflow/macros.html) like the below:

_constants or can be dynamic based on Airflow macros_ <br/>
max_query_date = '2018-02-01' # {{ macros.ds_add(ds, -7) }} <br/>
min_query_date = '2018-01-01' # {{ macros.ds_add(ds, -1) }} 


## Additional Resources

- Follow the latest [Airflow operators](https://github.com/apache/incubator-airflow/tree/master/airflow/contrib/operators) on github