In [1]:
from google.cloud import aiplatform

import tensorflow as tf
import numpy as np
import asyncio
import aiohttp
import json
import glob
import time
from PIL import Image
from statistics import mean
from io import BytesIO

In [24]:
# Setup endpoint

# CHANGE THESE
PROJECT_NUMBER = "827354046383"
ENDPOINT_ID = "4703099415160684544" #1node8core7.5gb
auth_token = "ya29.c.b0AUFJQsGD4I7d2uhUy6E4sd_XAQJuD5RDxOjUW59iRnh3TDMyNrITtlHJ7wOdDZVYRWAPTnCK7xQ18ITsmRu5B75461J5BOIU7vtwSh2eQ3PFqJg0QDTHSVnfnakj3w0tenN4Gx7lf7ymruGmYtut5zmHX0y5g4QKL5I-KJ4FI4TwM5sIUcJkcJiZiniex7E7UxlDQuoc4tr6hjnKRP78F62aV9mFP62ueURgiNUf"

endpoint = aiplatform.Endpoint(
    endpoint_name=f"projects/{PROJECT_NUMBER}/locations/us-central1/endpoints/{ENDPOINT_ID}")

url="https://us-central1-aiplatform.googleapis.com/v1beta1/projects/{}/locations/us-central1/endpoints/{}:predict".format(PROJECT_NUMBER, ENDPOINT_ID)

# Download image labels
labels_path = tf.keras.utils.get_file('ImageNetLabels.txt','https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt')
imagenet_labels = np.array(open(labels_path).read().splitlines())

In [20]:
# Image prep, Set number of jobs in a batch

file = 'imagenet/*.jpg'
file_string = glob.glob(file)
IMAGE_SIZE = (200, 200)
instances = []
njobs = 10 # number of jobs per batch
offset = 100

for x in range(njobs):
    # print(file_string[x+offset])
    im = Image.open(file_string[x+offset])
    im = im.resize(IMAGE_SIZE)
    im = np.array(im)/255.0
    img_str = im.astype(np.float32).tolist()
    instances.append(img_str)

In [25]:
nbatches = 5 # number of batches

# Batch response times
batch_response_times = []
batch_response_times = [0 for i in range(nbatches)]

# interarrival times for batches
inter_arrival_times = []
inter_arrival_times = [3 for i in range(nbatches)]
inter_arrival_times[0] = 0

# arrival times of batches
arrival_times = []
arrival_times = [0 for i in range(nbatches)]
arrival_times[0]=round(inter_arrival_times[0],2)#arrival of first customer
#Generate arrival times
for i in range(1,nbatches):
    arrival_times[i]=round((arrival_times[i-1]+inter_arrival_times[i]),2)
    
print("Batch interarrival times {}".format(inter_arrival_times))
print("Batch arrival times      {}".format(arrival_times))

Batch interarrival times [0, 3, 3, 3, 3]
Batch arrival times      [0, 3, 6, 9, 12]


In [23]:
# Single prediction request
@asyncio.coroutine
async def predict_single(session: aiohttp.ClientSession, instances: list, curCust: int, offset: int, start_time: float) -> dict:
    job_sent_time = time.monotonic()
    headers = {"Authorization": "Bearer {}".format(auth_token)}
    payload = {"instances" : instances[curCust:(curCust+1)]}
    resp = await session.post(url, json=payload, headers=headers) 
    data = await resp.json()
    return data

# Batch prediction request
@asyncio.coroutine
async def predict_batch(session, instances, b, offset, start_time):
    # Simulating arrival times
    await asyncio.sleep(arrival_times[b])
    print("Starting batch {} at {}s after start".format(b, time.monotonic() - start_time))
    batch_start_time = time.monotonic()
    tasks = []
    for c in range(0,njobs):
        tasks.append(predict_single(session=session, instances=instances, curCust=c, offset=offset, start_time=start_time))
    jobs = await asyncio.gather(*tasks, return_exceptions=True)
    batch_finish_time = time.monotonic() - batch_start_time
    batch_response_times[b] = batch_finish_time
    print("Batch {} complete after {}s".format(b,batch_finish_time))
    
    return jobs

