# Analyze images to detect anomalies using Lookout for Vision Model

1. Change the values of the following variables to match your lookout for vision model parameters
bucketname(bucket with images for inference), anomaly_bucketname(new bucket where images with anomalies will be copied to post inference), model, & lkvproject

2. This code block analyzes images to detect anomalies against the Lookout for Vision model that was previously trained



In [None]:
import boto3
import io
from PIL import Image, ImageDraw, ExifTags, ImageColor, ImageFont
from IPython.display import display 
import pandas as pd
import numpy as np
import cv2
%matplotlib inline

lkvclient = boto3.client('lookoutvision')
s3_connection = boto3.resource('s3')

#Create a pandas dataframe
header = {'filename': [], 'anomaly': "", 'confidence': ""}

#Create dataframe
df = pd.DataFrame(header)


#Function to analyze image with api call to lookout for vision model endpoint#
def detect_lookout_anomalies(model, lkvproject, contenttype, index, photopath, photo):
      
    with open(photopath, "rb") as image:
        response = lkvclient.detect_anomalies(
            ProjectName=lkvproject,
            ModelVersion=model,
            Body=image.read(),
            ContentType=contenttype
        )
    
    print('Detected anomaly for ' + photo) 
    print("Anomalous?: " + str(response["DetectAnomalyResult"]["IsAnomalous"]))
    print("Confidence: " + str(response["DetectAnomalyResult"]["Confidence"])) 
    anomaly = str(response["DetectAnomalyResult"]["IsAnomalous"])
    confidence = str(response["DetectAnomalyResult"]["Confidence"])
    
    # add entries to the pandas dataframe    
    df.loc[index] = [photo, anomaly, confidence]
    return str(response["DetectAnomalyResult"]["IsAnomalous"])


#Function to call to copy anomalous images from source bucket to new destination bucket.
def copy_to_bucket(bucket_from_name, bucket_to_name, file_name):
    s3_resource = boto3.resource('s3')
    copy_source = {
        'Bucket': bucket_from_name,
        'Key': file_name
    }
    s3_resource.Object(bucket_to_name, file_name).copy(copy_source)



#Main function
def main():
    
    #Variables
    min_confidence=95
    max_results = 4
    bucketname = "anaomaly-detection-and-localization-inference"
    anomaly_bucketname = "circuit-board-custom-labels-anomaly-experiment-2"
    model = "1"
    lkvproject = "circuit-board-anomaly-detection-experiment-2"
    contenttype = "image/jpeg"
    index = 0
    
    clients3 = boto3.client('s3')
    paginator = clients3.get_paginator('list_objects_v2')
    result = paginator.paginate(Bucket=bucketname)
    
    for page in result:
        if "Contents" in page:
            for key in page[ "Contents" ]:
                photo = key[ "Key" ]
                print(photo)
                s3_object = s3_connection.Object(bucketname,photo)
                s3_response = s3_object.get()
                stream = io.BytesIO(s3_response['Body'].read())
                image1=Image.open(stream)
                imgWidth, imgHeight = image1.size  
                draw = ImageDraw.Draw(image1) 
                image1.show()
                file_obj = clients3.get_object(Bucket=bucketname, Key=photo)
                # reading the file content in bytes
                file_content = file_obj["Body"].read()
                # creating 1D array from bytes data range between[0,255]
                np_array = np.frombuffer(file_content, np.uint8)
                # decoding array
                image_np = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
                # saving image to tmp (writable) directory
                cv2.imwrite("/tmp/image.jpeg", image_np)
                photopath = "/tmp/image.jpeg"
                #call lookout for vision model function to detect anomalies
                anomaly=detect_lookout_anomalies(model, lkvproject, contenttype, index, photopath, photo)
                print(anomaly)
                if anomaly == 'True':
                    print("Anomaly detected! Copy to Anomaly folder")
                    copy_to_bucket(bucketname, anomaly_bucketname, photo)  
                else:
                    print("No anomalies!")
                index += 1
main()


# Print the results of the Lookout for Vision Anomaly Detection inference

Output the results of the inference to a csv file. This csv file can be found in your main directory of the Jupyter notebook.

In [None]:
df.to_csv("circuitboard_lfv_anomaly_results.csv")

# Identify specific anomalies using Rekogniton Custom Labels model inference

1. Change the values of the following variables to match your Rekognition Custom Labels model parameters in the main() function
model_rekcl , anomaly_bucketname (name of the bucket where Lookout for Vision code will copy the images with anomalies)

2. Change the values of the following variables in the show_custom_labels() function to match your labels
solder_defect,scratch, damaged_board, bent_pins 

3. Change the panda dataframe to reflect the labels of your dataset. Make corresponding changes when adding entries to the dataframe post inference.

