# End-to-end triplet loss using Kubeflow pipelines

## Introduction

This notebook is intended to be run from the Jupyterlab environment in a GKE-based Kubeflow cluster.  This started as a thought experiment on what an "end-to-end" ML pipeline looks like within a single notebook.  

Kubeflow pipelines uses containers to execute steps of the pipeline; instead of developing container code separately outside the notebook, we explore ways to write code once inside the notebook, and have this be executable inside the notebook and/or built into Docker containers - all from the notebook interface.


## Learning using triplet loss

To prove out the concept, we try and implement an end-to-end ML pipeline for a "triplet learning" model.  The triplet loss model is trained using face images from the [PubFig dataset](http://www.cs.columbia.edu/CAVE/databases/pubfig/).  The trained model predicts an embedded representation of a face image, such that similar faces result in more similar embeddings.  This might be usefully applied for applications such as face clustering and recognition.

Find out more about triplet loss [here](https://www.coursera.org/lecture/convolutional-neural-networks/triplet-loss-HuUtN).

## Before you run
Since we use Cloud Build directly in the notebook (hence avoiding the need for the "docker" binary being available inside Jupyter), the GKE worker nodes should be created with the `https://www.googleapis.com/auth/cloud-platform` scope.  The  Kubeflow user service account should also be granted `Cloud Build Service Account` IAM role, so that Cloud Build jobs can be submitted from the notebook.

In [None]:
%%bash
echo $GOOGLE_APPLICATION_CREDENTIALS 

## Define custom magics

We explore the use of Jupyter magics to easily work with Docker containers within the notebook environment. The following magics are defined:

`%%containerize [docker-container-tag] [filename]`

Adds the contents of the cell to a file named `[filename]`.  `[filename]` can be referenced within the container's Dockerfile (for example, `COPY [filename] /app`.

`%%containerize_and_run [docker-container-tag] [filename]`

Adds the contents of the cell to a file name `[filename]`, and additionally runs the cell's contents like normal.  This allows code to be executed within the notebook context, and avoids having to `docker build ... / docker run ...` code just for testing.

`%%containerize_file [docker-container-tag] [source_filename] [container_filename]`

Adds a file from the local file system to a container. This works just like `%%containerize`, except the contents of `source_filename` is used for the file instead of the cell's contents.

`%cloud_build [docker-container-tag]`

Submits a container build job to Cloud Build. If the job is successful, the container image will be available in Google Container Registry.

`%container_build [docker-container-tag]`

Uses a locally installed Docker to build the container.  Assumes that you have Docker installed (which the Jupyterlab environment does not).

`%container_push [docker-container-tag]`

Pushes the container to a Docker registry.  Assumes that you have Docker installed (which the Jupyterlab environment does not).

`%container_run [docker-container-tag]`

Runs the container using the locally-installed Docker.  Assumes that you have Docker installed (which the Jupyterlab environment does not).

The following code defines how our custom magics work. **It is included directly in the notebook for demonstrative purposes**, but you may want to externalise this into a separate file and import it instead.

In [1]:
from IPython.core import magic_arguments
from IPython.core.magic import line_magic, cell_magic, line_cell_magic, Magics, magics_class

@magics_class
class ContainerMagics(Magics):
    
    def __init__(self, shell):
        super(ContainerMagics, self).__init__(shell)        
        self.containers = {}


    def get_container(self, name):
        if name in self.containers.keys():
            return self.containers[name]

        
    def update_or_add_container(self, name, container):
        self.containers[name] = container
        
    
    class Container(object):
        def __init__(self, name):
            self.name = name
            self.files = {}
        
        
        def add_file(self, file_name, contents):
            self.files[file_name] = contents
            
        
        def add_from_file(self, source_file, dest_file):
            with open(source_file, 'rb') as f:
                contents = f.read()
            self.files[dest_file] = contents.decode("utf-8")
            
            
        def has_dockerfile(self):
            return 'Dockerfile' in self.files.keys()
        
        
        def build(self, args, use_gcp=False):
            import os
            import logging
            import subprocess
            import tempfile
            assert 'Dockerfile' in self.files.keys(), 'No Dockerfile defined'

            print("Building container \'{}\' ...".format(self.name))            
            td = tempfile.mkdtemp()
            for base_name in self.files.keys():
                full_path = os.path.join(td, base_name)
                with open(full_path, 'w') as f:
                    f.write(self.files[base_name])
                    print("==> Wrote {}".format(full_path))
                f.close()
            
            try:
                if use_gcp:
                    print("==> Submitting cloud build ...")                    
                    out = subprocess.check_output(['gcloud', 'builds', 'submit', '--tag', self.name, '.'], 
                                                  cwd=td, stderr=subprocess.STDOUT)
                else:
                    print("==> Running docker build ...")                    
                    out = subprocess.check_output(['docker', 'build', '-t', self.name, '.'], 
                                                  cwd=td, stderr=subprocess.STDOUT)
                print(out.decode('utf-8'))
            except subprocess.CalledProcessError as e:
                raise RuntimeError("Command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output.decode('utf-8')))
                
            
        def push(self, args):
            import subprocess          
            try:            
                print("==> Running docker push ...")
                out = subprocess.check_output(['docker', 'push', self.name])
                print(out.decode('utf-8'))
            except subprocess.CalledProcessError as e:
                raise RuntimeError("Command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output.decode('utf-8')))
            
            
        def run(self, args):
            import subprocess
            out = subprocess.check_output(['docker', 'run', self.name] + args)
            print(out.decode('utf-8'))
            
        
    @cell_magic    
    def containerize(self, line='', cell=None):
        tokens = line.split(' ')
        
        assert len(tokens) == 2, 'Expecting 2 arguments, got {}'.format(str(len(tokens)))
        container_name, file_name = tokens[0], tokens[1]
        
        container = self.get_container(container_name)
        if not container:
            container = self.Container(container_name)
            
        container.add_file(file_name, cell)
        self.update_or_add_container(container_name, container)
        
    @cell_magic    
    def containerize_and_run(self, line='', cell=None):
        self.containerize(line, cell)
        ip = get_ipython()
        ip.run_cell(cell)        

    
    @line_magic
    def docker_build(self, line=''):
        tokens = line.split(' ')
        assert len(tokens) >= 1, 'Too few arguments, expecting at least [container-name]'
        container = self.get_container(tokens[0])
        assert container is not None, 'No container defined: {}'.format(tokens[0])
        container.build(tokens[1:], use_gcp=False)
        
        
    @line_magic
    def containerize_file(self, line=''):
        tokens = line.split(' ')
        assert len(tokens) == 3, 'Expecting 3 arguments, got {}'.format(str(len(tokens)))
        container_name, source_file, dest_file = tokens[0], tokens[1], tokens[2]
        
        container = self.get_container(container_name)
        if not container:
            container = self.Container(container_name)
            
        container.add_from_file(source_file, dest_file)
        self.update_or_add_container(container_name, container)
        

    @line_magic
    def cloud_build(self, line=''):
        tokens = line.split(' ')
        assert len(tokens) >= 1, 'Too few arguments, expecting at least [container-name]'
        container = self.get_container(tokens[0])
        assert container is not None, 'No container defined: {}'.format(tokens[0])
        container.build(tokens[1:], use_gcp=True)
        
        
    @line_magic
    def push_container(self, line=''):
        tokens = line.split(' ')
        assert len(tokens) >= 1, 'Too few arguments, expecting at least [container-name]'
        container = self.get_container(tokens[0])
        assert container is not None, 'No container defined: {}'.format(tokens[0])
        container.push(tokens[1:])    
        
        
    @line_magic
    def run_container(self, line=''):
        tokens = line.split(' ')
        assert len(tokens) >= 1, 'Too few arguments, expecting at least [container-name]'    
        container = self.get_container(tokens[0])
        assert container is not None, 'No container defined: {}'.format(tokens[0])
        container.run(tokens[1:])
 

ip = get_ipython()
ip.register_magics(ContainerMagics)

## Magic testing

Before we do any real work, let's test our magics.  The `%%containerize_and_run` cell magic adds the cells contents to a specified file within a container, and additionally runs it in the notebook.  To _only_ add the file to a container and not run it in the notebook, use `%%containerize`.

Running the cell below should print `hello there world!`.  At the same time, it's added as a file to container, ready to be built later.

In [None]:
%%containerize_and_run gcr.io/wwoo-gcp/hello-world main.py
if __name__ == '__main__':
    print('hello there world!')

Let's turn this into a container.  To do so, we need a Dockerfile.  Adding a Dockerfile is just like adding any other file.

In [None]:
%%containerize gcr.io/wwoo-gcp/hello-world Dockerfile
FROM python:3
COPY main.py /
ENTRYPOINT [ "python", "/main.py" ]

The `%cloud_build` line magic submits a container build job to Cloud Build using your `$GOOGLE_APPLICATION_CREDENTIALS`.  In Kubeflow, it uses the `user-gcp-sa` secret.

`%docker_build` runs `docker build` locally to build your container image.  We don't use that here - and Jupyterlab within Kubeflow doesn't have docker installed anyway.

In [None]:
%cloud_build gcr.io/wwoo-gcp/hello-world

## Download Pubfig Dataset

First, let's download a suitable dataset.  We'll use "eval" dataset from PubFig, since this appears to have lots of example images of individual people.

Let's use the `%%containerize_and_run` magic to define some code that we can both run in the notebook (for testing), in addition to being containerisable.  The download code basically reads an input file or URLs (which we'll get from the PubFig site) and spawns a number of download threads. This runs as a single container in the Kubeflow cluster.  

An alternative (maybe better) approach could be to parallelise this as a Beam pipeline in Dataflow, but here we're just demonstrating how to run this within the Kubeflow cluster.  An example of running a Dataflow job (to convert the labelled images into TFRecords) is lower in the notebook. :)

For each image, we're interested in the face only.  Luckily, PubFig supplies the face coordinates for each image, so we'll use that to produce crops of all the faces.  We'll use PIL to do the image manipulation.

In [None]:
!pip3 install Pillow==5.3.0

In [35]:
%%containerize_and_run gcr.io/wwoo-gcp/pubfig-download downloader.py

import logging
import os
import socket
import threading
import urllib3
from queue import Queue
from PIL import Image

NUM_THREADS = 4
URL_TIMEOUT = 4
IMAGE_CROP = True 
ESCAPE_SPACES = False


class LabelWriterThread(threading.Thread):
    def __init__(self, queue, dest_dir):
        super(LabelWriterThread, self).__init__()
        self.queue = queue
        self.daemon = True
        self.dest_dir = dest_dir

    def run(self):
        file_path = os.path.join(self.dest_dir, "manifest.txt")
        with open(file_path, 'w') as f:
            while True:
                f.write(self.queue.get() + '\n')
                f.flush()
                self.queue.task_done()

        
class DownloadThread(threading.Thread):
    def __init__(self, url_queue, print_queue, classes, image_crop, dest_dir):
        super(DownloadThread, self).__init__()
        socket.setdefaulttimeout(URL_TIMEOUT)
        self.http = urllib3.PoolManager()
        self.url_queue = url_queue
        self.classes = classes
        self.dest_dir = dest_dir
        self.daemon = True
        self.image_crop = image_crop
        self.print_queue = print_queue

    def run(self):
        while True:
            dict = self.url_queue.get()
            
            try:
                name = dict["url"].split('/')[-1]
                person_dir = os.path.join(self.dest_dir, dict["rel_dir"])
                
                try:
                    if not os.path.exists(person_dir):
                        os.makedirs(person_dir)
                except Exception as e:
                    logging.info('[%s] Path exist already.')

                dest_file = os.path.join(person_dir, name)
                self.download_url(dest_file, dict["url"])

                if os.path.isfile(dest_file):
                    if self.image_crop:
                
                        crop_dir = os.path.join(person_dir, "crop")

                        if not os.path.exists(crop_dir):
                            os.makedirs(crop_dir)

                        out_filename = os.path.join(crop_dir, 'crop_' + name)

                        self.crop_image(dest_file, out_filename, dict["crop_dims"])

                        if ESCAPE_SPACES:
                            out_filename = out_filename.replace(' ', '\ ')
                        
                        label = str(self.classes[dict["rel_dir"]])
                        self.print_queue.put(out_filename + '|' + label)
                        
                    else:
                        if ESCAPE_SPACES:
                            dest_file = dest_file.replace(' ', '\ ')

                        self.print_queue.put(dest_file)
                
                
            except Exception as e:
                logging.error("[%s] Error: %s" % (self.ident, e))
           
            finally:
                self.url_queue.task_done()
                

    def download_url(self, dest_file, url):
        try:
            logging.info("[%s] Downloading %s -> %s" % (self.ident, url, dest_file))
            response = self.http.request('GET', url, preload_content=False)
            with open(dest_file, "wb") as f:
                while True:
                    data = response.read(256)
                    if not data:
                        break    
                    f.write(data)
            
        except urllib3.exceptions.HTTPError as e:
            logging.error("[%s] HTTP Error: %s, %s" % (self.ident, str(e), url))


    def crop_image(self, dest_file, crop_file, crop_dims):
        logging.info("[%s] Cropping %s -> %s" % (self.ident, dest_file, crop_file))
        c = crop_dims.split(',')
        img = Image.open(dest_file)
        img2 = img.crop((float(c[0]), float(c[1]), float(c[2]), float(c[3])))
        img2.save(crop_file)

        
def read_url_file(file_path):
    f = open(file_path)
    queue = Queue()
    classes = {}

    for line in f:
        if not line.startswith('#'):
            tokens = line.split('\t')
            queue.put({ "rel_dir": tokens[0], "url": tokens[2], "crop_dims": tokens[3]})
            if not tokens[0] in classes.keys():
                classes[tokens[0]] = len(classes)
                # print(str(tokens[0]) + ' ' + str(classes[tokens[0]]))

    f.close()
    return queue, classes


def write_class_file(classes, file):
    with open(file, 'w') as f:
        for key in classes:
            f.write(key + "\n")
            f.flush()

   
def start_download(url_file, dest_dir):
    class_file = os.path.join(dest_dir, "classes.txt")
    url_queue, classes = read_url_file(url_file)
    
    write_class_file(classes, class_file)
    print_queue = Queue()

    for i in range(NUM_THREADS):
        t = DownloadThread(url_queue, print_queue, classes, IMAGE_CROP, dest_dir)
        t.start()

    t = LabelWriterThread(print_queue, dest_dir)
    t.start()

    url_queue.join()
    print_queue.join()
    

In the notebook, we can just call `start_download` with some appropriate arguments.  We'll actually do that later on, just to show it works.

In a container, we need to pass some command line arguments into a python script with a main entrypoint.  So let's create a `main.py` file in the container to do this:

In [None]:
%%containerize gcr.io/wwoo-gcp/pubfig-download main.py
import argparse
import downloader
import sys

def main(argv):
    args_parser = argparse.ArgumentParser()
    args_parser.add_argument('--url_file', dest='url_file', required=True,
                        help='Text file containing URLs to download.')    
    args_parser.add_argument('--output_dir', dest='output_dir', required=True,
                        help='Output directory.')
    args = args_parser.parse_args(argv)
    
    downloader.start_download(args.url_file, args.output_dir)    
    

if __name__ == '__main__':
    main(sys.argv[1:])

In [None]:
%%containerize gcr.io/wwoo-gcp/pubfig-download __init__.py
## Empty file

Create a simple wrapper bash script as the entrypoint to the container.

In [None]:
%%containerize gcr.io/wwoo-gcp/pubfig-download entrypoint.sh
#!/bin/bash
BUCKET=$1
echo == Bucket is $BUCKET ==
mkdir -p /tmp/pubfig_eval
python3 /main.py --url_file /eval_urls.txt --output_dir /tmp/pubfig_eval
sed -i "s/\/tmp\/pubfig_eval\//gs\:\/\/${BUCKET}\/pubfig_eval\//" /tmp/pubfig_eval/manifest.txt
gsutil -m cp -R /tmp/pubfig_eval gs://${BUCKET}/pubfig_eval
echo "gs://${BUCKET}/pubfig_eval/manifest.txt" > /output.txt

Download the dataset URL file from the PubFig site.  Another approach would be to download the file when the container is run, but here we're going to just include the file inside the container image.

In [None]:
%%bash
echo [ Downloading PubFig url file and create a sample subset ... ]
curl -O http://www.cs.columbia.edu/CAVE/databases/pubfig/download/eval_urls.txt
tail -n 200 eval_urls.txt > sample_urls.txt

Since we've downloaded the file and it exists in the local file system, we can just use `%containerize_file` to add it to a container.

In [None]:
%containerize_file gcr.io/wwoo-gcp/pubfig-download eval_urls.txt eval_urls.txt

In [None]:
%containerize_file gcr.io/wwoo-gcp/pubfig-download sample_urls.txt sample_urls.txt

Add a `requirements.txt` file for the python dependencies, using `%%containerize` to add the cell contents to a container file.  Then define a Dockerfile and use Cloud Build to build the container!

In [None]:
%%containerize gcr.io/wwoo-gcp/pubfig-download requirements.txt
urllib3==1.24.1
Pillow==5.3.0

In [None]:
%%containerize gcr.io/wwoo-gcp/pubfig-download Dockerfile
FROM google/cloud-sdk:latest
COPY *.py /
COPY eval_urls.txt /
COPY sample_urls.txt /
COPY entrypoint.sh /
COPY requirements.txt /
RUN apt-get -y install python3-pip
RUN pip3 install -r requirements.txt
ENTRYPOINT [ "bash", "/entrypoint.sh" ]

In [None]:
%cloud_build gcr.io/wwoo-gcp/pubfig-download

Since we used `%%containerize_and_run`, the `start_download` function is defined in the notebook.  We can download the dataset within the notebook, or launch the container we just built to run the code.  For example, we could include it in the Kubeflow pipeline - which we do later on!.

In the following cells, we'll try calling `start_download` as a test of running some code that has been containerised, but also callable in the notebook.

In [24]:
%%bash
echo [ Installing python dependencies ... ]
#pip3 install urllib3==1.24.1
#pip3 install Pillow==5.3.0

echo [ Removing old files ... ]
rm -rf /tmp/pubfig
mkdir -p /tmp/pubfig

[ Installing python dependencies ... ]
[ Removing old files ... ]


There will be lots of ERRORs, as the url file contains lots of broken links.  Nothing we can do there - but we'll have some data.

In [29]:
start_download('eval_urls.txt', '/tmp/pubfig')

Let's see the output. The `manifest.txt` has a written record for each image successfully downloaded and cropped.

In [34]:
%%bash
# ls -R /tmp/pubfig
tail /tmp/pubfig/manifest.txt

/tmp/pubfig/William Macy/crop/crop_wildhogsprempic33.jpg|139
/tmp/pubfig/William Macy/crop/crop_wlcl5b.jpg|139
/tmp/pubfig/William Macy/crop/crop_207526936_d9d5d4a75e_o.jpg|139
/tmp/pubfig/William Macy/crop/crop_243909925_1aeba9954c_o.jpg|139
/tmp/pubfig/William Macy/crop/crop_245712735_4d83608e75_o.jpg|139
/tmp/pubfig/William Macy/crop/crop_39937487_72abc68cc2_o.jpg|139
/tmp/pubfig/William Macy/crop/crop_39937508_3fec58c312_o.jpg|139
/tmp/pubfig/William Macy/crop/crop_243909925_1aeba9954c_o.jpg|139
/tmp/pubfig/William Macy/crop/crop_244004030_4711d5f04b_o.jpg|139
/tmp/pubfig/William Macy/crop/crop_23878732_579d04d823_o.jpg|139


## Data preprocessing container

Let's run a Beam job on Dataflow to convert the dataset into TFRecords.  The job will read a list of image paths and labels from a CSV file, then output TFRecords which we'll use for training, evaluation and prediction later on.

In [None]:
%%containerize gcr.io/wwoo-gcp/df-preproc Dockerfile
FROM gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:latest    
RUN mkdir /app
COPY main.py /app
COPY setup.py /app
RUN mkdir /data
COPY sample.txt /data
ENTRYPOINT ["python", "/app/main.py", "--setup_file", "/app/setup.py"]

Sample snippet of the input CSV. We'll include it in the container image using our custom magics, just as a demonstration.

In [None]:
%%containerize gcr.io/wwoo-gcp/df-preproc sample.txt
gs://wwoo-pubfig/data/Alicia Keys/crop/crop_040830_keys_vmed_1p.widec.jpg|0
gs://wwoo-pubfig/data/Alicia Keys/crop/crop_081608alicia.jpg|0
gs://wwoo-pubfig/data/Alicia Keys/crop/crop_100_alicia_keys.jpg|0
gs://wwoo-pubfig/data/Alicia Keys/crop/crop_1023041_Alicia%20Keys9.jpg|0
gs://wwoo-pubfig/data/Alicia Keys/crop/crop_111708alicia.jpg|0
gs://wwoo-pubfig/data/John Travolta/crop/crop_john_travolta_004_wenn1460435.jpg|25
gs://wwoo-pubfig/data/John Travolta/crop/crop_john_travolta_06-02.jpg|25
gs://wwoo-pubfig/data/John Travolta/crop/crop_john_travolta_12215142.jpg|25
gs://wwoo-pubfig/data/John Travolta/crop/crop_john_travolta_1734222.jpg|25

Python setup file of job dependencies.

In [None]:
%%containerize gcr.io/wwoo-gcp/df-preproc setup.py
from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = ['Pillow==5.3.0']

setup(
    name='df-preproc',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description=''
)

The code for the beam job is all in a file called `main.py`, to make things easier for our cell to file mapping.  We could split things into multiple files too, by using separate cells.

In [None]:
%%containerize gcr.io/wwoo-gcp/df-preproc main.py

import apache_beam as beam
import argparse
import logging
import sys


class ReadImagesAndConvertToJpegDoFn(beam.DoFn):
    def __init__(self, prepend_path=None, image_width=100, image_height=100):
        self.prepend_path = prepend_path
        self.image_width = image_width
        self.image_height = image_height


    def process(self, elem):
        # local scoped imports
        import io
        import numpy as np
        from PIL import Image

        key, uri = elem
        file_path = uri if self.prepend_path is None else os.path.join(self.prepend_path, uri)

        # TF will enable 'rb' in future versions, but until then, 'r' is
        # required.
        def _open_file_read_binary(uri):
            # local scoped imports
            import os
            from tensorflow.python.framework import errors
            from tensorflow.python.lib.io import file_io

            try:
                return file_io.FileIO(file_path, mode='rb')
            except errors.InvalidArgumentError:
                return file_io.FileIO(file_path, mode='r')

        try:
            img_raw = []
            
            with _open_file_read_binary(file_path) as f:
                image_bytes = f.read()
                image_pil = Image.open(io.BytesIO(image_bytes)).convert('RGB')
                image_pil = image_pil.resize((self.image_width, self.image_height))
                img = np.array(image_pil)
                img_raw.append(img.tostring())

            # A variety of different calling libraries throw different exceptions here.
            # They all correspond to an unreadable file so we treat them equivalently.
        except Exception as e:  # pylint: disable=broad-except
            logging.exception('Error processing image %s: %s', uri, str(e))
            return

        # key is the class label
        # value is a dict containing the raw image bytes and source image path
        yield (key, { 'img_raw': img_raw, 'file_path': file_path })

        
class CreateTFRecords(beam.PTransform):

    def __init__(self, out_path, dataset_name):
        super(CreateTFRecords, self).__init__()
        self.out_path = out_path
        self.dataset_name = dataset_name

    def expand(self, pcol):
        import os

        class TFExampleDoFn(beam.DoFn):
            def process(self, elem):
                # local scoped imports
                import tensorflow as tf

                def _bytes_feature(value):
                    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))

                def _int64_feature(value):
                    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
                
                def _str_feature(value):
                    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

                key, value = elem
                example = tf.train.Example(features=tf.train.Features(feature={
                    'bytes': _bytes_feature(value['img_raw']),
                    'path': _bytes_feature([value['file_path'].encode('utf8')]),
                    'label': _int64_feature([int(key)])
                }))

                yield example

        (pcol
            | 'ImageToExample' >> beam.ParDo(TFExampleDoFn())
            | 'SerializeToString' >> beam.Map(lambda x: x.SerializeToString())
            | 'WriteTFRecord' >> beam.io.WriteToTFRecord(
                os.path.join(self.out_path, self.dataset_name),
                file_name_suffix='.tfrecord.gz'))   
        
def run_pipeline(p, opt=None):
    import random

    def _add_keys(x):
        t = x.split('|')
        return (t[1], t[0])

    def _unwind_samples((key, values)):
        for v in values:
            yield (key, v)

    all_data = (p | 'ReadInputCSV' >> beam.io.ReadFromText(opt.input_csv))
    train_set, eval_set = (all_data
        | 'AddKeys' >> beam.Map(_add_keys)
        | 'ReadImagesAndConvertToJpeg' >> beam.ParDo(ReadImagesAndConvertToJpegDoFn(
            prepend_path=opt.prepend_path,
            image_width=opt.image_width,
            image_height=opt.image_height))
        | 'GroupByKey' >> beam.GroupByKey()
        | 'UnwindSamples' >> beam.FlatMap(_unwind_samples)
        | 'SplitDataset' >> beam.Partition(
            lambda x, _: int(random.uniform(0, 100) < int(opt.eval_percent)), 2))

    train_set | 'CreateTrainSet' >> CreateTFRecords(out_path=opt.out_path, dataset_name='train')

    eval_set | 'CreateEvalSet' >> CreateTFRecords(out_path=opt.out_path, dataset_name='eval')

    p.run().wait_until_finish() 


def main(argv):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_csv', dest='input_csv', required=True,
                        help='Input CSV file.')
    parser.add_argument('--out_path', dest='out_path', required=True,
                        help='Output path location for TFRecord files.')
    parser.add_argument('--path_prefix', dest='path_prefix',
                        help='Image path prefix for reading source image files.')
    parser.add_argument('--image_width', dest='image_width', type=int,
                        help='Image width resize', default=64)
    parser.add_argument('--image_height', dest='image_height', type=int,
                        help='Image height resize', default=64)
    parser.add_argument('--prepend_path', dest='prepend_path',
                        help='Path to prepend for reading images')
    parser.add_argument('--eval_percent', dest='eval_percent',
                        help='Percentage of dataset to use for evaluation', required=True)
    known_args, pipeline_args = parser.parse_known_args(argv)

    p = beam.Pipeline(argv=pipeline_args)
    run_pipeline(p, known_args)
    
    # the next step of pipeline will look for this file
    with open("/output.txt", "w") as output_file:
        output_file.write(known_args.out_path)
        print("Done!")       


