# Cloudbutton Moments in Time dataset example
## Video/image prediction
In this notebook we will process video clips from the MiT dataset at scale with the Cloudbutton toolkit,  
by predicting its top 5 actions with a pretrained ResNet50 neural network model.


In [None]:
import os
import time
import pprint
import builtins
import operator
import functools
import torch.optim
import torch.nn.parallel
from torch import save, load
from torch.nn import functional as F

from utils import extract_frames, CloudFileProxy
from models import load_model, load_transform, load_categories

from cloudbutton import Pool, CloudStorage
from cloudbutton.util import get_uuid

### Download the pretrained ResNet50 model weights

In [None]:
ROOT_URL = 'http://moments.csail.mit.edu/moments_models'
WEIGHTS_FILE = 'moments_RGB_resnet50_imagenetpretrained.pth.tar'

if not os.access(WEIGHTS_FILE, os.R_OK):
    os.system('wget ' + '/'.join([ROOT_URL, WEIGHTS_FILE]))

### Backends
The same program can be run in a local environtment with processes or executed by
functions un the cloud. After we choose a backend, only a few file locations must
be modified. In this example we will be using the cloud functions backend.

First, we have to specify the prefix in our bucket where the dataset is located.
We will be using a custom runtime for our functions which has torch, torchvision,
ffmpeg and opencv-python modules already installed.
We will store the pretrained weights in the cloud so that functions can access it.
Then, after functions get the models weights they will start preprocessing input
videos and inferring them one by one.
  
Later in this notebook, we will see a little improvement detail to this process.  


In [None]:
LOCAL_EXEC = False
INPUT_DATA_DIR = 'momentsintime/input_data'
CONCURRENCY = 1000

In [None]:
if LOCAL_EXEC:
    initargs = {
        'backend': 'localhost',
        'storage_backend': 'localhost'
        }
    weights_location = '/dev/shm/' + WEIGHTS_FILE
    video_locations = [os.path.abspath(os.path.join(INPUT_DATA_DIR, name)) 
                        for name in os.listdir(INPUT_DATA_DIR)]
    open = builtins.open

else:
    CUSTOM_RUNTIME = 'dhak/pywren-runtime-pytorch:3.6'
    initargs = {
        'backend': 'ibm_cf',
        'storage_backend': 'ibm_cos',
        'runtime': CUSTOM_RUNTIME,
        'runtime_memory': 1280
        }
    weights_location = 'momentsintime/models/' + WEIGHTS_FILE
    cloud_storage = CloudStorage()
    video_locations = cloud_storage.list_tmp_data(prefix=INPUT_DATA_DIR)
    open = CloudFileProxy(cloud_storage)

As you can see, we have masked the `open` function with a proxy
to access files/objects from the cloud.  
We will use `builtins.open` from now on to explicitly access a local file.

### Save model weights to the cloud object storage / shared memory (local)

In [None]:
with builtins.open(WEIGHTS_FILE, 'rb') as f_in:
    with open(weights_location, 'wb') as f_out:
        f_out.write(f_in.read())

### Function code


In [None]:
NUM_SEGMENTS = 16

# Get dataset categories
categories = load_categories()

# Load the video frame transform
transform = load_transform()

def predict_video(open, weights_location, video_locations):
    with open(weights_location, 'rb') as f:
        model = load_model(f)
    model.eval()

    results = []
    local_video_loc = 'video_to_predict_{}.mp4'.format(get_uuid())

    for video_loc in video_locations:
        start = time.time()
        with open(video_loc, 'rb') as f_in:
            with builtins.open(local_video_loc, 'wb') as f_out:
                f_out.write(f_in.read())

        # Obtain video frames
        frames = extract_frames(local_video_loc, NUM_SEGMENTS)

        # Prepare input tensor [num_frames, 3, 224, 224]
        input_v = torch.stack([transform(frame) for frame in frames])

        # Make video prediction
        with torch.no_grad():
            logits = model(input_v)
            h_x = F.softmax(logits, 1).mean(dim=0)
            probs, idx = h_x.sort(0, True)

        # Output the prediction
        output = dict(key=video_loc, predictions={})
        for i in range(0, 5):
            output['predictions'][categories[idx[i]]] = round(float(probs[i]), 5)
        output['iter_duration'] = time.time() - start
        results.append(output)
        #os.remove(local_video_loc)

    return results

