# Visual image search

_**Using a Convolutional Neural Net and Elasticsearch k-Nearest Neighbors Index to retrieve visually similar images**_

---

---

## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [TensorFlow Model Preparation](#TensorFlow-Model-Preparation)
1. [SageMaker Model Hosting](#Hosting-Model)
1. [Build a KNN Index in Elasticsearch](#ES-KNN)
1. [Evaluate Index Search Results](#Searching-with-ES-k-NN)
1. [Extensions](#Extensions)

## Background

In this notebook, we'll build the core components of a visual image search application. Visual image search is used in interfaces where instead of asking for something by voice or text, you show a photographic example of what you are looking for.

One of the core components of visual image search is a convolutional neural net (CNN) model that generates “feature vectors” representing both a query image and the reference item images to be compared against the query. The reference item feature vectors typically are generated offline and must be stored in a database of some sort, so they can be efficiently searched. For small reference item datasets, it is possible to use a brute force search that compares the query against every reference item. However, this is not feasible for large data sets where brute force search would become prohibitively slow.

To enable efficient searches for visually similar images, we'll use Amazon SageMaker to generate “feature vectors” from images and use KNN algorithm in Amazon Elasticsearch Service. KNN for Amazon Elasticsearch Service lets you search for points in a vector space and find the "nearest neighbors" for those points by Euclidean distance or cosine similarity(default is Euclidean distance). Use cases include recommendations (for example, an "other songs you might like" feature in a music application), image recognition, and fraud detection.

Here are the steps we'll follow to build the visual image search: After some initial setup, we'll prepare a model using TensorFlow for generating feature vectors, then generate feature vectors of Fashion Images from _**feidegger**_, a _**zalandoresearch**_ dataset. Those feature vectors will be imported in Amazon Elasticsearch KNN Index. Next, we'll explore some test image queries, and visualize the results.


In [None]:
%%capture
%pip install tqdm opensearch-py requests sagemaker==2.182.0 -U

In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()

s3_resource = boto3.resource("s3")
s3 = boto3.client('s3')

In [None]:
cfn = boto3.client('cloudformation')

def get_cfn_outputs(stackname):
    outputs = {}
    for output in cfn.describe_stacks(StackName=stackname)['Stacks'][0]['Outputs']:
        outputs[output['OutputKey']] = output['OutputValue']
    return outputs

## Setup variables to use for the rest of the demo
cloudformation_stack_name = "vis-search"

outputs = get_cfn_outputs(cloudformation_stack_name)

bucket = outputs['s3BucketTraining']
es_host = outputs['esHostName']

outputs

### Downloading Zalando Research data

The dataset itself consists of 8732 high-resolution images, each depicting a dress from the available on the Zalando shop against a white-background.

**Downloading Zalando Research data**: Data originally from here: https://github.com/zalandoresearch/feidegger

**Citation:** <br>
_@inproceedings{lefakis2018feidegger,_ <br>
_title={FEIDEGGER: A Multi-modal Corpus of Fashion Images and Descriptions in German},_ <br>
_author={Lefakis, Leonidas and Akbik, Alan and Vollgraf, Roland},_ <br>
_booktitle = {{LREC} 2018, 11th Language Resources and Evaluation Conference},_ <br>
_year = {2018}_ <br>
_}_


In [None]:
## Data Preparation

import os 
import json
import urllib.request
from multiprocessing import cpu_count
from tqdm.contrib.concurrent import process_map

images_path = 'data/feidegger/fashion'
filename = 'metadata.json'

my_bucket = s3_resource.Bucket(bucket)

if not os.path.isdir(images_path):
    os.makedirs(images_path)

def download_metadata(url):
    if not os.path.exists(filename):
        urllib.request.urlretrieve(url, filename)
        
# download metadata.json to local notebook
download_metadata('https://raw.githubusercontent.com/zalandoresearch/feidegger/master/data/FEIDEGGER_release_1.2.json')

def generate_image_list(filename):
    metadata = open(filename,'r')
    data = json.load(metadata)
    url_lst = []
    for i in data:
        url_lst.append(i['url'])
    return url_lst


def download_image(url):
    urllib.request.urlretrieve(url, images_path + '/' + url.split("/")[-1])
                    
# generate image list            
url_lst = generate_image_list(filename)     

workers = 2 * cpu_count()

# downloading images to local disk; This process will take approximately 2-5 minutes on a t3.medium notebook instance
_ = process_map(download_image, url_lst, max_workers=workers, chunksize=1)

In [None]:
# Uploading dataset to S3

files_to_upload = []
dirName = 'data'
for path, subdirs, files in os.walk('./' + dirName):
    path = path.replace("\\","/")
    directory_name = path.replace('./',"")
    for file in files:
        files_to_upload.append({
            "filename": os.path.join(path, file),
            "key": directory_name+'/'+file
        })

def upload_to_s3(file):
    my_bucket.upload_file(file['filename'], file['key'])

# uploading images to s3; This process will take approximately 2-5 minutes on a t3.medium notebook instance
_ = process_map(upload_to_s3, files_to_upload, max_workers=workers, chunksize=1)

## TensorFlow Model Preparation

We'll use TensorFlow backend to prepare a model for "featurizing" images into feature vectors. TensorFlow has a native Module API, as well as a higher level Keras API.

We will start with a pretrained model, avoiding spending time and money training a model from scratch. Accordingly, as a first step in preparing the model, we'll import a pretrained model from Keras application. Researchers have experimented with various pretrained CNN architectures with different numbers of layers, discovering that there are several good possibilities.

In this notebook, we'll select a model based on the ResNet architecture, a commonly used choice. Of the various choices for number of layers, ranging from 18 to 152, we'll use 50 layers. This also is a common choice that balances the expressiveness of the resulting feature vectors (embeddings) against computational efficiency (lower number of layers means greater efficiency at the cost of less expressiveness).


In [None]:
import tensorflow.keras as keras
from tensorflow.keras.applications.resnet50 import ResNet50
from PIL import Image

Now we'll get a reference ResNet50 model which is trained on Imagenet dataset to extract the feature without the actual clssifier. More specifically, we'll use that layer to generate a row vector of floating point numbers as an "embedding" or representation of the features of the image. We'll also save the model as _SavedModel_ format under **export/Servo/1** to serve from SageMaker TensorFlow serving API.


In [None]:
keras.backend.set_image_data_format(data_format='channels_last')

# Import Resnet50 model
model = ResNet50(weights='imagenet', include_top=False, pooling='avg', input_shape=(224,224,3))

In [None]:
# Creating the directory strcture
dirName = 'model/1'
if not os.path.exists(dirName):
    os.makedirs(dirName)
    print("Directory " , dirName ,  " Created ")
else:
    print("Directory " , dirName ,  " already exists")    

In [None]:
%time
# Save the model in SavedModel format
model.save('./model/1/', save_format='tf')

In [None]:
# Check the model Signature
!/home/ec2-user/anaconda3/envs/tensorflow2_p38/bin/saved_model_cli show --dir ./model/1/ --tag_set serve --signature_def serving_default

## SageMaker Model Hosting

After saving the feature extractor model we will deploy the model using Sagemaker Tensorflow Serving api which is a flexible, high-performance serving system for machine learning models, designed for production environments.TensorFlow Serving makes it easy to deploy new algorithms and experiments, while keeping the same server architecture and APIs. TensorFlow Serving provides out-of-the-box integration with TensorFlow models, but can be easily extended to serve other types of models and data. We will define **inference.py** to customize the input data to TensorFlow serving API. We also need to add **requirements.txt** file for aditional libraby in the tensorflow serving container.


In [None]:
import tarfile

# zip the model .gz format
model_version = '1'
export_dir = 'model/' + model_version
with tarfile.open('model.tar.gz', mode='w:gz') as archive:
    archive.add(export_dir, recursive=True)

In [None]:
# Upload the model to S3
sagemaker_session = sagemaker.Session()
model_path = sagemaker_session.upload_data(path='model.tar.gz', key_prefix='vis-search/tf/model')
model_path

After we upload the model to S3 we will use TensorFlow serving container to host the model. We are using ml.p3.16xlarge instance type. You may need to raise support ticket to increase the Service quotas for SageMaker hosting instance type. We will use this endpoint to generate features and import into ElasticSearch. you can also choose small instance such as "ml.m4.xlarge" to save cost.


In [None]:
# Deploy the model in Sagemaker Endpoint. This process will take ~10 min.
from sagemaker.tensorflow import TensorFlowModel

sagemaker_model = TensorFlowModel(
    model_data=model_path,
    role=role,
    framework_version='2.8'
)

predictor = sagemaker_model.deploy(initial_instance_count=3, instance_type='ml.t3.medium')

In [None]:
from io import BytesIO
import numpy as np
import requests

sm_runtime_client = boto3.client("sagemaker-runtime")

# get the features for a sample image
def download_file(url):
    r = requests.get(url)
    if r.status_code == 200:
        file = r.content
        return file
    else:
        print("file failed to download")
        return None
    
def get_s3_obj(s3_uri):
    key = s3_uri.replace(f's3://{bucket}/', '')
    payload = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
    return payload

def image_preprocessing(img_bytes, return_body=True):
    img = Image.open(BytesIO(img_bytes)).convert("RGB")
    img = img.resize((224, 224))
    img = np.asarray(img)
    img = np.expand_dims(img, axis=0)
    if return_body:
        body = json.dumps({"instances": img.tolist()})
        return body
    else:
        return img
    
def get_features(img_bytes, sagemaker_endpoint=predictor.endpoint_name):
    res = image_preprocessing(img_bytes, return_body=True)
    response = sm_runtime_client.invoke_endpoint(
        EndpointName=sagemaker_endpoint,
        ContentType="application/json",
        Body=res,
    )
    response_body = json.loads((response["Body"].read()))
    features = response_body["predictions"][0]
    return features

image_bytes = get_s3_obj('s3://e2eviz-s3buckettraining-1ddugc6fvajd6/data/feidegger/fashion/0000723855b24fbe806c20a1abd9d5dc.jpg?imwidth=400&filter=packshot')
    
features = get_features(image_bytes)
features

## Build a KNN Index in Elasticsearch

KNN for Amazon Elasticsearch Service lets you search for points in a vector space and find the "nearest neighbors" for those points by Euclidean distance or cosine similarity (default is Euclidean distance). Use cases include recommendations (for example, an "other songs you might like" feature in a music application), image recognition, and fraud detection.

KNN requires Elasticsearch 7.1 or later. Full documentation for the Elasticsearch feature, including descriptions of settings and statistics, is available in the Open Distro for Elasticsearch documentation. For background information about the k-nearest neighbors algorithm

In this step we'll get all the features zalando images and import those features into Elastichseach7.4 domain.


In [None]:
# return all s3 keys
def get_all_s3_keys(bucket):
    """Get a list of all keys in an S3 bucket."""    
    keys = []

    kwargs = {'Bucket': bucket}
    while True:
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            keys.append('s3://' + bucket + '/' + obj['Key'])

        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break

    return keys

In [None]:
# get all the zalando images keys from the bucket make a list
s3_uris = get_all_s3_keys(bucket)

In [None]:
# define a function to extract image features
from time import sleep

def extract_features(s3_uri):
    key = s3_uri.replace(f's3://{bucket}/', '')
    payload = s3.get_object(Bucket=bucket, Key=key)['Body'].read()
    try:
        response = get_features(payload)
    except:
        sleep(0.1)
        response = get_features(payload)

    del payload
    feature_lst = response
    
    return s3_uri, feature_lst

In [None]:
# This process cell will take approximately 24-25 minutes on a t3.medium notebook instance
# with 3 m5.xlarge SageMaker Hosted Endpoint instances
from multiprocessing import cpu_count
from tqdm.contrib.concurrent import process_map

workers = 2 * cpu_count()
img_feature_vectors = process_map(extract_features, s3_uris, max_workers=workers, chunksize=1)

In [None]:
# setting up the Elasticsearch connection
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth

region = boto3.Session().region_name # e.g. us-east-1
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, region)

oss = OpenSearch(
    hosts = [{'host': es_host, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [None]:
# Define KNN Elasticsearch index maping
knn_index = {
    "settings": {
        "index.knn": True
    },
    "mappings": {
        "properties": {
            "zalando_img_vector": {
                "type": "knn_vector",
                "dimension": 2048
            }
        }
    }
}

In [None]:
# Creating the Elasticsearch index
oss.indices.create(index="idx_zalando",body=knn_index,ignore=400)
oss.indices.get(index="idx_zalando")

In [None]:
# defining a function to import the feature vectors corrosponds to each S3 URI into Elasticsearch KNN index
# This process will take around ~3 min.

def es_import(elem):
    oss.index(index='idx_zalando',
             body={
                "zalando_img_vector": elem[1], 
                "image": elem[0]
             })

_ = process_map(es_import, img_feature_vectors, max_workers=workers, chunksize=1)

## Evaluate Index Search Results

In this step we will use SageMaker SDK as well as Boto3 SDK to query the Elasticsearch to retrive the nearest neighbours. One thing to mention **zalando** dataset has pretty good similarity with Imagenet dataset. Now if you hav a very domain speific problem then then you need to train that dataset on top of pretrained feature extractor model such as VGG, Resnet, Xeception, Mobilenet etc and bulid a new feature extractor model.


In [None]:
# define display_image function
def display_image(bucket, key, size=(300, 300)):
    response = s3.get_object(Bucket=bucket,Key=key)['Body']
    img = Image.open(response)
    img = img.resize(size)
    return display(img)

In [None]:
import requests
import random


urls = url_lst[0:10]

img_bytes = download_file(random.choice(urls))
features = get_features(img_bytes)

In [None]:
k = 5
idx_name = 'idx_zalando'
res = oss.search(request_timeout=30, index=idx_name,
                body={'size': k, 
                      'query': {'knn': {'zalando_img_vector': {'vector': features, 'k': k}}}})

In [None]:
for i in range(k):
    key = res['hits']['hits'][i]['_source']['image']
    key = key.replace(f's3://{bucket}/','')
    img = display_image(bucket, key)

# Deploying a full-stack visual search application


In [None]:
# download ready-made lambda package for backend api
!aws s3 cp s3://aws-ml-blog/artifacts/visual-search/function.zip ./

s3_resource.Object(bucket, 'backend/function.zip').upload_file('./function.zip')

In [None]:
s3_resource.Object(bucket, 'backend/template.yaml').upload_file('./backend/template.yaml')

sam_template_url = f'https://{bucket}.s3.amazonaws.com/backend/template.yaml'

# Generate the CloudFormation Quick Create Link

print("Click the URL below to create the backend API for visual search:\n")
print((
    f'https://console.aws.amazon.com/cloudformation/home?region={region}#/stacks/create/review'
    f'?templateURL={sam_template_url}'
    '&stackName=vis-search-api'
    f'&param_BucketName={outputs["s3BucketTraining"]}'
    f'&param_DomainName={outputs["esDomainName"]}'
    f'&param_OpenSearchURL={outputs["esHostName"]}'
    f'&param_SagemakerEndpoint={predictor.endpoint_name}'
    f'&param_LambdaCodeFile=backend/function.zip'
))

Now that you have a working Amazon SageMaker endpoint for extracting image features and a KNN index on Elasticsearch, you are ready to build a real-world full-stack ML-powered web app. The SAM template you just created will deploy an Amazon API Gateway and AWS Lambda function. The Lambda function runs your code in response to HTTP requests that are sent to the API Gateway.


In [None]:
# Review the content of the Lambda function code.
!pygmentize backend/lambda/app.py

### Once the CloudFormation Stack shows **CREATE_COMPLETE**, proceed to this cell below:


In [None]:
# Save the REST endpoint for the search API to a config file, to be used by the frontend build

api_endpoint = get_cfn_outputs('vis-search-api')['ImageSimilarityApi']

with open('./frontend/src/config/config.json', 'w') as outfile:
    json.dump({'apiEndpoint': api_endpoint}, outfile)

## Step 2: Deploy frontend services


In [None]:
# add NPM to the path so we can assemble the web frontend from our notebook code

from os import environ

npm_path = ':/home/ec2-user/anaconda3/envs/JupyterSystemEnv/bin'

if npm_path not in environ['PATH']:
    ADD_NPM_PATH = environ['PATH']
    ADD_NPM_PATH = ADD_NPM_PATH + npm_path
else:
    ADD_NPM_PATH = environ['PATH']
    
%set_env PATH=$ADD_NPM_PATH

In [None]:
%cd ./frontend/

!npm i --omit=dev

In [None]:
!npm run-script build

In [None]:
hosting_bucket = f"s3://{outputs['s3BucketHostingBucketName']}"

!aws s3 sync ./build/ $hosting_bucket

## Step 3: Browse your frontend service, and upload an image


In [None]:
%cd ../

In [None]:
print('Click the URL below:\n')
print(f'https://{outputs["cfDomain"]}/index.html')

You should see the following page:

![Website](pi3small.png)

On the website, try pasting the following URL in the URL text field.

`https://img01.ztat.net/article/spp-media-p1/3c8812d8b6233a55a5da06b19d780302/dc58460c157b426b817f13e7a2f087c5.jpg?imwidth=400&filter=packshot`


## Extensions

We have used pretrained Resnet50 model which is trained on Imagenet dataset. Now based on your use-case you can fine tune any pre-trained models, such as VGG, Inception, and MobileNet with your own dataset and host the model in Amazon SageMaker.

You can also use Amazon SageMaker Batch transform job to have a bulk feaures extracted from your stored S3 images and then you can use AWS Glue to import that data into Elasticeearch domain.


### Cleanup

Make sure that you stop the notebook instance, delete the Amazon SageMaker endpoint and delete the Elasticsearch domain to prevent any additional charges.


In [None]:
# Delete the endpoint
predictor.delete_endpoint()

In [None]:
# Empty S3 Contents
training_bucket_resource = s3_resource.Bucket(bucket)
training_bucket_resource.objects.all().delete()

hosting_bucket_resource = s3_resource.Bucket(outputs['s3BucketHostingBucketName'])
hosting_bucket_resource.objects.all().delete()