if __name__ == '__main__':
    main(sys.argv[1:])

The `%cloud_build` line magic submits a container build job to Cloud Build.

In [None]:
# Build the container
%cloud_build gcr.io/wwoo-gcp/df-preproc

## Generate metadata for embedding projector

In [57]:
import io
import numpy as np
import math
import tensorflow as tf

from PIL import Image


def generate_sprite_image(array, ncols=30):
    nindex, height, width, channels = array.shape
    nrows = math.ceil(nindex/ncols)
    result = np.zeros((height*nrows, width*ncols, channels), dtype=array.dtype)
    
    for i in range(0, nrows):
        for j in range(0, ncols):
            sprite_num = i*ncols+j
            if (sprite_num >= nindex):
                break
            result[i*height:(i+1)*height, j*width:(j+1)*width, :] = array[sprite_num]
    
    print('Total sprites: {}'.format(str(nindex)))
    
    return result        
    
    
def make_meta(input_path, output_dir):
    def parser(example_proto):
        features = {
            "bytes": tf.FixedLenFeature((), tf.string, default_value=""),
            "label": tf.FixedLenFeature((), tf.int64, default_value=0),
            "path": tf.FixedLenFeature((), tf.string, default_value='')
        }
        parsed_features = tf.parse_single_example(example_proto, features)
        image = tf.decode_raw(parsed_features["bytes"], tf.uint8)
        # image = tf.image.convert_image_dtype(image, dtype=tf.float32)
        return image, parsed_features["label"], parsed_features['path']
    
    sprite_array = []
    sprite_image = None
    
    image = None
    
    with tf.Session() as sess:        
        
        filenames = tf.matching_files(input_path)

        dataset = tf.data.TFRecordDataset(filenames=filenames,
                compression_type="GZIP")
        dataset = dataset.map(parser)

        iterator = dataset.make_one_shot_iterator()
        _image, _label, _path = iterator.get_next()
        
        with open(os.path.join(output_dir, 'meta.tsv'), 'wb') as f:        
            while True:
                try:
                    image, label, path = sess.run([_image, _label, _path])
                    f.write((str(label)+'\n').encode("utf-8"))
                    sprite_array.append(image.reshape([64, 64, 3]))

                except tf.errors.OutOfRangeError:
                    sprite_image = generate_sprite_image(np.asarray(sprite_array))
                    img = Image.fromarray(sprite_image, 'RGB')
                    img.save(os.path.join(output_dir, 'sprites.png'))
                    break
                    
            f.flush()
            f.close()



