# CSV Counting & Aggregation

This notebook counts all `.csv` files under the project camera output directory pattern:

`pipeline_outputs/monitoring_pipeline/piglet_rearing/Kamera*`

It aggregates:
- Number of CSV files
- Total number of rows (across all files)
- Total sum of columns `num_tail_detections` and `num_pig_detections` (missing columns treated as 0)
- Total analyzed video duration derived from `start_timestamp` and `end_timestamp` columns (seconds, hours, days)

Processing is streaming-friendly (iterative) and can be extended with parallelization while keeping memory usage low.


In [16]:
from __future__ import annotations
import os
from pathlib import Path
import json
import logging
from typing import List, Dict, Any, Optional, Tuple

import pandas as pd

try:
    from tqdm import tqdm
    TQDM_AVAILABLE = True
except ImportError:  # fallback if tqdm missing
    TQDM_AVAILABLE = False
    def tqdm(x, **kwargs):
        return x

# ------------------ Dynamic path setup ------------------
# Resolve repository root by moving up until we find a marker file (e.g. README.md or .git)
CURRENT_NOTEBOOK_DIR = Path(__file__).resolve().parent if '__file__' in globals() else Path.cwd()
CANDIDATE = CURRENT_NOTEBOOK_DIR
REPO_ROOT = None
for parent in [CANDIDATE, *CANDIDATE.parents]:
    if (parent / 'README.md').exists() and (parent / 'pipeline_outputs').exists():
        REPO_ROOT = parent
        break
if REPO_ROOT is None:
    # Fallback: assume current working directory is repo root
    REPO_ROOT = CURRENT_NOTEBOOK_DIR

# Base directory pattern for camera CSV files (no user-specific absolute path)
BASE_DIR = REPO_ROOT / 'pipeline_outputs' / 'monitoring_pipeline' / 'piglet_rearing'
CAMERA_GLOB_PATTERN = 'Kamera*/**/*.csv'
OUTPUT_DIR = CURRENT_NOTEBOOK_DIR
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# ------------------ Logging configuration ------------------
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
logger = logging.getLogger('csv_aggregation')
logger.info(f"Repository root resolved: {REPO_ROOT}")
logger.info(f"Base directory: {BASE_DIR}")


[INFO] Repository root resolved: /home/jan-hendrik/home/PigMonitoringResearch
[INFO] Base directory: /home/jan-hendrik/home/PigMonitoringResearch/pipeline_outputs/monitoring_pipeline/piglet_rearing
[INFO] Base directory: /home/jan-hendrik/home/PigMonitoringResearch/pipeline_outputs/monitoring_pipeline/piglet_rearing


In [23]:
# List all relevant CSV files
csv_files: List[Path] = []
if BASE_DIR.exists():
    for p in BASE_DIR.rglob(CAMERA_GLOB_PATTERN):
        if p.is_file():
            csv_files.append(p)
else:
    logger.warning(f"Base directory does not exist: {BASE_DIR}")

# Sort for reproducibility
csv_files = sorted(csv_files)
num_csv_files = len(csv_files)
print(f"Number of CSV files found: {num_csv_files}")
logger.info(f"CSV files found: {num_csv_files}")


[INFO] CSV files found: 1411


Number of CSV files found: 1411


In [24]:
# Helper function: load & validate
REQUIRED_COLUMNS = ['num_tail_detections', 'num_pig_detections']

def load_and_validate(path: Path, usecols: Optional[List[str]] = None) -> pd.DataFrame:
    try:
        df = pd.read_csv(path, low_memory=False, usecols=usecols)
    except Exception as e:
        logger.warning(f"Error reading {path.name}: {e}")
        raise
    # Normalize column naming in case of legacy 'Num_tail_detections'
    if 'Num_tail_detections' in df.columns and 'num_tail_detections' not in df.columns:
        df = df.rename(columns={'Num_tail_detections': 'num_tail_detections'})
    for col in REQUIRED_COLUMNS:
        if col not in df.columns:
            df[col] = 0
    return df

