In [None]:
!pip install --upgrade pip jupyter ipywidgets
!pip install opencv-python mediapipe numpy pandas seaborn torch torchvision matplotlib diffusers colorspacious ipywidgets transformers ultralytics kaggle

In [None]:
import os
import time
import urllib.request
from base64 import b64encode

import cv2
import matplotlib.pyplot as plt
import mediapipe as mp
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from IPython.display import HTML, display
from PIL import Image
from colorspacious import cspace_convert
from diffusers import DiffusionPipeline
from ipywidgets import interact, IntSlider
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms, models
from torchvision.models.segmentation import deeplabv3_resnet101
from transformers import AutoProcessor, AutoModelForImageTextToText
from ultralytics import YOLO
import google.colab.files as google_files

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# **Classification**

In [None]:
# Step 4: Download and extract the dataset
!chmod 600 .kaggle/kaggle.json
!kaggle datasets download -d praveen2084/leather-defect-classification
!unzip -qo leather-defect-classification.zip -d leather_defects

# Dynamically locate the dataset directory
data_path = None
for root, dirs, files in os.walk("leather_defects"):
    if "Leather Defect" in root:
        data_path = root
        break

if not data_path:
    raise FileNotFoundError("Could not locate the Leather Defects dataset folder.")

print(f"Dataset found at: {data_path}")

# Step 5: Define data transformations and load dataset
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# Load the dataset
dataset = datasets.ImageFolder(root=data_path, transform=transform)

# Step 6: Explore class distributions
class_names = dataset.classes
class_counts = {class_name: 0 for class_name in class_names}
for _, label in dataset:
    class_counts[class_names[label]] += 1

# Convert to DataFrame for analysis
class_df = pd.DataFrame(list(class_counts.items()), columns=["Class", "Count"])
print(class_df)

# Plot class distribution
plt.figure(figsize=(10, 6))
sns.barplot(x="Class", y="Count", data=class_df, palette="viridis")
plt.title("Class Distribution in Leather Defects Dataset", fontsize=16)
plt.xticks(rotation=45)
plt.xlabel("Class", fontsize=12)
plt.ylabel("Number of Images", fontsize=12)
plt.show()


# Step 7: Visualize sample images from each class
def show_samples_by_class(dataset, class_names, num_samples=3):
    """
    Display sample images for each class in the dataset.
    """
    fig, axes = plt.subplots(len(class_names), num_samples, figsize=(15, len(class_names) * 3))
    for class_idx, class_name in enumerate(class_names):
        class_images = [dataset[i][0] for i in range(len(dataset)) if dataset[i][1] == class_idx][:num_samples]
        for sample_idx, img in enumerate(class_images):
            ax = axes[class_idx, sample_idx] if len(class_names) > 1 else axes[sample_idx]
            ax.imshow(img.permute(1, 2, 0))
            ax.axis('off')
            if sample_idx == 0:
                ax.set_title(f"Class: {class_name}", fontsize=12, loc='left')
    plt.tight_layout()
    plt.show()


show_samples_by_class(dataset, class_names)

# Step 8: Analyze image dimensions and pixel statistics
image_shapes = []
pixel_means = []
pixel_stds = []

for img, _ in dataset:
    img_np = img.permute(1, 2, 0).numpy()
    image_shapes.append(img_np.shape)
    pixel_means.append(img_np.mean())
    pixel_stds.append(img_np.std())

# Print summary statistics
print("\nImage Shape Statistics:")
print(f"  Unique Shapes: {set(image_shapes)}")

print("\nPixel Value Statistics:")
print(f"  Mean Pixel Value: {np.mean(pixel_means):.2f}")
print(f"  Std Pixel Value: {np.mean(pixel_stds):.2f}")

# Plot pixel statistics distribution
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
sns.histplot(pixel_means, kde=True, color="blue", bins=30)
plt.title("Distribution of Mean Pixel Values", fontsize=14)
plt.xlabel("Mean Pixel Value", fontsize=12)
plt.ylabel("Frequency", fontsize=12)

plt.subplot(1, 2, 2)
sns.histplot(pixel_stds, kde=True, color="orange", bins=30)
plt.title("Distribution of Pixel Std Values", fontsize=14)
plt.xlabel("Pixel Std Value", fontsize=12)
plt.ylabel("Frequency", fontsize=12)

plt.tight_layout()
plt.show()