In [None]:
make_meta('gs://wwoo-pubfig/tfrecords/eval-00000-of-*.tfrecord.gz', '/tmp')
Image.open('/tmp/sprites.png')

The dataset is far from perfect.  We can see there's a lot of non-faces here.

<img src="sprites.png" width="800"/>

In [65]:
%%bash
gsutil cp /tmp/sprites.png gs://wwoo-pubfig/embeddings
gsutil cp /tmp/meta.tsv gs://wwoo-pubfig/embeddings

Copying file:///tmp/sprites.png [Content-Type=image/png]...
/ [1 files][  6.2 MiB/  6.2 MiB]                                                
Operation completed over 1 objects/6.2 MiB.                                      
Copying file:///tmp/meta.tsv [Content-Type=text/tab-separated-values]...
/ [1 files][  2.8 KiB/  2.8 KiB]                                                
Operation completed over 1 objects/2.8 KiB.                                      


## Training container

Container for our training code.  This is scheduled by Kubeflow pipelines like any other container and doesn't use TFJobs (that would be a nice TODO).

In [None]:
%%containerize gcr.io/wwoo-gcp/triplet-loss Dockerfile
FROM gcr.io/ml-pipeline/ml-pipeline-kubeflow-trainer:latest
RUN mkdir -p /app/meta
COPY task.py /app
RUN pip install tensorflow-hub>=0.1.1
ENTRYPOINT ["python", "/app/task.py"]

