# Stats shots parquet exploration

This notebook loads a parquet file produced by `ingest_bi.py` from the `output/bi/<colony_id>/stats.parquet` directory structure and shows basic time-series plots for ticks, creature count, and key genes.



In [9]:
import os
from pathlib import Path

import pandas as pd

# Heuristic: this notebook lives under `<repo_root>/bi`, and parquet files
# live under `<repo_root>/output/bi/<colony_id>/stats.parquet`, so the project root is the parent
# directory of the current working directory.
PROJECT_ROOT = Path.cwd().parent
ANALYTICS_DIR = PROJECT_ROOT / "output" / "bi"

# Find all colony directories (each contains a stats.parquet file)
colony_dirs = []
for colony_dir in sorted(ANALYTICS_DIR.iterdir()):
    if colony_dir.is_dir():
        stats_file = colony_dir / "stats.parquet"
        if stats_file.exists():
            colony_dirs.append(colony_dir)

if not colony_dirs:
    raise FileNotFoundError(f"No colony directories with stats.parquet found under {ANALYTICS_DIR}. Run ingest_bi.py from the repo root first.")

# Choose which colony directory to load (by index into the list above)
COLONY_INDEX = 0  # adjust this if you want a different colony

bi_data_file_path = colony_dirs[COLONY_INDEX]

In [10]:
# Install plotly if not already installed
try:
    import plotly.express as px  # pyright: ignore[reportMissingImports]
except ImportError:
    import sys
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "plotly"], 
                         stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    import plotly.express as px  # pyright: ignore[reportMissingImports]


In [11]:
# Load the selected parquet file and show available ticks

df = pd.read_parquet(bi_data_file_path / 'stats.parquet')

if "tick" not in df.columns:
    raise KeyError("Column 'tick' not found in parquet file.")

unique_ticks = sorted(df["tick"].dropna().unique())


In [12]:
# Print colony ID in a large title font
from IPython.display import display, HTML, Markdown  # pyright: ignore[reportMissingImports]

# Check if colony_id exists in the dataframe
if "colony_id" in df.columns:
    colony_id = df["colony_id"].iloc[0] if len(df) > 0 else "Unknown"
    # Use HTML for large title display
    display(HTML(f'<h1 style="font-size: 48px; font-weight: bold; text-align: center; margin: 20px 0;">Colony ID: {colony_id}</h1>'))
elif "colony_instance_id" in df.columns:
    colony_id = df["colony_instance_id"].iloc[0] if len(df) > 0 else "Unknown"
    display(HTML(f'<h1 style="font-size: 48px; font-weight: bold; text-align: center; margin: 20px 0;">Colony ID: {colony_id}</h1>'))
else:
    # Try to extract from directory name as fallback
    colony_id = bi_data_file_path.name
    display(HTML(f'<h1 style="font-size: 48px; font-weight: bold; text-align: center; margin: 20px 0;">Colony ID: {colony_id}</h1>'))

In [13]:
# Plot creature percentage over time (tick on X axis)

import plotly.express as px  # pyright: ignore[reportMissingImports]

required_cols = {"creatures_count", "colony_width", "colony_height"}
missing = required_cols - set(df.columns)
if missing:
    raise KeyError(
        f"Missing columns {missing} in parquet file. "
        "Regenerate parquet with updated stats_shots_to_parquet.py to include colony_width/colony_height."
    )

# Sort by tick to ensure a sensible time series
plot_df = df.sort_values("tick").copy()

# Compute percentage of grid cells occupied by creatures
plot_df["grid_cells"] = plot_df["colony_width"] * plot_df["colony_height"]
plot_df["creature_pct"] = (plot_df["creatures_count"] / plot_df["grid_cells"]) * 100.0

import plotly.graph_objects as go  # pyright: ignore[reportMissingImports]

# Create figure with two area fills
fig = go.Figure()

# Add creature coverage area (from 0 to line) - blue color
fig.add_trace(go.Scatter(
    x=plot_df["tick"],
    y=plot_df["creature_pct"],
    fill='tozeroy',
    mode='lines',
    line=dict(width=1.5, color='blue'),
    fillcolor='rgba(100, 149, 237, 0.3)',  # Light blue with transparency
    name='Creature coverage',
    showlegend=False,
))

