# Databricks Certified Machine Learning Professional Notes

## Experimentation

### Data Management

#### Delta Lake

##### Optimization

**OPTIMIZE Command**

`%sql OPTIMIZE table`

**VACUUM Command**

`spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false") # Retain 0 Hours of History`

`%sql VACUUM table RETAIN 0 HOURS`

**Z-Ordering Command**

`%sql OPTIMIZE table ZORDER BY (col)`

**Liquid Clustering: optimize for columns**

`%sql ALTER TABLE table cluster by (cola, colb)`

##### Operations
| Operation | Sample |
|----------|----------|
| Read  | `spark.read.format("delta").load(path)//.table(table).drop("a", "b")`   |
| Read - Streaming Data  | `spark.readStream.table(table)`|
| Write - Overwrite  | `df.drop("a").write.format("delta").option("overwriteSchema", "True").mode("overwrite").save(path)//.saveAsTable(table)`  |
| Write - Append  | `df.write.format("delta").mode("append").save(path)//.saveAsTable(table)`   |
| Extracting History  | `spark.sql("DESCRIBE HISTORY table")`   |
| Extracting Timestamp  | `spark.sql("DESCRIBE HISTORY table").orderBy("version").first().timestamp`   |
| Reading by Timestamp  | `spark.read.format("delta").option("timestampAsOf", timestamp).table(table)`   |
| Reading by Version  | `spark.read.format("delta").option("versionAsOf", 12).load(path)`   |

#### Feature Store

**imports**

```
from databricks.feature_store import FeatureStoreClient, FeatureLookUp, FeatureFunction
from databricks.feature_engineering import FeatureEngineeringClient # Feature Engineering Client alternative
fs = FeatureStoreClient()
```

##### Create Table
**Create Table by Dataframe**

```
fs.create_table(
	name = name, 
        df = df,
        schema = schema, # Optional
	description = desc,
	primary_keys = [],
	timestamp_keys = [],
	partition_columns = [])
```

**Create Table by Schema**

```
fs.create_table(
    schema = schema,
    description = desc,
    primary_keys = [],
    timestamp_keys = [],
    partition_columns = []) # results in an empty table
```
##### Read Table
**Read Table by Name**

`fs.read_table(name = name)`

**Read Table by Version**

`ts = spark.sql(“DESCRIBE HISTORY name”).collect()[2].timestamp # Version 2`

`fs.read_table(name = name, as_of_delta_timestamp = ts)`

##### Write Table
**Write Table - Merge**

`df_new = df.withColumn(“score_group”, when((df.score<=25), “low”).otherwise(“etc”)) # create a new column`

`fs.write_table(name = name, df = df.select(“id”, “score_group”), mode = “merge”)`

**Write Table - Overwrite**
```
cols = [“clean”, “location”, “communication”] average then drop cols
new_df = df.withColumn(“ave_cols”, expr(“+”.join(cols)) / lit(len(cols))).drop(cols)
fs.write_table(name = name, df = new_df, mode = “overwrite”)
```
* This operation will not change the existing table schema, the dropped columns will still exist but will populate with all-null values

**Get Table Metadata**

`fs.get_table(table_name).description // .path_data_sources // .features`

**Delete Table** 

`fs.drop_table(table_name)`

##### Machine Learning Operations

**Feature Function Definition**

```
%sql
CREATE OR REPLACE FUNCTION avg_func (a DOUBLE, b DOUBLE) RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
avg = a / b
return avg
$$
```

**Feature LookUp Definition**
```
lookups=	[FeatureLookup( table_name = table_name,
                		lookup_key = "lookup_key",
                                timestamp_lookup_key = ""ts_key",
                                feature_names = []),
                FeatureFunction(udf_name = “avg_func”,
                                output_name = “avg”,
                                input_bindings = {“a”: “a”, “b”: “b”})]
```

**Feature Training Set**
```
training_set = fs.create_training_set(  df = df, 
                                        feature_lookups = lookups, 
                                        label = “quality”, 
                                        exclude_columns = ["id_column"]) 
training_df = training_set.load_df()
```