In [None]:
%%containerize gcr.io/wwoo-gcp/triplet-loss task.py
import argparse
import os
import numpy as np
import sys
import tensorflow as tf
import tensorflow_hub as hub

from tensorflow.contrib.tensorboard.plugins import projector


def initialize_hyper_params(args):
    args_parser = argparse.ArgumentParser()
    
    args_parser.add_argument(
        '--margin',
        help='Triplet loss margin',
        type=float,
        default=1.0)

    args_parser.add_argument(
        '--learning-rate',
        help='Learning rate',
        type=float,
        default=0.0001)

    args_parser.add_argument(
        '--input-files',
        help='Path to input files',
        type=str,
        required=True)

    args_parser.add_argument(
        '--batch-size',
        help='Batch size',
        type=int,
        default=200)
    
    args_parser.add_argument(
        '--image-size',
        help='Image size',
        type=int,
        default=299)    

    args_parser.add_argument(
        '--train-steps',
        help='Training steps',
        type=int,
        default=200)

    args_parser.add_argument(
        '--model-dir',
        help='Model directory',
        type=str,
        required=True)

    args_parser.add_argument(
        '--save-summary-steps',
        help='Number of steps before saving summary',
        type=int,
        default=100)

    args_parser.add_argument(
        '--num-epochs',
        help='Number of training epochs',
        type=int,
        default=None)     
    
    args_parser.add_argument(
        '--max-predictions',
        help='Maximum number of predictions',
        type=int,
        default=None)    
    
    args_parser.add_argument(
        '--embedding-dir',
        help='Predicted embeddings directory',
        type=str,
        required=False)    
       
    args_parser.add_argument(
        '--verbosity',
        choices=[
            'DEBUG',
            'ERROR',
            'FATAL',
            'INFO',
            'WARN'
        ],
        default='INFO',
    )
    
    args_parser.add_argument(
        '--mode',
        choices=[
            'train',
            'evaluate',
            'predict'
        ],
        default='train',
    )

    return args_parser.parse_args(args)


