Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
624 changes: 311 additions & 313 deletions eta_prediction/feature_engineering/dataset_builder.py

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions eta_prediction/feature_engineering/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[project]
name = "feature_engineering"
version = "0.1.0"
description = "Feature engineering utilities for GTFS realtime and schedule data (part of gtfs-django repository)"
readme = "README.md"
requires-python = ">=3.10"
authors = [
{ name = "maintainer", email = "maintainer@example.com" }
]
license = { text = "MIT" }

dependencies = [
"numpy>=1.26,<2",
"pandas>=2.2,<3",
"scikit-learn>=1.3,<2",
"scipy>=1.11,<2",
"joblib>=1.3,<2",
"tqdm>=4.66,<5",
"pyproj>=3.6,<4",
"shapely>=2.1,<3",
"geopandas>=0.14,<1",
]

[project.optional-dependencies]
dev = [
"pytest>=7.4,<8",
"black>=24.3,<25",
"isort>=5.13,<6",
"mypy>=1.11,<2",
"pre-commit>=3.4,<4",
]

[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
sys.path.insert(0, str(ETA_PREDICTION_ROOT))

try:
from feature_engineering.dataset_builder import build_training_dataset, save_dataset
from feature_engineering.dataset_builder import build_vp_training_dataset, save_dataset
from sch_pipeline.utils import top_routes_by_scheduled_trips
except ImportError as e:
print(f"ERROR: Failed to import required modules: {e}")
Expand All @@ -28,7 +28,7 @@


class Command(BaseCommand):
help = "Build a small ETA training dataset for the top-N busiest routes (by scheduled trips)."
help = "Build ETA training dataset from VehiclePosition data for the top-N busiest routes."

def add_arguments(self, parser):
parser.add_argument(
Expand All @@ -49,10 +49,28 @@ def add_arguments(self, parser):
default=10,
help="Min observations per stop"
)
parser.add_argument(
"--distance-threshold",
type=float,
default=50.0,
help="Distance threshold (meters) to consider vehicle 'arrived' at stop"
)
parser.add_argument(
"--max-stops-ahead",
type=int,
default=5,
help="Maximum number of upcoming stops to include per VP"
)
parser.add_argument(
"--vp-sample-interval",
type=int,
default=30,
help="Sample VPs every N seconds per vehicle (0=no sampling, use all VPs)"
)
parser.add_argument(
"--out",
type=str,
default="eta_sample.parquet",
default="eta_vp_sample.parquet",
help="Output parquet path"
)
parser.add_argument(
Expand All @@ -65,18 +83,49 @@ def add_arguments(self, parser):
type=str,
help="Comma-separated route IDs (overrides --top-routes)"
)
parser.add_argument(
"--start-date",
type=str,
help="Start date (YYYY-MM-DD format, overrides --days)"
)
parser.add_argument(
"--end-date",
type=str,
help="End date (YYYY-MM-DD format, overrides --days)"
)

def handle(self, *args, **opts):
n = opts["top_routes"]
days = opts["days"]
min_obs = opts["min_observations"]
distance_threshold = opts["distance_threshold"]
max_stops_ahead = opts["max_stops_ahead"]
# vp_sample_interval = opts["vp_sample_interval"]
out = opts["out"]
attach_weather = not opts["no_weather"]
manual_routes = opts.get("route_ids")
start_date_str = opts.get("start_date")
end_date_str = opts.get("end_date")

# or for a fixed day window matching your data:
start = timezone.datetime(2025, 10, 8, 0, 0, tzinfo=UTC)
end = timezone.datetime(2025, 10, 9, 0, 0, tzinfo=UTC)
# Determine date range
if start_date_str and end_date_str:
try:
start = timezone.datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=UTC)
end = timezone.datetime.strptime(end_date_str, "%Y-%m-%d").replace(tzinfo=UTC)
except ValueError as e:
self.stdout.write(
self.style.ERROR(f"Invalid date format: {e}. Use YYYY-MM-DD")
)
return
else:
# Default: use fixed date range or calculate from --days
# For testing, using fixed dates:
start = timezone.datetime(2025, 10, 8, 0, 0, tzinfo=UTC)
end = timezone.datetime(2025, 10, 9, 0, 0, tzinfo=UTC)
# Or calculate from days:
# end = timezone.now()
# start = end - timedelta(days=days)

