<center>
<a href="https://github.com/kamu-data/kamu-cli">
<img alt="kamu" src="https://raw.githubusercontent.com/kamu-data/kamu-cli/master/docs/readme_files/kamu_logo.png" width=270/>
</a>
</center>

<br/>

<div align="center">
<a href="https://github.com/kamu-data/kamu-cli">Repo</a> | 
<a href="https://docs.kamu.dev/cli/">Docs</a> | 
<a href="https://docs.kamu.dev/cli/learn/learning-materials/">Tutorials</a> | 
<a href="https://docs.kamu.dev/cli/learn/examples/">Examples</a> |
<a href="https://docs.kamu.dev/cli/get-started/faq/">FAQ</a> |
<a href="https://discord.gg/nU6TXRQNXC">Discord</a> |
<a href="https://kamu.dev">Website</a>
</div>

<center>
<br/>
    
# 2. Watermarks

</center>

<div class="alert alert-block alert-info">
If you skipped the previous chapter or continuing after a break - use the following commands to get your environment ready for this chapter:
    
<p style="background:black">
<code style="background:black;color:white">cd "02 - Web3 Data (Ethereum trading example)"
./init-chapter-2.sh
</code>
</p>
</div>

# Up-to-date data is harder than you think

With `kamu` its very easy to keep all your data up-to-date.

<div class="alert alert-block alert-success">

To refresh your datasets simply run:

<p style="background:black">
<code style="background:black;color:white">&dollar; kamu pull --all
</code>
</p>
</div>

This command will traverse all datasets in the pipeline in the depth-first order and:
- pull data from remote repositories
- ingest data into root datasets from external sources
- advance computations in derivative datasets

And just to confirm:

In [None]:
%%local
import pandas as pd
import hvplot.pandas
pd.set_option('max_colwidth', None)

In [None]:
%load_ext kamu
%import_dataset net.rocketpool.reth.mint-burn
%import_dataset account.tokens.portfolio.market-value
%import_dataset account.tokens.portfolio

In [None]:
%%sql
select 
    event_time, eth_amount, amount
from `net.rocketpool.reth.mint-burn`
order by 1 desc
limit 1

Above you should see an `event_time` value close to the current time in UTC.

# A Bug?

But what about the market value dataset?

In [None]:
%%sql
select
    event_time, token_symbol, token_balance, token_market_value_eth, token_market_value_usd
from `account.tokens.portfolio.market-value` 
order by event_time desc
limit 1

Hm, for some reason the `event_time` is **way behind** (about a month as of this writing).

Remember that to get the market value of `rETH` we JOIN'ed our `portfolio` dataset onto every value in the `reth.mint-burn` dataset. Since `reth.mint-burn`, as we just saw, contains recent values - one would expect to get an up-to-date market value too.

_"So much for near real-time"_ you say...

# Time relativity in data
Now abandon your database mindset and think about this from decentralized data perspective.

The `portfolio` and `reth.mint-burn` datasets **are owned by different people and and operated using independent infrastructures**.

You have just ingested the `portfolio` data from Etherscan, but <mark>is it "latest data"</mark>? 

What if Ethereum blockchain was not producing any blocks for some time? What if blocks are being produced, but Etherscan's node we're pulling data from was failing to synchronize them? Similarly, we told you that `reth.mint-burn` is ingested by a recurrent job, but what if that job didn't run for a while due to some technical issue?

> The concept of time and "now" suddenly becomes very blurry... 

What we can tell is that data in the datasets appears with some **latency**, and this latency can vary greatly under some circumstances.

<div class="alert alert-block alert-info">

A good mental model is to imagine that both of these datasets have their **own wall clock**, and the time these clock tell might always be different and advance with very **different speeds**.<br/>    
For example a stock ticker dataset might advance its clock every second, but some population census dataset might only advance its clock once every four years.<br/><br/>
This is as close as data science gets to the theory of relativity :)
</div>

# Comparing results to batch processing
Now that we agree that `portfolio` and `eth-usd` datasets advance at different paces, meaning that one of them can be slightly or even much further ahead of another - what happens if we JOIN them together using a typical relational batch JOIN.

Let's write down the batch SQL version of the market value computation:

In [None]:
%%sql -o mv_batch -q

with 

--## We only care about rETH tokens
--## so let's filter out all other types
reth_portfolio as (
    select * 
    from `account.tokens.portfolio`
    where token_symbol = "rETH"
),

--## Join every exchange rate data point
--## with every portfolio transactions that precedes it in time
joined_with_preceding_dates as (
    select
        reth.event_time as event_time,
        pf.block_number as last_tx_block_number,
        pf.block_time as last_tx_block_time,
        pf.transaction_hash,
        pf.token_symbol,
        pf.token_balance,
        reth.eth_amount / reth.amount * pf.token_balance as token_market_value_eth
    from `net.rocketpool.reth.mint-burn` as reth
    join reth_portfolio as pf
    on reth.event_time >= pf.block_time
),

--## For every exchange rate data point
--## rank transactions in reverse order by time
with_dates_ranked as (
    select
        *,
        row_number() over (partition by event_time order by last_tx_block_time desc) as rank
    from joined_with_preceding_dates
)

--## Finally, select only the closest preceding transactions
--## using computed rank computed in the previous step
select
    *
from with_dates_ranked
where rank = 1
order by event_time desc

There are multiple ways to perform a JOIN based on closest preceding timestamp, but all of them will give you the same result.

Let's compare this "batch" result to the "streaming" result we get from `kamu`:

In [None]:
%%sql -o mv_streaming -q
select * from `account.tokens.portfolio.market-value`

In [None]:
%%local
mv_batch.hvplot.line(
    x="event_time", 
    y="token_market_value_eth",
    line_dash="dashed",
    xlabel="Time",
    ylabel="ETH",
    title="Temporal JOIN: Batch vs Streaming",
    height=500,
    width=800,
) * mv_streaming.hvplot.line(
    x="event_time", 
    y="token_market_value_eth",
) * mv_batch.hvplot.scatter(
    x="last_tx_block_time", 
    y="token_market_value_eth",
    color="red",
    alpha=0.01,
) 

Aha! We can clearly see that the batch JOIN (the dashed blue line) **returned us more data**!

<mark>So is batch more real-time than stream processing?</mark>

# Data (in)completeness
In our case the `portfolio` dataset's clock is lagging significantly behind the exchange rate dataset. This means that the interval of data after `max(portfolio.event_time)` for it is still undefined - it might still get any number of new transactions in this interval. And if it does - the results of `market-value` in that interval <mark>will be completely different than what "batch" results show us now!</mark>

See, batch SQL doesn't know anything about time and doesn't care about these funky dataset "wall clocks". The time columns for it is not more significant than any other, so batch query ignores them.

> **Batch processing is non-temporal**. It works under the assumption that every input (table) is complete. In reality most data is constantly delayed, reordered, and backfilled and rarely reaches the state of "completeness".

This makes batch a poor choice for dynamic data - it will constantly produce incorrect results on incomplete data.

# Watermarks
Stream processing engines used by `kamu` are (bi)temporal. They are aware of these "dataset clocks" and keep track of them. 

They are called **Watermarks**.

A watermark `(Ts, Te)` tells us
- that at a certain system time `Ts`
- with a high probability
- the system have observed all events prior to event time `Te`

<div class="alert alert-block alert-info">

