In [None]:
import cv2
import numpy as np
import pandas as pd
import logging
from pathlib import Path
from shapely.geometry import Polygon, box
from datetime import datetime
from ultralytics import YOLO
from tqdm.auto import tqdm

def ensure_directory_exists(path):
    """Create directory if it doesn't exist"""
    Path(path).parent.mkdir(parents=True, exist_ok=True)

class DetectionValidator:
    """Handles validation of detections and durations"""
    def __init__(self, config):
        self.config = config
        
    def validate_detection(self, confidence, area_ratio):
        return (confidence >= self.config['min_confidence'] and 
                area_ratio >= self.config['area_intersection_threshold'])

class TramAnalytics:
    """Handles statistical analysis of tram data"""
    def __init__(self, arrivals_df):
        self.df = arrivals_df
        
    def get_basic_stats(self):
        if len(self.df) == 0:
            return {
                'total_arrivals': 0,
                'avg_duration': 0,
                'min_duration': 0,
                'max_duration': 0
            }
        
        return {
            'total_arrivals': len(self.df),
            'avg_duration': round(self.df['Duration (Seconds)'].mean(), 2),
            'min_duration': round(self.df['Duration (Seconds)'].min(), 2),
            'max_duration': round(self.df['Duration (Seconds)'].max(), 2)
        }
    
    def get_hourly_distribution(self):
        self.df['Hour'] = pd.to_datetime(self.df['Enter Time']).dt.hour
        return self.df.groupby('Hour').size()