In [29]:
# Aggregation: total rows & sums (serial) + video duration
from datetime import datetime

TIME_COLUMNS = ('start_timestamp', 'end_timestamp')

def parse_duration_seconds(df: pd.DataFrame) -> int:
    if not all(c in df.columns for c in TIME_COLUMNS):
        return 0
    # Expect strings like '2021-12-28 07:47:04'
    try:
        start = pd.to_datetime(df['start_timestamp'], errors='coerce')
        end = pd.to_datetime(df['end_timestamp'], errors='coerce')
        deltas = (end - start).dt.total_seconds()
        return int(deltas.fillna(0).sum())
    except Exception:
        return 0

total_rows = 0
total_tail = 0
total_pig = 0
skipped_files = 0
total_video_seconds = 0
missing_ts_files = 0

if num_csv_files == 0:
    logger.warning('No CSV files found – aborting aggregation.')
else:
    iterator = tqdm(csv_files, desc='Processing CSV', disable=not TQDM_AVAILABLE)
    for csv_path in iterator:
        try:
            df = load_and_validate(csv_path)
        except Exception:
            skipped_files += 1
            continue
        rows = len(df)
        total_rows += rows
        total_tail += df['num_tail_detections'].sum()
        total_pig += df['num_pig_detections'].sum()
        if all(c in df.columns for c in TIME_COLUMNS):
            total_video_seconds += parse_duration_seconds(df)
        else:
            missing_ts_files += 1

video_hours = total_video_seconds / 3600 if total_video_seconds else 0
video_days = video_hours / 24 if total_video_seconds else 0

print(f"Total rows: {total_rows}")
print(f"Sum num_tail_detections: {total_tail}")
print(f"Sum num_pig_detections: {total_pig}")
print(f"Total video seconds: {total_video_seconds}")
print(f"Total video hours: {video_hours:.2f}")
print(f"Total video days: {video_days:.2f}")
print(f"Files missing timestamp columns: {missing_ts_files}")
print(f"Skipped files: {skipped_files}")


Processing CSV: 100%|██████████| 1411/1411 [00:43<00:00, 32.59it/s]

Total rows: 38560799
Sum num_tail_detections: 285018249
Sum num_pig_detections: 764990250
Total video seconds: 38560799
Total video hours: 10711.33
Total video days: 446.31
Files missing timestamp columns: 0
Skipped files: 0





In [32]:
# Summary as DataFrame / dict
summary = {
    'csv_files': num_csv_files,
    'total_rows': total_rows,
    'sum_num_tail_detections': int(total_tail),
    'sum_num_pig_detections': int(total_pig),
    'total_video_seconds': int(total_video_seconds),
    'total_video_hours': round(video_hours, 2),
    'total_video_days': round(video_days, 2),
    'files_missing_timestamps': missing_ts_files,
    'skipped_files': skipped_files
}
summary_df = pd.DataFrame([summary])
summary_df


Unnamed: 0,csv_files,total_rows,sum_num_tail_detections,sum_num_pig_detections,total_video_seconds,total_video_hours,total_video_days,files_missing_timestamps,skipped_files
0,1411,38560799,285018249,764990250,38560799,10711.33,446.31,0,0


In [31]:
# Persist results
csv_out = OUTPUT_DIR / 'csv_aggregat_summary.csv'
json_out = OUTPUT_DIR / 'csv_aggregat_summary.json'
summary_df.to_csv(csv_out, index=False)
with open(json_out, 'w') as f:
    json.dump(summary, f, indent=2)
print(f"Summary written: {csv_out}")
print(f"Summary written: {json_out}")


Summary written: /home/jan-hendrik/home/PigMonitoringResearch/data_exploration/csv_aggregat_summary.csv
Summary written: /home/jan-hendrik/home/PigMonitoringResearch/data_exploration/csv_aggregat_summary.json
