Data quality 
execution layer (bronze silver gold)
monitoring layer ( visialize those checks)

# define dqx in yaml
bronze: schema conformity, null checks, data type validation
silver: uniqueness, referencial integrity, deduplication
gold: business rules, thresholds, aggregations

persist dq_results, include metadata 

# monitoring 
- dashboard
- alerts 
- logging

We need to install dqx in our working session

In [0]:
%pip install databricks-labs-dqx

In [0]:
dbutils.library.restartPython()

# Bronze Ingestion

Here I am simulating raw data arrives to the bronze layer. This data comes from the Kaggle dataset [Netflix Data: Cleaning, Analysis and Visualization](https://www.kaggle.com/datasets/ariyoomotade/netflix-data-cleaning-analysis-and-visualization). The purpose is to explore dqx framework capabilities in data quality checks and monitoring through a data engineering project with Databricks. 

In [0]:
# Libraries
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
import yaml

In [0]:
# Reading data from a csv file. In this case, the csv file is in the default volume, but data can come from different sources such as an SFTP that drops data to our container, or an S3 bucket, or a Kafka stream, etc.

raw_data = spark.read.options(header=True, inferSchema=True).csv("/Volumes/workspace/default/files/DQX_demo_data/netflix_low_quality.csv")

# Displaying the data
raw_data.display()

In [0]:
# Initialize a client to interact with a workspace
ws = WorkspaceClient()

# Create an instance of DQProfiler 
profiler = DQProfiler(ws)
# Profile the input data for basic statistics
summary_stats, profiles = profiler.profile(raw_data)

# Print the summary statistics and profiles
print(yaml.safe_dump(summary_stats))
print(profiles)

# Generate the rules 

The function generate_dq_rules() creates a set of data quality rules based on the column profiles. These a recommendations we can leverage for our quality check process or we can create our own rules. 


In [0]:
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)
print(yaml.safe_dump(checks))

# Bronze quality checks

It is a common practice to store data as raw as possible in the Bronze layer. In this case, the only quality check we are going to implement is checking nulls and correct format for show_id. 

In [0]:
bronze_checks = [{'check': {'function': 'is_not_null_and_not_empty',
   'arguments': {'column': 'show_id', 'trim_strings': True}},
  'name': 'show_id_is_null_or_empty',
  'criticality': 'error'},
                 {'check': {'function': 'regex_match',
   'arguments': {'column': 'show_id', 'regex': '^s\\d+$', 'negate': False}},
  'name': 'invalid_show_id_format', 'criticality': 'error'}]
  
# Validate the data quality checks
status = DQEngine.validate_checks(bronze_checks)
print(status)
# Can save these checks in our repository for future use and reusability of our rules by a larger team 
checks_file = f"/Workspace/Users/nelsongilvargas@gmail.com/dqx_databricks_demo/utils/dqx_bronze_checks.yml"
dq_engine = DQEngine(ws)
dq_engine.save_checks_in_workspace_file(bronze_checks, workspace_path=checks_file)

# Dealing with invalid records
## Option 1 
One option given by dqx is splitting records in two sets: valid and invalid records. This is usefull to prevent low quality data in our persisted tables but at the same time keeping track of invalid data in a quarantine table. 

In [0]:
# Check are already in memory but lets simulate that we are reading them from a file
dq_engine = DQEngine(WorkspaceClient())
checks_file = f"/Workspace/Users/nelsongilvargas@gmail.com/dqx_databricks_demo/utils/dqx_bronze_checks.yml"
bronze_checks = dq_engine.load_checks_from_workspace_file(checks_file)

In [0]:
dq_engine = DQEngine(ws)

valid_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split(raw_data, bronze_checks)
display(valid_df)

Here we can see the records where show_id is empty or has a different format than the one we expect. 

In [0]:
display(invalid_df)

## Option 2 
dqx gives us the alternative to keep all the data together but adding two new columns '_errors' and '_warnings' which indicate the type of error or warning the record has. 

In [0]:
valid_and_quarantined_df =  dq_engine.apply_checks_by_metadata(raw_data, bronze_checks)
display(valid_and_quarantined_df)

# Storing the output

In [0]:
from pyspark.sql.functions import lit, current_timestamp
invalid_df = invalid_df.withColumn("write_timestamp", lit(current_timestamp()))
valid_df.write.mode("overwrite").format("delta").saveAsTable("dqx_demo_bronze_netflix_shows")
invalid_df.write.mode("append").format("delta").saveAsTable("quarantine_dqx_demo_bronze_netflix_shows")    