In [1]:
%matplotlib inline
from google.cloud import storage
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import io
import numpy as np
from pathlib import Path
import tensorflow as tf
import os
from PIL import Image
import json
import time
import re
import googleapiclient.discovery as discovery
from googleapiclient import errors

In [2]:
# Define bucket and blob prefix

project = 'optimum-treat-262616'
photo_bucket_name = 'catflap-photos-raw'
model_bucket_name = 'cat-detection-models'
prefix = '2020-06-07'

# Create input json file

In [3]:
# Set up buckets

client = storage.Client()
photo_bucket = client.get_bucket(photo_bucket_name)
model_bucket = client.get_bucket(model_bucket_name)

In [4]:
# Get list of blob names

blobs = photo_bucket.list_blobs(prefix=prefix)
blob_list = [blob.name for blob in blobs]
print(len(blob_list))

3056


In [5]:
# Read labels into pandas dataframe

batch_input_filename = f'/home/jupyter/batch-input-{prefix}.json'
with open(batch_input_filename, 'w') as fp:
    for idx, blob_name in enumerate(blob_list[:]):

        # Read blob from GCS
        blob = photo_bucket.blob(blob_name)
        blob_str = blob.download_as_string()
        bytes_io = io.BytesIO(blob_str)
        img = mpimg.imread(bytes_io, format='jpg')
        img_red_downsample = img[::10,::10,0]

        # Write to file
        json_instances_dict = {'flatten_input': img_red_downsample.tolist(), 'key': blob_name}
        json.dump(json_instances_dict, fp)
        fp.write('\n')
        
        if idx % 100 == 0:
            print(pd.Timestamp.now(), idx)

2020-06-09 20:55:03.678352 0
2020-06-09 20:55:15.888998 100
2020-06-09 20:55:27.641290 200
2020-06-09 20:55:40.221318 300
2020-06-09 20:55:52.608006 400
2020-06-09 20:56:04.347015 500
2020-06-09 20:56:16.099585 600
2020-06-09 20:56:27.871750 700
2020-06-09 20:56:39.236348 800
2020-06-09 20:56:50.389495 900
2020-06-09 20:57:02.570344 1000
2020-06-09 20:57:14.500155 1100
2020-06-09 20:57:26.351837 1200
2020-06-09 20:57:38.547688 1300
2020-06-09 20:57:50.421774 1400
2020-06-09 20:58:01.784858 1500
2020-06-09 20:58:13.583922 1600
2020-06-09 20:58:25.073078 1700
2020-06-09 20:58:36.785304 1800
2020-06-09 20:58:48.776435 1900
2020-06-09 20:59:00.307508 2000
2020-06-09 20:59:13.150699 2100
2020-06-09 20:59:25.511384 2200
2020-06-09 20:59:37.998924 2300
2020-06-09 20:59:49.472976 2400
2020-06-09 21:00:00.844571 2500
2020-06-09 21:00:12.510390 2600
2020-06-09 21:00:23.996578 2700
2020-06-09 21:00:35.567576 2800
2020-06-09 21:00:47.081896 2900
2020-06-09 21:00:58.374019 3000


In [6]:
# Upload batch input json file to GCS

batch_input_blob = model_bucket.blob('batch-input-keys/'+prefix+'.json')
batch_input_blob.upload_from_filename(batch_input_filename)

# Submit batch prediction job

In [7]:
def make_batch_job_body(project_name, input_paths, output_path,
        model_name, region, data_format='JSON',
        version_name=None, max_worker_count=None,
        runtime_version=None):

    project_id = 'projects/{}'.format(project_name)
    model_id = '{}/models/{}'.format(project_id, model_name)
    if version_name:
        version_id = '{}/versions/{}'.format(model_id, version_name)

    # Make a jobName of the format "model_name_batch_predict_YYYYMMDD_HHMMSS"
    timestamp = time.strftime('%Y%m%d_%H%M%S', time.gmtime())

    # Make sure the project name is formatted correctly to work as the basis
    # of a valid job name.
    clean_project_name = re.sub(r'\W+', '_', project_name)

    job_id = '{}_{}_{}'.format(clean_project_name, model_name,
                           timestamp)

    # Start building the request dictionary with required information.
    body = {'jobId': job_id,
            'predictionInput': {
                'dataFormat': data_format,
                'inputPaths': input_paths,
                'outputPath': output_path,
                'region': region}}

    # Use the version if present, the model (its default version) if not.
    if version_name:
        body['predictionInput']['versionName'] = version_id
    else:
        body['predictionInput']['modelName'] = model_id

    # Only include a maximum number of workers or a runtime version if specified.
    # Otherwise let the service use its defaults.
    if max_worker_count:
        body['predictionInput']['maxWorkerCount'] = max_worker_count

    if runtime_version:
        body['predictionInput']['runtimeVersion'] = runtime_version

    return body

In [8]:
# Create batch job body

batch_predict_body = make_batch_job_body(
    project_name = project, 
    input_paths = f'gs://{model_bucket_name}/batch-input-keys/{prefix}.json', 
    output_path = f'gs://{model_bucket_name}/batch-output-keys/{prefix}/',
    model_name = 'logistic_regression_v2', 
    region = 'europe-west2',
    version_name='logistic_regression_v2', 
    max_worker_count=20)

batch_predict_body

{'jobId': 'optimum_treat_262616_logistic_regression_v2_20200609_210105',
 'predictionInput': {'dataFormat': 'JSON',
  'inputPaths': 'gs://cat-detection-models/batch-input-keys/2020-06-07.json',
  'outputPath': 'gs://cat-detection-models/batch-output-keys/2020-06-07/',
  'region': 'europe-west2',
  'versionName': 'projects/optimum-treat-262616/models/logistic_regression_v2/versions/logistic_regression_v2',
  'maxWorkerCount': 20}}

In [9]:
# Submit batch prediction job

project_id = 'projects/{}'.format(project)

ml = discovery.build('ml', 'v1')
request = ml.projects().jobs().create(parent=project_id, body=batch_predict_body)

try:
    response = request.execute()

    print('Job requested.')

    # The state returned will almost always be QUEUED.
    print('state : {}'.format(response['state']))

except errors.HttpError as err:
    # Something went wrong, print out some information.
    print('There was an error getting the prediction results.' +
          'Check the details:')
    print(err._get_reason())

Job requested.
state : QUEUED
