The idea here is that nearby measurements are probably from the same train,
and if we use unambiguous measurements (where it's clear they belong to one train) to start a trip,
then we can estimate the speed and continue extending the trip in that direction to build a full trip.
This is only useful for cases where we have RTM data but not MTPS data.

This notebook only implements the first few steps, as after getting this far we received the MTPS data and worked on that instead. 

In [14]:
import numpy as np
import polars as pl

In [15]:
df = pl.scan_parquet("../docs/rtm_sample_cleaned.pq").collect()

In [16]:
df = (
    pl.scan_parquet("../docs/rtm_sample_cleaned.pq")
    .with_columns(
        pl.col("time")
        + pl.duration(nanoseconds=pl.lit(np.random.rand(df.height) * 1e9))
    )
    .sort(pl.col("time"))
    .with_row_index("id")
    .collect()
)

In [19]:
calculate_distance = (
    (pl.col("lat").first().sub(pl.col("lat").slice(1)).mul(111 * 1000).pow(2)).add(
        (pl.col("lon").first().sub(pl.col("lon").slice(1)).mul(68 * 1000)).pow(2)
    )
).sqrt()

In [None]:
df_link_1_naive: pl.DataFrame = (
    df.lazy()
    .head(5_000)
    .rolling(
        check_sorted=False,
        index_column="time",
        period="20s",
        offset="0s",
        closed="left",
    )
    .agg(
        pl.col("id").first(),
        pl.col("id").slice(1).filter(calculate_distance < 1000).alias("next_ids"),
    )
    .collect(streaming=True)
)

In [21]:
vals = df_link_1_naive["next_ids"]
df_link_2_naive: pl.DataFrame = (
    df_link_1_naive.lazy()
    .select(
        [
            pl.col("id"),
            pl.col("next_ids")
            .map_elements(vals.gather, return_dtype=pl.List(pl.List(pl.UInt32)))
            .list.eval(pl.element().flatten()),
        ]
    )
    .collect(streaming=True)
)
df_link_2_naive

id,next_ids
u32,list[u32]
0,"[6, 11, … 47]"
1,"[8, 13, … 49]"
2,"[10, 15, … 51]"
3,"[11, 14, … 50]"
4,"[12, 16, … 52]"
…,…
4995,[]
4996,[]
4997,[]
4998,[]


In [None]:
df_link_1_true = (
    df.lazy()
    .head(5000)
    .with_columns(df_link_1_naive["next_ids"], link_2=df_link_2_naive["next_ids"])
    .with_columns(link_1=pl.col("next_ids").list.set_difference("link_2"))
    .drop(["next_ids", "link_2"])
    .with_columns(
        maybe_next=pl.when(pl.col("link_1").list.len().eq(1)).then(
            pl.col("link_1").list.first()
        )
    )
    .join(df.lazy(), left_on="maybe_next", right_on="id", suffix="_next")
    .collect()
)