async def main(instances):
    async with aiohttp.ClientSession() as session:
        start_time = time.monotonic()
        batch_tasks = []
        for b in range(0,nbatches):
            batch_tasks.append(predict_batch(session, instances, b, offset, start_time))
        batches = await asyncio.gather(*batch_tasks, return_exceptions=True)
        total_time = time.monotonic() - start_time
        print("{} batches of {} jobs each took {}s".format(nbatches, njobs, total_time))
        print("Response times {}".format(batch_response_times))
        print("Mean batch response time {}".format(mean(batch_response_times)))

        return batches

await main(instances)

Initiating batch 0 coroutine at 3.6460000956139993e-06s after start
Initiating batch 1 coroutine at 0.00010630299993863446s after start
Initiating batch 2 coroutine at 0.0001255259999197733s after start
Initiating batch 3 coroutine at 0.00014236700008041225s after start
Initiating batch 4 coroutine at 0.00015557699998680619s after start
Initiating batch 5 coroutine at 0.00016791899997770088s after start
Initiating batch 6 coroutine at 0.00018009499990512268s after start
Initiating batch 7 coroutine at 0.00019236300022384967s after start
Initiating batch 8 coroutine at 0.00020460599989746697s after start
Initiating batch 9 coroutine at 0.00021667800001523574s after start
Initiating batch 10 coroutine at 0.00022850999994261656s after start
Initiating batch 11 coroutine at 0.00024039500021899585s after start
Initiating batch 12 coroutine at 0.0002544769999985874s after start
Initiating batch 13 coroutine at 0.0002673270000741468s after start
Initiating batch 14 coroutine at 0.000280461999

In [None]:
# STUFF FOR INTERARRIVAL BETWEEN JOBS NOT NEEDED

# job_response_times = []
# job_response_times = [0 for i in range(njobs)]

# interarrival times for jobs
# inter_arrival_times = []
# inter_arrival_times = [0 for i in range(njobs)]

# arrival_times = []
# arrival_times = [0 for i in range(njobs)]
# arrival_times[0]=round(inter_arrival_times[0],2)#arrival of first customer
# #Generate arrival times
# for i in range(1,njobs):
#     arrival_times[i]=round((arrival_times[i-1]+inter_arrival_times[i]),2)

In [None]:
# async def predict(session: aiohttp.ClientSession, instances: list, curCust: int, offset: int, start_time: float) -> dict:
    
#     # Simulating arrival times
#     # await asyncio.sleep(arrival_times[curCust])

#     image = file_string[curCust+offset]
#     # print("{} started {}s after start".format(image, time.monotonic() - start_time))
    
#     job_sent_time = time.monotonic()
#     headers = {"Authorization": "Bearer {}".format(auth_token)}
#     payload = {"instances" : instances[curCust:(curCust+1)]}
#     resp = await session.post(url, json=payload, headers=headers)
#     job_response_times[curCust] = time.monotonic() - job_sent_time
    
#     data = await resp.json()
    
#     # print("{} finished in {}s".format(file_string[curCust+offset],job_response_times[curCust]))
#     # return data # Uncomment to see if your reqest is working correctly
#     return image


# async def main(instances):
#     async with aiohttp.ClientSession() as session:
        
#         start_time = time.monotonic()
#         tasks = []
        
#         for c in range(0,njobs):
#             tasks.append(predict(session=session, instances=instances, curCust=c, offset=offset, start_time=start_time))
            
#         # asyncio.gather() will wait on the entire task set to be
#         # completed.  If you want to process results greedily as they come in,
#         # loop over asyncio.as_completed()
#         labels = await asyncio.gather(*tasks, return_exceptions=True)
#         total_time = time.monotonic() - start_time
        
#         print("{} predictions took {} seconds".format(njobs, total_time))
#         # print("Response times {}".format(job_response_times))
#         print("Mean response time {}".format(mean(job_response_times)))
        
#         return labels
        
# await main(instances)