In [None]:
import orcapod as op
import shutil

We will also make heavy use of PyArrow:

In [None]:
import pyarrow as pa

### Preparing the environment

In this notebook, we will create a local directory called `pipeline_data` and store results in there. To make sure we get reproducibile results, we start by making sure that this directory does not exist locally.

In [None]:
shutil.rmtree("./pipeline_data", ignore_errors=True)

### Creating streams

At the moment, there is only one way to create stream and that is by wrapping a PyArrow table.

In [None]:
table = pa.Table.from_pydict(
    {
        "a": [1, 2, 3],
        "b": ["x", "y", "z"],
        "c": [True, False, True],
        "d": [1.1, 2.2, 3.3],
    }
)

Use `op.streams.ImmutableTableStream` to turn table into a stream. You will also have to specify which columns are the tags.

In [None]:
stream = op.streams.ImmutableTableStream(table, tag_columns=["a", "b"])

### Working with streams

Once you have a stream, you can iterate through tag, packet pair:

In [None]:
for tag, packet in stream:
    print(f"Tag: {tag}, Packet: {packet}")

You can also get all tag packet pairs as a list of tuples by calling `.flow()`

In [None]:
stream.flow()

Every stream can be converted into a table with `as_table()` method

In [None]:
stream.as_table()

Optionally, you can pass in arguments to `as_table` to have system columns included in the table

`include_source` adds `source` column for each data (non-tag) column patterned like `_source_{column}` and will contain information about where that particular value orginated from.

In [None]:
stream.as_table(include_source=True)

`include_content_hash` will compute `content_hash` for each packet and include it as `_content_hash` column

In [None]:
stream.as_table(include_content_hash=True)

Alternatively, you can pass in a custom column name to use for the content hash column

In [None]:
stream.as_table(include_content_hash="my_hash_values")

Finally, `include_data_context` adds data context column as `_context_key` which captures information about the OrcaPod version, hasher version etc that were used when generting that packet.

In [None]:
stream.as_table(include_data_context=True)

### Tags and Packets

The tags and packets returned by the streams can be thought of as special dictionary.

In [None]:
all_tags_and_packets = stream.flow()

In [None]:
tag, packet = all_tags_and_packets[0]

In [None]:
tag

In [None]:
packet

The element of tag/packet can be accessed just like dictionary:

In [None]:
tag["a"]

In [None]:
tag["b"]

In [None]:
packet["c"]

In [None]:
packet["d"]

They have a few methods that will provide additional insights:

In [None]:
# Returns typespec (dictionary of key to type)
packet.types()

In [None]:
# entry names as strings
packet.keys()

They can also be converted to an Arrow table by calling `as_table`

In [None]:
packet.as_table()

And schema is conveniently available as:

In [None]:
packet.arrow_schema()

You can also get a plain dictionary from tag/packet with `as_dict`

In [None]:
tag.as_dict()

Packet contains some additional data such as `source_info`

In [None]:
packet.source_info()

These additional data can be included when converting to dict or table

In [None]:
packet.as_dict(include_source=True)

In [None]:
packet.as_table(include_source=True)

The hash of tag/packet can be computed with `content_hash()` method. The result will be cached so that it won't be computed again unnecessarily.

In [None]:
tag.content_hash()

## Working with operators

We start getting into orcapod computation when we use operators. At the time of the writing, only `Join` operator is implemented fully but more are to come very shortly.

Let's prepare two streams:

In [None]:
table1 = pa.Table.from_pydict(
    {
        "id": [0, 1, 4],
        "a": [1, 2, 3],
        "b": ["x", "y", "z"],
    }
)

table2 = pa.Table.from_pydict(
    {
        "id": [0, 1, 2],
        "c": [True, False, True],
        "d": [1.1, 2.2, 3.3],
    }
)

stream1 = op.streams.ImmutableTableStream(table1, tag_columns=["id"])
stream2 = op.streams.ImmutableTableStream(table2, tag_columns=["id"])

We now join the two streams by instantiating the Join operator and then passing in the two streams:

In [None]:
join = op.operators.Join()

In [None]:
joined_stream = join(stream1, stream2)

Calling an operator on stream(s) immediately performs checks to make sure that the input streams are comaptible with the operator but otherwise it does NOT trigger any computation. Computation occurs only when you try to **access the output stream's content via iteration, flow, or through conversion to table**.

In [None]:
for tag, packet in joined_stream:
    print(f"Tag: {tag}, Packet: {packet}")

The output of the computation is automatically cached so that as long as you access the same output stream, you won't be triggering unnecessary recomputation!

In [None]:
joined_stream.as_table()

## Working with Function Pods

Now we have explored the basics of streams, tags, packets, and operators (i.e. Join), it's time to explore the meat of `orcapod` -- `FunctionPod`s! Let's start by defining a very simple function pod that takes in two numbers and return the sum.