HYPER_PARAMS = None


def parser(example_proto):
    features = {
        "bytes": tf.FixedLenFeature((), tf.string, default_value=""),
        "path": tf.FixedLenFeature((), tf.string, default_value=""),        
        "label": tf.FixedLenFeature((), tf.int64, default_value=0)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    image = tf.decode_raw(parsed_features["bytes"], tf.uint8)
    image = tf.image.convert_image_dtype(image, dtype=tf.float32)
    return image, parsed_features["label"]


def generate_input_fn(file_pattern,
                      shuffle=True,
                      num_epochs=1,
                      batch_size=200,
                      multi_threading=True):

    def _input_fn():
        file_names = tf.matching_files(file_pattern)
        buffer_size = 2 * batch_size + 1

        dataset = tf.data.TFRecordDataset(filenames=file_names,
            compression_type="GZIP")
        dataset = dataset.map(parser)

        if shuffle:
            dataset = dataset.shuffle(buffer_size)

        dataset = dataset.batch(batch_size)
        dataset = dataset.prefetch(buffer_size)
        dataset = dataset.repeat(num_epochs)

        iterator = dataset.make_one_shot_iterator()
        features, labels = iterator.get_next()

        return features, labels

    return _input_fn


def build_model(images, is_training=True):
    images = tf.image.resize_images(images, (299, 299))    
    images = tf.map_fn(lambda x: tf.image.per_image_standardization(x), images, parallel_iterations=10)
    m = hub.Module("https://tfhub.dev/google/imagenet/inception_v3/feature_vector/1",
        tags={"train"}, trainable=is_training)
    embeddings = m(images)
    return tf.nn.l2_normalize(embeddings, axis=1)


def model_fn(features, labels, mode):
    is_training = (mode == tf.estimator.ModeKeys.TRAIN)
    images = tf.reshape(features, [-1, HYPER_PARAMS.image_size, HYPER_PARAMS.image_size, 3])
    assert images.shape[1:] == [HYPER_PARAMS.image_size, HYPER_PARAMS.image_size, 3], "{}".format(images.shape)

    # trainable = True if mode == tf.estimator.ModeKeys.TRAIN else False 
    trainable = True
    
    with tf.variable_scope('model'):
        embeddings = build_model(images, trainable) 
        assert embeddings.shape[1:] == [2048], "{}".format(embeddings.shape)
    
    if mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL:
        loss = tf.contrib.losses.metric_learning.triplet_semihard_loss(
            labels=labels, embeddings=embeddings, margin=HYPER_PARAMS.margin)

        if mode == tf.estimator.ModeKeys.EVAL:
            return tf.estimator.EstimatorSpec(mode=mode, loss=loss)
        
        # Minimize loss
        optimizer = tf.train.AdamOptimizer(HYPER_PARAMS.learning_rate)
        global_step = tf.train.get_global_step()
        train_op = optimizer.minimize(loss, global_step=global_step)     
        
        # Provide an estimator spec for `ModeKeys.EVAL` and `ModeKeys.TRAIN` modes.
        return tf.estimator.EstimatorSpec(mode=mode,
                                          train_op=train_op,
                                          loss=loss)

    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {'embeddings': embeddings}
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)


