# PyCaret Fugue Integration

[Fugue](https://github.com/fugue-project/fugue) is a low-code unified interface for different computing frameworks such as Spark, Dask and Pandas. PyCaret is using Fugue to support distributed computing scenarios.

## Hello World

Let's start with the most standard example, the code is exactly the same as the local version, there is no magic.

In [1]:
from pycaret.datasets import get_data
from pycaret.classification import *

setup(data=get_data("juice"), target = 'Purchase', n_jobs=1)

test_models = models().index.tolist()[:5]

Unnamed: 0,Description,Value
0,session_id,637
1,Target,Purchase
2,Target Type,Binary
3,Label Encoded,"CH: 0, MM: 1"
4,Original Data,"(1070, 19)"
5,Missing Values,False
6,Numeric Features,13
7,Categorical Features,5
8,Ordinal Features,False
9,High Cardinality Features,False


`compare_model` is also exactly the same if you don't want to use a distributed system

In [2]:
compare_models(include=test_models, n_select=2)

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lr,Logistic Regression,0.821,0.8919,0.7289,0.799,0.7612,0.6189,0.6215,0.183
dt,Decision Tree Classifier,0.7687,0.7656,0.7251,0.702,0.711,0.5189,0.5214,0.01
nb,Naive Bayes,0.7621,0.8343,0.7757,0.6737,0.7194,0.5149,0.521,0.008
knn,K Neighbors Classifier,0.7273,0.7639,0.6391,0.6603,0.6476,0.4256,0.4273,0.012
svm,SVM - Linear Kernel,0.5976,0.0,0.1724,0.1018,0.1238,0.0428,0.0432,0.01


[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=637, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_impurity_split=None,
                        min_samples_leaf=1, min_samples_split=2,
                        min_weight_fraction_leaf=0.0, presort='deprecated',
                        random_state=637, splitter='best')]

Now let's make it distributed, as a toy case, on dask. The only thing changed is an additional parameter `fugue_engine`

In [3]:
compare_models(include=test_models, n_select=2, fugue_engine="dask")

[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=637, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_impurity_split=None,
                        min_samples_leaf=1, min_samples_split=2,
                        min_weight_fraction_leaf=0.0, presort='deprecated',
                        random_state=637, splitter='best')]

In order to use Spark as the execution engine, you must have access to a Spark cluster, and you must have a `SparkSession`, let's initialize a local Spark session

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now just change `fugue_engine` to this session object, you make it run on Spark. You must understand this is a toy case. In the real situation, you need to have a SparkSession pointing to a real Spark cluster to enjoy the power of Spark

In [11]:
compare_models(include=test_models, n_select=2, fugue_engine=spark)

                                                                                

[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=0, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_impurity_split=None,
                        min_samples_leaf=1, min_samples_split=2,
                        min_weight_fraction_leaf=0.0, presort='deprecated',
                        random_state=0, splitter='best')]

In the end, you can `pull` to get the metrics table

In [14]:
pull()

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lr,Logistic Regression,0.8275,0.8964,0.7265,0.8105,0.7589,0.626,0.6344,0.377
dt,Decision Tree Classifier,0.7778,0.7646,0.7047,0.7098,0.7048,0.527,0.5294,0.01
nb,Naive Bayes,0.7674,0.834,0.7369,0.6776,0.7031,0.5129,0.5173,0.014
knn,K Neighbors Classifier,0.7073,0.7646,0.5447,0.6275,0.5792,0.3579,0.3627,0.044
svm,SVM - Linear Kernel,0.6403,0.0,0.1107,0.1439,0.1047,0.0688,0.082,0.019


## A more practical case

The above example is a pure toy, to make things work perfectly in a distributed system you must be careful about a few things

### Use a lambda instead of a dataframe in setup

