# 00 — End-to-End AIS Demo (CSV → Stats → Events → Map → Streaming)

This notebook shows a **complete workflow** using the `aistk` toolkit:

1. **Configure paths** to your AIS CSV file(s)
2. **Load & filter** with `AISDataset`
3. **Compute statistics** (eager and streaming-friendly)
4. **Detect events**
5. **Generate a quick map preview**
6. **Simulate online streaming from CSV** and emit events

> Works with a single CSV or a folder pattern like `data/ais/*.csv`.


## 1) Prerequisites

Make sure you have the package and optional extras installed.

```bash
pip install aistk polars folium shapely dask[complete] pyspark
```


In [None]:
# 2) Configure data paths and parameters
CSV_ROOT = 'data/ais'           # directory with CSV(s) OR path to a single CSV
CSV_PATTERN = '*.csv'           # e.g. '*.csv' or '2024.csv'
DATE_FROM = '2024-01-01'
DATE_TO   = '2024-02-01'
MMSI_LIST = [244660000]         # set to [] or None to include all

OUT_PARQUET = 'out/demo_data.parquet'
OUT_STATS   = 'out/demo_stats.parquet'
OUT_EVENTS  = 'out/demo_events.parquet'
OUT_MAP     = 'out/demo_map.html'

## 3) Load & Filter (AISDataset)

We use the high-level dataset wrapper which builds a **Polars LazyFrame** under the hood and materializes on `collect()`.

In [None]:
from aistk.core import AISDataset
import polars as pl

# Build pipeline
ds = AISDataset(CSV_ROOT, pattern=CSV_PATTERN)
if MMSI_LIST:
    ds = ds.filter(mmsi=MMSI_LIST)

ds = ds.between(DATE_FROM, DATE_TO)

df = ds.collect()
df.head()

In [None]:
print('Rows:', df.height); print('Columns:', df.columns)

## 4) Save filtered subset (optional)

This writes the collected frame to a single Parquet file for faster re-use.

In [None]:
ds.write_parquet(OUT_PARQUET)
OUT_PARQUET

## 5) Stats (eager Polars DataFrame)

Compute per-MMSI metrics using the eager path (`compute_stats_df`).

In [None]:
from aistk.stats import compute_stats_df
stats_df = compute_stats_df(df, level='mmsi')
stats_df.sort('distance_km', descending=True).head()

In [None]:
# Persist stats
stats_df.write_parquet(OUT_STATS)
OUT_STATS

## 6) Stats (streaming-friendly with Polars Lazy)

Same metrics computed as **expressions** on a `LazyFrame`. Good for very large datasets.


In [None]:
from aistk.stats_streaming import compute_stats_lazy
lf = AISDataset(CSV_ROOT, pattern=CSV_PATTERN)._build()
if MMSI_LIST:
    lf = lf.filter(pl.col('MMSI').is_in(MMSI_LIST))
lf = lf.filter((pl.col('ts') >= pl.lit(DATE_FROM)) & (pl.col('ts') < pl.lit(DATE_TO)))
res_lazy = compute_stats_lazy(lf, level='mmsi').collect(streaming=True)
res_lazy.sort('distance_km', descending=True).head()

## 7) Event Detection (batch)

Detect `sharp_turn`, `stop`, `gap`, and `draft_change` events on the collected DataFrame.

In [None]:
from aistk.events import detect_events_df
ev = detect_events_df(df, turn_deg=30.0, stop_sog=0.5, stop_min=15, draft_jump_m=0.3)
ev.head()

In [None]:
# Persist events
ev.write_parquet(OUT_EVENTS)
OUT_EVENTS

## 8) Quick Map Preview

Render a Folium map for a selected MMSI (or all points if not provided).

In [None]:
html_path = ds.plot_map(OUT_MAP, mmsi=MMSI_LIST[0] if MMSI_LIST else None)
html_path

## 9) Streaming Simulation from CSV

Read the CSV in chunks, feed rows to the online detector, and print JSON events as they occur.


In [None]:
import json
from aistk.streaming.events_online import process_stream

# Re-scan the CSV lazily for chunked reading
lf_stream = pl.scan_csv(f"{CSV_ROOT}/{CSV_PATTERN}", has_header=True, infer_schema_length=0, ignore_errors=True, try_parse_dates=True)
if MMSI_LIST and 'MMSI' in lf_stream.columns:
    lf_stream = lf_stream.filter(pl.col('MMSI').is_in(MMSI_LIST))

chunk_size = 20_000
offset = 0

while True:
    chunk = lf_stream.slice(offset, chunk_size).collect(streaming=True)
    if chunk.height == 0:
        break
    cols = [c for c in ['MMSI','ts','LAT','LON','COG','SOG','Draft'] if c in chunk.columns]
    recs = (dict(zip(cols, row)) for row in chunk.select(cols).iter_rows())
    for event in process_stream(recs, stop_min=10):
        print(json.dumps(event))
    offset += chunk_size

---

## (Optional) CLI inside the notebook
If you have the console entry `aistk` installed, you can run commands directly from notebook cells:

```bash
!aistk scan data/ais --from 2024-01-01 --to 2024-02-01 --to-parquet out/data.parquet
!aistk stats data/ais --engine polars-stream --out out/stats.parquet
!aistk events data/ais --out out/events.csv
!aistk stream-csv data/ais/2024.csv --chunk-size 5000
```