def main(argv):
    global HYPER_PARAMS
    HYPER_PARAMS = initialize_hyper_params(argv)

    tf.logging.set_verbosity(HYPER_PARAMS.verbosity)
    
    config = tf.estimator.RunConfig(tf_random_seed=3423458,
                                    model_dir=HYPER_PARAMS.model_dir,
                                    save_summary_steps=HYPER_PARAMS.save_summary_steps)

    estimator = tf.estimator.Estimator(model_fn=model_fn, config=config)
    
    if HYPER_PARAMS.mode == 'train':
        my_input_fn = generate_input_fn(file_pattern=HYPER_PARAMS.input_files,
                                        shuffle=True,
                                        num_epochs=HYPER_PARAMS.num_epochs,
                                        batch_size=HYPER_PARAMS.batch_size)  
        
        estimator.train(my_input_fn, max_steps=int(HYPER_PARAMS.train_steps))

        # the next step of pipeline will look for this file
        with open("/model_dir.txt", "w") as output_file:
            output_file.write(HYPER_PARAMS.model_dir)
            print("Done!")
        
        
    elif HYPER_PARAMS.mode == 'evaluate':
        my_input_fn = generate_input_fn(file_pattern=HYPER_PARAMS.input_files,
                                        shuffle=False,
                                        num_epochs=1,
                                        batch_size=HYPER_PARAMS.batch_size)        
        
        estimator.evaluate(my_input_fn)

        # the next step of pipeline will look for this file
        with open("/output.txt", "w") as output_file:
            output_file.write('done')
            print("Done!")

    elif HYPER_PARAMS.mode == 'predict':
        my_input_fn = generate_input_fn(file_pattern=HYPER_PARAMS.input_files,
                                        shuffle=False,
                                        num_epochs=1,
                                        batch_size=HYPER_PARAMS.batch_size)        
        
        predictions = estimator.predict(my_input_fn)

        embeddings = np.zeros((HYPER_PARAMS.max_predictions, 2048))
        for i, p in enumerate(predictions):
            if i > HYPER_PARAMS.max_predictions - 1:
                break
            embeddings[i] = p['embeddings']
        
        with tf.Graph().as_default():
            # Visualize test embeddings
            embedding_var = tf.Variable(embeddings, name='face_embedding')

            config = projector.ProjectorConfig()
            embedding = config.embeddings.add()
            embedding.tensor_name = embedding_var.name
            embedding.sprite.image_path = 'sprites.png' # we will create this later
            embedding.sprite.single_image_dim.extend([64, 64])    
            embedding.metadata_path = 'meta.tsv'

            summary_writer = tf.summary.FileWriter(HYPER_PARAMS.embedding_dir)        
            projector.visualize_embeddings(summary_writer, config)

            saver = tf.train.Saver()
            with tf.Session() as sess:
                sess.run(tf.global_variables_initializer())
                # saver = tf.train.Saver([embedding_var])
                saver.save(sess, os.path.join(HYPER_PARAMS.embedding_dir, "embeddings.ckpt"))

        # the next step of pipeline will look for this file
        with open("/embedding_dir.txt", "w") as output_file:
            output_file.write(HYPER_PARAMS.embedding_dir)
            print("Done!")

    
