# Train a CTR Model

In [44]:
"""Python script to train sklearn model using the criteo dataset."""

import argparse
import csv
from datetime import date
from datetime import datetime
from datetime import time
from datetime import timedelta
import subprocess
import os

from sklearn.externals import joblib
from sklearn.feature_extraction import DictVectorizer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline

from tensorflow.python.lib.io import file_io

import xgboost

parser = argparse.ArgumentParser()
parser.add_argument('--base-dir', default='.')
parser.add_argument('--event-date',
                    default=(date.today() - timedelta(1)).strftime('%Y%m%d'))
args = parser.parse_args([])


# Dataset: http://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset/

# Number of data samples to use for training.
MAX_SAMPLES = 70000


# Read in 7 days worth of data (starting yesterday) into a key value format.

# NB: end_date is exclusive
def daterange(end_date, num_days):
    for n in range(num_days):
        yield end_date - timedelta(num_days - n)

# Data in key value format. Each element is a dict.
data = []

# Labels for the data.
y = []

num_samples = 0
for d in daterange(datetime.strptime(args.event_date, '%Y%m%d'), 7):
  data_fn = os.path.join(args.base_dir, 'logs', d.strftime('%Y%m%d'), 'click.txt')
  print('Reading file {}'.format(data_fn))
  with file_io.FileIO(data_fn, 'r') as f:
    reader = csv.reader(f, delimiter='\t')
    while num_samples < MAX_SAMPLES:
      line = reader.next()
      row = {}
      y.append(int(line[0]))
      for i in range(1, 13 + 1):
        if line[i]:
          row[str(i)] = int(line[i])
      for i in range(14, 39 + 1):
        if line[i]:
          row[str(i)] = line[i]
      data.append(row)
      num_samples += 1

# Split data into training and testing.
X_train, X_test, y_train, y_test = train_test_split(data, y, test_size=0.2)
print 'training data size: {}'.format(len(X_train))
print 'test data size: {}'.format(len(X_test))

param = {
    'max_depth': 7,
    'eta': 0.2,
    'silent': False,
    'objective': 'binary:logistic',
    'tree_method': 'hist',
    'eval_metric': 'logloss',
    'gamma': 0.4,
    'subsample': 0.8,
    'colsample_bytree': 0.8,
    'min_child_weight': 20,
    'alpha': 3,
    'lambda': 100
}
xgb = xgboost.XGBRegressor(n_estimators=2, **param)

# Setup and train the pipeline.
pipeline = Pipeline(steps=[("dict_vect", DictVectorizer()),
                           ("xgboost", xgb)])
print("Training...")
pipeline.fit(X_train, y_train)
print("Done!")

Reading file ./logs/20180606/click.txt
Reading file ./logs/20180607/click.txt
Reading file ./logs/20180608/click.txt
Reading file ./logs/20180609/click.txt
Reading file ./logs/20180610/click.txt
Reading file ./logs/20180611/click.txt
Reading file ./logs/20180612/click.txt
training data size: 56000
test data size: 14000
Training...
Done!


# Predict Locally

In [47]:
example = {
    "1": 26,
    "2": 5,
    "3": 7,
    "4": 7,
    "5": 250,
    "6": 7,
    "7": 161,
    "8": 45,
    "9": 965,
    "10": 2,
    "11": 30,
    "13": 7,
    "14": "5a9ed9b0",
    "15": "421b43cd",
    "16": "8d7d4f0a",
    "17": "29998ed1",
    "18": "25c83c98",
    "19": "fbad5c96",
    "20": "db55c967",
    "21": "0b153874",
    "22": "a73ee510",
    "23": "3b08e48b",
    "24": "4ce044d9",
    "25": "6aaba33c",
    "26": "a4fb9828",
    "27": "b28479f6",
    "28": "e1ac77f7",
    "29": "b041b04a",
    "30": "e5ba7672",
    "31": "2804effd",
    "34": "723b4dfd",
    "36": "bcdee96c",
    "37": "b34f3128",
}
predictions = pipeline.predict([example])
print("Prediction: {}".format(predictions))

Prediction: [0.4866918]


# Cloud Setup

In [53]:
% env PROJECT_ID op-beta-walkthrough
% env MODEL_DEST gs://op-beta-walkthrough-mlengine/ctr
% env MODEL_NAME ctr
% env VERSION_NAME v2
% env REGION us-central1

env: PROJECT_ID=op-beta-walkthrough
env: MODEL_DEST=gs://op-beta-walkthrough-mlengine/ctr
env: MODEL_NAME=ctr
env: VERSION_NAME=v2
env: REGION=us-central1


# Save and Upload the Model

In [50]:
# Export the model.
model = './model.joblib'
joblib.dump(pipeline, model)

['./model.joblib']

In [54]:
! gsutil cp model.joblib ${MODEL_DEST}/model.joblib

