In [1]:
import cv2
import os, time
from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, PublicAccess, BlobType, generate_blob_sas, BlobSasPermissions
from azure.storage.queue import QueueServiceClient
from dotenv import load_dotenv
import os
import uuid
from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient
from azure.cognitiveservices.vision.customvision.training import CustomVisionTrainingClient
from azure.cognitiveservices.vision.customvision.training.models import CustomVisionErrorException
from msrest.authentication import ApiKeyCredentials
from twilio.rest import Client

Setting up environment variables and client

In [2]:
load_dotenv()
CONNECTION_STRING = os.getenv('CONNECTION_STRING').strip()
SOURCE = os.getenv('SOURCE')
TIME_DELAY = int(os.getenv('TIME_DELAY'))
MANUAL_MODE = int(os.getenv('MANUAL_MODE'))
ACCOUNT_SID = os.environ['TWILIO_ACCOUNT_SID']
AUTH_TOKEN = os.environ['TWILIO_AUTH_TOKEN']
PHONE_NUMBER = os.environ['TWILIO_PHONE_NUMBER']
PREDICTION_KEY = os.environ['PREDICTION_KEY']
TRAINING_KEY = os.environ['TRAINING_KEY']
PROJECT_ID = os.environ['PROJECT_ID']
ENDPOINT = "https://southcentralus.api.cognitive.microsoft.com"
ACCOUNT_NAME = os.environ['ACCOUNT_NAME']
STORAGE_ACCOUNT_KEY = os.environ['STORAGE_ACCOUNT_KEY']
prediction_credentials = ApiKeyCredentials(in_headers={"Prediction-key": PREDICTION_KEY})
predictor = CustomVisionPredictionClient(ENDPOINT, prediction_credentials)


training_credentials = ApiKeyCredentials(in_headers={"Training-key": TRAINING_KEY})
trainer = CustomVisionTrainingClient(ENDPOINT, training_credentials)

client = Client(ACCOUNT_SID, AUTH_TOKEN)

KeyError: 'STORAGE_ACCOUNT_KEY'

In [34]:
connection_string = CONNECTION_STRING
source = SOURCE
time_delay = TIME_DELAY
manual_mode = MANUAL_MODE
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_name = None 
queue_service = None

Creating a blob named 'fromcamera + current date time'.

In [35]:
timestr = time.strftime("%Y%m%d-%H%M%S")
container_name = 'fromcamera' + timestr
blob_service_client.create_container(container_name)
container_client = blob_service_client.get_container_client(container_name)
container_client.set_container_access_policy(signed_identifiers={}, public_access=PublicAccess.Container)
queue_service = QueueServiceClient.from_connection_string(connection_string)
queue_service.create_queue('fromcamera' + timestr)

<azure.storage.queue._queue_client.QueueClient at 0x1c3284d3bd0>

Inference function:
- Gets the iteration name from custom vision.
- Converts the image to bytes -> compresses -> and stores it in the memory.
- Returns results recived from custom vision prefiction client that we created in [2] as 'predictor'.

In [36]:
def inference (frame):
    # Get the project by project_id
    try:
        iterations = trainer.get_iterations(PROJECT_ID)
        published_iteration = next(iteration for iteration in iterations if iteration.publish_name)
        publish_iteration_name = published_iteration.publish_name
    except StopIteration:
        print("No published iteration found. Please publish an iteration in the Custom Vision portal.")
        exit(1)

    image_jpg = cv2.imencode('.jpg',frame)[1].tobytes()
    results = predictor.detect_image(PROJECT_ID, publish_iteration_name, image_jpg)
    return results

Upload frame function:
- Takes the frame, iterator as 'i', and an optional suffix.
- Stores the frame in memory 'image_jpg'.
- Creats the blob name (adds the suffix at the end of the name if provided).
- Creates a client to upload image to storage 'blob_client'.

In [37]:
def upload_frame(frame, i, sufix=''):
    print("frame capture function returned :: " +  str(frame is not None) + " storing to container :: " + container_name)
    image_jpg = cv2.imencode('.jpg',frame)[1].tobytes()
    blob_name= 'image' + str(i) + sufix +'.jpg' if sufix else 'image' + str(i) +'.jpg'
    blob_client = container_client.get_blob_client(blob_name)
    blob_client.upload_blob(image_jpg, blob_type=BlobType.BlockBlob)
    print("Total files stored :: " + str(i))

'save_to_filesystem' function saves the frame as jpg to local storage, and sets the frame name to current date time before storing.

In [38]:
def save_to_filesystem(frame):
    #Stores frame as jpg locally
    current_time = time.strftime("%Y-%m-%d %H-%M-%S")
    local_image_location = os.path.join(os.path.join(os.path.dirname('p='), "test/"))
    cv2.imwrite(f"{local_image_location}/{current_time}.jpg", frame)

Send SMS function sends a SMS alert to the user with the image url from storage:

  Creates an empty list -> Gets all the blob from storage -> Checks if blob name has 'without' in it (as we are storing two images one with bounding box and one without bounding box) -> If 'without' not found in name, adds it to the list -> Sorts the blobs in the list by timestamp -> Generates a SAS token for the [0]th index image -> then creates a sas_url
- body: 'string' that you want to send in the SMS.
- from: phone number given by twilio.
- to: recipents phone number.