If you directly provide a dataframe in `setup`, this dataset will need to be sent to all worker node. If the dataframe is 1G, you have 100 worker, then it is possible your dirver machine will need to send out up to 100G data (depending on specific framework's implementation), then this data transfer becomes a bottleneck itself. Instead, if you provide a lambda function, it doesn't change the local compute scenario, but the driver will only send the function reference to workers, and each worker will be responsible to load the data by themselves, so there is no heavy traffic on the driver side.

### Be deterministic

You should always use `session_id` to make the distributed compute deterministic, otherwise, for the exactly same logic you could get drastically different selection for each run.

### Set n_jobs

It is important to be explicit on n_jobs when you want to run something distributedly, so it will not overuse the local/remote resources. This can also avoid resrouce contention, and make the compute faster.

In [10]:
setup(data=get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=1)

Unnamed: 0,Description,Value
0,session_id,0
1,Target,Purchase
2,Target Type,Binary
3,Label Encoded,"CH: 0, MM: 1"
4,Original Data,"(1070, 19)"
5,Missing Values,False
6,Numeric Features,13
7,Categorical Features,5
8,Ordinal Features,False
9,High Cardinality Features,False


({'lr': <pycaret.containers.models.classification.LogisticRegressionClassifierContainer at 0x7f2604a04a30>,
  'knn': <pycaret.containers.models.classification.KNeighborsClassifierContainer at 0x7f2604a04550>,
  'nb': <pycaret.containers.models.classification.GaussianNBClassifierContainer at 0x7f2604a04520>,
  'dt': <pycaret.containers.models.classification.DecisionTreeClassifierContainer at 0x7f2604a34ee0>,
  'svm': <pycaret.containers.models.classification.SGDClassifierContainer at 0x7f2604a34e80>,
  'rbfsvm': <pycaret.containers.models.classification.SVCClassifierContainer at 0x7f2604a0d9d0>,
  'gpc': <pycaret.containers.models.classification.GaussianProcessClassifierContainer at 0x7f2604a0daf0>,
  'mlp': <pycaret.containers.models.classification.MLPClassifierContainer at 0x7f2604a0d5e0>,
  'ridge': <pycaret.containers.models.classification.RidgeClassifierContainer at 0x7f2604a0df10>,
  'rf': <pycaret.containers.models.classification.RandomForestClassifierContainer at 0x7f2604a2c3a0>

  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)


  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)
  _warn_prf(average, modifier, msg_start, len(result))
  mcc = cov_ytyp / np.sqrt(cov_ytyt * cov_ypyp)


### Set the appropriate batch_size

`batch_size` parameter helps adjust between load balence and overhead. For each batch, setup will be called only once. So

| Choice |Load Balance|Overhead|Best Scenario|
|---|---|---|---|
|Smaller value|Better|Worse|`training time >> data loading time` or `models ~= workers`|
|Bigger values|Worse|Better|`training time << data loading time` or `models >> workers`|

The default value is set to `1`, meaning we want the best load balance.

### Display progress

In development, you can enable visual effect by `display_remote=True`, but meanwhile you must also enable [Fugue Callback](https://fugue-tutorials.readthedocs.io/tutorials/advanced/rpc.html) so that the driver can monitor worker progress. But it is recommended to turn off display in production.

In [9]:
fconf = {
    "fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer",  # keep this value
    "fugue.rpc.flask_server.host": "0.0.0.0",  # the driver ip address workers can access
    "fugue.rpc.flask_server.port": "3333",  # the open port on the dirver
    "fugue.rpc.flask_server.timeout": "2 sec",  # the timeout for worker to talk to driver
}

compare_models(n_select=2, fugue_engine="dask", fugue_conf=fconf, display_remote=True, batch_size=3)

IntProgress(value=0, description='Processing: ', max=17)

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lda,Linear Discriminant Analysis,0.8328,0.8949,0.7585,0.7985,0.7735,0.6416,0.6464,0.006
ridge,Ridge Classifier,0.8275,0.0,0.7479,0.7971,0.7654,0.6299,0.6366,0.005
lr,Logistic Regression,0.8275,0.8963,0.7265,0.8105,0.7589,0.626,0.6344,0.044
gbc,Gradient Boosting Classifier,0.8195,0.8855,0.751,0.776,0.7594,0.6154,0.6193,0.054
rf,Random Forest Classifier,0.8048,0.8792,0.7408,0.7483,0.7397,0.5843,0.5889,0.134
ada,Ada Boost Classifier,0.8021,0.8668,0.7014,0.7639,0.7275,0.5729,0.5776,0.043
lightgbm,Light Gradient Boosting Machine,0.7994,0.8775,0.7299,0.7444,0.7331,0.573,0.5768,0.379
mlp,MLP Classifier,0.7915,0.8836,0.6626,0.8038,0.6961,0.5439,0.5728,0.234
et,Extra Trees Classifier,0.782,0.8509,0.7122,0.7214,0.7101,0.5365,0.5428,0.121
dt,Decision Tree Classifier,0.7778,0.7646,0.7047,0.7098,0.7048,0.527,0.5294,0.006


[LinearDiscriminantAnalysis(n_components=None, priors=None, shrinkage=None,
                            solver='svd', store_covariance=False, tol=0.0001),
 RidgeClassifier(alpha=1.0, class_weight=None, copy_X=True, fit_intercept=True,
                 max_iter=None, normalize=False, random_state=0, solver='auto',
                 tol=0.001)]

### A note on Spark settings

It is highly recommended to have only 1 worker on each Spark executor, so the worker can fully utilize all cpus (set `spark.task.cpus`). Also when you do this you should explicitly set `n_jobs` in `setup` to the number of cpus of each executor.

```python
executor_cores = 4

spark = SparkSession.builder.config("spark.task.cpus", executor_cores).config("spark.executor.cores", executor_cores).getOrCreate()

setup(data=get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=executor_cores)

compare_models(n_select=2, fugue_engine=spark)
```

### A note on Dask

Dask has fake distributed modes such as the default (multi-thread) and multi-process modes. The default mode will just work fine (but they are actually running sequentially), and multi-process doesn't work for PyCaret for now because it messes up with PyCaret's global variables. On the other hand, any Spark execution mode will just work fine.