# Add empty cells area (from line to 100%) - off-white color
fig.add_trace(go.Scatter(
    x=plot_df["tick"],
    y=[100] * len(plot_df),
    fill='tonexty',
    mode='lines',
    line=dict(width=0),
    fillcolor='rgba(240, 240, 240, 0.5)',  # Off-white with transparency
    name='Empty cells',
    showlegend=False,
))

fig.update_layout(
    title="Creature coverage (%)",
    xaxis_title="Tick",
    yaxis_title="Creature % of cells occupied",
    height=400,
    yaxis_range=[0, 100],
    xaxis=dict(showgrid=False),
    yaxis=dict(showgrid=False),
    template="plotly_white"
)
fig.show()



In [14]:
# Plot creature count by tick, colored by original color RGB (with fills)
# Each curve represents creatures with a specific original color, colored by that color's RGB values
# Added area fills to 0 with alpha=0.5 for overlapping visualization

import plotly.graph_objects as go  # pyright: ignore[reportMissingImports]

plot_df = df.sort_values("tick").copy()

# Collect all unique original colors from top N columns across all ticks
all_colors = set()
for i in range(1, 6):  # top 1-5
    col_name = f"original_color_top{i}"
    if col_name in plot_df.columns:
        colors = plot_df[col_name].dropna().unique()
        all_colors.update(colors)

if not all_colors:
    print("No original_color data found in parquet file. Regenerate parquet with updated stats_shots_to_parquet.py.")
else:
    # For each color, track its count over time
    color_data = {}
    for color_str in all_colors:
        color_data[color_str] = []
    
    # Extract counts for each color at each tick
    for _, row in plot_df.iterrows():
        tick = row["tick"]
        for color_str in all_colors:
            count = None
            # Check top 1-5 columns for this color
            for i in range(1, 6):
                top_col = f"original_color_top{i}"
                count_col = f"original_color_top{i}_count"
                if top_col in row and row[top_col] == color_str:
                    if count_col in row:
                        count = row[count_col]
                    break
            color_data[color_str].append((tick, count if count is not None else 0))
    
    # Create Plotly figure
    fig = go.Figure()
    
    for color_str in sorted(all_colors):
        # Parse RGB values from color string (format: "R_G_B")
        try:
            parts = color_str.split("_")
            if len(parts) == 3:
                r = int(parts[0])
                g = int(parts[1])
                b = int(parts[2])
                rgb_color = f"rgb({r}, {g}, {b})"
                rgba_color = f"rgba({r}, {g}, {b}, 0.5)"  # Add alpha=0.5 for fill
            else:
                rgb_color = "rgb(128, 128, 128)"  # Default gray if parsing fails
                rgba_color = "rgba(128, 128, 128, 0.5)"
        except (ValueError, IndexError):
            rgb_color = "rgb(128, 128, 128)"  # Default gray if parsing fails
            rgba_color = "rgba(128, 128, 128, 0.5)"
        
        # Extract ticks and counts for this color
        ticks = [t for t, _ in color_data[color_str]]
        counts = [c for _, c in color_data[color_str]]
        
        # Only plot if there's at least one non-zero count
        if any(c > 0 for c in counts):
            fig.add_trace(go.Scatter(
                x=ticks,
                y=counts,
                mode='lines',
                name=color_str,
                line=dict(width=1.5, color=rgb_color),
                fill='tozeroy',
                fillcolor=rgba_color,
                opacity=0.7,
                showlegend=False
            ))
    
    fig.update_layout(
        title="Creature Count by Color",
        xaxis_title="Tick",
        yaxis_title="Creature Count",
        height=500,
        template="plotly_white",
        xaxis=dict(showgrid=False),
        yaxis=dict(showgrid=False)
    )
    fig.show()



In [15]:
# New metrics: Health, Food, and Age - 3 charts horizontally

from plotly.subplots import make_subplots  # pyright: ignore[reportMissingImports]
import plotly.graph_objects as go  # pyright: ignore[reportMissingImports]

