
# Turning Movement Counts — Intersection Analytics

This notebook ingests **turning movement count (TMC)** data, parses it into a typed data model, and computes:
- Seasonal adjustments
- Growth factors (annual growth to a target year)
- Peak-hour windows for **AM**, **Midday (MD)**, and **PM**
- **Heavy vehicle (HV)** percentages
- **Peak Hour Factors (PHF)** for each movement, each approach, and the full intersection, for arbitrary time periods

> **Your file**: `SR 544 at Charlotte Rd (EXCEL EXPORT).xls` (already uploaded alongside this notebook). If you're running locally, put your file path in the `SOURCE_FILE` variable below.


In [None]:

# If your environment needs these, uncomment:
# !pip install pandas numpy xlrd --quiet

import math
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional, Literal
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

pd.options.display.float_format = "{:,.3f}".format

SOURCE_FILE = "SR 544 at Charlotte Rd (EXCEL EXPORT).xls"  # Change if needed



## Data model

We represent an intersection with approaches (N, S, E, W) and movements (Left, Thru, Right). The **minimum required** columns are the movement volumes in 15‑minute bins; HV columns are optional but encouraged.


In [None]:

Approach = Literal["NB","SB","EB","WB"]
Movement = Literal["L","T","R"]  # Left, Thru, Right

def phf_from_series(q15: pd.Series) -> Optional[float]:
    """Compute PHF for a given series of 15-minute volumes that represents exactly one hour (4 bins).
    PHF = Hourly volume / (4 * max 15-min volume). Returns None if not computable."""
    if q15 is None or q15.empty:
        return None
    if len(q15) != 4:
        return None
    hourly = float(q15.sum())
    max_q = float(q15.max())
    if max_q <= 0:
        return None
    return hourly / (4.0 * max_q)

@dataclass
class MovementCounts:
    approach: Approach
    movement: Movement  # L, T, or R
    volumes_15min: pd.Series            # index: datetime-like, freq 15min, values: veh count
    hv_15min: Optional[pd.Series]=None  # optional heavy vehicle 15-min counts

    def total(self, start=None, end=None) -> float:
        s = self.volumes_15min
        if start or end:
            s = s.loc[start:end]
        return float(s.sum())

    def hv_percent(self, start=None, end=None) -> Optional[float]:
        if self.hv_15min is None:
            return None
        s = self.volumes_15min
        h = self.hv_15min
        if start or end:
            s = s.loc[start:end]
            h = h.loc[start:end]
        tot = float(s.sum())
        hv = float(h.sum())
        if tot <= 0:
            return None
        return 100.0 * hv / tot

@dataclass
class ApproachCounts:
    approach: Approach
    movements: Dict[Movement, MovementCounts] = field(default_factory=dict)

    def series(self, part: Literal["vol","hv"] = "vol") -> pd.Series:
        frames = []
        for m, mc in self.movements.items():
            frames.append(mc.volumes_15min if part == "vol" else (mc.hv_15min or 0*mc.volumes_15min))
        if not frames:
            return pd.Series(dtype=float)
        return sum(frames)

    def total(self, start=None, end=None) -> float:
        s = self.series("vol")
        if start or end:
            s = s.loc[start:end]
        return float(s.sum())

    def hv_percent(self, start=None, end=None) -> Optional[float]:
        s = self.series("vol")
        h = self.series("hv")
        if start or end:
            s = s.loc[start:end]
            h = h.loc[start:end]
        tot = float(s.sum())
        hv = float(h.sum())
        if tot <= 0:
            return None
        return 100.0 * hv / tot

