In [0]:
# Install the library
!pip install databricks-labs-dqx

In [0]:
# Restart your compute
dbutils.library.restartPython()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
import random
import string

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Bad Quality DataFrame") \
    .getOrCreate()

# Function to generate random data
def generate_random_name():
    return ''.join(random.choices(string.ascii_uppercase, k=5))

def generate_random_age():
    return random.choice([None, random.randint(18, 80)])

def generate_random_gender():
    return random.choice([None, 'Male', 'Female', 'Other'])

# Create a list of rows with bad data
data = []
for i in range(100):
    id = i + 1
    name = generate_random_name() if random.random() > 0.1 else None  # 10% chance of null name
    age = generate_random_age()
    gender = generate_random_gender()

    # Introduce some duplicates, randomly
    if random.random() > 0.9 and len(data) > 1:  # 10% chance to duplicate a previous row
        data.append(data[random.randint(0, len(data)-1)])
    else:
        data.append((id, name, age, gender))

# Create a DataFrame
columns = ['id', 'name', 'age', 'gender']
df = spark.createDataFrame(data, columns)

# Introduce random bad quality data
df = df.withColumn('age', when(col('age').isNull(), -1).otherwise(col('age')))  # Assign -1 for null ages

# Show the DataFrame
df.show(100, truncate=False)


In [0]:
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.col_functions import *
from databricks.sdk import WorkspaceClient



In [0]:
# Connect to the blank workspace

ws_client = WorkspaceClient()

In [0]:
# Function Profile

def profile_data(input_df):
    try:
        profiler = DQProfiler(ws_client)
        summary_stats, profiles = profiler.profile(input_df)
        return summary_stats, profiles
    except Exception as e:
        raise

In [0]:
import json
summary_stats, profiles = profile_data(df)
# print(f"Summary Stats: {json.dumps(summary_stats)}")
print(f"Profile: {profiles}")

In [0]:
# Added the profile checkers
for i in profiles:
    print(i)

In [0]:
def generate_dq_checks(profiles):
    try:
        generator = DQGenerator(ws_client)
        checks = generator.generate_dq_rules(profiles)
        return checks
    except Exception as e:
        raise

def apply_quality_checks(input_df, checks):
    try:
        dq_engine = DQEngine(ws_client)
        result_df = dq_engine.apply_checks_by_metadata(input_df, checks)

        return result_df
    except Exception as e:
        raise

In [0]:

checks = generate_dq_checks(profiles)
for i in checks:
    print(i)

validated_df = apply_quality_checks(df, checks)
display(validated_df)