# Step 9: GPU check for dataset processing (Optional)
print(f"\nSample image tensor is on device: {device}")

In [None]:
# Step 5: Define data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Step 6: Load and split the dataset
dataset = datasets.ImageFolder(root=data_path, transform=transform)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, pin_memory=True)

# Step 7: Define the model
model = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT)
num_classes = len(dataset.classes)
model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)

# Move the model to GPU if available
model = model.to(device)

# Step 8: Define loss function, optimizer, and learning rate scheduler
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)


# Step 9: Training function with loss and accuracy tracking
def train_and_validate(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=10):
    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss, correct, total = 0.0, 0, 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total
        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)

        # Validation phase
        model.eval()
        running_loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)

                running_loss += loss.item()
                _, preds = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (preds == labels).sum().item()

        val_loss = running_loss / len(val_loader)
        val_acc = correct / total
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)

        # Step the scheduler
        scheduler.step()

        # Print epoch results
        print(f"Epoch {epoch + 1}/{num_epochs}: "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    return history


# Train the model and collect history
history = train_and_validate(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=10)


# Step 10: Visualize loss and accuracy
def plot_training_history(history):
    epochs = range(1, len(history["train_loss"]) + 1)

    plt.figure(figsize=(12, 5))
    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, history["train_loss"], label="Train Loss", marker="o")
    plt.plot(epochs, history["val_loss"], label="Val Loss", marker="o")
    plt.title("Loss per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, history["train_acc"], label="Train Accuracy", marker="o")
    plt.plot(epochs, history["val_acc"], label="Val Accuracy", marker="o")
    plt.title("Accuracy per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()

    plt.tight_layout()
    plt.show()


plot_training_history(history)


# Step 11: Test the model
def test_model(model, val_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (preds == labels).sum().item()

    print(f"Test Accuracy: {100 * correct / total:.2f}%")


test_model(model, val_loader)


# Step 12: Visualize predictions
def visualize_predictions(model, val_loader, class_names):
    model.eval()
    images, labels = next(iter(val_loader))
    images, labels = images.to(device), labels.to(device)
    outputs = model(images)
    _, preds = torch.max(outputs, 1)

    # Plot the images with predictions
    fig, axes = plt.subplots(1, 5, figsize=(15, 5))
    for i in range(5):
        img = images[i].cpu().numpy().transpose((1, 2, 0))
        img = np.clip(img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406]), 0, 1)
        axes[i].imshow(img)
        axes[i].set_title(f"True: {class_names[labels[i]]}\nPred: {class_names[preds[i]]}")
        axes[i].axis("off")

    plt.show()


visualize_predictions(model, val_loader, dataset.classes)


# **Object Detection**

In [None]:
# Step 4: Load the YOLOv8 model
# Load a pre-trained YOLOv8 model (nano version for speed and efficiency)
model = YOLO('yolov8n.pt')  # YOLOv8 Nano - lightweight and fast
print("YOLOv8 model loaded successfully!")

In [None]:
# Step 5: Upload an image
print("Please upload an image for object detection...")
uploaded = google_files.upload()
image_path = list(uploaded.keys())[0]
print(f"Uploaded image: {image_path}")

# Step 6: Perform object detection
results = model(image_path)

# Step 7: Display the results
# Annotate the image with detected objects
annotated_image = results[0].plot()

# Display the annotated image
plt.figure(figsize=(10, 10))
plt.imshow(cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB))
plt.axis('off')
plt.title('Detected Objects')
plt.show()

# Step 8: Print the detected objects with labels
print("Detected Objects:")
print("-----------------")
for obj in results[0].boxes:
    class_id = int(obj.cls.item())  # Convert class ID to integer
    label = model.names[class_id]  # Get the class label from model's label map
    confidence = obj.conf.item()  # Convert confidence to float
    bbox = obj.xyxy.tolist()  # Get the bounding box coordinates
    print(f"Label: {label}, Confidence: {confidence:.2f}, BBox: {bbox}")