@dataclass
class Intersection:
    name: str
    interval_minutes: int
    movements: Dict[Tuple[Approach, Movement], MovementCounts]  # key e.g., ("NB","L")
    meta: Dict = field(default_factory=dict)

    @property
    def index(self) -> pd.DatetimeIndex:
        # Return the union index across movements (assumes aligned 15-min bins)
        idx = None
        for mc in self.movements.values():
            idx = mc.volumes_15min.index if idx is None else idx.union(mc.volumes_15min.index)
        return idx

    def approach(self, approach: Approach) -> ApproachCounts:
        mv = {m: mc for (a, m), mc in self.movements.items() if a == approach}
        return ApproachCounts(approach=approach, movements=mv)

    def as_dataframe(self) -> pd.DataFrame:
        """Return a wide DataFrame with columns like 'NB_L', 'NB_T', 'NB_R' and optional 'NB_L_HV', etc."""
        cols = {}
        for (a, m), mc in self.movements.items():
            cols[f"{a}_{m}"] = mc.volumes_15min
            if mc.hv_15min is not None:
                cols[f"{a}_{m}_HV"] = mc.hv_15min
        df = pd.DataFrame(cols).sort_index()
        return df

    # --- Adjustments ---
    def apply_seasonal(self, factors: Dict[str, float], period_map: Dict[pd.Timestamp, str]) -> "Intersection":
        """Return a new Intersection with volumes multiplied by seasonal factors keyed by period id (e.g., 'AM','MD','PM')."""
        new = {}
        for key, mc in self.movements.items():
            vol = mc.volumes_15min.copy()
            mult = vol.index.map(lambda ts: factors.get(period_map.get(ts, ""), 1.0))
            vol = vol * pd.Series(mult, index=vol.index)
            hv = None if mc.hv_15min is None else mc.hv_15min * pd.Series(mult, index=mc.hv_15min.index)
            new[key] = MovementCounts(approach=mc.approach, movement=mc.movement, volumes_15min=vol, hv_15min=hv)
        return Intersection(name=self.name+" (seasonal)", interval_minutes=self.interval_minutes, movements=new, meta=self.meta|{"seasonal": factors})

    def apply_growth(self, annual_rate: float, years: float) -> "Intersection":
        """Compound growth: multiplier = (1 + r)^years."""
        g = (1.0 + annual_rate) ** years
        new = {}
        for key, mc in self.movements.items():
            vol = mc.volumes_15min * g
            hv  = None if mc.hv_15min is None else mc.hv_15min * g
            new[key] = MovementCounts(approach=mc.approach, movement=mc.movement, volumes_15min=vol, hv_15min=hv)
        return Intersection(name=self.name+f" (grown {annual_rate:.3%} for {years:.2f}y)", interval_minutes=self.interval_minutes, movements=new, meta=self.meta|{"growth": {"annual_rate": annual_rate, "years": years, "multiplier": g}})

    # --- Peak windows & PHF ---
    def find_peak_window(self, start: str, end: str) -> Tuple[pd.Timestamp, float]:
        """Within [start, end], find the 60-minute rolling window with the maximum *intersection* total volume (all movements).
        Returns (window_start, total_volume)."""
        df = self.as_dataframe().loc[start:end]
        if df.empty:
            return (pd.NaT, float("nan"))
        totals = df[[c for c in df.columns if not c.endswith("_HV")]].sum(axis=1)
        # sum across 4 consecutive 15-min bins
        win = totals.rolling(window=4, min_periods=4).sum()
        idx = win.idxmax()
        return idx - pd.Timedelta(minutes=45), float(win.max())  # idx refers to end of window

    def phf_by(self, level: Literal["movement","approach","intersection"], window_start: pd.Timestamp) -> Dict[str, float]:
        """Compute PHF over the 60-min window starting at `window_start`.
        Returns a dict mapping keys to PHF values."""
        t0 = pd.Timestamp(window_start)
        t1 = t0 + pd.Timedelta(hours=1) - pd.Timedelta(minutes=self.interval_minutes)
        phfs = {}
        if level == "movement":
            for (a,m), mc in self.movements.items():
                q = mc.volumes_15min.loc[t0:t1]
                phfs[f"{a}_{m}"] = phf_from_series(q)
        elif level == "approach":
            for a in ["NB","SB","EB","WB"]:
                s = self.approach(a).series("vol").loc[t0:t1]
                phfs[a] = phf_from_series(s)
        elif level == "intersection":
            s = self.as_dataframe().filter(regex=r"^(?!.*_HV$).*$").sum(axis=1).loc[t0:t1]
            phfs["intersection"] = phf_from_series(s)
        return phfs



## Parsing your Excel

FDOT/typical TMC exports vary. The parser below is **adapter-driven**: supply a **column map** from source columns to canonical keys like `NB_L`, `NB_T`, `NB_R`, etc., and (optionally) `NB_L_HV` for heavy vehicles.

1. Set `SHEET` and `HEADER_ROW` to the sheet name and the zero-based header row of the time-series table.
2. Provide `TIME_COLUMN` containing timestamps (or time-of-day).
3. Fill `COLUMN_MAP` with your file's movement column names.


In [None]:

