# Timeseries Example
This notebook demonstrates a multi-step Dask pipeline using a synthetic time series dataset. 
We use a local Dask client to manage our computations and view a dashboard for monitoring tasks.

In [1]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.datasets import timeseries
from dask.distributed import Client

## Step 1: Creating a Dask Client
We create a local Dask `Client` from `dask.distributed`. This automatically spins up a scheduler and worker processes/threads on your local machine. 

- **What it does**:  
  - Provides a **dashboard** (usually at `http://127.0.0.1:8787/status`) where you can see active tasks, CPU usage, memory usage, etc.  
  - Allows you to scale to multiple workers or even a remote cluster with minimal code changes.

In [2]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 15.35 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42181,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 15.35 GiB

0,1
Comm: tcp://127.0.0.1:36359,Total threads: 3
Dashboard: http://127.0.0.1:42273/status,Memory: 3.84 GiB
Nanny: tcp://127.0.0.1:38657,
Local directory: /tmp/dask-scratch-space/worker-ulbegq_7,Local directory: /tmp/dask-scratch-space/worker-ulbegq_7

0,1
Comm: tcp://127.0.0.1:36871,Total threads: 3
Dashboard: http://127.0.0.1:46035/status,Memory: 3.84 GiB
Nanny: tcp://127.0.0.1:36823,
Local directory: /tmp/dask-scratch-space/worker-38il6n4n,Local directory: /tmp/dask-scratch-space/worker-38il6n4n

0,1
Comm: tcp://127.0.0.1:35821,Total threads: 3
Dashboard: http://127.0.0.1:44383/status,Memory: 3.84 GiB
Nanny: tcp://127.0.0.1:44181,
Local directory: /tmp/dask-scratch-space/worker-5jmn6dju,Local directory: /tmp/dask-scratch-space/worker-5jmn6dju

0,1
Comm: tcp://127.0.0.1:42073,Total threads: 3
Dashboard: http://127.0.0.1:34513/status,Memory: 3.84 GiB
Nanny: tcp://127.0.0.1:42917,
Local directory: /tmp/dask-scratch-space/worker-d3yjm8ie,Local directory: /tmp/dask-scratch-space/worker-d3yjm8ie


## Step 2: Generating Synthetic Time Series Data
We use the built-in `timeseries` function from `dask.datasets`. This function generates a random time-series Dask DataFrame with columns like `x`, `y`, `id`, and `name`.

**Parameters**:  
- `start="2021-01-01"` and `end="2021-01-31"` define the date range.  
- `freq="1min"` means each row is one minute apart.  
- `partition_freq="1H"` means the data is split into partitions by the hour, which increases parallelism.  

In [3]:
ddf = timeseries(
    start="2021-01-01",
    end="2021-01-31",
    freq="1min",
    partition_freq="1H",
    seed=42
)

## Step 3: Adding Derived Columns
We use `DataFrame.assign(...)` to add new columns:

- **`x_squared`**: The square of `x` (i.e., `x**2`).  
- **`x_y_product`**: The product of `x` and `y`.

This step demonstrates basic arithmetic transformations and further expands our computation graph.

In [4]:
ddf = ddf.assign(
    x_squared = ddf.x ** 2,
    x_y_product = ddf.x * ddf.y,
)

## Step 4: Custom Transform with `map_partitions`
We define a function `custom_transform(df)` to create two additional columns:

1. **`x_shifted`**: Shifts the `x` column by one row, filling missing values with the mean of `x`.  
2. **`ratio`**: Computes `(x + 1) / (y + 1)` for each row, a simple example of a custom ratio metric.

We then apply this function to each partition using `ddf.map_partitions(custom_transform)`, which allows partition-level transformations and can handle arbitrary Python logic.

In [5]:
def custom_transform(df):
    """Random custom transform: shift 'x' column and fill with mean, plus ratio."""
    df["x_shifted"] = df["x"].shift(1).fillna(df["x"].mean())
    df["ratio"] = (df["x"] + 1) / (df["y"] + 1)
    return df

ddf = ddf.map_partitions(custom_transform)