plot_df = df.sort_values("tick").copy()

fig = make_subplots(
    rows=1, cols=3,
    subplot_titles=("Health average", "Food average", "Age average"),
    horizontal_spacing=0.08,
    shared_xaxes=True
)

window = 5  # smoothing window size

# 1) Health average
y_col_health = None
if "health_avg" in plot_df.columns:
    y_col_health = "health_avg"
elif "health_mean" in plot_df.columns:
    y_col_health = "health_mean"

if y_col_health is not None:
    smooth_col = f"{y_col_health}_smooth"
    plot_df[smooth_col] = plot_df[y_col_health].rolling(window=window, min_periods=1, center=True).mean()
    # Add shadow/shade line (wider, lighter color) behind the main line
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=4, color="rgba(255, 0, 0, 0.3)"), name="Health", showlegend=False),
        row=1, col=1
    )
    # Add main line on top
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=1, color="red"), name="Health", showlegend=False),
        row=1, col=1
    )
    fig.update_yaxes(title_text="Health", row=1, col=1, range=[0, None])
else:
    fig.add_annotation(text="Health average not found", xref="x1", yref="y1", x=0.5, y=0.5, showarrow=False, row=1, col=1, xanchor='center', yanchor='middle')

# 2) Food average
y_col_food = None
if "food_avg" in plot_df.columns:
    y_col_food = "food_avg"
elif "food_mean" in plot_df.columns:
    y_col_food = "food_mean"

if y_col_food is not None:
    smooth_col = f"{y_col_food}_smooth"
    plot_df[smooth_col] = plot_df[y_col_food].rolling(window=window, min_periods=1, center=True).mean()
    # Add shadow/shade line (wider, lighter color) behind the main line
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=4, color="rgba(0, 128, 0, 0.3)"), name="Food", showlegend=False),
        row=1, col=2
    )
    # Add main line on top
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=1, color="green"), name="Food", showlegend=False),
        row=1, col=2
    )
    fig.update_yaxes(title_text="Food", row=1, col=2, range=[0, None])
else:
    fig.add_annotation(text="Food average not found", xref="x2", yref="y2", x=0.5, y=0.5, showarrow=False, row=1, col=2, xanchor='center', yanchor='middle')

# 3) Age average
y_col_age = None
if "age_avg" in plot_df.columns:
    y_col_age = "age_avg"
elif "age_mean" in plot_df.columns:
    y_col_age = "age_mean"

if y_col_age is not None:
    smooth_col = f"{y_col_age}_smooth"
    plot_df[smooth_col] = plot_df[y_col_age].rolling(window=window, min_periods=1, center=True).mean()
    # Add shadow/shade line (wider, lighter color) behind the main line
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=4, color="rgba(100, 149, 237, 0.3)"), name="Age", showlegend=False),
        row=1, col=3
    )
    # Add main line on top
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=1, color="blue"), name="Age", showlegend=False),
        row=1, col=3
    )
    fig.update_yaxes(title_text="Age", row=1, col=3, range=[0, None])
else:
    fig.add_annotation(text="Age average not found", xref="x3", yref="y3", x=0.5, y=0.5, showarrow=False, row=1, col=3, xanchor='center', yanchor='middle')

fig.update_layout(height=300, template="plotly_white", showlegend=False)
fig.update_xaxes(title_text="Tick", showgrid=False, row=1, col=1)
fig.update_xaxes(title_text="Tick", showgrid=False, row=1, col=2)
fig.update_xaxes(title_text="Tick", showgrid=False, row=1, col=3)
fig.show()


In [16]:
# Compact view: 3 charts horizontally (creature size avg, kill ratio, move ratio)

from plotly.subplots import make_subplots  # pyright: ignore[reportMissingImports]
import plotly.graph_objects as go  # pyright: ignore[reportMissingImports]

plot_df = df.sort_values("tick").copy()

fig = make_subplots(
    rows=1, cols=3,
    subplot_titles=("Creature size average", "Kill ratio", "Move ratio"),
    horizontal_spacing=0.08
)