# ---- Configure these for your file ----
SHEET = None   # e.g., 'Sheet1'; set to None to use the first sheet
HEADER_ROW = 0 # 0-based row index where the time-series headers begin
TIME_COLUMN = "Time"  # Name of the time column in the sheet

# Example map. Replace values on the right with your exact column names from the sheet.
COLUMN_MAP = {
    # Movement volumes
    "NB_L": "NB Left",
    "NB_T": "NB Thru",
    "NB_R": "NB Right",
    "SB_L": "SB Left",
    "SB_T": "SB Thru",
    "SB_R": "SB Right",
    "EB_L": "EB Left",
    "EB_T": "EB Thru",
    "EB_R": "EB Right",
    "WB_L": "WB Left",
    "WB_T": "WB Thru",
    "WB_R": "WB Right",
    # Optional HV columns (comment missing ones)
    "NB_L_HV": "NB Left HV",
    "NB_T_HV": "NB Thru HV",
    "NB_R_HV": "NB Right HV",
    "SB_L_HV": "SB Left HV",
    "SB_T_HV": "SB Thru HV",
    "SB_R_HV": "SB Right HV",
    "EB_L_HV": "EB Left HV",
    "EB_T_HV": "EB Thru HV",
    "EB_R_HV": "EB Right HV",
    "WB_L_HV": "WB Left HV",
    "WB_T_HV": "WB Thru HV",
    "WB_R_HV": "WB Right HV",
}

# Optional: if the sheet stores date and separate time columns, you can merge them here.
DATE_FOR_COUNTS = None  # e.g., '2025-03-18' to anchor time-of-day into full timestamps


In [None]:

def read_tmc_dataframe(source_file: str = SOURCE_FILE,
                       sheet: Optional[str] = SHEET,
                       header_row: int = HEADER_ROW,
                       time_column: str = TIME_COLUMN,
                       column_map: Dict[str,str] = COLUMN_MAP) -> pd.DataFrame:
    """Read the Excel and return a canonical wide DataFrame indexed by timestamps at 15-min.
    Columns will include keys from COLUMN_MAP where present."""
    df = pd.read_excel(source_file, sheet_name=sheet, header=header_row)
    # Normalize time
    if DATE_FOR_COUNTS is not None:
        # time-of-day to full timestamps
        ts = pd.to_datetime(DATE_FOR_COUNTS + " " + df[time_column].astype(str))
    else:
        ts = pd.to_datetime(df[time_column])
    df.index = pd.DatetimeIndex(ts)
    df = df.drop(columns=[time_column])
    # Build canonical
    out = pd.DataFrame(index=df.index)
    for canon, src in column_map.items():
        if src in df.columns:
            out[canon] = pd.to_numeric(df[src], errors="coerce").fillna(0).astype(float)
    out = out.sort_index()
    return out

def build_intersection_from_df(name: str, wide_df: pd.DataFrame) -> Intersection:
    # infer interval (minutes) from index
    if len(wide_df.index) >= 2:
        dt = (wide_df.index[1] - wide_df.index[0]).seconds // 60
    else:
        dt = 15
    movements = {}
    for approach in ["NB","SB","EB","WB"]:
        for mv in ["L","T","R"]:
            key = f"{approach}_{mv}"
            if key in wide_df.columns:
                vol = wide_df[key].copy()
                hv = wide_df.get(f"{key}_HV", None)
                movements[(approach, mv)] = MovementCounts(approach=approach, movement=mv, volumes_15min=vol, hv_15min=hv)
    return Intersection(name=name, interval_minutes=int(dt), movements=movements, meta={})



## Period definitions (AM / MD / PM) and seasonal factors

Edit the period clock-times and factors as needed. The `period_map` assigns each timestamp to a period name (e.g., `"AM"`), then `apply_seasonal` uses `factors[period]`.


In [None]:

# Define daily periods — tweak as needed
AM = ("06:00", "10:00")
MD = ("10:00", "15:00")
PM = ("15:00", "19:00")
PERIODS = {"AM": AM, "MD": MD, "PM": PM}

def build_period_map(index: pd.DatetimeIndex, periods: Dict[str, Tuple[str,str]] = PERIODS) -> Dict[pd.Timestamp, str]:
    def in_range(t, start, end):
        return (t >= pd.to_datetime(start).time()) and (t < pd.to_datetime(end).time())
    m = {}
    for ts in index:
        label = ""
        for name, (s,e) in periods.items():
            if in_range(ts.time(), s, e):
                label = name
                break
        m[ts] = label
    return m