In [None]:
# Step 5: Define a function for video-based object detection
def video_object_detection(video_path, output_path="annotated_output.mp4", display_interval=50):
    """
    Perform object detection on a video using YOLOv8 with GPU acceleration.

    Args:
        video_path (str): Path to the input video.
        output_path (str): Path to save the annotated video.
        display_interval (int): Interval for printing progress.

    Returns:
        None
    """
    # Open the video file
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video file.")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Video Properties: {width}x{height}, FPS: {fps}, Total Frames: {total_frames}")

    # Define a video writer to save the annotated video
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # Codec for .mp4 files
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    print("Processing video with YOLOv8...")
    frame_index = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform object detection on the current frame using GPU
        results = model.predict(frame, device=device, half=True)  # Use GPU with FP16 precision

        # Annotate the frame with detections
        annotated_frame = results[0].plot()

        # Ensure frame dimensions match the video writer output
        if annotated_frame.shape[:2] != (height, width):
            annotated_frame = cv2.resize(annotated_frame, (width, height))

        # Write the annotated frame to the output video
        out.write(annotated_frame)

        # Print progress every `display_interval` frames
        frame_index += 1
        if frame_index % display_interval == 0:
            print(f"Processed {frame_index}/{total_frames} frames...")

    # Release resources
    cap.release()
    out.release()
    print(f"Video processing complete. Annotated video saved as {output_path}")


# Step 6: Display the annotated video
def display_video(video_path):
    """
    Display a video file in Colab using HTML5 video player.

    Args:
        video_path (str): Path to the video file.

    Returns:
        None
    """
    try:
        mp4 = open(video_path, "rb").read()
        data_url = f"data:video/mp4;base64,{b64encode(mp4).decode()}"
        display(HTML(f"""
            <video width="640" height="480" controls>
                <source src="{data_url}" type="video/mp4">
            </video>
        """))
    except Exception as e:
        print(f"Error displaying video: {e}")


# Step 7: Upload a video file to Colab
print("Please upload a video file for object detection...")
uploaded = google_files.upload()

# Step 8: Perform object detection and display the video
video_path = list(uploaded.keys())[0]
output_video_path = "annotated_output.mp4"

if video_path:
    video_object_detection(video_path, output_video_path, display_interval=50)
    print("Displaying the annotated video...")
    display_video(output_video_path)
else:
    print("Error: No video file uploaded.")

# **Segmentation**

In [None]:
# Step 4: Load the pre-trained DeepLabV3 model
model = deeplabv3_resnet101(pretrained=True).to(device).eval()
print("DeepLabV3 model loaded successfully!")

