In [None]:
from __future__ import annotations

# Data processing status example

The purpose of this example is to demonstrate how one would set up a data flow where you are incrementally
processing partitions within a dataset. The general concept is that you have two tables that you will use,
one for status and one for results. The purpose of the status table is to have a small table that is easy
to query for partitions that have not yet been processed.

In this example, we first create these two tables. Then we collect the available partitions and compare them
to the status table. To demonstrate how you could batch process a portion of your available data, we simply
take a subset of the returned values that are not yet processed. In customer work flows, you will likely
want to pass all of the available partitions to work or you might prefer to send off a single partition at
a time. The details of how you select which partitions to process are up to the individual workflows.

The code below produces a few lines of status output and then displays both the results and status tables.

## Setup

This example assumes you have started the OSS server using the dataset example located in the test
asset directory. From the rerun repository you can start this using the following command.

```shell
rerun server --dataset ./tests/assets/rrd/dataset
```

The example below creates a temporary directory. It will not persist after this notebook has been executed,
so you will need to restart your server if you want to run the example multiple times. If you would prefer
to persist the created table, you can change the remove the `with tempfile.TemporaryDirectory()` line and
instead set a specific location for your files.

In [None]:
from datafusion import col, functions as F, DataFrame
from datetime import datetime
from pathlib import Path
from rerun.catalog import CatalogClient, DatasetEntry
from typing import List
import pyarrow as pa
import tempfile

CATALOG_URL = "rerun+http://localhost:51234"
DATASET_NAME = "dataset"

STATUS_TABLE_NAME = "status"
RESULTS_TABLE_NAME = "results"

In [None]:
def create_table(
    client: CatalogClient, directory: Path, table_name: str, schema: pa.Schema
) -> DataFrame:
    """Create a lance table at a specified location and return its DataFrame."""
    if table_name in client.table_names():
        return client.get_table(name=table_name)

    url = f"file://{directory}/{table_name}"

    return client.create_table(table_name, schema, url).df()

In [None]:
def create_status_table(client: CatalogClient, directory: Path) -> DataFrame:
    """Create the status table."""
    schema = pa.schema(
        [
            ("rerun_partition_id", pa.utf8()),
            ("is_complete", pa.bool_()),
            ("update_time", pa.timestamp(unit="ms")),
        ]
    )
    return create_table(client, directory, STATUS_TABLE_NAME, schema)

In [None]:
def create_results_table(client: CatalogClient, directory: Path) -> DataFrame:
    """Create the results table."""
    schema = pa.schema(
        [
            ("rerun_partition_id", pa.utf8()),
            ("first_log_time", pa.timestamp(unit="ns")),
            ("last_log_time", pa.timestamp(unit="ns")),
            ("first_position_obj1", pa.list_(pa.float32(), 3)),
            ("first_position_obj2", pa.list_(pa.float32(), 3)),
            ("first_position_obj3", pa.list_(pa.float32(), 3)),
        ]
    )
    return create_table(client, directory, RESULTS_TABLE_NAME, schema)

In [None]:
def find_missing_partitions(
    partition_table: DataFrame, status_table: DataFrame
) -> List[str]:
    """Query the status table for partitions that have not processed."""
    status_table = status_table.filter(col("is_complete"))
    partitions = partition_table.join(
        status_table, on="rerun_partition_id", how="anti"
    ).collect()
    return [str(r) for rss in partitions for rs in rss for r in rs]

In [None]:
def process_partitions(
    client: CatalogClient, dataset: DatasetEntry, partition_list: list[str]
) -> None:
    """
    Example code for processing some partitions within a dataset.

    This example performs a simple aggregation of some of the values stored in the dataset that
    might be useful for further processing or metrics extraction. In this work flow we first write
    to the status table that we have started work but set the `is_complete` column to `False`.
    When the work is complete we write an additional row setting this column to `True`. Alternate
    workflows may only include writing to the table when work is complete. It is sometimes favorable
    to keep track of when jobs start and finish so you can produce additional metrics around
    when the jobs ran and how long they took.
    """
    client.append_to_table(
        STATUS_TABLE_NAME,
        rerun_partition_id=partition_list,
        is_complete=[False] * len(partition_list),
        update_time=[datetime.now()] * len(partition_list),
    )

    df = (
        dataset.dataframe_query_view(index="time_1", contents="/**")
        .filter_partition_id(*partition_list)
        .df()
    )

    df = df.aggregate(
        "rerun_partition_id",
        [
            F.min(col("log_time")).alias("first_log_time"),
            F.max(col("log_time")).alias("last_log_time"),
            F.first_value(
                col("/obj1:Points3D:positions")[0],
                filter=col("/obj1:Points3D:positions").is_not_null(),
                order_by=col("time_1"),
            ).alias("first_position_obj1"),
            F.first_value(
                col("/obj2:Points3D:positions")[0],
                filter=col("/obj2:Points3D:positions").is_not_null(),
                order_by=col("time_1"),
            ).alias("first_position_obj2"),
            F.first_value(
                col("/obj3:Points3D:positions")[0],
                filter=col("/obj3:Points3D:positions").is_not_null(),
                order_by=col("time_1"),
            ).alias("first_position_obj3"),
        ],
    )

    df.write_table(RESULTS_TABLE_NAME)

    client.append_to_table(
        STATUS_TABLE_NAME,
        rerun_partition_id=partition_list,
        is_complete=[True]
        * len(
            partition_list
        ),  # Add the `True` value to prevent this from processing again
        update_time=[datetime.now()] * len(partition_list),
    )

In [None]:
# This code block is the main execution. If you wish to persist the directory you can remove
# the `tempfile.TemporaryDirectory()` line and set `temp_path` to a directory on your machine.

with tempfile.TemporaryDirectory() as temp_dir:
    temp_path = Path(temp_dir)

    client = CatalogClient(CATALOG_URL)
    dataset = client.get_dataset(name=DATASET_NAME)

    status_table = create_status_table(client, temp_path)
    results_table = create_results_table(client, temp_path)

    # TODO(tsaucer) replace with partition table query
    partition_table = (
        dataset.dataframe_query_view(index="time_1", contents="/**")
        .df()
        .select("rerun_partition_id")
        .distinct()
    )

    missing_partitions = None
    while missing_partitions is None or len(missing_partitions) != 0:
        missing_partitions = find_missing_partitions(partition_table, status_table)
        print(
            f"{len(missing_partitions)} of {partition_table.count()} partitions have not processed."
        )

        if len(missing_partitions) > 0:
            process_partitions(client, dataset, missing_partitions[0:3])

    # Show the final results
    display(results_table)

    # Show the final status table
    display(status_table)