# Example seasonal multipliers by period (1.00 = no change)
SEASONAL_FACTORS = {
    "AM": 1.05,
    "MD": 0.98,
    "PM": 1.03,
}



## Load, build, and example analytics


In [None]:

# 1) Read the source; if your environment lacks 'xlrd' for .xls, install it above or save as .xlsx/.csv and change SOURCE_FILE.
try:
    wide = read_tmc_dataframe()
except Exception as e:
    print("Reading failed:", e)
    print("Tip: install xlrd for .xls:  !pip install xlrd  OR save to .xlsx/.csv and update SOURCE_FILE.")
    wide = pd.DataFrame()

# 2) Build the Intersection model
ix = build_intersection_from_df("SR 544 at Charlotte Rd", wide) if not wide.empty else None

# 3) Seasonal + growth examples
if ix is not None:
    pmap = build_period_map(ix.index)
    ix_seasonal = ix.apply_seasonal(SEASONAL_FACTORS, pmap)
    # Example: 2% annual growth for 5 years
    ix_future = ix_seasonal.apply_growth(annual_rate=0.02, years=5.0)

    display(ix.as_dataframe().head(8))
else:
    print("No intersection was built yet.")



## Peak windows and PHFs

The `find_peak_window()` method scans a time range and returns the **start** of the highest 60‑minute intersection total. Then use `phf_by()` to compute PHFs for movements, approaches, or the whole intersection within that window.


In [None]:

def report_peak_and_phf(ix: Intersection, label: str, start_clock: str, end_clock: str):
    if ix is None:
        print(f"[{label}] No data")
        return
    day = ix.index[0].normalize()
    start = pd.to_datetime(str(day.date()) + " " + start_clock)
    end   = pd.to_datetime(str(day.date()) + " " + end_clock)
    w0, vol = ix.find_peak_window(start=str(start), end=str(end))
    if pd.isna(w0):
        print(f"[{label}] No window found in {start_clock}-{end_clock}")
        return
    print(f"[{label}] Peak window start: {w0}, total = {vol:,.0f}")
    phf_mov = ix.phf_by("movement", w0)
    phf_app = ix.phf_by("approach", w0)
    phf_int = ix.phf_by("intersection", w0)
    # Tabular views
    df_mov = pd.DataFrame.from_dict(phf_mov, orient="index", columns=["PHF"]).sort_index()
    df_app = pd.DataFrame.from_dict(phf_app, orient="index", columns=["PHF"]).sort_index()
    df_int = pd.DataFrame.from_dict(phf_int, orient="index", columns=["PHF"]).sort_index()
    display(df_mov)
    display(df_app)
    display(df_int)

# Run standard AM/MD/PM
if ix is not None:
    report_peak_and_phf(ix, "AM", *AM)
    report_peak_and_phf(ix, "MD", *MD)
    report_peak_and_phf(ix, "PM", *PM)



## Heavy vehicle (HV) percentages

If HV columns are provided (e.g., `NB_L_HV`), the notebook reports HV% by movement and approach for any period.


In [None]:

def hv_percentages(ix: Intersection, start_clock: str, end_clock: str) -> pd.DataFrame:
    if ix is None:
        return pd.DataFrame()
    day = ix.index[0].normalize()
    start = pd.to_datetime(str(day.date()) + " " + start_clock)
    end   = pd.to_datetime(str(day.date()) + " " + end_clock)
    rows = []
    for (a,m), mc in ix.movements.items():
        pct = mc.hv_percent(start, end)
        rows.append({"key": f"{a}_{m}", "level": "movement", "HV_%": pct})
    for a in ["NB","SB","EB","WB"]:
        pct = ix.approach(a).hv_percent(start, end)
        rows.append({"key": a, "level": "approach", "HV_%": pct})
    df = pd.DataFrame(rows).set_index("key").sort_index()
    return df

if ix is not None:
    print("HV% — AM window:")
    display(hv_percentages(ix, *AM))
    print("HV% — MD window:")
    display(hv_percentages(ix, *MD))
    print("HV% — PM window:")
    display(hv_percentages(ix, *PM))



## Export

You can export adjusted volumes and summary stats to CSV.


In [None]:

def export_intersection(ix: Intersection, path: str) -> None:
    df = ix.as_dataframe()
    df.to_csv(path, index=True)
    print(f"Wrote {path}")

# Example export (commented)
# if ix is not None:
#     export_intersection(ix_future, "intersection_adjusted.csv")