In [None]:
# Step 5: Define utility functions
def preprocess_image(image_path):
    """
    Preprocess the input image for DeepLabV3.

    Args:
        image_path (str): Path to the input image.

    Returns:
        input_tensor (torch.Tensor): Preprocessed image tensor.
        input_image (PIL.Image.Image): Original input image.
    """
    input_image = Image.open(image_path).convert("RGB")
    preprocess = T.Compose([
        T.Resize((520, 520)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    input_tensor = preprocess(input_image).unsqueeze(0).to(device)
    return input_tensor, input_image


def create_segmentation_overlay(output_predictions, original_size):
    """
    Generate a colorized segmentation overlay.

    Args:
        output_predictions (np.ndarray): Predicted segmentation map.
        original_size (tuple): Original image dimensions (width, height).

    Returns:
        segmentation_colormap_resized (np.ndarray): Resized segmentation overlay.
    """
    segmentation_colormap = np.zeros((*output_predictions.shape, 3), dtype=np.uint8)
    cmap = plt.cm.get_cmap("jet", 21)  # Adjust for COCO classes (21 classes)
    for label in np.unique(output_predictions):
        segmentation_colormap[output_predictions == label] = np.array(cmap(label)[:3]) * 255

    # Resize the segmentation map back to the original image size
    segmentation_colormap_resized = cv2.resize(
        segmentation_colormap.astype(np.uint8),
        original_size,
        interpolation=cv2.INTER_NEAREST
    )
    return segmentation_colormap_resized


def blend_images(original_image, segmentation_overlay):
    """
    Blend the original image with the segmentation overlay.

    Args:
        original_image (np.ndarray): Original image as a NumPy array.
        segmentation_overlay (np.ndarray): Segmentation overlay as a NumPy array.

    Returns:
        blended_image (np.ndarray): Blended image.
    """
    return cv2.addWeighted(original_image, 0.7, segmentation_overlay, 0.3, 0)


# Step 6: Upload an image
print("Please upload an image...")
uploaded = google_files.upload()
image_path = list(uploaded.keys())[0]

# Step 7: Perform segmentation
try:
    input_tensor, input_image = preprocess_image(image_path)
    original_size = input_image.size
    input_image_np = np.array(input_image)

    # Perform segmentation
    with torch.no_grad():
        output = model(input_tensor)['out'][0]
    output_predictions = output.argmax(0).byte().cpu().numpy()

    # Create segmentation overlay and blend it with the original image
    segmentation_overlay = create_segmentation_overlay(output_predictions, original_size)
    blended_image = blend_images(input_image_np, segmentation_overlay)

    # Step 8: Visualize the results
    plt.figure(figsize=(15, 10))
    plt.subplot(1, 2, 1)
    plt.title("Original Image")
    plt.imshow(input_image)
    plt.axis("off")
    plt.subplot(1, 2, 2)
    plt.title("Segmented Overlay at Original Scale")
    plt.imshow(blended_image)
    plt.axis("off")
    plt.tight_layout()
    plt.show()

except Exception as e:
    print(f"An error occurred: {e}")


In [None]:
# Function to calculate usable area and visualize results
def process_and_visualize(image_path, threshold_value=120):
    # Load the image
    image = cv2.imread(image_path)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Apply thresholding
    _, binary_threshold = cv2.threshold(gray, threshold_value, 255, cv2.THRESH_BINARY)

    # Calculate usable area
    usable_area = np.sum(binary_threshold == 255)
    total_area = binary_threshold.size
    usable_percentage = (usable_area / total_area) * 100

    # Create an overlay of the segmented area on the original image
    overlay = image.copy()
    overlay[binary_threshold == 255] = [0, 255, 0]  # Highlight usable areas in green

    # Visualization
    plt.figure(figsize=(16, 8))
    plt.subplot(1, 3, 1)
    plt.title("Original Image")
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.axis("off")

    plt.subplot(1, 3, 2)
    plt.title(f"Segmented Usable Area ({usable_percentage:.2f}%)")
    plt.imshow(binary_threshold, cmap="gray")
    plt.axis("off")

    plt.subplot(1, 3, 3)
    plt.title("Overlay on Original Image")
    plt.imshow(cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB))
    plt.axis("off")

    plt.tight_layout()
    plt.show()


# Upload an image
print("Please upload an image...")
uploaded = google_files.upload()
image_path = list(uploaded.keys())[0]

# Use an interactive slider to adjust the threshold
print("Adjust the threshold using the slider below:")
interact(lambda threshold_value: process_and_visualize(image_path, threshold_value),
         threshold_value=IntSlider(value=120, min=0, max=255, step=5))


# **Pose Estimation**

In [None]:
# Step 3: Initialize Mediapipe Pose Estimation
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True)
mp_drawing = mp.solutions.drawing_utils

uploaded = google_files.upload()

# Load the uploaded image
image_path = list(uploaded.keys())[0]
image = cv2.imread(image_path)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# Step 5: Perform pose estimation
results = pose.process(image_rgb)

# Check if landmarks are detected
if not results.pose_landmarks:
    print("No pose landmarks detected. Ensure the image contains a visible person.")
else:
    print("Pose landmarks detected.")

# Step 6: Draw pose landmarks on the image
annotated_image = image.copy()
if results.pose_landmarks:
    mp_drawing.draw_landmarks(
        annotated_image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

# Convert BGR to RGB for display
annotated_image_rgb = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)

# Step 7: Display the image with pose estimation
plt.figure(figsize=(10, 10))
plt.imshow(annotated_image_rgb)
plt.axis('off')
plt.title("Pose Estimation")
plt.show()

# Step 8: Print keypoint coordinates (Optional)
if results.pose_landmarks:
    for idx, landmark in enumerate(results.pose_landmarks.landmark):
        print(f"Landmark {idx}: (x={landmark.x:.2f}, y={landmark.y:.2f}, z={landmark.z:.2f})")

In [None]:
# Step 3: Initialize Mediapipe Hand Tracking
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=True, max_num_hands=5)
mp_drawing = mp.solutions.drawing_utils

uploaded = google_files.upload()

# Load the uploaded image
image_path = list(uploaded.keys())[0]
image = cv2.imread(image_path)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# Step 5: Perform hand tracking
results = hands.process(image_rgb)

# Step 6: Draw hand landmarks
annotated_image = image.copy()
if results.multi_hand_landmarks:
    for hand_landmarks in results.multi_hand_landmarks:
        mp_drawing.draw_landmarks(
            annotated_image, hand_landmarks, mp_hands.HAND_CONNECTIONS
        )

