# AutoML with FLAML Library for synapseML models and spark dataframes


## 1. Introduction

FLAML is a Python library (https://github.com/microsoft/FLAML) designed to automatically produce accurate machine learning models 
with low computational cost. It is fast and economical. The simple and lightweight design makes it easy 
to use and extend, such as adding new learners. FLAML can 
- serve as an economical AutoML engine,
- be used as a fast hyperparameter tuning tool, or 
- be embedded in self-tuning software that requires low latency & resource in repetitive
   tuning tasks.

In this notebook, we demonstrate how to use FLAML library to do AutoML for synapseML models and spark dataframes. We also compare the results between FLAML AutoML and default SynapseML. 
In this example, we use LightGBM to build a classification model in order to predict bankruptcy.

Since the dataset is unbalanced, `AUC` is a better metric than `Accuracy`. FLAML (1 min of training) achieved AUC **0.79**, the default SynapseML model only got AUC **0.64**. 

FLAML requires `Python>=3.7`. To run this notebook example, please install flaml with the `synapse` option:
```bash
pip install flaml[synapse] 
```
 

In [1]:
# %pip install "flaml[synapse]"

## 2. Load data and preprocess

In [2]:
import pyspark

spark = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config(
        "spark.jars.packages",
        f"com.microsoft.azure:synapseml_2.12:0.10.2,org.apache.hadoop:hadoop-azure:{pyspark.__version__},com.microsoft.azure:azure-storage:8.6.6",
    )
    .config("spark.sql.debug.maxToStringFields", "100")
    .getOrCreate()
)

:: loading settings :: url = jar:file:/datadrive/spark/spark33/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/lijiang1/.ivy2/cache
The jars for the packages stored in: /home/lijiang1/.ivy2/jars
com.microsoft.azure#synapseml_2.12 added as a dependency
org.apache.hadoop#hadoop-azure added as a dependency
com.microsoft.azure#azure-storage added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bfb2447b-61c5-4941-bf9b-0548472077eb;1.0
	confs: [default]
	found com.microsoft.azure#synapseml_2.12;0.10.2 in central
	found com.microsoft.azure#synapseml-core_2.12;0.10.2 in central
	found org.scalactic#scalactic_2.12;3.2.14 in local-m2-cache
	found org.scala-lang#scala-reflect;2.12.15 in central
	found io.spray#spray-json_2.12;1.3.5 in central
	found com.jcraft#jsch;0.1.54 in central
	found org.apache.httpcomponents.client5#httpclient5;5.1.3 in central
	found org.apache.httpcomponents.core5#httpcore5;5.1.3 in central
	found org.apache.httpcomponents.core5#httpcore5-h2;5.1.3 in central
	found org.slf4j#slf4j-api;1.7.25 in local-m2-cache
	foun

23/02/28 02:12:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)
# print dataset size
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()

23/02/28 02:12:32 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-azure-file-system.properties,hadoop-metrics2.properties
records read: 6819
Schema: 
root
 |-- Bankrupt?: integer (nullable = true)
 |--  ROA(C) before interest and depreciation before interest: double (nullable = true)
 |--  ROA(A) before interest and % after tax: double (nullable = true)
 |--  ROA(B) before interest and depreciation after tax: double (nullable = true)
 |--  Operating Gross Margin: double (nullable = true)
 |--  Realized Sales Gross Margin: double (nullable = true)
 |--  Operating Profit Rate: double (nullable = true)
 |--  Pre-tax net Interest Rate: double (nullable = true)
 |--  After-tax net Interest Rate: double (nullable = true)
 |--  Non-industry income and expenditure/revenue: double (nullable = true)
 |--  Continuous interest rate (after tax): double (nullable = true)
 |--  Operating Expense Rate: double (nullable = true)
 |--  Research and development expense rate: double 

Split the dataset into train and test

In [4]:
train, test = df.randomSplit([0.8, 0.2], seed=41)

Add featurizer to convert features to vector

In [5]:
from pyspark.ml.feature import VectorAssembler

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)["Bankrupt?", "features"]
test_data = featurizer.transform(test)["Bankrupt?", "features"]

### Default SynapseML LightGBM

In [6]:
from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol="Bankrupt?", isUnbalance=True
)

model = model.fit(train_data)

23/02/28 02:12:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

#### Model Prediction

In [7]:
def predict(model):
    from synapse.ml.train import ComputeModelStatistics

    predictions = model.transform(test_data)
    # predictions.limit(10).show()
    
    metrics = ComputeModelStatistics(
        evaluationMetric="classification",
        labelCol="Bankrupt?",
        scoredLabelsCol="prediction",
    ).transform(predictions)
    display(metrics)
    return metrics

default_metrics = predict(model)
default_metrics.show()

DataFrame[evaluation_type: string, confusion_matrix: matrix, accuracy: double, precision: double, recall: double, AUC: double]