### Map functions
Similar to the `multiprocessing` module API, we use a Pool to map the video keys
across n workers (concurrency). However, we do not have to instantiate a Pool of
n workers *specificly*, it is the map function that will invoke as many workers according
to the length of the list.

In [None]:
with Pool(initargs=initargs) as pool:
    iterable = [(open, weights_location, video_locations[n::CONCURRENCY]) 
                for n in range(CONCURRENCY) if n < len(video_locations)]
    start = time.time()
    res = pool.map_async(func=predict_video, iterable=iterable)
    results = res.get()
    end = time.time()
    
print('\nDone.')
print('Videos processed:', len(video_locations))
print('Total duration:', round(end - start, 2), 'sec\n')

results = functools.reduce(operator.iconcat, results, [])
pprint.pprint(results[:10])


---------------

## Improvement
Now, since we know every function will have to pull the model weights from
the cloud storage, we can actually pack these weights with the runtime image
and reduce the start-up cost substantially.

In [None]:
initargs['runtime'] = 'dhak/pywren-runtime-resnet'
weights_location = '/momentsintime/model_weights'

In [None]:
def predict_video(open, weights_location, video_locations):
    # force load weigths from local file
    with builtins.open(weights_location, 'rb') as f:
        model = load_model(f)
    model.eval()

    results = []
    local_video_loc = 'video_to_predict_{}.mp4'.format(get_uuid())

    for video_loc in video_locations:
        start = time.time()
        with open(video_loc, 'rb') as f_in:
            with builtins.open(local_video_loc, 'wb') as f_out:
                f_out.write(f_in.read())
                
        # Obtain video frames
        frames = extract_frames(local_video_loc, NUM_SEGMENTS)

        # Prepare input tensor [num_frames, 3, 224, 224]
        input_v = torch.stack([transform(frame) for frame in frames])

        # Make video prediction
        with torch.no_grad():
            logits = model(input_v)
            h_x = F.softmax(logits, 1).mean(dim=0)
            probs, idx = h_x.sort(0, True)

        # Output the prediction
        output = dict(key=video_loc, predictions={})
        for i in range(0, 5):
            output['predictions'][categories[idx[i]]] = round(float(probs[i]), 5)

        output['iter_duration'] = time.time() - start
        results.append(output)
        #os.remove(local_video_loc)

    return results

In [None]:
with Pool(initargs=initargs) as pool:
    iterable = [(open, weights_location, video_locations[n::CONCURRENCY]) 
                for n in range(CONCURRENCY) if n < len(video_locations)]
    start = time.time()
    res = pool.map_async(func=predict_video, iterable=iterable)
    results = res.get()
    end = time.time()
    
print('\nDone.')
print('Videos processed:', len(video_locations))
print('Total duration:', round(end - start, 2), 'sec\n')

results = functools.reduce(operator.iconcat, results, [])
pprint.pprint(results[:10])


### Clean

In [None]:
if os.path.isfile(WEIGHTS_FILE):
    os.remove(WEIGHTS_FILE)

if LOCAL_EXEC:
    if os.path.isfile(weights_location):
        os.remove(weights_location)
else:
    cloud_storage.delete_cobject(key=weights_location)

### Dockerfile and build scripts for both runtimes can be found in the docker/ folder.

### Source of many of this programs code is from the demonstration in https://github.com/zhoubolei/moments_models

### Moments in Time article: http://moments.csail.mit.edu/#paper