In [None]:
@op.function_pod(output_keys=["sum"])
def add_numbers(a: int, b: int) -> int:
    """A simple function pod that adds two numbers."""
    return a + b

You'll notice that, aside from the `op.function_pod` decorator, this is nothing but an ordinary Python function with type hints! The type hints are crucial however, as this will be used by `orcapod` system to validate the input streams into your pods and to be able to predict if the output of your pod can be fed into another operator/pod without an issue.

Once you have function pod defined, you can already use it on streams just like operators. Let's prepare a stream that has entries for `a` and `b` and then feed them into the function pod.

In [None]:
input_table = pa.Table.from_pydict(
    {
        "id": [0, 1, 2, 3, 4],
        "a": [1, 2, 3, 4, 5],
        "b": [10, 20, 30, 40, 50],
    }
)

input_stream = op.streams.ImmutableTableStream(input_table, tag_columns=["id"])

In [None]:
# run the stream through the function pod!
output_stream = add_numbers(input_stream)

And that's it! Believe it or not, that is all it takes to set up the computation. The actual computation will be triggered the first time you access the content of the output stream.

In [None]:
output_stream

In [None]:
for t, p in output_stream:
    print(f"Tag: {t}, Packet: {p}")

Simple, right?

## Chaining operators and pods into a pipeline

Now that we have seen how to define and run pods, it's time to put them together into a concrete pipeline. To do so, we will construct a `Pipeline` instance. When doing so, we have to pass in a place to save data to, so we will also prepare a data store.

In [None]:
data_store = op.stores.BatchedDeltaTableArrowStore(base_path="./pipeline_data")

pipeline = op.Pipeline(name="MyPipelin", pipeline_store=data_store)

Once we have the pipeline ready, we can define the pipeline by simply running & chaining operators and pods **inside the pipeline context**. Typically, you'd want to define your function pods before hand:

In [None]:
@op.function_pod(output_keys=["sum"])
def add_numbers(a: int, b: int) -> int:
    """A simple function pod that adds two numbers."""
    return a + b


@op.function_pod(output_keys=["product"])
def multiply_numbers(a: int, b: int) -> int:
    """A simple function pod that multiplies two numbers."""
    return a * b


@op.function_pod(output_keys=["result"])
def combine_results(sum: int, product: int) -> str:
    """A simple function pod that combines results."""
    return f"Sum: {sum}, Product: {product}"

In [None]:
# now defien the pipeline
with pipeline:
    sum_results = add_numbers(input_stream)
    product_results = multiply_numbers(input_stream)
    final_results = combine_results(sum_results, product_results)

You can access individual elements of the pipeline as an attribute. By default, the attribute is named after the operator/pod name.

In [None]:
pipeline.add_numbers

Notice that elements of the pipeline is wrapped in a `Node`, being either `PodNode` or `KernelNode`.

You can fetch results of the pipeline through these nodes. For example, you can access the saved results of the pipeline as Polars dataframe by access the `df` attribute.

In [None]:
pipeline.add_numbers.df

You'll notice that `df` comes back empty because the pipeline is yet to run. Let's now trigger the pipeline to fill the nodes with computation results!

In [None]:
pipeline.run()

This will cause all nodes in the pipeline to run and store the results.

Now let's take a look at the computed results:

In [None]:
pipeline.add_numbers.df

You now have the computations saved at each node!

### Labeling nodes in the pipeline

When constructing the pipeline, each invocation of the operator/pod results in a new node getting added, with the name of the node defaulting to the name of the operator/pod. If you use the same pod multiple times, then the nodes will be given names of form `{pod_name}_0`, `{pod_name}_1`, and so on.

While this is helpful default behavior, you'd likely want to explicitly name each node so you can more easily understand what you are accessing within the pipeline. To achieve this, you can explicitly label each invocation with `label=` argument in the call.

In [None]:
data_store = op.stores.BatchedDeltaTableArrowStore(base_path="./pipeline_data")

pipeline2 = op.Pipeline(name="MyPipelin", pipeline_store=data_store)

In [None]:
# now defien the pipeline
with pipeline2:
    sum_results = add_numbers(input_stream, label="my_summation")
    product_results = multiply_numbers(input_stream, label="my_product")
    final_results = combine_results(
        sum_results, product_results, label="my_final_result"
    )

In [None]:
pipeline2.my_summation.df

In [None]:
pipeline2.my_product.df

In [None]:
pipeline2.my_final_result.df

Notice that despite just freshly creating the pipeline, each node already had results filled in! This is because the results from the previous pipeline execution was smartly fetched back. Critically, this was done only because Orcapod noticed that you had an identical pipeline with the same inputs and same operators/pods so that you can reuse the result as is. Should the structure of pipeline been different, the wront results would not be loaded.