**Logging a Model**
```
fs.log_model(   model = model, 
                registered_model_name = model_name, 
                training_set = training_set, 
                flavor = mlflow.sklearn, 
                artifact_path = artifact_path)
```

**Batch Scoring**
```
client = MlflowClient()
latest_version = client.get_latest_version(model_name, stages=[“None”])[0].version
fs.score_batch(f”models:/{model_name}/{latest_version}”, input_df, result_type=”string”)
```


### Experiment Tracking

**Basic Logging**
```
experiment = mlflow.get_experiment_by_name(experiment_name)
experiment_id = experiment.experiment_id
with mlflow.start_run(experiment_id = experiment_id, run_name = run_name) as run:
	mlflow.flavor.log_model(model, “artifact_path”)
	mlflow.log_metric(“rmse”, rmse)
	mlflow.log_param(“trees”, tree_num)
	mlflow.log_metrics({“mse”, mse})
	mlflow.log_params({})
	mlflow.log_artifact(csv_path, “feature-importance.csv”)
	mlflow.log_figure(fig, “feature_importance.png”)
	print(run.info.run_id)
```

**Accessing Metadata**
```
from mlflow.tracking import MlflowClient
client = MlflowClient()
```
| Operation | sample |
|----------|----------|
| List Experiments   | `client.list_run_infos(experiment_id)`   |
| Load Experiment Metadata - Spark DataFrame   | `spark.read.format(“mlflow-experiment”).load(experiment_id)`   |
| Load Run Artifacts/Data   | `run_id = runs.orderBy(“start_time”, ascending = False).first().run_id`   |
| Pull Run Metrics   | `client.get_run(run_id).data.metrics`   |
| List Run Artifacts/Files   | `client.list_artifacts(run_id)`   |
| Load Model   | `mlflow.flavor.load(f”runs:/{run_id}/model”)`   |

**Register a Model with Signature**
```
from mlflow.models import infer_signature
mlflow.register_model(model_uri, name)
mlflow.log_model(model,
		artifact_path = …,
		signature = infer_signature(..,..) // input_examples=..,
		registered_model_name…)
```

**Nest Runs**
```
with mlflow.start_run(run_name = …) as run:
	with mlflow.start_run(run_name = …, nested=True)
```

OR

`with mlflow.start_run(run_name = …, parent_run_id = run.info.run_id):`

**Autologging**

`mlflow.flavor.autolog()`

**SHAP Logging**

`mlflow.shap.log_explanation()`

## Model Lifecycle Management

### Preprocessing Logic

**PyFunc Model Definition**

`class sklearn_model(mlflow.pyfunc.PythonModel)`

**Model Metadata**
```
from mlflow.tracking.client import MLflowClient
client = MLflowClient()
```

`details = client.get_model_version(name=model_name, version=1)`

`details = client.get_model_version_by_alias(name=model_name, alias=”Champion”)`

`details.name//run_id//version//current_stage//tags//description//status//metrics//model_id//params//source`

`reg_model_details = client.get_registered_model(‘name’)`

`reg_model_details.name//aliases//description//latest_versions//tags//`

**Transition Model Versions**
```
client.transition_model_version_stage(
        name = details.name,
        version: str = details.version,
        stage = “Production”)
```

**Verify Transition**
```
model_details = client.get_model_version(
	name = details.name,
	version: str = details.version)
model_details.current_stage ‘Production’
```
**Add Metadata to a Registered Model**
```
client.update_registered_model(
	name = details.name,
        description = “this model….”)
```

**Add Metadata to a Registered Model Version**
```
client.update_model_version(
    name = details.name,
    version = details.version,
    description = “this model….”)
```

**Delete Model Versions**

`client.delete_model_version(“name”, version) # archive a model first`

`client.delete_registered_model(“name”) `

### Model Lifecycle Automation 

**http_request**

