In [10]:
# PyCloud

# Example of distributed video processing with PyCloud - detecting imagenet classess from video frames over queue

import numpy as np
import tempfile
import os
import cv2
from skimage.transform import resize
import tensorflow as tf
import time
from pycloud.core import PyCloud
import logging

CLOUD = PyCloud.get_instance()

LOGGER = logging.getLogger("Video processing")

@CLOUD.endpoint("resnet50")
def preprocess(img):
    LOGGER.info("Image shape: %s",img.shape)
    img = np.expand_dims(img, axis=0)
    img = tf.keras.applications.resnet50.preprocess_input(img)
    return img

@CLOUD.init_service("resnet50")
def initialize_resnet():
    model = tf.keras.applications.ResNet50(
        include_top=True, weights='imagenet', input_tensor=None, input_shape=None,
        pooling=None, classes=1000
    )
    return {'resnet50': model}

@CLOUD.queue_consumer("resnet50")
def classify(img, video_id, frame_number):
    result= CLOUD.initialized_data()['resnet50'].predict(preprocess(img))
    result = np.argmax(result)
    CLOUD.enqueue(get_frame_result, video_id, frame_number, result)

In [11]:
# PyCloud

    
@CLOUD.init_service("api")
def initialize_api():
    return {"videos": {}}
    
@CLOUD.endpoint("api")
def detect(video):
    video_id = str(time.time()).split(".")[0]
    distribute_frames(video, video_id)
    frames = CLOUD.initialized_data()["videos"][video_id]
    while True:
        # wait for detection of all frames
        for frame_id, recognized_class in frames.items():
            if recognized_class is None:
                break
        else:
            return frames
        time.sleep(0.5)
    
@CLOUD.queue_consumer("api")
def get_frame_result(video_id, frame_id, recognized_class):
    frames = CLOUD.initialized_data()["videos"][video_id]
    frames[frame_id] = recognized_class
    
@CLOUD.endpoint("api")
def distribute_frames(video, video_id):
    tempfile = '/tmp/{}'.format(time.time())
    with open(tempfile, "wb") as temp:
        temp.write(video)
    vidcap = cv2.VideoCapture(tempfile)
    success,image = vidcap.read()
    count = 0
    frames = {}
    CLOUD.initialized_data()["videos"][video_id] = frames
    while success: 
        frames[count] = None
        CLOUD.enqueue(classify, image, video_id, count)
        success, image = vidcap.read()
        LOGGER.info('Read a new frame: %s', success)
        count += 1
    os.remove(tempfile)
      

In [13]:
# PyCloud

def build_app():
    
    # local testing
    initialize_api()
    initialize_resnet()
    with open("./your_video.mp4", "rb") as file:
        video_bytes = file.read()
    print(detect(video_bytes))
    
    # service configuration
    CLOUD.configure_service("api", exposed=True, preferred_ports={'HTTP': 5001})
    CLOUD.configure_service("resnet50", package_deps=['libgl1-mesa-glx'])
    CLOUD.set_basic_auth_credentials("pycloud", "demo")
    
CLOUD.build(build_app)

In [9]:
from pycloud_cli.eks import EKSLauncher

launcher = EKSLauncher(CLOUD)
launcher.exec()

In [3]:
!time curl localhost:5001 -F "endpoint_id=detect@process_video.ipynb" -F "video=@./your_video.mp4"
    