In [1]:
# First run will required runtime restart.
# Have to restart and run again.

!pip install streamlit
!pip install pyngrok
!pip install ultralytics==8.0.196
!pip install ffmpeg-python
!pip install roboflow
!pip install py7zr

from IPython.display import clear_output
clear_output()

In [None]:
import requests
import py7zr
import os

def download_model_via_api(url, save_path):
    response = requests.get(url)
    if response.status_code == 200:
        with open(save_path, 'wb') as f:
            f.write(response.content)
    else:
        print(f"Failed to download model. Status code: {response.status_code}, Response: {response.text}")
        raise Exception("Failed to download model weights.")

# Adjusted for direct download from GitHub
model_weights_url = 'https://github.com/leekaize/COS30018/raw/main/02_FinalModel/FinalMulti.7z'
model_weights_path = '/content/FinalMulti.7z'

# Adjusted for direct download from GitHub
or_model_weights_url = 'https://github.com/leekaize/COS30018/raw/main/ObjectRecognitionModel/latest_prod_recog_best.7z'
or_model_weights_path = '/content/ProductRecognition.7z'

# Adjusted for direct download from GitHub
reference_plan_img_url = 'https://github.com/leekaize/COS30018/raw/main/04_Planogram/ReferencePlanogramPerspective.jpg'
reference_plan_img_path = '/content/ReferencePlanogramImagePerspective.jpg'

# Adjusted for direct download from GitHub
reference_plan_label_url = 'https://github.com/leekaize/COS30018/raw/main/04_Planogram/ReferencePlanogramLabel.txt'
reference_plan_label_path = '/content/reference_plan_label.txt'

# Adjusted for direct download from GitHub
planogram_functions_url = 'https://github.com/leekaize/COS30018/raw/main/04_Planogram/planogram.py'
planogram_functions_path = '/content/planogram_function.py'

# Download the missing and imprecise model weight
download_model_via_api(model_weights_url, model_weights_path)

# Download the object recognition model weight
download_model_via_api(or_model_weights_url, or_model_weights_path)

# Download the reference planogram image's perspective
download_model_via_api(reference_plan_img_url, reference_plan_img_path)

# Download the reference planogram bounding box labels
download_model_via_api(reference_plan_label_url, reference_plan_label_path)

# Download the .py file for planogram functions
download_model_via_api(planogram_functions_url, planogram_functions_path)

# Extract the 7zip files
with py7zr.SevenZipFile(model_weights_path, 'r') as archive:
    archive.extractall('/content/model_weights')
with py7zr.SevenZipFile(or_model_weights_path, 'r') as archive:
    archive.extractall('/content/or_model_weights')

# Check the extracted files
print(os.listdir('/content/model_weights'))
print(os.listdir('/content/or_model_weights'))

In [None]:
%%writefile app.py
import ultralytics
import streamlit as st
from ultralytics import YOLO
from PIL import Image, ImageOps
import cv2
import tempfile
import numpy as np
import os
import ffmpeg
from roboflow import Roboflow
from planogram_function import *

# Load the YOLO model
# Missing and Imprecise
model = YOLO('/content/model_weights/FinalMulti.pt')
# Object recognition
recognition_model = YOLO('/content/or_model_weights/latest_prod_recog_best.pt')
    
rf = Roboflow(api_key="Qcpny2sVf8g33aLNl9iL")
project = rf.workspace("cos30018").project("incrementallearning")

# load the reference planogram bounding box locations
reference_plan_path = "/content/reference_plan_label.txt"
data  = read_reference_file(reference_plan_path)

reference_img = cv2.imread('/content/ReferencePlanogramImagePerspective.jpg')

# Define the new width and height for reference image(for downsampling)
reference_new_width = int(reference_img.shape[1] / 2)
reference_new_height = int(reference_img.shape[0] / 2)

# Downsample the image using resize function
downsampled_reference_image = cv2.resize(reference_img, (reference_new_width, reference_new_height))