```
from mlflow.databricks.databricks_utils import get_databricks_host_creds, http_request
# Create a Webhook
job_json={  model_name = model,
            events = [],
            status = “”,
            description = “”,
            job_spec = {
                    “job_id”: job_id, 
                    “workspace_url”: …, 
                    “access_token”: …}}
http_request(
    host_creds = get_databricks_host_creds(“databricks”)
    endpoint = “/api/2.0/mlflow/registry-webhooks/create”,
    method = “POST”,
    json = job_json)
# List Webhooks
http_request(	
    host_creds = get_databricks_host_creds(“databricks”),
    endpoint = “/api/2.0/mlflow/registry-webhooks/list/?model_name=model”
    method = “GET”)
# Delete Webhook
http_request(
    host_creds = get_databricks_host_creds(“databricks”),
    endpoint = “/api/2.0/mlflow/registry-webhooks/delete”,
    method = “DELETE”,
    json = {“id”: id})
```

**RegistryWebhooksClient**

```
%sh pip install databricks-registry-webhooks
from databricks_registry_webhooks import HttpUrlSpec, RegistryWebhooksClient
# Create a Webhook
http_url_spec = HttpUrlSpec(
        url="https://hooks.slack.com/services/...",
        secret="secret_string",
        authorization="Bearer token")
RegistryWebhooksClient().create_webhook(
                                        model_name = name,
                                        description = “”,
                                        events = [],
                                        status = “”,
                                        http_spec = http_url_spec)
# List Webhooks
RegistryWebhooksClient().list_webhooks(model_name=name)
# Delete Webhook
RegistryWebhooksClient().delete_webhook(id=id)
# Update a Webhook Status
RegistryWebhooksClient().update_webhook(id=id, status=status)
```

**Events**
* REGISTERED_MODEL_CREATED
* MODEL_VERSION_CREATED
* COMMENT_CREATED
* MODEL_VERSION_TAG_SET
* MODEL_VERSION_TRANSITIONED_STAGE
* MODEL_VERSION_TRANSITIONED_TO_XXX
* TRANSITION_REQUEST_CREATED
* TRANSITION_REQUEST_TO_XXX_CREATED

XXX: STAGING/PRODUCTION/ARCHIVED

**Statuses**
ACTIVE | DISABLED | TEST_MODE

## Model Deployment

#### Batch

**Load a Model**

`mlflow.set_registry_uri(“databricks-uc”)`

`mlflow.flavor.load_model(model_uri)`

**Load a Single-Node Model with spark_udf**

`mlflow.pyfunc.spark_udf(spark, model_uri, result_type = “”)`

**Convert a Batch Deployment Pipeline to Streaming**

```
mlflow.set_registry_uri(“databricks-uc”)
mlflow.pyfunc.spark_udf(spark, model_uri, result_type = “”)
# DLT Inference
import dlt
from pyspark.sql.functions import col, struct
# Create 3 DLT tables...
@dlt.table(name=”1”, comment=””, table_properties={})
    ...
@dlt.table(name=”2”, comment=””, table_properties={})
    ...
@dlt.table(name=”3”, comment=””, table_properties={})
    ...
def raw_inputs():
	return spark.read.csv(bronze_path, inferSchema=True, header=True, multiLine=True)
```

#### Real-time

**Create a Serving Endpoint**
```
from databricks.sdk import WorkspaceClient
from mlflow.tracking import MlflowClient
from databricks.sdk.service.serving import EndpointCoreConfigInput
w = WorkspaceClient()
client = MlflowClient()
fs_model_version = client.get_model_version_by_alias(model_name, alias = “Champion”).version
fs_endpoint_config_dict = { “served_entities”: [{	
            “model_name”: model_name,
			“model_version” fs_model_version,
			“scale_to_zero_enabled”: True,
			“workload_size’: “Small”} ] }
fs_endpoint_config = EndpointCoreConfigInput.from_dict(ds_endpoint_config_dict)
w.serving_endpoints.create_and_wait(endpoint_name=endpoint_name, config=fs_endpoint_config)
```
**Query a Serving Endpoint**
```
w.serving_endpoints.query(name=endpoint_name, dataframe_records=df)
df[“prediction”] = query_response.predictions
df[“model”] = query_response.served_model_name
```