# Determine which routes to use
if manual_routes:
route_ids = [r.strip() for r in manual_routes.split(",")]
Expand Down Expand Up @@ -112,21 +161,49 @@ def handle(self, *args, **opts):
self.stdout.write("\n" + "="*60)
self.stdout.write(self.style.NOTICE("Configuration:"))
self.stdout.write(f" Routes: {', '.join(route_ids)}")
self.stdout.write(f" Date range: {start.date()} to {end.date()} ({days} days)")
self.stdout.write(f" Date range: {start.date()} to {end.date()}")
self.stdout.write(f" Distance threshold: {distance_threshold}m")
self.stdout.write(f" Max stops ahead: {max_stops_ahead}")
# # self.stdout.write(f" VP sample interval: {vp_sample_interval}s ({'all VPs' if vp_sample_interval == 0 else 'sampled'})")
self.stdout.write(f" Min observations/stop: {min_obs}")
self.stdout.write(f" Weather features: {'enabled' if attach_weather else 'disabled'}")
self.stdout.write(f" Output: {out}")
self.stdout.write("="*60 + "\n")

# Check for VehiclePosition data
from rt_pipeline.models import VehiclePosition
vp_count = VehiclePosition.objects.filter(
ts__gte=start,
ts__lt=end
).count()

if vp_count == 0:
self.stdout.write(
self.style.WARNING(
f"No VehiclePosition data found in date range {start.date()} to {end.date()}\n"
"Check data availability:\n"
" python manage.py shell -c 'from rt_pipeline.models import VehiclePosition; "
"from django.db.models import Min, Max; "
"print(VehiclePosition.objects.aggregate(min=Min(\"ts\"), max=Max(\"ts\")))'"
)
)
return
else:
self.stdout.write(
self.style.SUCCESS(f"Found {vp_count:,} VehiclePosition records in date range")
)