# Convert images to grayscale (to optimize sift detection)
reference_image_gray = cv2.cvtColor(downsampled_reference_image, cv2.COLOR_BGR2GRAY)

# Detects and compute descriptors using sift
sift = cv2.SIFT_create()
keypoints2, descriptors2 = sift.detectAndCompute(reference_image_gray, None)

def upload_to_roboflow(image_path, annotation_path, project):
    try:
        response = project.upload(image_path=image_path, annotation_path=annotation_path, annotation_format="yolo")
        print(f"Uploaded {image_path} and {annotation_path} successfully.")
        print("Upload response:", response)
        # Delete files after successful upload
        os.remove(image_path)
        os.remove(annotation_path)
        print(f"Deleted {image_path} and {annotation_path} after upload.")
    except Exception as e:
        print(f"Failed to upload {image_path} and {annotation_path}: {e}")

def detection_page():
    st.title("Detection")
    col1, col2 = st.columns(2)

    with col1:
        uploaded_file = st.file_uploader("Upload an image or video...", type=["png", "jpg", "jpeg", "mp4"])

    if uploaded_file is not None:
        # Reset session state for valid_detections on new file upload
        st.session_state.valid_detections = []

        file_type = uploaded_file.type.split('/')[0]
        if file_type == 'image':
            with col2:
                image = Image.open(uploaded_file).convert('RGB')
                image = ImageOps.exif_transpose(image)
                results = model.predict(image, conf=0.5)
                np_array = results[0].plot()
                np_array = cv2.cvtColor(np_array, cv2.COLOR_BGR2RGB)
                predicted_image = Image.fromarray(np_array, 'RGB')
                st.image(predicted_image, caption='Processed Image with Detections.', use_column_width=True)
        elif file_type == 'video':
            with col2:
                tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
                tfile.write(uploaded_file.read())
                results = model.predict(tfile.name, save=True, project="videos")
                processed_video_path = results[0].save_dir + results[0].path
                processed_video_path = processed_video_path.replace("/tmp/", "/")
                temp_video_path = processed_video_path.replace(".mp4", ".avi")

                (
                    ffmpeg
                    .input(temp_video_path)
                    .output(processed_video_path, vcodec='libx264', crf=28, preset='ultrafast', movflags='faststart', acodec='aac', strict='experimental')
                    .run()
                )

                st.video(processed_video_path)