In [39]:
def send_sms():
    blob_list = []

    for blob in container_client.list_blobs():
        if 'without' in blob.name:
            continue
        blob_list.append(blob)  


    sorted_list = sorted(blob_list, key=lambda e: e.creation_time, reverse=True)
    sas_i = generate_blob_sas(
        account_name= ACCOUNT_NAME,
        container_name= container_name,
        blob_name= sorted_list[0].name,
        account_key= STORAGE_ACCOUNT_KEY,
        permission= BlobSasPermissions(read=True),
        expiry= datetime.utcnow() + timedelta(hours=8760)
        )
    sas_url = 'https://' + ACCOUNT_NAME +'.blob.core.windows.net/' + container_name + '/' + sorted_list[0].name + '?' + sas_i

    message = client.messages.create(
                        body=f'A Box is detected at you door! In case you are out you can view the image here: {sas_url}',
                        from_= PHONE_NUMBER,
                        to='+917303879964'
                    )
    print(sas_url)

1. Checking the source, if source is usb then stream will be captured from the webcam else a .mp4 video path can be provided.
2. Checking the manual mode, if manual_mode = 0 frames will be sent automatically else frames can be sent manually from camera preview.
3. Sending the frame to 'Infrance' function (created on [5]) which returns the predictions.
4. Setting the Probability threshold as 80%.
5. Looping through each prediction, and checking the probability, so that, all the predictions having a probability > 80% get skiped.
6. Adding a bounding overley over detected object.
7. Uploading the detected frame using upload_frame function (created on [6]).
8. Saving the frame as jpg using save_to_filesystem function (created on [7]).
9. Sending notification to user using send_sms function (created on [8]).
10. Adding the time_delay to pause the code for the defined time_delay in seconds.

In [40]:
if source is not None:
    if source == 'usb':
        cap = cv2.VideoCapture(0)
    else:
        cap = cv2.VideoCapture(source)
else:
    print("Please set the valuse of SOURCE variable in .env file")
    exit()

ret = True
i = 0
print('Created stream')

while ret:
    ret, frame = cap.read()
    if(frame is None):
        print("Unable to capture frame from source :" + source)
        print("Please check correct SOURCE variable is set in .env file")
        break  

    if(manual_mode == 1):
        window_name = "Press SPACE to capture or ESC to quit"
        cv2.namedWindow(window_name,cv2.WINDOW_AUTOSIZE)
        cv2.imshow(window_name, frame)
              
        k = cv2.waitKey(1)
        if k%256 == 27:
            # ESC pressed
            print("Escape hit, closing...")
            cv2.destroyAllWindows()
            break
                 
        elif k%256 == 32:
            # SPACE pressed
            i+=1 
            upload_frame(frame, i)
    else:   
        ret, frame = cap.read()
        i+=1
        results = inference(frame)
        # Set the probability threshold (e.g., 0.8 for 80%)
        probability_threshold = 0.8

        # Display the results.
        # The bounding box values are normalized, which means they are in the range of 0 to 1 relative to the image dimensions.
        # To get the actual pixel coordinates, you can multiply these values by the width and height of the image, respectively.
        for prediction in results.predictions:
            if prediction.tag_name == 'Box':
                if prediction.probability >= probability_threshold:
                    url = f'https://mlcohort.blob.core.windows.net/{container_name}/image{i}.jpg'

                    upload_frame(frame, i, '_without_overlay')

                    #Storing bounding_box coordinates as x an y axis
                    x = int(prediction.bounding_box.left * frame.shape[0])
                    y = int(prediction.bounding_box.top * frame.shape[1])

                    width = x + int(prediction.bounding_box.width * frame.shape[0])
                    height = y + int(prediction.bounding_box.height * frame.shape[1])

                    #Adding bounding_box to the frame
                    frame = cv2.rectangle(frame, (x, y), (width, height), (0, 0, 255), 2)
                    #Adding tag_name that we got from pridiction in the bounding_box
                    frame = cv2.putText(frame, prediction.tag_name, (x + 5, y + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 1, cv2.LINE_AA, False)
                       
                    #TODO
                     #extract url from blob insted of hardcoding
                     #check for pridiction tag_name
                     #fix bounding box alignment
                     #upload both bounding-box image and plane image
                       
                    print("\t" + prediction.tag_name + ": {0:.2f}% bbox.left = {1:.2f}, bbox.top = {2:.2f}, bbox.width = {3:.2f}, bbox.height = {4:.2f}".format(prediction.probability * 100, prediction.bounding_box.left, prediction.bounding_box.top, prediction.bounding_box.width, prediction.bounding_box.height))
                    upload_frame(frame, i)
                    save_to_filesystem(frame)
                    send_sms()
                    time.sleep(time_delay)
                else:
                    print("No object detected")
                    time.sleep(time_delay)
            else: 
                print('Something else was detected')              

    cap.release()
    print('Released stream')

Created stream
frame capture function returned :: True storing to container :: fromcamera20230519-231034
Total files stored :: 1
	Box: 100.00% bbox.left = 0.42, bbox.top = 0.46, bbox.width = 0.29, bbox.height = 0.29
frame capture function returned :: True storing to container :: fromcamera20230519-231034
Total files stored :: 1
https://mlcohort.blob.core.windows.net/fromcamera20230519-231034/image1.jpg?se=2024-05-18T17%3A40%3A42Z&sp=r&sv=2022-11-02&sr=b&sig=9o2ODPoaExrXAgDsMbUBX6U0J8G5epyegVy2WClZ8G8%3D


KeyboardInterrupt: 