# Advanced topics

## Caching

Vaex can cache task results, such as aggregations, or the internal hashmaps used for `groupby` operations to make recurring calculations much faster, at the cost of calculating cache keys and storing/retrieving the cached values.

Internally, Vaex calculates fingerprints (e.g. hashes of data, or file paths and mtimes) to create cache keys that are similar across processes, so that a restart of a process will most likely result in similar hash keys.

[See configuration of the cache.](conf.html#cache)

Caches can be turned on globally like this:

In [1]:
import vaex
df = vaex.datasets.titanic()
vaex.cache.memory();  # cache on globally

One can verify that the cache is turned on via:

In [2]:
vaex.cache.is_on()

True

The cache can be globally turned off again:

In [3]:
vaex.cache.off()
vaex.cache.is_on()

False

The cache can also be turned on with a context manager, after which it will be turned off again. Here we use a disk cache. Disk cache is shared among processes, and is ideal for processes that restart, or when using Vaex in a web service with multiple workers. Consider the following example:

In [4]:
with vaex.cache.disk(clear=True):
    print(df.age.mean())  # The very first time the mean is computed

29.8811345124283


In [5]:
# outside of the context manager, the cache is still off
vaex.cache.is_on()

False

In [6]:
with vaex.cache.disk():
    print(df.age.mean())  # The second time the result is read from the cache

29.8811345124283


In [7]:
vaex.cache.is_on()

False

## Progress bars

Usually progress bars give an an idea of how long a computation might take. [Rich](https://rich.readthedocs.io/) based progress bars take this idea to the next level. With Rich one gets to see a tree structure of progress bars that give the user an idea of what Vaex does internally, and how long each step takes. Each leaf in this tree is a `Task`, while the nodes are used to group tasks logically. For instance, in the following example the last node named 'mean' uses the mean aggregation, which creates two tasks: sum and count agregations.

In [8]:
df = vaex.datasets.taxi()

with vaex.progress.tree('rich', title="My Vaex computations"):
    result_1 = df.groupby('passenger_count', agg='count')
    result_2 = df.groupby('vendor_id', agg=vaex.agg.sum('tip_amount'))    
    result_3 = df.tip_amount.mean()

Output()

In the last column (between brackets) we also see how many passes over the data Vaex had to do to compute all results. The last two tasks are done together in the 5th pass.

If we want to do all computations in a single pass over the data for performance reason, we can use Vaex' async way, by adding the delayed argument (see [Async programming with Vaex](...) for more details).

In [9]:
df = vaex.datasets.taxi()

with vaex.progress.tree('rich', title="My Vaex computations"):
    result_1 = df.groupby('passenger_count', agg='count', delay=True)
    result_2 = df.groupby('vendor_id', agg=vaex.agg.sum('tip_amount'), delay=True)    
    result_3 = df.tip_amount.mean(delay=True)
    df.execute()
result_1 = result_1.get()
result_2 = result_2.get()
result_3 = result_3.get()

Output()

We see that all computations are done in a single pass over the data, which is slightly faster in this case because we are not IO bound. On slower disks, or slower formats (e.g. parquet) this difference will be larger.

Combining this with the [caching](...) feature, we can clearly see the effect on later calculations, and the efficiency of Vaex:

In [10]:
vaex.cache.disk(clear=True)  # turn on cache, and delete all cache entries

with vaex.progress.tree('rich', title="Warm up cache"):
    result_1 = df.groupby('passenger_count', agg='count', delay=True)
    result_2 = df.groupby('vendor_id', agg=vaex.agg.sum('tip_amount'), delay=True)    
    df.execute()


with vaex.progress.tree('rich', title="My Vaex computations"):
    result_1 = df.groupby('passenger_count', agg='count', delay=True)
    result_2 = df.groupby('vendor_id', agg=vaex.agg.sum('tip_amount'), delay=True)    
    result_3 = df.tip_amount.mean(delay=True)
    df.execute()
vaex.cache.off();

Output()

Output()

[Learn more about caching in Vaex.](...)

## Async programming with Vaex

Using the [Rich based progress bar](...) we can see that if we call two methods on a dataframe, we get two passes over the data (as indicated by the `[1]` and `[2]`). 

In [11]:
df = vaex.datasets.taxi()
with vaex.progress.tree('rich', title="Two passes"):
    print(df.tip_amount.sum())
    print(df.passenger_count.sum())

Output()

119031704.4405651
206151178


### Using delay=True

If we pass `delay=True`, Vaex will not start to execute the tasks it created internally, but will return a [promise](https://en.wikipedia.org/wiki/Futures_and_promises) instead. After calling `df.execute()` all tasks will execute, and the promises will be resolved, meaning that you can use the `.get()` method to get the final value, or use the `.then()` method to represent the result.

In [12]:
with vaex.progress.tree('rich', title="Single pass using delay"):
    tip_sum_promise = df.tip_amount.sum(delay=True)
    passengers_promise = df.passenger_count.sum(delay=True)
    df.execute()
    tip_per_passenger = tip_sum_promise.get() / passengers_promise.get()
    print(f"tip_per_passenger = {tip_per_passenger}")

Output()

tip_per_passenger = 0.5774000691888608


### Using delayed decorator

To make life easier, Vaex implements the [`@vaex.delayed`](https://vaex.io/docs/api.html#vaex.delayed) decorator. Once all arguments are resolved, the decorated function will be executed automatically.

In [13]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    df.execute()
    print(f"tip_per_passenger = {tip_per_passenger_promise.get()}")

Output()

tip_per_passenger = 0.5774000691888608


### Async await

In all of the above cases, we called `df.execute()` which will synchronously execute all tasks using threads. However, if you are using Async IO in Python, this means you are blocking all other async coroutines from running.

To allow other coroutines to continue running (e.g. in a FastAPI context), we can instead await `df.execute_async()`. On top of that, we can also `await` the promise to get the result, instead of calling `.get()` to make your code look more AsyncIO like.


In [14]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed and await"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    await df.execute_async()
    tip_per_passenger = await tip_per_passenger_promise
    print(f"tip_per_passenger = {tip_per_passenger}")

Output()

tip_per_passenger = 0.5774000691888607


<div class="alert alert-info">

**Note:** In the Jupyter notebook, an asyncio event loop is already running. In a script you may need to use `asyncio.run(my_top_level_coroutine())` in order to use `await`.

</div>

### Async auto execute

In the previous example we manually had to call `df.execute_async()`. This enables Vaex to execute all tasks in as little passes over the data as possible.

To make life easier, and your code even more AsyncIO like, we can use the `df.executor.auto_execute()` async context manager that will automatically call `df.execute_async()` for you when a promise is awaited.

In [15]:
with vaex.progress.tree('rich', title="Single pass using auto_execute"):
    async with df.executor.auto_execute():
        @vaex.delayed
        def compute(tip_sum, passengers):
            return tip_sum/passengers

        tip_per_passenger = await compute(df.tip_amount.sum(delay=True),
                                          df.passenger_count.sum(delay=True))
        print(f"tip_per_passenger = {tip_per_passenger}")

Output()

tip_per_passenger = 0.5774000691888607