if __name__ == '__main__':
    main(sys.argv[1:])

In [None]:
%cloud_build gcr.io/wwoo-gcp/triplet-loss

## Creating the Kubeflow pipeline

At this point, we have all the containers we need already built.  All that's left to do is define a pipeline to link all the steps together!

The final pipeline should look like this:

<img src="triplet-loss-pipeline.png" width="400"/>



In [None]:
#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.3-rc.2/kfp.tar.gz'
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.4/kfp.tar.gz'

# Install Pipeline SDK
!pip3 install $KFP_PACKAGE --upgrade

In [61]:
import kfp
import kfp.dsl as dsl
import kfp.gcp as gcp
import os

PROJECT = 'wwoo-gcp'
REGION = 'us-central1'
BUCKET = 'wwoo-pubfig'
EXPERIMENT_NAME = 'triplet-learning'

TRAIN_STEPS = 5000
BATCH_SIZE = 100
IMAGE_SIZE = 64

client = kfp.Client()
#exp = client.create_experiment(name=EXPERIMENT_NAME)
exp = client.get_experiment('9ae33307-b66d-49c6-bc59-8b3296f30927')

In [67]:
class ObjectDict(dict):
    def __getattr__(self, name):
        if name in self:
            return self[name]
        else:
            raise AttributeError("No such attribute: " + name)


