In [1]:
# For Property 4 check, @David-Lor, see: https://github.com/David-Lor/FastAPI_LightningTalk-Notebook/blob/master/FastAPI.ipynb
import uvicorn
from fastapi import FastAPI
import socket

MY_IP_ADDRESS = '10.0.1.7'
PORT = 8080
app = FastAPI()

def run():
    uvicorn.run(app, port=PORT, host=MY_IP_ADDRESS)
    
from multiprocessing import Process
from wait4it import wait_for

_api_process = None

def start_api():
    """Stop the API if running; Start the API; Wait until API (port) is available (reachable)"""
    global _api_process
    if _api_process:
        _api_process.terminate()
        _api_process.join()

    _api_process = Process(target=run, daemon=True)
    _api_process.start()
    wait_for(host=MY_IP_ADDRESS,port=PORT)

def delete_route(method: str, path: str):
    """Delete the given route from the API. This must be called on cells that re-define a route"""
    [app.routes.remove(route) for route in app.routes if method in route.methods and route.path == path]

In [2]:
# This notebook is a test of several properties that are needed 
# to stand up an end to end pool of online learners
from vowpalwabbit import pyvw

In [3]:
# Property 1: Inital test to verify that more than one vw can be used
# and that they do not share model memory

first_vw = pyvw.vw(quiet=True)
second_vw = pyvw.vw(quiet=True)
print(f"first vw has {hex(id(first_vw))} and second vw has {hex(id(second_vw))}")
id(first_vw) == id(second_vw), "First and second instances point to the same memory!"

# But they don't seem to share model memory? Note how even the examples are in scope of
# the instance
ex = first_vw.example('1 | a b c')
first_vw.learn(ex)

first_predict = first_vw.predict(ex)
second_predict = second_vw.predict(ex)

print(f"first model predicted {first_predict}, second predicted  {second_predict}")
assert first_predict != second_predict, "Models seems equivalent because same prediction was given"

first vw has 0x7fcc1d9524a0 and second vw has 0x7fcc3471b360
first model predicted 0.632030725479126, second predicted  0.0


In [4]:
# Property 2: Referencing a config file, create a pool of learners
# The pool should be threadsafe (deduction from locked_dict library)
from dynaconf import settings
import locked_dict.locked_dict as locked_dict
from coolname import generate_slug

pool_of_learners = locked_dict.LockedDict()

a_key = None
for _ in range(settings.NUMBER_OF_LEARNERS):
    name = generate_slug(2)
    print(f"\t ...adding {name}")
    pool_of_learners[name] = pyvw.vw(quiet=True)
    a_key = name
    
assert type(pool_of_learners[a_key]) == type(pyvw.vw(quiet=True)), "Pool contains different types than vw!"

	 ...adding towering-kudu
	 ...adding ochre-jaybird


In [28]:
# Property 3: ImageNet can recieve and output features that a learner can accept

# class code attribution @sansi95 (https://github.com/robinsonkwame/kente-cloth-authentication/src/features/feature_processor.py)
# modified
from abc import ABC, abstractmethod
import numpy as np
from keras.applications.mobilenet_v2 import MobileNetV2
from keras.applications import imagenet_utils
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from dynaconf import settings # because we frequently update the file 
import PIL
from PIL import Image
import cv2

class FeatureProcessor(ABC):
    def __init__(self,
                 batch_size,
                 flattened_size,
                 feature_file_format
                ):
        super().__init__()
        self.batch_size = batch_size
        self.flattened_size = flattened_size
        self.feature_file_format = feature_file_format

    @staticmethod
    def create(
        feature_processor_name,
        batch_size,
        feature_file_format
    ):
        if feature_processor_name == "MobileNet":
            flattened_size = 7 * 7 * 1280
            return MobileNetFeatureProcessor(batch_size, flattened_size, feature_file_format)

    def initialize_output_processor(self, labels, feature_file_path):
        if self.feature_file_format == "npy":
            self.output_processor = NpyOutput(labels,
            self.flattened_size, self.batch_size, feature_file_path)
        elif self.feature_file_format == "csv":
            self.output_processor = CsvOutput(labels,
            self.batch_size, feature_file_path)

    @abstractmethod
    def process_image(self):
        pass

    @abstractmethod
    def create_features(self):
        pass