4. This code block analyzes the images detected as anomalies using Lookout for vision using the Rekognition custom labels model endpoint and identifies specific anomalies like bent pins, damaged board, solder defect, & scratches

In [13]:
import boto3
import io
from PIL import Image, ImageDraw, ExifTags, ImageColor, ImageFont
from IPython.display import display 
import pandas as pd
import numpy as np
import cv2
%matplotlib inline

rekclclient=boto3.client('rekognition')
s3_connection = boto3.resource('s3')

#Create a pandas dataframe
header = {'filename': [], 'solder defect': "", 'damaged board': "", 'bent pins': "", 'scratch': "", 'width':"", 'height':""}

#Create dataframe
df = pd.DataFrame(header)


#Function to call api of Rekognition custom labels model endpoint
def show_custom_labels(model_rekcl, bucketname, photo, min_confidence, max_results, index):
    s3_object = s3_connection.Object(bucketname,photo)
    s3_response = s3_object.get()
    stream = io.BytesIO(s3_response['Body'].read())
    image=Image.open(stream)
    imgWidth, imgHeight = image.size  
    draw = ImageDraw.Draw(image) 
    image.show()
    label = 'none'
    confidence = 0
    
    #Variables
    solder_defect = 'solder defect'
    scratch = 'scratch'
    damaged_board = 'damaged board'
    bent_pins = 'bent pins'
    
    #Call DetectCustomLabels
    response = rekclclient.detect_custom_labels(Image={'S3Object': {'Bucket': bucketname, 'Name': photo}},
        MaxResults = max_results,
        MinConfidence=min_confidence,
        ProjectVersionArn=model_rekcl)
   
    #Print out custom label properties
    for customLabel in response['CustomLabels']:
        print('Label ' + str(customLabel['Name'])) 
        print('Confidence ' + str(customLabel['Confidence'])) 
        label = str(customLabel['Name'])
        confidence = customLabel['Confidence']
        print(f'width={imgWidth}, height={imgHeight}')
        if (label == solder_defect):
            solder_defect_confidence = confidence
        elif (label == scratch):
            scratch_confidence = confidence
        elif (label == damaged_board):
            damaged_board_confidence = confidence
        else:
            bent_pins_confidence = confidence
        
        
    # add entries to the pandas dataframe    
    df.loc[index] = [photo, solder_defect_confidence, damaged_board_confidence, bent_pins_confidence, scratch_confidence, imgWidth, imgHeight]
    return len(response['CustomLabels'])

#Main function
def main():
    
    #Variables
    model_rekcl='arn:aws:rekognition:us-east-1:083291618045:project/circuit-board-custom-labels-experiment-2/version/circuit-board-custom-labels-experiment-2.2021-04-27T06.27.15/1619530035592'
    min_confidence=0
    max_results = 4
    anomaly_bucketname = "circuit-board-custom-labels-anomaly-experiment-2"
    contenttype = "image/jpeg"
    index = 0
    
    clients3 = boto3.client('s3')
    paginator = clients3.get_paginator('list_objects_v2')
    result = paginator.paginate(Bucket=anomaly_bucketname)
    
    for page in result:
        if "Contents" in page:
            for key in page[ "Contents" ]:
                photo = key[ "Key" ]
                print(photo)
                file_obj = clients3.get_object(Bucket=anomaly_bucketname, Key=photo)
                #call function
                label_count=show_custom_labels(model_rekcl, anomaly_bucketname, photo, min_confidence, max_results, index)
                print("Custom labels detected: " + str(label_count))
                index += 1
               
               
main()


extra_images-anomaly_10.jpg
Label solder defect
Confidence 73.2249984741211
width=4000, height=2667
Label bent pins
Confidence 9.4350004196167
width=4000, height=2667
Label scratch
Confidence 8.744999885559082
width=4000, height=2667
Label  damaged board
Confidence 8.595999717712402
width=4000, height=2667
Custom labels detected: 4
extra_images-anomaly_2.jpg
Label solder defect
Confidence 72.0929946899414
width=4000, height=2667
Label bent pins
Confidence 9.959000587463379
width=4000, height=2667
Label scratch
Confidence 9.660000801086426
width=4000, height=2667
Label  damaged board
Confidence 8.288999557495117
width=4000, height=2667
Custom labels detected: 4
extra_images-anomaly_3.jpg
Label solder defect
Confidence 76.30500030517578
width=4000, height=2667
Label  damaged board
Confidence 8.496000289916992
width=4000, height=2667
Label scratch
Confidence 7.8939995765686035
width=4000, height=2667
Label bent pins
Confidence 7.304999828338623
width=4000, height=2667
Custom labels detect

# Print the results of Rekognition Custom Labels model inference

Output the results of the inference to a csv file. This csv file can be found in your main directory of the Jupyter notebook.

In [14]:
df.to_csv("circuitboard_rekcl_anomaly_results.csv")