# Horse Feature DataFrame Generation

This notebook contains functions to load and analyze ground truth data for horse detection. It processes GeoJSON data and associated orthomosaic tiles to create a feature dataset suitable for machine learning.

## Setup and Imports

In [None]:
import geopandas as gpd
import pandas as pd
import numpy as np
import rasterio
from pathlib import Path
import os
from io import BytesIO
from tqdm import tqdm
import json

In [None]:
# Constants for data directories relative to project root
DATA_DIR = "../data"
PROCESSED_DIR = os.path.join(DATA_DIR, "processed")
ENCODED_TILES_DIR = os.path.join(PROCESSED_DIR, "encoded_tiles")

## Data Loading Functions

These functions handle loading and analyzing the ground truth data from GeoJSON files. The data contains point features representing horse presence/absence observations.

### `load_ground_truth`
Loads ground truth data from a GeoJSON file into a GeoDataFrame. The function includes basic error handling for file existence.

In [None]:
def load_ground_truth(
    filepath: str = "../data/vector/groundtruth.geojson",
) -> gpd.GeoDataFrame:
    """Load ground truth data from a GeoJSON file."""
    path = Path(filepath)
    if not path.exists():
        raise FileNotFoundError(f"File not found: {filepath}")
    return gpd.read_file(filepath)

### `get_point_info`
Extracts basic information about point features from the GeoDataFrame, including:
- Total number of points
- Available columns
- Spatial bounds of the data

In [None]:
def get_point_info(gdf: gpd.GeoDataFrame) -> dict:
    """Get basic information about point features."""
    points = gdf[gdf.geometry.type == "Point"]
    return {
        "total_points": len(points),
        "columns": list(points.columns),
        "bounds": points.total_bounds.tolist(),
    }

In [None]:
# Load the ground truth data
gdf = load_ground_truth()

# Get and display basic information
info = get_point_info(gdf)
print("Ground Truth Information:")
for key, value in info.items():
    print(f"{key}: {value}")

# Display the first few rows of the GeoDataFrame
print("\nFirst few rows of the ground truth data:")
display(gdf.head())

## Tile Processing Functions

This section contains functions for processing and encoding GeoTIFF tiles. These functions handle the conversion of image data into a format suitable for machine learning.

### `encode_tile`
Reads a GeoTIFF tile and encodes it as compressed bytes. The function:
- Loads the raster data using rasterio
- Preserves both pixel data and metadata
- Compresses the data using numpy's savez_compressed
- Returns the encoded data as bytes

In [None]:
def encode_tile(tile_path: str) -> bytes:
    """Read a GeoTIFF tile and encode it as bytes."""
    if not os.path.exists(tile_path):
        raise FileNotFoundError(f"Tile not found: {tile_path}")

    with rasterio.open(tile_path) as src:
        data = src.read()
        bio = BytesIO()
        np.savez_compressed(bio, data=data, **src.meta)
        bio.seek(0)
        return bio.read()

## Feature DataFrame Creation

This section handles the creation of a comprehensive DataFrame that combines ground truth data with orthomosaic information. The process involves:
- Converting GeoDataFrame to standard DataFrame
- Processing orthomosaic dates and directories
- Creating feature rows based on temporal relationships

### `create_feature_dataframe`
Creates a DataFrame by combining ground truth points with orthomosaic information. Key features:
- Uses all available orthomosaics for both presence and absence points
- Generates tile paths for each point-orthomosaic combination
- Adds an `observation_offset` column, which is the unsigned integer number of days between the orthomosaic date and the ground truth date

In [None]:
def create_feature_dataframe(
    gdf: gpd.GeoDataFrame, tiles_dir: str = "../data/raster/tiles"
) -> gpd.GeoDataFrame:
    # Retain the geometry column
    df = gdf.copy()

    # Get and sort orthomosaics
    orthomosaics = sorted(
        [
            d
            for d in os.listdir(tiles_dir)
            if os.path.isdir(os.path.join(tiles_dir, d)) and not d.startswith(".")
        ]
    )

    # Convert dates
    ortho_dates = pd.to_datetime(
        [d.split("_")[0] for d in orthomosaics], format="%y%m%d", utc=True
    ).tz_localize(None)

    # Create rows for each orthomosaic and calculate observation offset
    rows = []
    for _, row in df.iterrows():
        row_date = pd.to_datetime(row["Datetime"]).tz_localize(None)

        for ortho, ortho_date in zip(orthomosaics, ortho_dates):
            new_row = row.to_dict()
            new_row["orthomosaic"] = ortho
            new_row["tile_path"] = os.path.join(
                tiles_dir,
                ortho,
                "presence" if row["Presence"] == 1 else "absence",
                f"{int(row['idx']):04d}.tif",
            )
            # Calculate the observation offset in days
            new_row["observation_offset"] = abs((ortho_date - row_date).days)
            rows.append(new_row)

    # Create a GeoDataFrame from the rows
    feature_gdf = gpd.GeoDataFrame(rows, geometry=gdf.geometry)
    
    # Remove the 'tile_path' column
    feature_gdf.drop(columns=['tile_path'], inplace=True)
    
    return feature_gdf

## Tile Encoding Functions

This section contains functions for batch processing and encoding multiple tiles. These functions handle the bulk conversion of image data and provide progress tracking and error handling.

### `encode_all_tiles`
Processes and encodes all tiles in the DataFrame. Key features:
- Uses tqdm for progress tracking
- Handles encoding errors gracefully
- Provides summary statistics of successful/failed encodings
- Returns a filtered DataFrame containing only successfully encoded tiles
- Includes batch processing capability for memory efficiency