class TramTracker:
    def __init__(self, config):
        self.config = config
        self.validator = DetectionValidator(config['detection_params'])
        self.area_a = self.load_area_a(config['paths']['area_coordinates'])
        self.results = []
        self.current_confidence = 0
        self.setup_logging()
        
    def setup_logging(self):
        # Ensure log directory exists
        ensure_directory_exists(self.config['paths']['output']['log'])
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.config['paths']['output']['log']),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def load_area_a(self, coordinates_file):
        try:
            points = np.loadtxt(coordinates_file, dtype=np.int32)
            return Polygon(points)
        except Exception as e:
            self.logger.error(f"Error loading area coordinates: {e}")
            raise

    def is_in_area_a(self, bbox):
        try:
            tram_box = box(bbox[0], bbox[1], bbox[2], bbox[3])
            intersection_area = self.area_a.intersection(tram_box).area
            tram_area = tram_box.area
            return intersection_area / tram_area > self.config['detection_params']['area_intersection_threshold']
        except Exception as e:
            self.logger.error(f"Error checking area intersection: {e}")
            return False

    def extract_timestamp(self, filename):
        """Extract timestamp from filename"""
        try:
            parts = filename.stem.split('_')
            date_str = parts[-2]
            time_str = parts[-1]
            return datetime.strptime(f"{date_str}_{time_str}", "%Y%m%d_%H%M%S")
        except Exception as e:
            self.logger.error(f"Error extracting timestamp from {filename}: {e}")
            raise

    def process_frame(self, image_path, yolo_model, save_debug_frames=False):
        try:
            timestamp = self.extract_timestamp(Path(image_path))
            results = yolo_model.predict(
                source=str(image_path),
                conf=self.config['detection_params']['min_confidence'],
                verbose=False
            )
            
            tram_detected = False
            tram_in_area_a = False
            bbox = None
            self.current_confidence = 0
            
            if len(results) > 0:
                result = results[0]
                for box in result.boxes:
                    class_id = int(box.cls[0].item())
                    self.current_confidence = box.conf[0].item()
                    
                    if class_id == 0 and self.validator.validate_detection(self.current_confidence, 1.0):
                        tram_detected = True
                        bbox = box.xyxy[0].cpu().numpy()
                        tram_in_area_a = self.is_in_area_a(bbox)
                        break
            
            if save_debug_frames:
                self.save_debug_frame(str(image_path), bbox, tram_in_area_a, timestamp, image_path)
            
            self.results.append({
                'timestamp': timestamp,
                'tram_detected': tram_detected,
                'in_area_a': tram_in_area_a,
                'confidence': self.current_confidence
            })
            
        except Exception as e:
            self.logger.error(f"Error processing frame {image_path}: {e}")

    def calculate_duration(self):
        """Calculate durations for all tram arrivals"""
        if not self.results:
            return pd.DataFrame()
            
        arrivals_data = []
        entry_time = None
        exit_time = None
        arrival_count = 0
        consecutive_detections = 0
        min_duration = self.config['detection_params']['min_duration']
        max_duration = self.config['detection_params']['max_duration']
        MIN_CONSECUTIVE_FRAMES = self.config['detection_params']['min_consecutive_frames']
        
        for i, result in enumerate(self.results):
            current_time = result['timestamp']
            
            if result['in_area_a'] and result['tram_detected']:
                consecutive_detections += 1
                
                if entry_time is None and consecutive_detections >= MIN_CONSECUTIVE_FRAMES:
                    for j in range(i-MIN_CONSECUTIVE_FRAMES, i):
                        if j >= 0 and self.results[j]['in_area_a']:
                            entry_time = self.results[j]['timestamp']
                            break
            else:
                if consecutive_detections >= MIN_CONSECUTIVE_FRAMES and entry_time is not None:
                    for j in range(i-1, max(-1, i-5), -1):
                        if j >= 0 and self.results[j]['in_area_a']:
                            exit_time = self.results[j]['timestamp']
                            break
                    
                    if exit_time:
                        duration = (exit_time - entry_time).total_seconds()
                        
                        if min_duration <= duration <= max_duration:
                            arrival_count += 1
                            arrivals_data.append({
                                'Arrival No.': arrival_count,
                                'Enter Time': entry_time.strftime('%H:%M:%S'),
                                'Exit Time': exit_time.strftime('%H:%M:%S'),
                                'Duration (Seconds)': round(duration, 1),
                                'Confidence': result['confidence']
                            })
                    
                    entry_time = None
                    exit_time = None
                
                consecutive_detections = 0
        
        if entry_time is not None and consecutive_detections >= MIN_CONSECUTIVE_FRAMES:
            exit_time = self.results[-1]['timestamp']
            duration = (exit_time - entry_time).total_seconds()
            
            if min_duration <= duration <= max_duration:
                arrival_count += 1
                arrivals_data.append({
                    'Arrival No.': arrival_count,
                    'Enter Time': entry_time.strftime('%H:%M:%S'),
                    'Exit Time': exit_time.strftime('%H:%M:%S'),
                    'Duration (Seconds)': round(duration, 1),
                    'Confidence': self.results[-1]['confidence']
                })
        
        return pd.DataFrame(arrivals_data)

    def save_debug_frame(self, image_path, bbox, in_area_a, timestamp, original_path):
        """Save debug frame with visualization"""
        debug_image = cv2.imread(image_path)
        
        area_points = np.array(self.area_a.exterior.coords[:-1], dtype=np.int32)
        cv2.polylines(debug_image, [area_points], True, (0, 255, 0), 2)
        
        height = debug_image.shape[0]
        cv2.putText(debug_image, 
                timestamp.strftime('%Y-%m-%d %H:%M:%S'),
                (10, height - 30),
                cv2.FONT_HERSHEY_SIMPLEX, 
                .5, 
                (255, 255, 255), 
                2)
        
        if bbox is not None:
            x1, y1, x2, y2 = map(int, bbox)
            color = (0, 255, 0) if in_area_a else (0, 0, 255)
            cv2.rectangle(debug_image, (x1, y1), (x2, y2), color, 2)
            
            status = "In Area A" if in_area_a else "Outside Area A"
            cv2.putText(debug_image, status, (x1, y1-10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
        else:
            cv2.putText(debug_image, 
                    "No Tram Detected", 
                    (10, 60), 
                    cv2.FONT_HERSHEY_SIMPLEX, 
                    1, 
                    (0, 0, 255), 
                    2)
        
        debug_path = Path(self.config['paths']['debug_frames']) / f"debug_{Path(original_path).stem}.jpg"
        debug_path.parent.mkdir(parents=True, exist_ok=True)
        cv2.imwrite(str(debug_path), debug_image)

CONFIG = {
    'detection_params': {
        'min_confidence': 0.5,
        'min_consecutive_frames': 3,
        'min_duration': 3,
        'max_duration': 600,
        'area_intersection_threshold': 0.65
    },
    'paths': {
        'model_path': "build_custom_model/runs/train/ayana_tram_stop_detection/weights/best.pt",
        'area_coordinates': "area_coordinates.txt",
        'debug_frames': "dataset/debug_frames",
        'demo': "demo",
        'captured_frames': "dataset/captured_frames",
        'output': {
            'excel': "report/tram_arrivals.xlsx",
            'log': "report/tram_tracker.log"
        }
    },
    'testing': False,
    'target_date': '2024-12-02'
}

# Initialize model and tracker
model_path = CONFIG['paths']['model_path']
yolo_model = YOLO(model_path)
tracker = TramTracker(CONFIG)

# Process frames
if not CONFIG['testing']:
    image_folder = Path(CONFIG['paths']['captured_frames']) / CONFIG['target_date']
    image_subfolders = [f for f in image_folder.glob('*') if f.is_dir()]
    image_files = []
    for image_subfolder in image_subfolders:
        image_files.extend(sorted(image_subfolder.glob('ayana_tram_stop_*.jpg')))
    image_files = sorted(image_files)
else:
    image_subfolder = Path(CONFIG['paths']['demo'])
    image_files = sorted(image_subfolder.glob('*.jpg'))

for image_file in tqdm(image_files, desc="Processing frames"):
    tracker.process_frame(image_file, yolo_model, save_debug_frames=True)

# Generate results
arrivals_df = tracker.calculate_duration()

# Ensure output directory exists and save results
ensure_directory_exists(CONFIG['paths']['output']['excel'])
arrivals_df.to_excel(CONFIG['paths']['output']['excel'], index=False)
print("\nTram Arrivals Summary:")
print(arrivals_df)

# Generate analytics
analytics = TramAnalytics(arrivals_df)
stats = analytics.get_basic_stats()
print("\nStatistics:")
print(f"Total Arrivals: {stats['total_arrivals']}")
print(f"Average Duration: {stats['avg_duration']} seconds")
print(f"Min Duration: {stats['min_duration']} seconds")
print(f"Max Duration: {stats['max_duration']} seconds")

In [None]:
import os
target_date = '2024-12-02'
path = '/Users/fchrulk/AHM/Production/ayana-tram-detection-cctv/dataset/captured_frames/' + target_date

incorrect = []
for subpath in os.listdir(path):
    subpath_hour = subpath.split('_')[-1][:2]
    subpath = os.path.join(path, subpath)
    if os.path.isdir(subpath):
        image_folder = Path(subpath)
        image_files = sorted(image_folder.glob('ayana_tram_stop_*.jpg'))[:20]
        for img in image_files:
            # img_hour = img.as_posix().split('/')[-1].split('_')[-1].split('.')[0][:2]
            # if img_hour != subpath_hour:
            #     incorrect.append(subpath)
            img_year = img.as_posix().split('/')[-1].split('_')[-2][:4]
            if img_year != '2024':
                incorrect.append(subpath)
            
incorrect = sorted(list(set(incorrect)))
incorrect

In [11]:
import os
from pathlib import Path

# Your existing code
subpath = '/Users/fchrulk/AHM/Production/ayana-tram-detection-cctv/dataset/captured_frames/2024-12-02/ayana_tram_stop_20241202_080033'
image_folder = Path(subpath)
image_files = sorted(image_folder.glob('ayana_tram_stop_*.jpg'))

# Renaming logic
for file_path in image_files:
    # Get the old filename
    old_name = file_path.name
    
    # Create new filename by replacing '00' with '08' in the hour position
    new_name = old_name.replace('_20141202_', '_20241202_')
    
    # Create full paths
    old_path = file_path
    new_path = file_path.parent / new_name
    
    # # Rename the file
    os.rename(old_path, new_path)
    # print(f'Renamed: {old_name} → {new_name}')

In [None]:
import cv2
import numpy as np

class LineCoordinatesGetter:
    def __init__(self):
        self.points = []
        self.image = None
        self.original = None
        self.width = None
        self.height = None
        self.done = False
        self.confirmed = False

    def get_click_coordinates(self, event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN and not self.confirmed:
            if len(self.points) < 2:
                self.points.append((x, y))
                print(f"Point {len(self.points)}: ({x}, {y})")
                
                # Draw the point
                cv2.circle(self.image, (x, y), 5, (0, 0, 255), -1)
                
                # If we have two points, draw the line
                if len(self.points) == 2:
                    cv2.line(self.image, self.points[0], self.points[1], (0, 255, 255), 2)
                    self.done = True
                    print("Press 'c' to confirm or 'r' to reset")
                
                cv2.imshow('Image', self.image)

    def get_line_coordinates(self, image_path):
        # Load image
        self.original = cv2.imread(image_path)
        self.height, self.width = self.original.shape[:2]
        self.image = self.original.copy()
        
        # Create window and set mouse callback
        cv2.imshow('Image', self.image)
        cv2.setMouseCallback('Image', self.get_click_coordinates)
        
        print("Click two points to define the stop line.")
        print("Press 'r' to reset, 'c' to confirm when done, 'q' to quit.")
        
        while True:
            key = cv2.waitKey(1) & 0xFF
            
            # Reset points
            if key == ord('r'):
                self.points = []
                self.done = False
                self.confirmed = False
                self.image = self.original.copy()
                cv2.imshow('Image', self.image)
                print("Points reset. Click two points to define the stop line.")
            
            # Confirm points
            elif key == ord('c') and self.done:
                self.confirmed = True
                print("Line coordinates confirmed!")
                break
            
            # Quit
            elif key == ord('q'):
                self.points = []  # Clear points if quitting
                break
        
        cv2.destroyAllWindows()
        cv2.waitKey(1)  # Additional waitKey for MacBook
        
        if len(self.points) == 2 and self.confirmed:
            return self.points
        return None

def save_line_coordinates(points, filename='stop_line_coordinates.txt'):
    """Save the line coordinates to a file"""
    if points:
        np.savetxt(filename, points, fmt='%d')
        print(f"Line coordinates saved to {filename}")

# # Example usage
# if __name__ == "__main__":
image_path = "ayana_tram_stop_get_coordinates.jpg"  # Replace with your image path

# Get line coordinates
line_getter = LineCoordinatesGetter()
points = line_getter.get_line_coordinates(image_path)

if points:
    print("\nStop line coordinates:")
    print(f"Point 1: {points[0]}")
    print(f"Point 2: {points[1]}")
    
    # Save coordinates to file
    save_line_coordinates(points)
else:
    print("No line coordinates obtained")