## Step 5: Creating a Separate Random DataFrame and Merging
We create a small **Pandas** DataFrame `pdf_extra` containing random integers, a categorical flag, and some random values.  
Then we convert it to a **Dask** DataFrame `ddf_extra` via `dd.from_pandas(...)`.  

**Merging**:  
- We merge `ddf_extra` into the main time series dataframe on the column `id`.  
- The merge expands our task graph as each partition must now align with the new data on `id`.

In [6]:
np.random.seed(0)
pdf_extra = pd.DataFrame({
    "id": np.random.randint(0, 20, size=1000),
    "category_flag": np.random.choice(["A", "B", "C"], size=1000),
    "value_extra": np.random.randn(1000)
})
ddf_extra = dd.from_pandas(pdf_extra, npartitions=5)

ddf_merged = ddf.merge(ddf_extra, on="id", how="left")

## Step 6: Grouping, Filtering, and Aggregating
We demonstrate grouping by multiple columns (`name` and `category_flag`) and performing multiple aggregations at once. Specifically:  
- **mean** of `x`  
- **sum** of `y`  
- **max** of `x_squared`  
- **mean** of `x_y_product`  
- **std** of `value_extra`  
- **mean** of `ratio`

We then filter the aggregated DataFrame to keep only rows where `sum_y > 200.0`. 

In [7]:
grouped = ddf_merged.groupby(["name", "category_flag"])

aggregated = grouped.agg({
    "x": "mean",
    "y": "sum",
    "x_squared": "max",
    "x_y_product": "mean",
    "value_extra": "std",
    "ratio": "mean"
}).rename(columns={
    "x": "mean_x",
    "y": "sum_y",
    "x_squared": "max_x_squared",
    "x_y_product": "mean_x_y_product",
    "value_extra": "std_extra",
    "ratio": "mean_ratio"
})


aggregated_filtered = aggregated[aggregated["sum_y"] > 200.0]

## Step 7: Re-Merging Aggregated Results
To illustrate how data can flow in a complex pipeline, we merge the aggregated statistics back onto the original data. We do a left-join on `["name", "category_flag"]`. 

**New column**:  
- `relative_x = ddf_final.x / ddf_final.mean_x`, which calculates how each row's `x` compares to the mean `x` from the aggregated results.

In [8]:
ddf_final = ddf_merged.merge(aggregated_filtered, on=["name", "category_flag"], how="left")

ddf_final = ddf_final.assign(
    relative_x = ddf_final.x / ddf_final.mean_x
)

## Step 8: (Optional) Visualize the Computation Graph
Dask can render a graph of all the tasks in this pipeline via:

```python
ddf_final.visualize("dask_large_graph.png", rankdir="TB")

In [9]:
# ddf_final.visualize("dask_large_graph.png", rankdir="TB")
# - This will generate a large PNG file showing the DAG.
# - You also can create a svg.
# - TB means Top to Bottom, another option is Left to Right -> LR

## Step 9: Compute
Up until now we built up the Task Graph. Dask is lazy evaluated, meaning that until we call `compute()` nothing is actually computed. 

In [23]:
result = ddf_final.compute()
print("Final DataFrame shape:", result.shape)
print(result.head())

Final DataFrame shape: (43200, 17)
     name   id         x         y  x_squared  x_y_product  x_shifted  \
0  George  930 -0.536696  0.278684   0.288043    -0.149569  -0.661580   
1     Ray  930 -0.063989  0.052323   0.004095    -0.003348  -0.945922   
2  Oliver  930  0.875461 -0.007715   0.766431    -0.006754   0.318963   
3   Jerry  930 -0.640279  0.196177   0.409957    -0.125608  -0.718947   
4  Ingrid  930  0.311617 -0.446125   0.097105    -0.139020   0.638554   

      ratio category_flag  value_extra  mean_x  sum_y  max_x_squared  \
0  0.362328          <NA>          NaN     NaN    NaN            NaN   
1  0.889471          <NA>          NaN     NaN    NaN            NaN   
2  1.890042          <NA>          NaN     NaN    NaN            NaN   
3  0.300726          <NA>          NaN     NaN    NaN            NaN   
4  2.368076          <NA>          NaN     NaN    NaN            NaN   

   mean_x_y_product  std_extra  mean_ratio  relative_x  
0               NaN        NaN      

In [12]:
client.close()