<div align="center">
    <img src = "../assets/dask-logo.svg" alt="Dask logo" width="20%">
</div>

---

# Example: Using Dask, Dask Gateway, and adaptive scaling

In this notebook, we will explore some [Indian stock market data](https://www.kaggle.com/datasets/debashis74017/stock-market-data-nifty-50-stocks-1-min-data?select=ASIANPAINT_minute_data_with_indicators.csv).

In [1]:
# Location of Indian stock market data on Google Storage
data_uri = "gs://nebari-public/nifty_stock_market_data/stock-market-data-india"

import warnings
warnings.filterwarnings("ignore")

## Explore a small dataset with Dask DataFrame

But first let's see how large this dataset is.

In [2]:
# Determine the size of the dataset using gscfs
from gcsfs import GCSFileSystem  

gs = GCSFileSystem()
print(f"{gs.du(data_uri)/1e9} GB")

69614913022

In the following notebook cells you will load the stocks dataset into a Dask DataFrame and view the first few elements.

In [3]:
# Import Dask's Dask DataFrame API
import dask.dataframe as dd

In [4]:
# Read CSV files using a glob-pattern into a Dask DataFrame
ddf = dd.read_csv(data_uri + "/*.csv")

In [5]:
# View the lazy Dask DataFrame
ddf

Unnamed: 0_level_0,date,close,high,low,open,volume,sma5,sma10,sma15,sma20,ema5,ema10,ema15,ema20,upperband,middleband,lowerband,HT_TRENDLINE,KAMA10,KAMA20,KAMA30,SAR,TRIMA5,TRIMA10,TRIMA20,ADX5,ADX10,ADX20,APO,CCI5,CCI10,CCI15,macd510,macd520,macd1020,macd1520,macd1226,MFI,MOM10,MOM15,MOM20,ROC5,ROC10,ROC20,PPO,RSI14,RSI8,slowk,slowd,fastk,fastd,fastksr,fastdsr,ULTOSC,WILLR,ATR,Trange,TYPPRICE,HT_DCPERIOD,BETA
npartitions=955,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1,Unnamed: 59_level_1,Unnamed: 60_level_1
,object,float64,float64,float64,float64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [6]:
# Inspect the first few values of the DataFrame (`head` calls `compute` internally)
ddf.head()

Unnamed: 0,date,close,high,low,open,volume,sma5,sma10,sma15,sma20,...,fastd,fastksr,fastdsr,ULTOSC,WILLR,ATR,Trange,TYPPRICE,HT_DCPERIOD,BETA
0,2015-02-02 10:18:00+05:30,1544.0,1545.0,1543.1,1545.0,220,1544.94,1545.89,1546.756667,1546.995,...,31.307634,0.0,8.760123,43.728004,-86.466165,2.056306,2.1,1544.033333,27.840336,0.138547
1,2015-02-02 10:19:00+05:30,1543.7,1544.0,1543.0,1544.0,29,1544.63,1545.545,1546.446667,1546.865,...,23.381708,0.0,4.380062,47.390635,-89.62963,1.980855,1.0,1543.566667,27.115144,0.094738
2,2015-02-02 10:20:00+05:30,1543.0,1543.7,1542.5,1543.7,28,1544.22,1545.205,1546.013333,1546.58,...,15.620034,0.0,1.184238e-15,43.139523,-92.857143,1.92508,1.2,1543.066667,26.735549,0.336124
3,2015-02-02 10:21:00+05:30,1542.3,1543.0,1542.3,1543.0,125,1543.64,1544.635,1545.533333,1546.395,...,9.497585,0.0,1.184238e-15,44.657886,-100.0,1.837574,0.7,1542.533333,26.601907,0.608042
4,2015-02-02 10:22:00+05:30,1542.1,1542.15,1541.1,1541.1,113,1543.02,1544.235,1545.143333,1546.0725,...,13.377926,0.0,1.184238e-15,54.066603,-86.111111,1.792033,1.2,1541.783333,26.691996,0.253168


**Convert you Dask DataFrame to a pandas DataFrame to load the entire dataset into local memory.**

> ⚠️ Warning! This will crash your kernel because of insufficient local memory! You'll need to restart the kernel and read the dataset in again.

In [None]:
# Convert a Dask DataFrame to pandas DataFrame
# Uncomment the next line to run. This will crash your kernel!

# df = ddf.compute()

As we mentioned earlier, Dask computations look very similar to pandas with an extra `compute()` at the end.

---

## Scale to large dataset with Dask Gateway

You can now scale your computation to all the ~100 files in the dataset using a Dask cluster with Dask Gateway.

### Create a Dask Gateway instance

As the first step, import and instantiate Dask Gateway.

In [7]:
from dask_gateway import Gateway

gateway = Gateway()
gateway

Gateway<http://nebari-dask-gateway-gateway-api.dev:8000>

Open the `Cluster Options` widget where you can view and update cluster configurations like the conda environment, instance type, and any environment variables.

In [8]:
options = gateway.cluster_options()
options

VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…

This is a visual example, but all of this can of course be done programatically:

```python
options.conda_environment = conda_env
options.profile = worker_type
options.environment_vars = {"MYENV": "aNeNvVaR"}
```

> ⚠️ Warning: It's important that the environment used for your notebook (that is, the IPython kernel) must match the Dask worker environment (that is, `options.conda_environment`).

### Create a new Dask cluster and connect to a Client

In [9]:
# Create a new cluster with the above options
cluster = gateway.new_cluster(options)

In [11]:
# View the cluster widget
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

The cluster starts with zero workers, so you need to set number of workers manually or setup **adaptive scaling**. With adaptive, your cluster can automatically resize itself within the minimum and maximum bounds based on the workload. Learn more in Dask's [adaptive deployments documentation](https://docs.dask.org/en/stable/how-to/adaptive.html).

**In the above UI, set up adaptive with 1 minimum node and 10 maximum nodes.**

Image source: [Dask documentation](https://docs.dask.org/en/stable/how-to/adaptive.html)

<img src="../assets/dask-adaptive.svg" alt="Dask adaptive scaling" width="30%">



In [12]:
# Enable adaptive scaling
cluster.adapt(minimum=1, maximum=10)

To use adaptive scaling programmatically:
 
```python
cluster.adapt(minimum=1, maximum=10)
```

In [13]:
# Connect a new client to the Gateway cluster
client = cluster.get_client()

In [14]:
# View the client widget
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: https://demo.nebari.dev/gateway/clusters/dev.5a7968993db24e7eaed2cbabe80a3ce5/status,


The `Dask Client` interface gives us a brief summary of everything we've set up so far. 

### Dask's diagnostic dashboard

Open the Dask dashboard by clicking on the link in the Client UI.

Or (recommended), using the JupyterLab extension in the left sidebar, open:

* Cluster map
* Task stream
* Progress bar
* Worker memory plots (Optional)
* Task groups plot (Optional)

## Computation on the large dataset

### Stock data compute

With the Dask cluster running, we have the resources to do some computation!

Let's compute the highest `high` and lowest `low`. Make sure to look at the dashboard plots!

In [15]:
# Compute highest-high
ddf.high.max().compute()

41834.55

In [None]:
# Compute lowest-low
ddf.high.max().compute()

### Standalone example with Dask Array

The previous example reads data from cloud storage, which can take time. Here is an example with Dask Array that you can execute immediately!

In [None]:
import dask.array as da

In [None]:
x = da.random.random((100000, 100000), chunks=(1000, 1000))
x

In [None]:
y = x * x
z = y.mean(axis=1)

In [None]:
z.compute()

## Shutdown the cluster

**ALWAYS** remember to shutdown your cluster with the following commands.

> ⚠️ Warning: As with JupyterLab servers, Dask workers run on cloud compute instances and cost actual money.

In [16]:
cluster.close()
client.close()

---
## 👏 Next:
* [03_managing_environments](../03_managing_environments.ipynb)
---