@dsl.pipeline(
  name='triplet_learning',
  description='Triplet learning pipeline'
)
def triplet_learning(
    project=dsl.PipelineParam(name='project', value=PROJECT),
    bucket=dsl.PipelineParam(name='bucket', value=BUCKET),
    train_steps=dsl.PipelineParam(name='train-steps', value=TRAIN_STEPS),
    batch_size=dsl.PipelineParam(name='train-batch-size', value=BATCH_SIZE),
    image_size=dsl.PipelineParam(name='image-size', value=IMAGE_SIZE)
):
    start_step = 0
    
    if start_step == 0:
        downloader = dsl.ContainerOp(
            name='pubfig-download',
            image='gcr.io/wwoo-gcp/pubfig-download:latest',
            arguments=[
                str(bucket)
              ],
            file_outputs={'manifest': '/output.txt'}
        )
    else:
        downloader = ObjectDict({
            'outputs': {
                'manifest': os.path.join('gs://', str(bucket), 'pubfig_eval', 'manifest.txt')
            }
        })
        
        
    if start_step == 1:
        preprocess = dsl.ContainerOp(
            name='preprocess-on-df',
            image='gcr.io/wwoo-gcp/df-preproc:latest',
            arguments=[
                '--project', project,
                '--runner', 'DataflowRunner',
                '--staging-location', os.path.join('gs://', str(bucket), 'staging'),
                '--experiments', 'shuffle_mode=service',
                '--eval_percent', 20,
                '--image_width', image_size,
                '--image_height', image_size,
                '--temp_location', os.path.join('gs://', str(bucket), 'temp'),
                '--job_name', 'df-preprocess',
                '--input_csv', downloader.outputs['manifest'],
                '--out_path', os.path.join('gs://', str(bucket), 'tfrecords')

              ],
            file_outputs={'outpath': '/output.txt'}
        )
    else:
        preprocess = ObjectDict({
            'outputs': {
                'outpath': os.path.join('gs://', str(bucket), 'tfrecords')
            }
        })
        
    if start_step <= 2:
        train = dsl.ContainerOp(
            name='train',
            image='gcr.io/wwoo-gcp/triplet-loss',
            arguments=[
                '--mode', 'train',
                '--input-files', os.path.join(str(preprocess.outputs['outpath']), 'train-*.tfrecord.gz'),
                '--batch-size', batch_size,
                '--learning-rate', 0.001,
                '--image-size', image_size,
                '--train-steps', train_steps,
                '--model-dir', os.path.join('gs://', str(bucket), 'model')
            ],
            file_outputs={
                'model-dir': '/model_dir.txt',
            }
        )
        train.set_memory_request('4G')
        train.set_cpu_request('3')
    else:
        train = ObjectDict({
            'outputs': {
                'model-dir': os.path.join('gs://', str(bucket), 'model')
            }
        })        


    if start_step <= 3:
        evaluate = dsl.ContainerOp(
            name='evalulate',
            image='gcr.io/wwoo-gcp/triplet-loss',
            arguments=[
                '--mode', 'evaluate',
                '--input-files', os.path.join(str(preprocess.outputs['outpath']), 'eval-*.tfrecord.gz'),
                '--batch-size', batch_size,
                '--image-size', image_size,                
                '--model-dir', train.outputs['model-dir']
            ],
            file_outputs={
                'output': '/output.txt',
            }
        )   
        evaluate.set_memory_request('12G')
        evaluate.set_cpu_request('3')     
        
   
    if start_step <= 3:
        predict_eval = dsl.ContainerOp(
            name='predict-eval',
            image='gcr.io/wwoo-gcp/triplet-loss',
            arguments=[
                '--mode', 'predict',
                '--input-files', os.path.join(str(preprocess.outputs['outpath']), 'eval-00000-of-*.tfrecord.gz'),
                '--max-predictions', 907,
                '--batch-size', batch_size,                
                '--image-size', image_size,
                '--model-dir', train.outputs['model-dir'],
                '--embedding-dir', os.path.join('gs://', str(bucket), 'embeddings')
            ],
            file_outputs={
                'embedding-dir': '/embedding_dir.txt',
            }
        )    
        predict_eval.set_memory_request('4G')
        predict_eval.set_cpu_request('3')           

In [68]:
import kfp.compiler as compiler
    
# Compile it into a tar package.
compiler.Compiler().compile(triplet_learning, 'triplet_learning.tar.gz')
run = client.run_pipeline(exp.id, 'triplet_learning', 'triplet_learning.tar.gz', params={})

## Visualise embeddings

Once the experiment finishes, you can launch Tensorboard on your laptop.

`tensorboard --logdir gs://YOUR_BUCKET/embeddings`

Example T-SNE visualization of the evaluation set.  This is one of the better looking "clusters".


<img src="t-sne-triplets-eval.png" width="800"/>



Example T-SNE visualization of a (scrubbed!) training set.  I removed all the non-face images, and used Cloud Vision API to crop the faces instead of using the provided face coordinates.


<img src="t-sne-triplets-train.png" width="800"/>