def planogram_page(): 
    st.title("Planogram Compliance")
    col1, col2 = st.columns(2)
    
    with col1:
        uploaded_file = st.file_uploader("Upload an image or video...", type=["png", "jpg", "jpeg", "mp4"])
    
    if uploaded_file is not None:
        # Reset session state for valid_detections on new file upload
        st.session_state.valid_detections = []

        file_type = uploaded_file.type.split('/')[0]
        if file_type == 'image':
            with col2:
                image = Image.open(uploaded_file).convert('RGB')
                image = ImageOps.exif_transpose(image)
                image = np.array(image)
                # Downsample the image to speed up keypoints detection
                image_new_width = int(image.shape[1]/ 2)
                image_new_height = int(image.shape[0]/2 )
                downsampled_image = cv2.resize(image, (image_new_width, image_new_height))

                # Convert images to grayscale (to optimize sift detection)
                downsampled_image_gray = cv2.cvtColor(downsampled_image, cv2.COLOR_BGR2GRAY)

                # Detect keypoints and compute descriptors using sift
                keypoints1, descriptors1 = sift.detectAndCompute(downsampled_image_gray, None)
                
                # Match keypoints
                matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=True)
                matches = matcher.match(descriptors1, descriptors2)
                
                #store the minimum distance for best fit
                min_dist = float('inf')
                # Iterate over all matches to find the minimum distance
                for m in matches:
                    if m.distance < min_dist:
                        min_dist = m.distance
                
                # Filter weak matches
                good_matches = []
                distance_threshold = 2  # Threshold for point to be considered inliers
                for m in matches:
                    if m.distance < distance_threshold * min_dist:
                        good_matches.append(m)
                        
                # Extract matched keypoints using the good_matches(source and destination points)
                src_pts = np.float32([keypoints1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
                dst_pts = np.float32([keypoints2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
                
                # Estimate homography using RANSAC with ransac reprojection threshold of 10 to limit distortion
                homography, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, ransacReprojThreshold=10)

                # Apply perspective transformation to source image and convert to RGB format
                transformed_img = cv2.warpPerspective(downsampled_image, homography, (downsampled_reference_image.shape[1], downsampled_reference_image.shape[0]))
                transformed_img_rgb = cv2.cvtColor(transformed_img, cv2.COLOR_BGR2RGB)
                        
                # Perform detection
                results = recognition_model.predict(transformed_img_rgb, conf=0.4)
                
                # read the predicted result from object recognition into array as tuples(classID, xmin, ymin, xmax, ymax)(normalized coordinates)
                objects_predicted_tuples = []
                # attach classID to their respective bouunding boxes
                for result in results:
                    for box in result.boxes:
                        objects_predicted_tuples.append((int(box.cls), float(box.xyxyn[0][0]), float(box.xyxyn[0][1]), float(box.xyxyn[0][2]), float(box.xyxyn[0][3])))
                
                # convert tuples into arrays
                object_predicted_array = np.array(objects_predicted_tuples)
                
                # Extract matching bounding boxes
                bounding_boxes = []
                for reference in data:
                    checkMatch = 0
                    for prediction in object_predicted_array:
                        if prediction[0] == reference[0]:
                            if compute_iou(prediction[1:],reference[1:]) > 0.5:
                                checkMatch = 1
                    if checkMatch == 0:
                        bounding_boxes.append(reference)
                
                enlarge_bbox = []
                for bbox in bounding_boxes:
                    enlarge_bbox.append(convert_normalized_to_pixel(bbox[1:], transformed_img_rgb.shape[0], transformed_img_rgb.shape[1]))
    
                for bbox in bounding_boxes:
                    cv2.rectangle(image, (int(bbox[1]*transformed_img_rgb.shape[0]), int(bbox[2]*transformed_img_rgb.shape[1])), (int(bbox[3]*transformed_img_rgb.shape[0]), int(bbox[4]*transformed_img_rgb.shape[1])), (0, 255, 0), 2)
                
                # Compute the inverse of the homography matrix
                inverse_homography = np.linalg.inv(homography)
                
                transformed_bbox = []
                for bbox in enlarge_bbox:    
                    # Inverse transform the bounding boxes from the warped perspective to original
                    transformed_bbox.append(transform_bounding_box(bbox, inverse_homography,downsampled_image_gray))

                for bbox in transformed_bbox:
                    # Draw the transformed bounding box on the image
                    cv2.rectangle(downsampled_image, (int(bbox[0]), int(bbox[1])),(int(bbox[2]), int(bbox[3])), (0, 255, 0), 2)
                    cv2.rectangle(downsampled_image, (int(bbox[0]), int(bbox[1])  - 10), (int(bbox[0]) + 100, int(bbox[1])), (0, 255, 0), cv2.FILLED)
                    cv2.putText(downsampled_image, "Non-compliant", (int(bbox[0]), int(bbox[1])-2), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 0), 1)

                predicted_image = Image.fromarray(downsampled_image, 'RGB')
                st.image(predicted_image, caption='Processed Image with Detections.', use_column_width=True)

def contribute_page():
    st.title("Contribute")
    st.write("Contribute to incremental learning model by providing feedback.")
    col1, col2 = st.columns(2)

    if 'valid_detections' not in st.session_state:
        st.session_state.valid_detections = []

    with col1:
        uploaded_file = st.file_uploader("Upload an image", type=["png", "jpg", "jpeg"])

    if uploaded_file is not None:
      with col2:
        # Reset session state for valid_detections on new file upload
        st.session_state.valid_detections = []

        image = Image.open(uploaded_file).convert('RGB')
        image = ImageOps.exif_transpose(image)
        results = model.predict(image, conf=0.3)
        detections = results[0].boxes  # Detected bounding boxes

        # Show the full image with detections in the first column
        np_array = results[0].plot()
        np_array = cv2.cvtColor(np_array, cv2.COLOR_BGR2RGB)
        predicted_image = Image.fromarray(np_array, 'RGB')
        with col1:
            st.image(predicted_image, caption='Processed Image with Detections.', use_column_width=True)

        if detections is not None and len(detections) > 0:
            with st.form(key='detections_form'):
                for i, box in enumerate(detections):
                    bbox = box.xyxy[0].cpu().numpy().tolist()  # Convert tensor to list
                    x1, y1, x2, y2 = map(int, bbox)
                    margin = 50  # Margin to make the cropped image slightly larger
                    x1, y1 = max(0, x1 - margin), max(0, y1 - margin)
                    x2, y2 = min(image.width, x2 + margin), min(image.height, y2 + margin)
                    cropped_image = image.crop((x1, y1, x2, y2))
                    label = int(box.cls.cpu().item())  # Get the class label
                    label_name = "Imprecise" if label == 0 else "Missing"  # Adjust based on your class labels

                    col_image, col_radio = st.columns([1, 2])
                    with col_image:
                        st.image(cropped_image, caption=f"Detection {i+1}", width=150)
                    with col_radio:
                        if st.radio(f"[{label_name}] Is detection {i+1} correct?", ('Yes', 'No'), key=f"radio_{i}") == 'Yes':
                            st.session_state.valid_detections.append({
                                "bbox": bbox,
                                "class": int(box.cls.cpu().item())  # Convert tensor to int
                            })

                submit_button = st.form_submit_button(label='Finish')
                if submit_button:
                    # After all detections are rated
                    annotations = []
                    for detection in st.session_state.valid_detections:
                        x1, y1, x2, y2 = detection["bbox"]
                        label = detection["class"]  # Assuming 'class' is already an int
                        x_center = (x1 + x2) / 2 / image.width
                        y_center = (y1 + y2) / 2 / image.height
                        width = (x2 - x1) / image.width
                        height = (y2 - y1) / image.height
                        annotations.append(f"{label} {x_center} {y_center} {width} {height}")

                    # Save the annotated image
                    annotated_image_path = os.path.splitext(uploaded_file.name)[0] + "_annotated.jpg"
                    annotations_path = os.path.splitext(uploaded_file.name)[0] + ".txt"
                    image.save(annotated_image_path)

                    with open(annotations_path, 'w') as f:
                        f.write("\n".join(annotations))

                    # Upload to Roboflow
                    upload_to_roboflow(annotated_image_path, annotations_path, project)

                    st.success("All detections have been rated and uploaded to Roboflow.")
                    # Reset session state
                    st.session_state.valid_detections = []

def main():
    st.set_page_config(page_title="AI Retail Vision", layout="wide")

    tab_names = ["Detection", "Planogram", "Contribute"]
    selected_tab = st.sidebar.radio("Navigation", tab_names)

    if selected_tab == "Detection":
        detection_page()
    elif selected_tab == "Planogram":
        planogram_page()
    elif selected_tab == "Contribute":
        contribute_page()

if __name__ == "__main__":
    main()

In [None]:
from pyngrok import ngrok
# Kill previous ngrok instances
ngrok.kill()
ngrok.set_auth_token('2g80v7Wvh6Wrlh2yFO25i3BUpy0_37mg8RF51YAddgyXE3eag')

# Start the Streamlit app in the background
get_ipython().system_raw('streamlit run --server.port 8501 app.py &')

# Set up the ngrok tunnel to the streamlit port 8501
public_url = ngrok.connect(addr="8501", proto='http', bind_tls=True)
print("Streamlit is running at ", public_url)