# Lakehouse monitoring example notebook: TimeSeries analysis

**User requirements**
- You must have access to run commands on a cluster with access to Unity Catalog.
- You must have `USE CATALOG` privilege on at least one catalog, and you must have `USE SCHEMA` privileges on at least one schema. This notebook creates tables in the `main.default` schema. If you do not have the required privileges on the `main.default` schema, you must edit the notebook to change the default catalog and schema to ones that you do have privileges on.

**System requirements:**
- Your workspace must be enabled for Unity Catalog.
- Databricks Runtime 12.2LTS or above.
- A Single user or Assigned cluster.

This notebook illustrates how to create a time series monitor.

For more information about Lakehouse monitoring, see the documentation ([AWS](https://docs.databricks.com/lakehouse-monitoring/index.html)|[Azure](https://learn.microsoft.com/azure/databricks/lakehouse-monitoring/index)).

## Setup
* Verify cluster configuration
* Install Python client
* Define catalog, schema and table names

In [0]:
# Check the cluster configuration. If this cell fails, use the cluster selector at the top right of the notebook to select or configure a cluster running Databricks Runtime 12.2 LTS or above.
import os

assert float(os.environ.get("DATABRICKS_RUNTIME_VERSION", 0)) >= 12.2, "Please configure your cluster to use Databricks Runtime 12.2 LTS or above."

In [0]:
%pip install "https://ml-team-public-read.s3.amazonaws.com/wheels/data-monitoring/a4050ef7-b183-47a1-a145-e614628e3146/databricks_lakehouse_monitoring-0.4.14-py3-none-any.whl"

In [0]:
# This step is necessary to reset the environment with our newly installed wheel.
dbutils.library.restartPython()

In [0]:
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
# If necessary, change the catalog and schema name here.

CATALOG = "main"
SCHEMA = "default"

In [0]:
username = spark.sql("SELECT current_user()").first()["current_user()"]
username_prefixes = username.split("@")[0].split(".")

In [0]:
unique_suffix = "_".join([username_prefixes[0], username_prefixes[1][0:2]])
TABLE_NAME = f"{CATALOG}.{SCHEMA}.wine_ts_{unique_suffix}"
BASELINE_TABLE = f"{CATALOG}.{SCHEMA}.wine_ts_baseline_{unique_suffix}"
TIMESTAMP_COL = "timestamp"

In [0]:
spark.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {BASELINE_TABLE}")

## User Journey
1. Create tables: Read raw data and create the primary table (the table to be monitored) and the baseline table (which contains data known to meet expected quality standards).
2. Create a monitor on the primary table.
3. Inspect the metrics tables.
4. Apply changes to table and refresh metrics. Inspect the metrics tables.
5. [Optional] Delete the monitor.

### 1. Create the primary and (optional) baseline tables in Unity Catalog
* The tables must be Delta tables registered in Unity Catalog and owned by the user running the notebook.  
* The table to be monitored is also called the "primary table".  
* The baseline table must have the same schema as the monitored table.

This example uses the `winequality` datasets.

In [0]:
import pandas as pd

white_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-white.csv", sep=";")
red_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-red.csv", sep=";")

# Add categoricals
white_wine["type"] = "white"
red_wine["type"] = "red"
data_pdf = pd.concat([white_wine, red_wine], axis=0)

# Clean columns
data_pdf.columns = data_pdf.columns.str.replace(" ", "_")

#### Split the data. `baseline_df` is the baseline table, `ts1_df` is the primary table, and `ts2_df` is used to simulate future data.

In [0]:
data_df = spark.createDataFrame(data_pdf)
baseline_df, ts1_df, ts2_df = data_df.randomSplit(weights=[0.20, 0.40, 0.40], seed=42)

#### Create different timestamps to simulate timeseries data

In [0]:
from datetime import timedelta, datetime
from pyspark.sql import functions as F

In [0]:
# Simulate data with different timestamps
timestamp_0 = datetime.now() # Baseline data
timestamp_1 = (datetime.now() + timedelta(1)).timestamp() # 1 day later
timestamp_2 = (datetime.now() + timedelta(2)).timestamp() 

baseline_df = baseline_df.withColumn("timestamp", F.lit(timestamp_0).cast("timestamp"))
ts1_df = ts1_df.withColumn("timestamp", F.lit(timestamp_1).cast("timestamp"))
ts2_df = ts1_df.withColumn("timestamp", F.lit(timestamp_2).cast("timestamp"))

In [0]:
baseline_df.display()

#### Create the baseline and primary Delta tables in Unity Catalog

In [0]:
# Write the baseline table to Unity Catalog

(baseline_df
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema",True)
  .option("delta.enableChangeDataFeed", "true")
  .saveAsTable(f"{BASELINE_TABLE}")
)

In [0]:
# Write the primary table to Unity Catalog. This is the table to be monitored. Later in this notebook, we will add data for future timestamps to this table. 

(ts1_df
  .write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema",True)
  .option("delta.enableChangeDataFeed", "true")
  .saveAsTable(f"{TABLE_NAME}")
)

In [0]:
spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").display()

## 2. Create the monitor
This notebook illustrates `TimeSeries` type analysis. For other types of analysis, see the Lakehouse Monitoring documentation ([AWS](https://docs.databricks.com/lakehouse-monitoring/index.html)|[Azure](https://learn.microsoft.com/azure/databricks/lakehouse-monitoring/index)).

**Make sure to drop any columns that should be excluded from a business or use-case perspective.**

In [0]:
import databricks.lakehouse_monitoring as lm

In [0]:
# Window sizes to analyze data over
GRANULARITIES = ["1 day"]                       

# Expressions to slice data with
SLICING_EXPRS = ["type='Red'"]  

In [0]:
print(f"Creating monitor for {TABLE_NAME}")

info = lm.create_monitor(
  table_name=TABLE_NAME,
  profile_type=lm.TimeSeries(
    timestamp_col=TIMESTAMP_COL,
    granularities=GRANULARITIES
  ),
  slicing_exprs=SLICING_EXPRS,
  baseline_table_name=BASELINE_TABLE,
  output_schema_name=f"{CATALOG}.{SCHEMA}"
)

In [0]:
import time


# Wait for monitor to be created
while info.status == lm.MonitorStatus.PENDING:
  info = lm.get_monitor(table_name=TABLE_NAME)
  time.sleep(10)

assert(info.status == lm.MonitorStatus.ACTIVE)

In [0]:
# A metric refresh will automatically be triggered on creation
refreshes = lm.list_refreshes(table_name=TABLE_NAME)
assert(len(refreshes) > 0)

run_info = refreshes[0]
while run_info.state in (lm.RefreshState.PENDING, lm.RefreshState.RUNNING):
  run_info = lm.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
  time.sleep(30)

assert(run_info.state == lm.RefreshState.SUCCESS)

Click the highlighted Dashboard link in the cell output to open the dashboard. You can also navigate to the dashboard from the Catalog Explorer UI.

In [0]:
lm.get_monitor(table_name=TABLE_NAME)

## 3. Inspect the metrics tables

By default, the metrics tables are saved in the default database.  

The `create_monitor` call created two new tables: the profile metrics table and the drift metrics table. 

These two tables record the outputs of analysis jobs. The tables use the same name as the primary table to be monitored, with the suffixes `_profile_metrics` and `_drift_metrics`.

### Orientation to the profile metrics table

The profile metrics table has the suffix `_profile_metrics`. For a list of statistics that are shown in the table, see the documentation ([AWS](https://docs.databricks.com/lakehouse-monitoring/monitor-output.html#profile-metrics-table)|[Azure](https://learn.microsoft.com/azure/databricks/lakehouse-monitoring/monitor-output#profile-metrics-table)). 

- For every column in the primary table, the profile table shows summary statistics for the baseline table and for the primary table. The column `log_type` shows `INPUT` to indicate statistics for the primary table, and `BASELINE` to indicate statistics for the baseline table. The column from the primary table is identified in the column `column_name`.
- For `TimeSeries` type analysis, the `granularity` column shows the granularity corresponding to the row. For baseline table statistics, the `granularity` column shows `null`.
- The table shows statistics for each value of each slice key in each time window, and for the table as whole. Statistics for the table as a whole are indicated by `slice_key` = `slice_value` = `null`.
- In the primary table, the `window` column shows the time window corresponding to that row. For baseline table statistics, the `window` column shows `null`.  
- Some statistics are calculated based on the table as a whole, not on a single column. In the column `column_name`, these statistics are identified by `:table`.

In [0]:
# Display profile metrics table
profile_table = f"{TABLE_NAME}_profile_metrics"
display(spark.sql(f"SELECT * FROM {profile_table}"))

### Orientation to the drift metrics table

The drift metrics table has the suffix `_drift_metrics`. For a list of statistics that are shown in the table, see the documentation ([AWS](https://docs.databricks.com/lakehouse-monitoring/monitor-output.html#drift-metrics-table)|[Azure](https://learn.microsoft.com/azure/databricks/lakehouse-monitoring/monitor-output#drift-metrics-table)). 

- For every column in the primary table, the drift table shows a set of metrics that compare the current values in the table to the values at the time of the previous analysis run and to the baseline table. The column `drift_type` shows `BASELINE` to indicate drift relative to the baseline table, and `CONSECUTIVE` to indicate drift relative to a previous time window. As in the profile table, the column from the primary table is identified in the column `column_name`.
  - At this point, because this is the first run of this monitor, there is no previous window to compare to. So there are no rows where `drift_type` is `CONSECUTIVE`. 
- For `TimeSeries` type analysis, the `granularity` column shows the granularity corresponding to that row.
- The table shows statistics for each value of each slice key in each time window, and for the table as whole. Statistics for the table as a whole are indicated by `slice_key` = `slice_value` = `null`.
- The `window` column shows the the time window corresponding to that row. The `window_cmp` column shows the comparison window. If the comparison is to the baseline table, `window_cmp` is `null`.  
- Some statistics are calculated based on the table as a whole, not on a single column. In the column `column_name`, these statistics are identified by `:table`.

In [0]:
# Display the drift metrics table
drift_table = f"{TABLE_NAME}_drift_metrics"
display(spark.sql(f"SELECT * FROM {drift_table}"))

## 4. Add new data to the table and refresh metrics

### Add new data to the table
The following cell appends the simulated future data from `ts2_df` to the primary table.


In [0]:
(ts2_df
  .write.format("delta").mode("append") 
  .option("mergeSchema",True) 
  .option("delta.enableChangeDataFeed", "true") 
  .saveAsTable(f"{TABLE_NAME}")
)

In [0]:
spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").display()

### Refresh metrics

In [0]:
run_info = lm.run_refresh(table_name=TABLE_NAME)
while run_info.state in (lm.RefreshState.PENDING, lm.RefreshState.RUNNING):
  run_info = lm.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
  time.sleep(30)

assert(run_info.state == lm.RefreshState.SUCCESS)

Open the monitoring dashboard to notice the changes.

In [0]:
lm.get_monitor(table_name=TABLE_NAME)

## [Optional] Delete the monitor
Uncomment the following line of code to clean up the monitor. Only a single monitor can exist for a table.

In [0]:
# lm.delete_monitor(table_name=TABLE_NAME)