For a brief introduction to stream processing and watermarks see [this video](https://www.youtube.com/watch?v=XxKnTusccUM&t=577s)

</div>


You can see the current watermarks of every dataset in `kamu`.

<div class="alert alert-block alert-success">

Try running:

<p style="background:black">
<code style="background:black;color:white">&dollar; kamu list --wide
</code>
</p>
</div>

Example output:
```
┌───────────────────────────────────────┬──────────────┬──────────────┬─────────────┐
│                 Name                  │     Kind     │    Pulled    │  Watermark  │
├───────────────────────────────────────┼──────────────┼──────────────┼─────────────┤
│ account.tokens.portfolio              │  Derivative  │ 2 hours ago  │ a month ago │
│ account.tokens.portfolio.market-value │  Derivative  │ a minute ago │ a month ago │
│ account.tokens.portfolio.usd          │  Derivative  │ a minute ago │ a month ago │
│ account.tokens.transfers              │     Root     │ 2 hours ago  │ a month ago │
│ account.transactions                  │     Root     │ 2 hours ago  │ a month ago │
│ com.cryptocompare.ohlcv.eth-usd       │ Remote(Root) │ an hour ago  │ an hour ago │
│ net.rocketpool.reth.mint-burn         │ Remote(Root) │ 3 hours ago  │ 3 hours ago │
└───────────────────────────────────────┴──────────────┴──────────────┴─────────────┘
```

<div class="alert alert-block alert-info">

You can also see `kamu` adjusting the watermarks after each `ExecuteQuery` step:

<p style="background:black">
<code style="background:black;color:white">&dollar; kamu log account.tokens.portfolio
</code>
</p>
</div>

Despite being pulled just recently, the `account.tokens.transfers` and `account.transactions` have watermarks which are a month old. And since their data is still undefined beyond that period - same goes for all derivative datasets that depend on them (`account.tokens.portfolio.*`).

# How watermarks are set
Currently `kamu` automatically sets watermarks for root datasets using highest observer event time - so for `portfolio` dataset the watermark matches the `event_time` of the last transaction. This is a safe default choice that works for most datasets.

Watermarks can also be assigned using fixed offset from the latest observed event time (to tolerate some degree of out-of-order arrivals), or predictively - being constantly adjusted by the system based on the observed event delays.

There is a lot of room for experimentation, but the key idea of this mechanism is to find good **balance between correctness and latency**:
- Advancing watermark too eagerly may result in processing taking place before all data have arrived and produce **incorrect results**
- Setting watermark too conservatively to let all data "settle" will introduce a lot of **latency**

As the owners of our crypto wallet - we are in the best position to know when all transactions have made it into the dataset, so we can manually advance the watermark with `kamu`.

<div class="alert alert-block alert-success">

Advance the watermark of both datasets manually (adjust command to current date):

<p style="background:black">
<code style="background:black;color:white">kamu pull --set-watermark 2023-09-01T00:00:00Z account.tokens.transfers
kamu pull --set-watermark 2023-09-01T00:00:00Z account.transactions
</code>
</p>
</div>

<div class="alert alert-block alert-success">

Propagate the new watermarks through the pipeline:

<p style="background:black">
<code style="background:black;color:white">kamu pull --all
</code>
</p>
</div>

Let's have a look now:

In [None]:
%import_dataset account.tokens.portfolio.market-value

In [None]:
%%sql -o mv_streaming -q
select * from `account.tokens.portfolio.market-value`
order by event_time desc

In [None]:
%%local
mv_batch.hvplot.line(
    x="event_time", 
    y="token_market_value_eth",
    line_dash="dashed",
    xlabel="Time",
    ylabel="ETH",
    title="Temporal JOIN: Batch vs Streaming",
    height=500,
    width=800,
) * mv_streaming.hvplot.line(
    x="event_time", 
    y="token_market_value_eth",
) * mv_batch.hvplot.scatter(
    x="last_tx_block_time", 
    y="token_market_value_eth",
    color="red",
    alpha=0.01,
) 

Here we go, the `market-value` dataset is now fully up-to-date and **correct**!

Most importantly, our computations did not make any bold assumptions about completeness of data - we achieved it safely by "hinting" to the system that our two root datasets are complete, and `kamu` **propagated the watermarks through the entire pipeline**.

<div class="alert alert-block alert-info">
    
There is an obvious **room for improvement** here:
    
The data we get from Etherscan API comes from the Ethereum blockchain, which is strongly ordered. If we could tell what the latest block number processed by Etherscan's backend was - we would be able to set the watermark automatically.
    
This can be achieved using [container-based fetch](XXX) and is left as an exercise for the reader.
    
</div>

# Summary
Every dataset in `kamu` has a **watermark** - they serve to prevent processing from producing incorrect results by holding off until data is complete.

Ability to **balance latency and correctness** is the <mark>holy grail of data processing</mark>:
- It makes data more **composable** - allows us to build complex pipelines without having a compounding effect of correctness issues
- It makes processing more **autonomous** - after the query is written it requires minimal maintenance, unlike batch pipelines that require lots of baby sitting