# Unit 4 - Cloud Based Model Training 
In this notebook we will cover:
   1. ***Very*** brief intro to Dask
   2. Hosting the Numerai data in S3 
   3. Training a model in the cloud with [Coiled](https://coiled.io/product/) and Dask !

<img src="images/dawg.jpg" />

<img src="https://kubedex.com/wp-content/uploads/2018/09/dask-logo.png"
     width="25%"
     alt="Dask logo\"/>
     
## Types of scaling problems in machine learning

There are two main types of scaling challenges you can run into in your machine learning workflow: scaling the **size of your data** and scaling the **size of your model**. That is:

1. **CPU-bound problems**: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
2. **Memory-bound problems**: Data is larger than RAM, and sampling isn't an option.

 <img src="https://raw.githubusercontent.com/coiled/pydata-global-dask/master/images/grid_search_schedule.gif"
     width="50%"
     alt="Grid search schedule\" />


## When to use Dask?


<img src="images/MemoryError.png" 
     width="55%"
     alt="Dask overview\" />
     
     
     
Before trying to use Dask, there are some questions to determine if Dask might be suitable for you. 
- Does your data fit in memory? 
    - Yes: Use pandas or numpy.  
    - No : Dask might be able to help. 
- Do your computations take for ever?
    - Yes: Dask might be able to help. 
    - No : Awesome.
- Do you have embarrassingly parallelizable code?
    - Yes: Dask might be able to help.
    - No?: If you are not sure here are some [examples](https://examples.dask.org/applications/embarrassingly-parallel.html) 
    - No: I'm sorry, although Dask might have some hope for you.


**Bottom Left:** You don't need Dask.    
**Elsewhere:** Dask fair game.


<img src="https://raw.githubusercontent.com/dask/dask-ml/main/docs/source/images/dimensions_of_scale.svg"
     width="50%"
     alt="Dask zones">
     

<img src="https://raw.githubusercontent.com/dask/dask/main/docs/source/images/dask-overview.svg" 
     width="75%"
     alt="Dask overview\" />

##  Multi-machine parallelism in the cloud with Coiled

<br>
<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/Coiled-Logo_Horizontal_RGB_Black.png"
     alt="Coiled logo" 
     width=15%/>
<br>

Coiled, [among other things](https://coiled.io/product/), provides hosted and scalable Dask clusters.

<img src="images/dask-gcp-bad-guy.png"
     alt="Coiled logo" 
     width=55%/>


## Alternatives

There are **a lot** of alternatives to the setup used in this video, but ultimately I chose Dask/Coiled as it was the first I could get working

Some of the alternative technologies/providers:

<img src="images/options.png"
     width="75%"
     alt="Grid search schedule\" />





In [None]:
import coiled


In [None]:
import dask
from dask.distributed import Client


In [None]:
# Spin up a Coiled cluster
cluster = coiled.Cluster(software="peterling7710/pling_numerai", 
                         backend_options={"spot": True}
                         #worker_memory="16 GiB",
                        )

cluster.adapt(minimum=2, maximum=5)


In [None]:
# Instantiate a Client and print dashboard link
client = Client(cluster)
client


In [None]:
#Check if module versions are identical
client.get_versions(check=True)


In [None]:
# Access AWS environment variables
from dotenv import load_dotenv
import os

load_dotenv()

key  = 'ACCESS_KEY'
ACCESS_KEY = os.getenv(key)

key  = 'SECRET_KEY'
SECRET_KEY = os.getenv(key)


In [None]:
import s3fs

# Load S3 Bucket with AWS Credentials
fs = s3fs.S3FileSystem(key=ACCESS_KEY, secret=SECRET_KEY)
fs.ls('s3://numerai-data')


In [None]:
import dask.dataframe as dd

df = dd.read_csv(
    "s3://numerai-data/numerai_training_data_int8.csv",
    storage_options = {'key': ACCESS_KEY, 'secret': SECRET_KEY})
   

In [None]:
df.head()


In [None]:
features = [c for c in df if c.startswith("feature")]
features_erano = [c for c in df if c.startswith("feature")] + ["erano"]

targets = [c for c in df if c.startswith("target")]

df["erano"] = df.era.astype(int)
eras = df.erano


In [None]:
import numpy as np
filt = np.arange(1, 304, 4)


In [None]:
#Sample the data
tdf = df.loc[df.erano.isin(filt)]


In [None]:
#Separate the feature data from target data
X, y = tdf[features_erano], tdf["target"]

y = y.to_frame()

eras = X.erano


In [None]:
#Convert to Dask Array and persist data to workers
X_arr, y_arr = dask.persist(X.to_dask_array(lengths=True), y.to_dask_array(lengths=True))


In [None]:
from sklearn.model_selection._split import _BaseKFold, indexable, _num_samples
from sklearn import model_selection, metrics 
from scipy.stats import spearmanr 

class TimeSeriesSplitGroups(_BaseKFold):
    def __init__(self, n_splits=5):
        super().__init__(n_splits, shuffle=False, random_state=None)

    def split(self, X, y=None, groups=None):
        X, y, groups = indexable(X, y, groups)
        n_samples = _num_samples(X)
        n_splits = self.n_splits
        n_folds = n_splits + 1
        group_list = np.unique(groups)
        n_groups = len(group_list)
        if n_folds > n_groups:
            raise ValueError(
                ("Cannot have number of folds ={0} greater"
                 " than the number of samples: {1}.").format(n_folds,
                                                             n_groups))
        indices = np.arange(n_samples)
        test_size = (n_groups // n_folds)
        test_starts = range(test_size + n_groups % n_folds,
                            n_groups, test_size)
        #test_starts = list(test_starts)[::-1]
        for test_start in test_starts:
            
            yield (indices[groups.isin(group_list[:test_start])],
                   indices[groups.isin(group_list[test_start:test_start + test_size])])

def spearman(y_true, y_pred): 
    return spearmanr(y_pred, y_true).correlation 


# The Meta(verse?)

- Gradient Boosting Decision Trees (GBDT) are a great starting point, and overall very well rounded algorithm
- Several popular implementations of GBDT (Light GBM vs XGBoost vs. CatBoost)
- We will be using LGBM for this series as it is very memory efficient
    
<img src="images/EZ_LGBM.jpg" width=500/>

<img src="images/SkleanJoblibDaskflow.png"
     width="50%"
     alt="Dask logo\"/>

In [None]:
import lightgbm as lgb
import joblib

# Create Joblib context with Dask backend to evaluate our models performance - quickly!

with joblib.parallel_backend('dask'):
    
    fold_scores = []
    
    cvGen=TimeSeriesSplitGroups(n_splits=5) # purged cv
    
    for i,(train,test) in enumerate(cvGen.split(X=X_arr, y=y_arr, groups=eras)):
        lgbm_model = lgb.DaskLGBMRegressor()
        lgbm_model.fit(X_arr[train], y_arr[train])
        
        preds = lgbm_model.predict(X_arr[test])
        score = spearman(y_arr[test], preds)
        fold_scores.append(score)
        
        
    print(fold_scores)

    print(np.mean(fold_scores))
    

# Thank You and Good Luck!
- Like & Subscribe for more!
- [Github](https://github.com/peterling7710/NumeraiStarterPack) with the notebooks for this series
- Find my socials [here](https://linktr.ee/peterling) for more numer.ai related content

<img src="images/TAF.jpg"/>