# 1) Numerical gene: creature_size_avg (or creature_size_mean fallback)
y_col_creature = None
if "creature_size_avg" in plot_df.columns:
    y_col_creature = "creature_size_avg"
elif "creature_size_mean" in plot_df.columns:
    y_col_creature = "creature_size_mean"

if y_col_creature is not None:
    window = 5  # smoothing window size
    smooth_col = f"{y_col_creature}_smooth"
    plot_df[smooth_col] = plot_df[y_col_creature].rolling(window=window, min_periods=1, center=True).mean()
    # Add shadow/shade line (wider, lighter color) behind the main line
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=4, color="rgba(128, 128, 128, 0.3)"), name="Creature size", showlegend=False),
        row=1, col=1
    )
    # Add main line on top
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=plot_df[smooth_col], mode='lines', 
                   line=dict(width=1), name="Creature size", showlegend=False),
        row=1, col=1
    )
    fig.update_yaxes(title_text="Value", row=1, col=1, range=[0, None])
else:
    fig.add_annotation(text="Creature size average not found", xref="x1", yref="y1", x=0.5, y=0.5, showarrow=False, row=1, col=1, xanchor='center', yanchor='middle')

# 2) Boolean gene: can_kill_true_fraction - convert to percentage with area fills
if "can_kill_true_fraction" in plot_df.columns:
    # Convert to percentage
    kill_pct = plot_df["can_kill_true_fraction"] * 100
    
    # Add true area (light red, from 0 to line - bottom)
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=kill_pct, fill='tozeroy', mode='lines', 
                   line=dict(width=0), fillcolor='rgba(255, 182, 193, 0.4)', 
                   name="True", showlegend=False),
        row=1, col=2
    )
    
    # Add false area (light green, from line to 100 - on top)
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=[100] * len(plot_df), fill='tonexty', mode='lines',
                   line=dict(width=0), fillcolor='rgba(144, 238, 144, 0.4)',
                   name="False", showlegend=False),
        row=1, col=2
    )
    
    # Add line on top
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=kill_pct, mode='lines', line=dict(width=1), 
                   name="Kill ratio", showlegend=False),
        row=1, col=2
    )
    fig.update_yaxes(title_text="Kill ratio (%)", row=1, col=2, range=[0, 100])
else:
    fig.add_annotation(text="Kill ratio not found", xref="x2", yref="y2", x=0.5, y=0.5, showarrow=False, row=1, col=2, xanchor='center', yanchor='middle')

# 3) Boolean gene: can_move_true_fraction - convert to percentage with area fills
if "can_move_true_fraction" in plot_df.columns:
    # Convert to percentage
    move_pct = plot_df["can_move_true_fraction"] * 100
    
    # Add true area (light red, from 0 to line - bottom)
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=move_pct, fill='tozeroy', mode='lines',
                   line=dict(width=0), fillcolor='rgba(255, 182, 193, 0.4)',
                   name="True", showlegend=False),
        row=1, col=3
    )
    
    # Add false area (light green, from line to 100 - on top)
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=[100] * len(plot_df), fill='tonexty', mode='lines',
                   line=dict(width=0), fillcolor='rgba(144, 238, 144, 0.4)',
                   name="False", showlegend=False),
        row=1, col=3
    )
    
    # Add line on top
    fig.add_trace(
        go.Scatter(x=plot_df["tick"], y=move_pct, mode='lines', line=dict(width=1),
                   name="Move ratio", showlegend=False),
        row=1, col=3
    )
    fig.update_yaxes(title_text="Move ratio (%)", row=1, col=3, range=[0, 100])
else:
    fig.add_annotation(text="Move ratio not found", xref="x3", yref="y3", x=0.5, y=0.5, showarrow=False, row=1, col=3, xanchor='center', yanchor='middle')

fig.update_layout(height=300, template="plotly_white", showlegend=False)
fig.update_xaxes(title_text="Tick", showgrid=False, row=1, col=1)
fig.update_xaxes(title_text="Tick", showgrid=False, row=1, col=2)
fig.update_xaxes(title_text="Tick", showgrid=False, row=1, col=3)
fig.show()