In [None]:
def encode_all_tiles(df: pd.DataFrame, batch_size: int = 100) -> pd.DataFrame:
    """Encode all tiles in the DataFrame."""
    result_df = df.copy()
    result_df["encoded_tile"] = None
    failed_encodings = []

    for idx in tqdm(df.index, desc="Encoding tiles"):
        try:
            result_df.loc[idx, "encoded_tile"] = encode_tile(df.loc[idx, "tile_path"])
        except (FileNotFoundError, ValueError) as e:
            failed_encodings.append((df.loc[idx, "tile_path"], str(e)))

    if failed_encodings:
        print(f"\nFailed to encode {len(failed_encodings)} tiles")

    success_mask = result_df["encoded_tile"].notna()
    print(f"\nSuccessfully encoded {success_mask.sum()} out of {len(df)} tiles")
    return result_df[success_mask].copy()

## Data Storage Functions

This section handles the efficient storage of processed data. The functions here manage the serialization of large DataFrames into manageable chunks and maintain metadata about the saved data.

### `save_chunked_parquet`
Saves the DataFrame to parquet files in chunks for efficient storage and loading. Key features:
- Automatically determines optimal chunk size based on target size in MB
- Creates directory structure if it doesn't exist
- Saves each chunk as a separate parquet file
- Generates metadata JSON file containing:
  - Number of chunks
  - Total number of rows
  - Total size in MB
  - List of chunk file names

In [None]:
def save_chunked_parquet(
    gdf: gpd.GeoDataFrame, output_dir: str = ENCODED_TILES_DIR, target_size_mb: int = 500
) -> None:
    """Save GeoDataFrame to parquet files in chunks."""
    os.makedirs(output_dir, exist_ok=True)

    # Convert geometry to WKT for storage
    df = gdf.copy()
    df['geometry'] = df['geometry'].apply(lambda geom: geom.wkt if geom else None)

    df_size = df.memory_usage(deep=True).sum() / (1024 * 1024)
    n_chunks = int(np.ceil(df_size / target_size_mb))
    chunk_size = len(df) // n_chunks

    for i in tqdm(range(n_chunks), desc="Saving chunks"):
        chunk = df.iloc[i * chunk_size : min((i + 1) * chunk_size, len(df))]
        chunk.to_parquet(
            os.path.join(output_dir, f"tiles_chunk_{i:03d}.parquet"), index=True
        )

    # Save metadata
    metadata = {
        "n_chunks": n_chunks,
        "total_rows": len(df),
        "total_size_mb": df_size,
        "chunk_files": [f"tiles_chunk_{i:03d}.parquet" for i in range(n_chunks)],
    }

    with open(os.path.join(output_dir, "chunks_metadata.json"), "w") as f:
        json.dump(metadata, f, indent=2)

## Main Processing Pipeline

This section executes the complete data processing pipeline:
1. Loads and analyzes ground truth data
2. Creates the feature DataFrame
3. Displays sample data and statistics
4. Encodes all tiles
5. Saves the processed data in chunked parquet format

The pipeline includes comprehensive error handling and progress reporting at each step.

In [None]:
if __name__ == "__main__":
    try:
        # Load and analyze ground truth data
        gdf = load_ground_truth()

        # Print basic information
        info = get_point_info(gdf)
        print("\nGround Truth Information:")
        for key, value in info.items():
            print(f"{key}: {value}")

        # Create and display feature DataFrame
        df = create_feature_dataframe(gdf)
        print("\nFeature DataFrame Info:")
        print(df.info())

        # Explicitly print the column names of the Feature DataFrame
        print("\nFeature DataFrame Columns:")
        print(df.columns.tolist())

        # Show samples of presence and absence rows with observation_offset
        print("\nSample of Presence Rows (Presence == 1):")
        presence_sample = df[df["Presence"] == 1].head(3)
        print(
            presence_sample[
                ["Presence", "Datetime", "orthomosaic", "tile_path", "observation_offset"]
            ].to_string()
        )

        print("\nSample of Absence Rows (Presence == 0):")
        absence_sample = df[df["Presence"] == 0].head(3)
        print(
            absence_sample[
                ["Presence", "Datetime", "orthomosaic", "tile_path", "observation_offset"]
            ].to_string()
        )

        # Print some statistics
        print("\nSummary:")
        print(f"Total rows: {len(df)}")
        print(f"Presence rows: {len(df[df['Presence'] == 1])}")
        print(f"Absence rows: {len(df[df['Presence'] == 0])}")
        print(f"Unique orthomosaics: {df['orthomosaic'].nunique()}")

        # Encode all tiles
        print("\nEncoding all tiles...")
        encoded_df = encode_all_tiles(df)
        print("\nEncoded DataFrame Info:")
        print(encoded_df.info(show_counts=True))

        # Remove the tile_path column after encoding
        encoded_df.drop(columns=["tile_path"], inplace=True)

        # Explicitly print the column names of the Feature DataFrame
        # to confirm dropped tile_path column
        print("\nFeature DataFrame Columns:")
        print(df.columns.tolist())

        # Show sample of encoded tile sizes
        print("\nSample of encoded tile sizes (bytes):")
        encoded_sizes = encoded_df["encoded_tile"].apply(len)
        print(f"Mean size: {encoded_sizes.mean():.0f}")
        print(f"Min size: {encoded_sizes.min()}")
        print(f"Max size: {encoded_sizes.max()}")
        print(f"Total size: {encoded_sizes.sum() / (1024*1024*1024):.2f} GB")

        # Save encoded tiles to parquet chunks
        save_chunked_parquet(encoded_df)  # Using default output directory

    except Exception as e:
        print(f"Error: {str(e)}")