# Convert BGR to RGB for display
annotated_image_rgb = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)

# Step 7: Display the image with hand tracking
plt.figure(figsize=(10, 10))
plt.imshow(annotated_image_rgb)
plt.axis("off")
plt.title("Hand Tracking")
plt.show()


# **Super Resolution**

In [None]:
# Function to upload and load an image
def upload_image():
    print("Please upload a low-resolution image...")
    uploaded = google_files.upload()
    if uploaded:
        image_path = list(uploaded.keys())[0]
        print(f"Uploaded file: {image_path}")
        return image_path
    else:
        raise ValueError("No file uploaded. Please try again.")


# Function to perform super-resolution using DiffusionPipeline
def super_resolve_image(image_path, model_name="CompVis/ldm-super-resolution-4x-openimages"):
    try:
        print("Loading the DiffusionPipeline for super-resolution...")
        pipeline = DiffusionPipeline.from_pretrained(model_name).to(device)
        print("Pipeline loaded successfully!")

        # Open and process the uploaded image
        low_res_image = Image.open(image_path).convert("RGB")

        # Perform super-resolution
        print("Generating high-resolution image...")
        with torch.no_grad():
            high_res_image = pipeline(image=low_res_image).images[0]  # Correct usage: no 'prompt' argument

        return low_res_image, high_res_image
    except Exception as e:
        print(f"Error during processing: {e}")
        raise


# Function to display images side by side
def display_images(low_res_image, high_res_image):
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.title("Low Resolution Image", fontsize=14)
    plt.imshow(low_res_image)
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.title("High Resolution Image", fontsize=14)
    plt.imshow(high_res_image)
    plt.axis("off")

    plt.tight_layout()
    plt.show()


# Main Script
if __name__ == "__main__":
    try:
        # Step 4: Upload and load the low-resolution image
        image_path = upload_image()

        # Step 5: Perform super-resolution
        low_res_image, high_res_image = super_resolve_image(image_path)

        # Step 6: Display the results
        display_images(low_res_image, high_res_image)

    except Exception as e:
        print(f"An error occurred: {e}")


# **Video Optical Flow Detection**

In [None]:
# Step 3: Upload a video
def upload_video():
    print("Please upload a video file...")
    uploaded = google_files.upload()
    if uploaded:
        video_path = list(uploaded.keys())[0]
        print(f"Uploaded video: {video_path}")
        return video_path
    else:
        print("No file uploaded. Please try again.")
        return None


# Step 4: Process video for optical flow
def process_video_for_optical_flow(video_path, output_path="output_optical_flow.avi", display=True):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*"XVID")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Read the first frame
    ret, first_frame = cap.read()
    if not ret:
        print("Error: Unable to read the first frame of the video.")
        cap.release()
        return

    # Convert to grayscale
    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)

    # Initialize HSV image
    hsv = np.zeros_like(first_frame)
    hsv[..., 1] = 255

    frame_count = 0
    start_time = time.time()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Convert to grayscale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Calculate optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0
        )

        # Convert flow to HSV for visualization
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        hsv[..., 0] = ang * 180 / np.pi / 2
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
        flow_image = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

        # Write the frame with optical flow to the output video
        out.write(flow_image)

        # Display the video if required
        if display:
            cv2.imshow("Optical Flow", flow_image)
            if cv2.waitKey(1) & 0xFF == ord("q"):  # Press 'q' to quit
                break

        prev_gray = gray
        frame_count += 1

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    end_time = time.time()

    print("\nProcessing Complete")
    print("-------------------")
    print(f"Total Frames Processed: {frame_count}")
    print(f"Elapsed Time: {end_time - start_time:.2f} seconds")
    print(f"Approx. FPS: {frame_count / (end_time - start_time):.2f}")
    print(f"Output video saved as {output_path}")


# Main Script
if __name__ == "__main__":
    video_path = upload_video()
    if video_path:
        process_video_for_optical_flow(video_path, display=False)


# **Image Captioning**

In [None]:
# Step 4: Load the BLIP-2 model and processor
try:
    processor = AutoProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
    model = AutoModelForImageTextToText.from_pretrained("Salesforce/blip-image-captioning-base").to(device)
    print("Model and processor loaded successfully!")
except Exception as e:
    print(f"Error loading model or processor: {e}")
    raise


