In [None]:
from ultralytics import YOLO
from PIL import Image
import glob

import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt

# Set a dedicated cache directory to avoid conflicts on Dataflow workers.
torch_cache_dir = './torch_cache'
# os.makedirs(torch_cache_dir, exist_ok=True)
# os.environ['TORCH_HOME'] = torch_cache_dir

# Alternatively, you can set the cache directory using torch.hub.set_dir:
torch.hub.set_dir(torch_cache_dir)

# ---------------------------
# Model Initialization
# ---------------------------
# Load YOLO model (ensure your checkpoint path is correct)
model = YOLO("checkpoints/yolov8n.pt") # "checkpoints/yolo11n.pt"

# Set up MiDaS model
model_type = "MiDaS_small"  # Adjust model type as needed (e.g., "DPT_Large" for higher accuracy)
midas = torch.hub.load("intel-isl/MiDaS", model_type, trust_repo=True)

# Move model to GPU if available
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("PyTourch using CUDA")
else:
    device = torch.device("cpu")
    print("PyTourch using CPU")

midas.to(device)
midas.eval()

# Load MiDaS transforms
midas_transforms = torch.hub.load("intel-isl/MiDaS", "transforms")
if model_type in ["DPT_Large", "DPT_Hybrid"]:
    transform = midas_transforms.dpt_transform
else:
    transform = midas_transforms.small_transform

# ---------------------------
# Process Images from Glob
# ---------------------------
# Use glob to obtain a list of image paths (update the pattern as needed)
img_paths = glob.glob("./datasets/[AC]_*.*")
# img_paths = glob.glob("/datasets/A_004.png")

if not img_paths:
    print("No images found with the given glob pattern.")

# Loop over each image
for img_path in img_paths:
    print(f"\nProcessing image: {img_path}")
    
    # Load image using PIL and convert to a NumPy array (RGB)
    image_pil = Image.open(img_path).convert("RGB")
    image_np = np.array(image_pil)
    
    # ---------------------------
    # YOLO Object Detection
    # ---------------------------
    # Run YOLO detection on the image (stream mode returns a list of results)
    results = model(
        img_path, 
        stream=True, 
        imgsz=1280,   # Increase resolution to help detect small/distant objects
        conf=0.25     # Lower confidence threshold to catch smaller or partially occluded persons
    )
    
    # Filter detections to pedestrians only (assuming class index 0 represents a person)
    person_boxes = []
    for result in results:
        # print("=== YOLO Detection Result ===")
        # print("Boxes:", result.boxes)
        # print("Masks:", result.masks)
        # print("Keypoints:", result.keypoints)
        # print("Probabilities:", result.probs)
        # print("OBB:", result.obb)
        
        for box in result.boxes:
            if int(box.cls[0]) == 0:
                person_boxes.append(box)
    
    # print("Filtered Person Boxes:", person_boxes)
    
    # ---------------------------
    # MiDaS Full Image Depth Estimation
    # ---------------------------
    # Prepare the full image for depth estimation
    input_batch = transform(image_np).to(device)
    
    with torch.no_grad():
        prediction = midas(input_batch)
        # Resize the prediction to the original image resolution
        prediction = torch.nn.functional.interpolate(
            prediction.unsqueeze(1),
            size=image_np.shape[:2],
            mode="bicubic",
            align_corners=False,
        ).squeeze()
    depth_map = prediction.cpu().numpy()
    
    # ---------------------------
    # Compute and Print Pedestrian Depth Information
    # ---------------------------
    print("\n--- Pedestrian Depth Estimates ---")
    for idx, box in enumerate(person_boxes):
        # Get bounding box coordinates
        coords = box.xyxy.cpu().numpy()[0]
        x1, y1, x2, y2 = map(int, coords)
        x1 = max(0, x1)
        y1 = max(0, y1)
        x2 = min(image_np.shape[1], x2)
        y2 = min(image_np.shape[0], y2)
        
        # Extract the corresponding region from the depth map and compute average depth
        depth_crop = depth_map[y1:y2, x1:x2]
        avg_depth = np.mean(depth_crop)
        
        print(f"Pedestrian {idx+1}: Bounding Box [x1: {x1}, y1: {y1}, x2: {x2}, y2: {y2}], Average Depth: {avg_depth:.4f}")
    
    # ---------------------------
    # Optional Visualization
    # ---------------------------
    # Normalize the depth map for visualization
    depth_min, depth_max = depth_map.min(), depth_map.max()
    depth_norm = (depth_map - depth_min) / (depth_max - depth_min + 1e-6)
    depth_vis = (depth_norm * 255).astype(np.uint8)
    depth_vis = cv2.applyColorMap(depth_vis, cv2.COLORMAP_PLASMA)
    
    # Overlay YOLO person bounding boxes on the depth map visualization
    for box in person_boxes:
        coords = box.xyxy.cpu().numpy()[0]
        x1, y1, x2, y2 = map(int, coords)
        cv2.rectangle(depth_vis, (x1, y1), (x2, y2), (0, 255, 0), 2)
    
    plt.figure(figsize=(10, 10))
    plt.imshow(cv2.cvtColor(depth_vis, cv2.COLOR_BGR2RGB))
    plt.title(f"Depth Map with Pedestrian Bounding Boxes: {img_path}")
    plt.axis("off")
    plt.show()

In [None]:
# Add the deadsnakes PPA
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update

# Now try installing Python 3.11
sudo apt install python3.11 python3.11-venv python3.11-dev

# Create a Python 3.11 virtual environment and activate it
python3.11 -m venv beam_env
source beam_env/bin/activate

# Install Apache Beam
pip install apache-beam[gcp]==2.63.0

# Check Python Version
python3 --version

In [None]:
docker build . --no-cache --tag northamerica-northeast2-docker.pkg.dev/cloud-computing-449706/cloud-computing/dataflow/m3:latest
docker push northamerica-northeast2-docker.pkg.dev/cloud-computing-449706/cloud-computing/dataflow/m3:latest

In [None]:
export CLOUDSDK_PYTHON=python3.11
export PYTHON_VERSION=3.11
export BEAM_VERSION=2.63.0

PROJECT=$(gcloud config list project --format "value(core.project)")
BUCKET=gs://$PROJECT-bucket
IMAGE_URI=northamerica-northeast2-docker.pkg.dev/cloud-computing-449706/cloud-computing/dataflow/m3:latest
python src/dataflow.py   \
    --runner DataflowRunner   \
    --sdk_container_image=$IMAGE_URI   \
    --sdk_location=container   \
    --project $PROJECT   \
    --staging_location $BUCKET/staging   \
    --temp_location $BUCKET/temp   \
    --setup_file ./setup.py   \
    --input_topic projects/$PROJECT/topics/m3-image    \
    --output_topic projects/$PROJECT/topics/m3-output    \
    --region  northamerica-northeast2    \
    --experiment use_unsupported_python_version    \
    --experiments=use_runner_v2    \
    --streaming