# Build dataset
try:
self.stdout.write(self.style.NOTICE("Building dataset..."))
df = build_training_dataset(
provider_id=None,
self.stdout.write(self.style.NOTICE("\nBuilding dataset..."))
df = build_vp_training_dataset(
route_ids=route_ids,
start_date=start,
end_date=end,
min_observations_per_stop=min_obs,
distance_threshold=distance_threshold,
max_stops_ahead=max_stops_ahead,
# min_observations_per_stop=min_obs,
# vp_sample_interval_seconds=vp_sample_interval,
attach_weather=attach_weather,
)
except Exception as e:
Expand All @@ -141,12 +218,22 @@ def handle(self, *args, **opts):
self.stdout.write(
self.style.WARNING(
"Resulting dataset is empty. Possible issues:\n"
" 1. No TripUpdate data in the date range\n"
" 2. No matching stop_sequences between StopTime and TripUpdate\n"
" 3. All data filtered out by min_observations threshold\n"
" 1. No VehiclePosition data in the date range\n"
" 2. VPs not matching any trips with stop sequences\n"
" 3. Vehicles never came close enough to stops (try increasing --distance-threshold)\n"
" 4. All data filtered out by --min-observations threshold\n"
" 5. No future VPs available to detect arrivals (incomplete trips)\n"
"\nDebug queries:\n"
" - Check TripUpdate count: python manage.py shell -c 'from rt_pipeline.models import TripUpdate; print(TripUpdate.objects.count())'\n"
" - Check date range: python manage.py shell -c 'from rt_pipeline.models import TripUpdate; print(TripUpdate.objects.aggregate(min=Min(\"ts\"), max=Max(\"ts\")))'"
" - Check VP count: python manage.py shell -c 'from rt_pipeline.models import VehiclePosition; print(VehiclePosition.objects.count())'\n"
" - Check date range: python manage.py shell -c 'from rt_pipeline.models import VehiclePosition; from django.db.models import Min, Max; print(VehiclePosition.objects.aggregate(min=Min(\"ts\"), max=Max(\"ts\")))'\n"
" - Check StopTime data: python manage.py shell -c 'from sch_pipeline.models import StopTime, Stop; print(f\"StopTimes: {StopTime.objects.count()}, Stops with coords: {Stop.objects.exclude(stop_lat__isnull=True).count()}\")'\n"
"\nTry adjusting parameters:\n"
" - Increase --distance-threshold (current: {})m\n"
" - Reduce --min-observations (current: {})\n"
" - Increase --max-stops-ahead (current: {})\n"
" - Set --vp-sample-interval to 0 to use all VPs".format(
distance_threshold, min_obs, max_stops_ahead
)
)
)
return
Expand All @@ -156,17 +243,33 @@ def handle(self, *args, **opts):
self.stdout.write(self.style.SUCCESS("Dataset Summary:"))
self.stdout.write(f" Total rows: {len(df):,}")
self.stdout.write(f" Unique trips: {df['trip_id'].nunique():,}")
self.stdout.write(f" Unique routes: {df['route_id'].nunique()}")
self.stdout.write(f" Unique vehicles: {df['vehicle_id'].nunique():,}")
self.stdout.write(f" Unique stops: {df['stop_id'].nunique():,}")
self.stdout.write(f" Routes: {df['route_id'].nunique()}")

if "delay_seconds" in df.columns:
delay_stats = df["delay_seconds"].describe()
self.stdout.write(f"\n Delay statistics (seconds):")
self.stdout.write(f" Mean: {delay_stats['mean']:.1f}")
self.stdout.write(f" Median: {delay_stats['50%']:.1f}")
self.stdout.write(f" Std: {delay_stats['std']:.1f}")
self.stdout.write(f" Min: {delay_stats['min']:.1f}")
self.stdout.write(f" Max: {delay_stats['max']:.1f}")
if "time_to_arrival_seconds" in df.columns:
tta_stats = df["time_to_arrival_seconds"].describe()
self.stdout.write(f"\n Time-to-arrival statistics:")
self.stdout.write(f" Mean: {tta_stats['mean']:.1f}s ({tta_stats['mean']/60:.1f} min)")
self.stdout.write(f" Median: {tta_stats['50%']:.1f}s ({tta_stats['50%']/60:.1f} min)")
self.stdout.write(f" Std: {tta_stats['std']:.1f}s")
self.stdout.write(f" Min: {tta_stats['min']:.1f}s")
self.stdout.write(f" Max: {tta_stats['max']:.1f}s ({tta_stats['max']/60:.1f} min)")

if "distance_to_stop" in df.columns:
dist_stats = df["distance_to_stop"].describe()
self.stdout.write(f"\n Distance-to-stop statistics:")
self.stdout.write(f" Mean: {dist_stats['mean']:.1f}m")
self.stdout.write(f" Median: {dist_stats['50%']:.1f}m")
self.stdout.write(f" Min: {dist_stats['min']:.1f}m")
self.stdout.write(f" Max: {dist_stats['max']:.1f}m")

if "current_speed_kmh" in df.columns:
speed_stats = df[df["current_speed_kmh"] > 0]["current_speed_kmh"].describe()
if not speed_stats.empty:
self.stdout.write(f"\n Speed statistics (km/h):")
self.stdout.write(f" Mean: {speed_stats['mean']:.1f}")
self.stdout.write(f" Median: {speed_stats['50%']:.1f}")

missing = df.isnull().sum()
if missing.any():
Expand All @@ -183,6 +286,19 @@ def handle(self, *args, **opts):
self.stdout.write(
self.style.SUCCESS(f"✓ Successfully saved to {out}")
)

# Provide guidance on next steps
self.stdout.write("\n" + self.style.NOTICE("Next steps:"))
self.stdout.write(" 1. Inspect the dataset: ")
self.stdout.write(f" import pandas as pd; df = pd.read_parquet('{out}'); df.head()")
self.stdout.write(" 2. Check feature distributions and correlations")
self.stdout.write(" 3. Train a model predicting 'time_to_arrival_seconds' from:")
self.stdout.write(" - distance_to_stop")
self.stdout.write(" - current_speed_kmh")
self.stdout.write(" - temporal features (hour, is_peak_hour, etc.)")
self.stdout.write(" - operational features (headway_seconds)")
self.stdout.write(" - weather features (if enabled)")

except Exception as e:
self.stdout.write(
self.style.ERROR(f"Failed to save dataset: {e}")
Expand Down
73 changes: 0 additions & 73 deletions eta_prediction/gtfs_rt_bindings_SA.py

This file was deleted.

18 changes: 0 additions & 18 deletions eta_prediction/gtfs_rt_bindings_TU.py

This file was deleted.

Loading