# Function to generate a caption for an image
def generate_caption(image_path):
    try:
        # Step 5: Load and preprocess the image
        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, return_tensors="pt").to(device)

        # Step 6: Generate caption
        with torch.no_grad():
            outputs = model.generate(
                pixel_values=inputs["pixel_values"],
                max_length=50,
                num_beams=5
            )
        caption = processor.decode(outputs[0], skip_special_tokens=True)

        # Step 7: Display the image with the generated caption
        plt.figure(figsize=(10, 10))
        plt.imshow(image)
        plt.axis("off")
        plt.title(f"Generated Caption: {caption}", fontsize=16)
        plt.show()

        return caption
    except Exception as e:
        print(f"An error occurred while generating caption: {e}")
        raise


# Main script
if __name__ == "__main__":
    # Step 8: Upload an image
    print("Please upload an image...")
    uploaded = google_files.upload()

    if uploaded:
        image_path = list(uploaded.keys())[0]
        print(f"Image uploaded: {image_path}")

        # Generate caption and display
        try:
            caption = generate_caption(image_path)

            # Step 9: Additional details and analysis
            print("\nCaption Analysis:")
            print("-----------------")
            print(f"Generated Caption: {caption}")

        except Exception as e:
            print(f"Error: {e}")
    else:
        print("No image uploaded. Please try again.")

# **Color Consistency**

In [None]:
# Upload the reference and sample images
def upload_images():
    print("Upload the reference image...")
    reference_uploaded = google_files.upload()
    reference_path = list(reference_uploaded.keys())[0]

    print("Upload the sample image...")
    sample_uploaded = google_files.upload()
    sample_path = list(sample_uploaded.keys())[0]

    return reference_path, sample_path


# Function to calculate ΔE heatmap using colorspacious
def calculate_delta_e(reference_lab, sample_lab):
    # Vectorized computation of ΔE
    delta_e_map = np.sqrt(np.sum((reference_lab - sample_lab) ** 2, axis=2))
    mean_delta_e = np.mean(delta_e_map)
    return delta_e_map, mean_delta_e


# Main Script
reference_path, sample_path = upload_images()

# Load images
reference = cv2.imread(reference_path)
sample = cv2.imread(sample_path)

if reference is None or sample is None:
    raise ValueError("Error: One or both images could not be loaded. Ensure valid image files.")

# Resize the images to the same dimensions
if reference.shape != sample.shape:
    print("Resizing images to match dimensions...")
    sample = cv2.resize(sample, (reference.shape[1], reference.shape[0]))

# Convert images to LAB color space using colorspacious
reference_lab = cspace_convert(cv2.cvtColor(reference, cv2.COLOR_BGR2RGB) / 255.0, "sRGB1", "CIELab")
sample_lab = cspace_convert(cv2.cvtColor(sample, cv2.COLOR_BGR2RGB) / 255.0, "sRGB1", "CIELab")

# Calculate ΔE heatmap and mean ΔE
delta_e_map, mean_delta_e = calculate_delta_e(reference_lab, sample_lab)

# Display results
plt.figure(figsize=(18, 8))

# Reference Image
plt.subplot(1, 3, 1)
plt.title("Reference Image")
plt.imshow(cv2.cvtColor(reference, cv2.COLOR_BGR2RGB))
plt.axis("off")

# Sample Image
plt.subplot(1, 3, 2)
plt.title("Sample Image")
plt.imshow(cv2.cvtColor(sample, cv2.COLOR_BGR2RGB))
plt.axis("off")

# ΔE Heatmap
plt.subplot(1, 3, 3)
plt.title(f"ΔE Heatmap (Mean ΔE: {mean_delta_e:.2f})")
plt.imshow(delta_e_map, cmap="viridis")
plt.colorbar(label="ΔE")
plt.axis("off")

plt.tight_layout()
plt.show()


# **Thickness Estimation**

In [None]:
# Default Middlebury Stereo Images (cones pair)
DEFAULT_LEFT_IMAGE_URL = "https://vision.middlebury.edu/stereo/data/scenes2003/newdata/cones/im2.png"
DEFAULT_RIGHT_IMAGE_URL = "https://vision.middlebury.edu/stereo/data/scenes2003/newdata/cones/im6.png"