[Stage 27:>                                                         (0 + 1) / 1]

+---------------+--------------------+-----------------+------------------+-------------------+------------------+
|evaluation_type|    confusion_matrix|         accuracy|         precision|             recall|               AUC|
+---------------+--------------------+-----------------+------------------+-------------------+------------------+
| Classification|1250.0  23.0  \n3...|0.958997722095672|0.3611111111111111|0.29545454545454547|0.6386934942512319|
+---------------+--------------------+-----------------+------------------+-------------------+------------------+



                                                                                

### Run FLAML
In the FLAML automl run configuration, users can specify the task type, time budget, error metric, learner list, whether to subsample, resampling strategy type, and so on. All these arguments have default values which will be used if users do not provide them. 

In [8]:
''' import AutoML class from flaml package '''
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

automl = AutoML()

In [9]:
import os
settings = {
    "time_budget": 30,  # total running time in seconds
    "metric": 'roc_auc',
    "estimator_list": ['lgbm_spark'],  # list of ML learners; we tune lightgbm in this example
    "task": 'classification',  # task type
    "log_file_name": 'flaml_experiment.log',  # flaml log file
    "seed": 41,    # random seed
    "force_cancel": True,  # force stop training once time_budget is used up
}

Disable Arrow optimization to omit below warning:
```
/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:87: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
  Unsupported type in conversion to Arrow: VectorUDT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warnings.warn(msg)
```

In [10]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

In [11]:
df = to_pandas_on_spark(to_pandas_on_spark(train_data).to_spark(index_col="index"))

df.head()

Unnamed: 0,index,Bankrupt?,features
0,0,0,"[0.0828, 0.0693, 0.0884, 0.6468, 0.6468, 0.997..."
1,1,0,"[0.1606, 0.1788, 0.1832, 0.5897, 0.5897, 0.998..."
2,2,0,"[0.204, 0.2638, 0.2598, 0.4483, 0.4483, 0.9959..."
3,3,0,"[0.217, 0.1881, 0.2451, 0.5992, 0.5992, 0.9962..."
4,4,0,"[0.2314, 0.1628, 0.2068, 0.6001, 0.6001, 0.998..."


In [12]:
'''The main flaml automl API'''
automl.fit(dataframe=df, label='Bankrupt?', labelCol="Bankrupt?", isUnbalance=True, **settings)

[flaml.automl.automl: 02-28 02:12:59] {2922} INFO - task = classification
[flaml.automl.automl: 02-28 02:13:00] {2924} INFO - Data split method: stratified
[flaml.automl.automl: 02-28 02:13:00] {2927} INFO - Evaluation method: cv




[flaml.automl.automl: 02-28 02:13:01] {3054} INFO - Minimizing error metric: 1-roc_auc
[flaml.automl.automl: 02-28 02:13:01] {3209} INFO - List of ML learners in AutoML Run: ['lgbm_spark']
[flaml.automl.automl: 02-28 02:13:01] {3539} INFO - iteration 0, current learner lgbm_spark








[flaml.automl.automl: 02-28 02:13:48] {3677} INFO - Estimated sufficient time budget=464999s. Estimated necessary time budget=465s.
[flaml.automl.automl: 02-28 02:13:48] {3724} INFO -  at 48.5s,	estimator lgbm_spark's best error=0.0871,	best estimator lgbm_spark's best error=0.0871




[flaml.automl.automl: 02-28 02:13:54] {3988} INFO - retrain lgbm_spark for 6.2s
[flaml.automl.automl: 02-28 02:13:54] {3995} INFO - retrained model: LightGBMClassifier_a2177c5be001
[flaml.automl.automl: 02-28 02:13:54] {3239} INFO - fit succeeded
[flaml.automl.automl: 02-28 02:13:54] {3240} INFO - Time taken to find the best model: 48.4579541683197


### Best model and metric

In [13]:
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))

Best hyperparmeter config: {'numIterations': 4, 'numLeaves': 4, 'minDataInLeaf': 20, 'learningRate': 0.09999999999999995, 'log_max_bin': 8, 'featureFraction': 1.0, 'lambdaL1': 0.0009765625, 'lambdaL2': 1.0}
Best roc_auc on validation data: 0.9129
Training duration of best run: 6.237 s


In [14]:
flaml_metrics = predict(automl.model.estimator)
flaml_metrics.show()

DataFrame[evaluation_type: string, confusion_matrix: matrix, accuracy: double, precision: double, recall: double, AUC: double]

+---------------+--------------------+------------------+-------------------+------------------+------------------+
|evaluation_type|    confusion_matrix|          accuracy|          precision|            recall|               AUC|
+---------------+--------------------+------------------+-------------------+------------------+------------------+
| Classification|1218.0  55.0  \n1...|0.9453302961275627|0.32926829268292684|0.6136363636363636|0.7852156680711276|
+---------------+--------------------+------------------+-------------------+------------------+------------------+