Copying file://model.joblib [Content-Type=application/octet-stream]...
-
Operation completed over 1 objects/6.1 MiB.                                      


# Deploy to Cloud

In [84]:
! gcloud ml-engine models create $MODEL_NAME --enable-logging --regions us-central1 --project $PROJECT_ID

Created ml engine model [projects/op-beta-walkthrough/models/ctr].


In [85]:
! curl -X POST -H "Content-Type: application/json" \
   -d '{"name": "'${VERSION_NAME}'", "deploymentUri": "'${MODEL_DEST}'", "runtimeVersion": "1.8", "framework": "SCIKIT_LEARN"}' \
   -H "Authorization: Bearer `gcloud auth print-access-token`" \
    https://ml.googleapis.com/v1/projects/$PROJECT_ID/models/$MODEL_NAME/versions

{
  "name": "projects/op-beta-walkthrough/operations/create_ctr_v2-1529073392835",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.ml.v1.OperationMetadata",
    "createTime": "2018-06-15T14:36:33Z",
    "operationType": "CREATE_VERSION",
    "modelName": "projects/op-beta-walkthrough/models/ctr",
    "version": {
      "name": "projects/op-beta-walkthrough/models/ctr/versions/v2",
      "deploymentUri": "gs://op-beta-walkthrough-mlengine/ctr",
      "createTime": "2018-06-15T14:36:32Z",
      "runtimeVersion": "1.8",
      "framework": "SCIKIT_LEARN",
      "pythonVersion": "2.7"
    }
  }
}


# Predict on Cloud

In [88]:
import googleapiclient.discovery

PROJECT_ID = os.environ['PROJECT_ID']
VERSION_NAME = os.environ['VERSION_NAME']
MODEL_NAME = os.environ['MODEL_NAME']

service = googleapiclient.discovery.build('ml', 'v1')
name = 'projects/{}/models/{}'.format(PROJECT_ID, MODEL_NAME)
name += '/versions/{}'.format(VERSION_NAME)

response = service.projects().predict(
    name=name,
    body={'instances': [example]}
).execute()

cloud_prediction = response['predictions'][0]

# Compare cloud prediction to local prediction
# use the same `example` from the "Predict locally" section
local_prediction = pipeline.predict([example])[0]
print("Cloud: {}\nLocal: {}".format(cloud_prediction, local_prediction))

Cloud: 0.522804915905
Local: 0.48669180274


# Train on Cloud

## Package up the code

In [77]:
% env BASE_DIR=gs://ml-engine-airflow/criteo
% env TRAIN_BIN=gs://ml-engine-airflow/criteo/bin/criteo-trainer-0.1.tar.gz

env: BASE_DIR=gs://ml-engine-airflow/criteo
env: TRAIN_BIN=gs://ml-engine-airflow/criteo/bin/criteo-trainer-0.1.tar.gz


In [69]:
! python setup.py sdist
! gsutil cp dist/criteo-trainer-0.1.tar.gz ${TRAIN_BIN}

running sdist
running egg_info
writing criteo_trainer.egg-info/PKG-INFO
writing top-level names to criteo_trainer.egg-info/top_level.txt
writing dependency_links to criteo_trainer.egg-info/dependency_links.txt
reading manifest file 'criteo_trainer.egg-info/SOURCES.txt'
writing manifest file 'criteo_trainer.egg-info/SOURCES.txt'

running check


creating criteo-trainer-0.1
creating criteo-trainer-0.1/criteo_trainer.egg-info
creating criteo-trainer-0.1/trainer
copying files to criteo-trainer-0.1...
copying setup.py -> criteo-trainer-0.1
copying criteo_trainer.egg-info/PKG-INFO -> criteo-trainer-0.1/criteo_trainer.egg-info
copying criteo_trainer.egg-info/SOURCES.txt -> criteo-trainer-0.1/criteo_trainer.egg-info
copying criteo_trainer.egg-info/dependency_links.txt -> criteo-trainer-0.1/criteo_trainer.egg-info
copying criteo_trainer.egg-info/top_level.txt -> criteo-trainer-0.1/criteo_trainer.egg-info
copying trainer/__init__.py -> criteo-trainer-0.1/trainer
copying trainer/train.py -> crite

## Submit a training job

In [83]:
! gcloud ml-engine jobs submit training ctr_`date +%s` \
    --module-name=trainer.train \
    --packages=${TRAIN_BIN} \
    --runtime-version=1.8 \
    --region=us-central1 \
    --project=ml-engine-airflow \
    -- \
      --base-dir=${BASE_DIR} \
      --event-date=20180608

Job [ctr_1529013637] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ml-engine jobs describe ctr_1529013637

or continue streaming the logs with the command

  $ gcloud ml-engine jobs stream-logs ctr_1529013637
jobId: ctr_1529013637
state: QUEUED