# Download default Middlebury stereo images
def download_default_stereo_images():
    left_image_path = "cones_left.png"
    right_image_path = "cones_right.png"

    if not os.path.exists(left_image_path):
        print("Downloading default left stereo image...")
        urllib.request.urlretrieve(DEFAULT_LEFT_IMAGE_URL, left_image_path)

    if not os.path.exists(right_image_path):
        print("Downloading default right stereo image...")
        urllib.request.urlretrieve(DEFAULT_RIGHT_IMAGE_URL, right_image_path)

    return left_image_path, right_image_path


# Upload or use default stereo images
def upload_or_use_default_stereo_images():
    print("Upload the left stereo image (or press Cancel to use default)...")
    try:
        left_uploaded = google_files.upload()
        left_image_path = list(left_uploaded.keys())[0]
    except:
        print("Using default left stereo image.")
        left_image_path = None

    print("Upload the right stereo image (or press Cancel to use default)...")
    try:
        right_uploaded = google_files.upload()
        right_image_path = list(right_uploaded.keys())[0]
    except:
        print("Using default right stereo image.")
        right_image_path = None

    # Use default images if not uploaded
    if left_image_path is None or right_image_path is None:
        left_image_path, right_image_path = download_default_stereo_images()

    return left_image_path, right_image_path


# Function to calculate the depth map using stereo images
def calculate_depth_map(left_image_path, right_image_path, num_disparities=64, block_size=15):
    # Load the stereo images in grayscale
    left_image = cv2.imread(left_image_path, cv2.IMREAD_GRAYSCALE)
    right_image = cv2.imread(right_image_path, cv2.IMREAD_GRAYSCALE)

    if left_image is None or right_image is None:
        raise ValueError("Error: One or both images could not be loaded. Ensure valid image files.")

    # Create the StereoBM object
    stereo = cv2.StereoBM_create(numDisparities=num_disparities, blockSize=block_size)

    # Compute the disparity map
    disparity = stereo.compute(left_image, right_image).astype(np.float32) / 16.0

    return left_image, right_image, disparity


# Function to estimate thickness from disparity
def disparity_to_thickness(disparity_map, focal_length=800, baseline=0.1):
    """
    Converts disparity to thickness based on stereo camera calibration.

    Args:
        disparity_map (np.array): Disparity map (pixel disparities).
        focal_length (float): Focal length of the camera (in pixels).
        baseline (float): Distance between the stereo cameras (in meters).

    Returns:
        np.array: Thickness map (in meters).
    """
    # Avoid division by zero
    disparity_map[disparity_map <= 0] = 0.1

    # Depth = (focal_length * baseline) / disparity
    depth_map = (focal_length * baseline) / disparity_map

    return depth_map


# Function to visualize multiple outputs
def visualize_results(left_image, right_image, disparity_map, thickness_map):
    # Visualize stereo images
    plt.figure(figsize=(20, 10))
    plt.subplot(2, 3, 1)
    plt.title("Left Stereo Image")
    plt.imshow(left_image, cmap="gray")
    plt.axis("off")

    plt.subplot(2, 3, 2)
    plt.title("Right Stereo Image")
    plt.imshow(right_image, cmap="gray")
    plt.axis("off")

    # Visualize disparity map
    plt.subplot(2, 3, 3)
    plt.title("Disparity Map")
    plt.imshow(disparity_map, cmap="plasma")
    plt.colorbar(label="Disparity")
    plt.axis("off")

    # Visualize thickness map
    plt.subplot(2, 3, 4)
    plt.title("Thickness Map (meters)")
    plt.imshow(thickness_map, cmap="viridis")
    plt.colorbar(label="Thickness (meters)")
    plt.axis("off")

    # Visualize thickness histogram
    thickness_flat = thickness_map[thickness_map > 0].flatten()
    plt.subplot(2, 3, 5)
    plt.title("Thickness Histogram")
    plt.hist(thickness_flat, bins=50, color="blue", alpha=0.7)
    plt.xlabel("Thickness (meters)")
    plt.ylabel("Frequency")

    plt.tight_layout()
    plt.show()


# Main Script
print("Upload stereo images for depth map calculation (or press Cancel to use default Middlebury images):")
left_image_path, right_image_path = upload_or_use_default_stereo_images()

# Calculate the depth map
left_image, right_image, disparity_map = calculate_depth_map(left_image_path, right_image_path, num_disparities=64,
                                                             block_size=15)

# Estimate thickness from disparity
thickness_map = disparity_to_thickness(disparity_map, focal_length=800, baseline=0.1)

# Visualize all results
visualize_results(left_image, right_image, disparity_map, thickness_map)