class MobileNetFeatureProcessor(FeatureProcessor):
    def __init__(self, batch_size, flattened_size, feature_file_format):
        super().__init__(batch_size, flattened_size, feature_file_format)
        self.model = MobileNetV2(weights="imagenet",
                                 include_top=False, 
                                 input_shape=(224, 224, 3)
                    )
        self.name = "mobile"

    def process_image(self, image_path):
        image = load_img(
            image_path,
            target_size=(224, 224)
        )
        image = img_to_array(image)
        image = np.expand_dims(image, axis=0)
        image = imagenet_utils.preprocess_input(image)
        return image

    def process_in_memory_image(self, image, dsize=(224, 224)):
        # See: https://stackoverflow.com/questions/55873174/how-do-i-return-an-image-in-fastapi
        image = cv2.resize(image, dsize=dsize)
        print('... about to convert to array')        
        image = img_to_array(image)        
        image = np.expand_dims(image, axis=0)
        print('... about to preprocess')                
        image = imagenet_utils.preprocess_input(image)
        print('about to return image...')
        return image
    
    
    def create_features(self, batch_images):
        features = self.model.predict(
            batch_images,
            batch_size= self.batch_size
        )
        features = features.reshape(
            (features.shape[0], self.flattened_size)
        )
        return features

    def create_features_for_an_image(self, the_image):
        print('about to call mobilenet model directly')
        features = self.model(
            the_image,
            training=False
        )
        print('reshaping mobilenet featurs...')
        features = features.reshape(
            (features.shape[0], self.flattened_size)
        )
        return features
    
    
def construct_vw_example(label, features):
    return f"{label} |" + np.array2string(
        features,
        precision=4,
        separator=' ',
        suppress_small=True
    )

    
the_feature_processor = FeatureProcessor.create(
        feature_processor_name=settings.FEATURE_PROCESSOR_NAME,
        batch_size=settings.BATCH_SIZE,
        feature_file_format=None
)

the_image_features = the_feature_processor.create_features(
    the_feature_processor.process_image('./real_miami_a_25.jpg') # note: 2016x2016 size! 
)
assert the_image_features.shape[1] == 7 * 7 * 1280, "feature vector is not the expected length"

TRUE = 1
image_feature_ex = pool_of_learners[a_key].example(
    construct_vw_example(TRUE, the_image_features)
)
pool_of_learners[a_key].learn(image_feature_ex)

decision = pool_of_learners[a_key].predict(image_feature_ex)
assert decision == TRUE, "The learned example was predicted to be something other than the example!"
print(f"the decision was {decision}")

the decision was 1.0


In [35]:
# Property 4: Stand up an API that passes a list of image files to a learner
# Note: this code requires a server to standup, see first block

from typing import List

from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import HTMLResponse
from PIL import Image
from io import BytesIO
import numpy as np

app = FastAPI()

def load_image_into_numpy_array(data):
    return np.array(Image.open(BytesIO(data)))

app = FastAPI()

# TODO: create new endpint that teaches
# * accepts list of files
# X * accepts list of fake/real indicators <--- text box? radio buttons?
# * calls image net on them (list of)
# * passes as exampels to vw

# TODO: create new endpoint that predicts
# * accepts list of files
# X * accepts list of fake/real indicators
# * passes to vw, returns predictions with
# * F1 metric, accuracy, etc.

# ...
# * 

def get_or_assign_learner():
    # to do: associate session or authenticated login
    # with
    return a_key

@app.post("/teach/")
async def create_teach(files: List[UploadFile] = File(...), labels: str = Form(...)):
    # Todo: move some of these to long running tasks
    individual_labels = labels.split(',')

    individual_image_features = []
    for the_file in files:
        print(f"working on {the_file.filename}")
        image = load_image_into_numpy_array(await the_file.read())
        print(f" loaded to numpy array! {image.shape}")

        image_features =\
                the_feature_processor.create_features_for_an_image(
                    the_feature_processor.process_in_memory_image(
                        image
                    )
                )

        print(image_features[1])
        
#         individual_image_features.append(
#             the_feature_processor.create_features(
#                 the_feature_processor.process_in_memory_image(
#                     image
#                 )
#         )
#     )
        print(f"... taught {the_file.filename}!")

    
#     a_key = get_or_assign_learner()
#     image_feature_ex = pool_of_learners[a_key].example(
#         construct_vw_example(TRUE, the_image_features)
#     )
#     pool_of_learners[a_key].learn(image_feature_ex)

#     decision = pool_of_learners[a_key].predict(image_feature_ex)

    
    return {
        "files": [file.filename for file in files],
        "labels": individual_labels
    }
    #return {"filenames": [file.filename for file in files]}

@app.get("/")
async def main():
    content = """
<body>
<form action="/teach/" enctype="multipart/form-data" method="post">
<input name="files" type="file" multiple>
<input name="labels" type="text" required>
<input type="submit">
</form>
</body>
    """
    return HTMLResponse(content=content)

In [36]:
start_api()

INFO:     Started server process [16197]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://10.0.1.7:8080 (Press CTRL+C to quit)


INFO:     10.0.1.36:53964 - "GET / HTTP/1.1" 200 OK
working on fake_001_0.jpg
 loaded to numpy array! (224, 224, 3)
... about to convert to array
... about to preprocess
about to return image...
about to call mobilenet model directly


In [38]:
# seems to be deamon'ed or auto-restart
_api_process.terminate()
_api_process.kill()
# Free socket