In [0]:
# Import required Spark SQL types for schema definition
from pyspark.sql.types import StructType, StructField, StringType
import re
import json
from pyspark.sql import functions as F

# Define the schema for the profiling results DataFrame
schema = StructType([
    StructField('TABLE_NAME', StringType(), True),         # Name of the table
    StructField('COLUMN_NAME', StringType(), True),        # Name of the column
    StructField('COLUMN_ORDER', StringType(), True),       # Order of the column in the table
    StructField('DATA_TYPE', StringType(), True),          # Data type of the column
    StructField('TOTAL_COUNT', StringType(), True),        # Total number of rows sampled
    StructField('NULL_COUNT', StringType(), True),         # Number of null values
    StructField('NON_NULL_COUNT', StringType(), True),     # Number of non-null values
    StructField('DISTINCT_COUNT', StringType(), True),     # Number of distinct values
    StructField('BLANK_COUNT', StringType(), True),        # Number of blank string values
    StructField('TOP_N_VALUES', StringType(), True),       # Top N most frequent values
    StructField('MIN_VALUE', StringType(), True),          # Minimum value (numeric, date, or string length)
    StructField('MAX_VALUE', StringType(), True),          # Maximum value (numeric, date, or string length)
    StructField('AVG_VALUE', StringType(), True),          # Average value (numeric or string length)
    StructField('STDDEV_VALUE', StringType(), True),       # Standard deviation (numeric)
    StructField('ZERO_VALUE', StringType(), True),         # Count of zero values (numeric)
    StructField('POSITIVE_VALUE', StringType(), True),     # Count of positive values (numeric)
    StructField('NEGATIVE_VALUE', StringType(), True),     # Count of negative values (numeric)
    StructField('PERCENTILE_25', StringType(), True),      # 25th percentile (numeric)
    StructField('PERCENTILE_50', StringType(), True),      # 50th percentile (numeric)
    StructField('PERCENTILE_75', StringType(), True),      # 75th percentile (numeric)
    StructField('BUCKET_VALUE', StringType(), True),       # Bucketed value counts (numeric)
    StructField('FUTURE_DATE_COUNT', StringType(), True),  # Count of future dates (date/timestamp)
    StructField('TRUE_COUNT', StringType(), True),         # Count of True values (boolean)
    StructField('FALSE_COUNT', StringType(), True)         # Count of False values (boolean)
])

# List of tables to profile
table_list = [
    'sample_superstore.superstore_bronze.orders',
    'sample_superstore.superstore_bronze.returns',
    'sample_superstore.superstore_bronze.people'
]

In [0]:
def def_data_profiling(fqdn_table_name, sample_size=100, top_n=5, bucket_n=5):
    """
    Profiles a Spark table by computing column-level statistics on a sample of its data.

    Parameters:
        fqdn_table_name (str): Fully qualified table name to profile.
        sample_size (int): Number of rows to sample from the table for profiling.
        top_n (int): Number of top frequent values to report for each column.
        bucket_n (int): Number of buckets for numeric column value distribution.

    Returns:
        pyspark.sql.DataFrame: DataFrame containing profiling statistics for each column,
        conforming to the predefined schema.
    """
    
    # Load a sample of the table into a Spark DataFrame
    profile_df_raw = spark.table(fqdn_table_name).limit(sample_size)
    print(f"Starting data profiling for table: '{fqdn_table_name}' with sample size: {sample_size}.....")

    columns = profile_df_raw.columns
    profile_rows = []

    for idx, col in enumerate(columns, 1):
        # Initialize all statistics variables
        _min = _max = _avg = _stddev = _positive_count = _negative_count = _zero_count = _p25 = _p50 = _p75 = _blank_count = _top_n_str = _bucket_str = _total_count = _null_count = _non_null_count = _distinct_count = _future_date_count = _true_count = _false_count = None

        col_dtype = dict(profile_df_raw.dtypes)[col]
        # Count nulls, non-nulls, total, and distinct values
        _null_count = profile_df_raw.filter(F.col(col).isNull()).count()
        _non_null_count = profile_df_raw.filter(F.col(col).isNotNull()).count()
        _total_count = _null_count + _non_null_count
        _distinct_count = profile_df_raw.select(col).distinct().count()

        # Get top N most frequent values
        top_n_list = (
            profile_df_raw.filter(F.col(col).isNotNull())
            .groupBy(col)
            .count()
            .orderBy(F.desc("count"))
            .limit(top_n)
            .select(col, "count")
            .collect()
        )
        top_n_str = json.dumps([
            {str(row[0]): row[1]} for row in top_n_list
        ], default=str)

        # String column profiling: length stats and blank count
        if col_dtype.lower() == 'string':
            lengths = profile_df_raw.select(F.length(F.col(col)).alias("len")).filter(F.col(col).isNotNull())
            _min = lengths.agg(F.min("len")).first()[0]
            _max = lengths.agg(F.max("len")).first()[0]
            _avg = lengths.agg(F.avg("len")).first()[0]
            _blank_count = profile_df_raw.filter(F.col(col) == "").count()

        # Numeric column profiling: min, max, avg, stdev, 25%/50%/75% percentile, zero/+ve/-ve counts, buckets
        if col_dtype.lower().startswith(('int', 'smallint', 'tinyint', 'bigint', 'double', 'float', 'decimal')):
            stats = profile_df_raw.select(
                F.min(col).alias("min"),
                F.max(col).alias("max"),
                F.avg(col).alias("avg"),
                F.stddev(col).alias("stddev"),
                F.expr(f'percentile({col}, array(0.25))')[0].alias("percentile_25"),
                F.expr(f'percentile({col}, array(0.5))')[0].alias("percentile_50"),
                F.expr(f'percentile({col}, array(0.75))')[0].alias("percentile_75"), 
                F.sum(F.when(F.col(col) == 0, 1).otherwise(0)).alias("zero_count"),
                F.sum(F.when(F.col(col) > 0, 1).otherwise(0)).alias("positive_count"),
                F.sum(F.when(F.col(col) < 0, 1).otherwise(0)).alias("negative_count")
            ).first()
            _stddev = float(stats["stddev"])
            _min = float(stats["min"])
            _max = float(stats["max"])
            _avg = float(stats["avg"])
            _p25 = float(stats["percentile_25"])
            _p50 = float(stats["percentile_50"])
            _p75 = float(stats["percentile_75"])
            _zero_count = int(stats["zero_count"])
            _positive_count = int(stats["positive_count"])
            _negative_count = int(stats["negative_count"])

            # Create buckets and count values in each bucket
            if _min is not None and _max is not None:
                bucket_edges = [_min + i*(_max - _min)/(bucket_n) for i in range(bucket_n+1)]
                bucket_counts = []
                for i in range(bucket_n):
                    lower = bucket_edges[i]
                    upper = bucket_edges[i+1]
                    count = profile_df_raw.filter((F.col(col) >= lower) & (F.col(col) < upper)).count() if i < (bucket_n-1) else profile_df_raw.filter((F.col(col) >= lower) & (F.col(col) <= upper)).count()
                    bucket_counts.append({f">= {round(lower, 2)} < {round(upper, 2)}": count})
                _bucket_str = json.dumps(bucket_counts)        

        # Date/timestamp column profiling: min, max, and future date count
        if col_dtype.lower().startswith(('date', 'timestamp', 'timestamp_ntz')):
            stats = profile_df_raw.select(
                F.min(col).alias("min"),
                F.max(col).alias("max")
            ).first()
            _min = str(stats["min"]) 
            _max = str(stats["max"])
            _future_date_count = int(profile_df_raw.filter(F.to_date(F.col(col)) > F.current_date()).count())

        # Boolean column profiling: count True/False values
        if col_dtype.lower() == 'boolean':
            _true_count = profile_df_raw.filter(F.col(col) == True).count()
            _false_count = profile_df_raw.filter(F.col(col) == False).count()

        # Append all computed statistics for the column
        profile_rows.append({
            'TABLE_NAME': str(fqdn_table_name),
            'COLUMN_NAME': str(col) if col is not None else None,
            'COLUMN_ORDER': str(idx) if idx is not None else None,
            'DATA_TYPE': str(col_dtype) if col_dtype is not None else None,
            'TOTAL_COUNT': str(_total_count) if _total_count is not None else None,
            'NULL_COUNT': str(_null_count) if _null_count is not None else None,
            'NON_NULL_COUNT': str(_non_null_count) if _non_null_count is not None else None,
            'DISTINCT_COUNT': str(_distinct_count) if _distinct_count is not None else None,
            'BLANK_COUNT': str(_blank_count) if _blank_count is not None else None,
            'TOP_N_VALUES': str(top_n_str) if top_n_str is not None else None,
            'MIN_VALUE': str(_min) if _min is not None else None,
            'MAX_VALUE': str(_max) if _max is not None else None,
            'AVG_VALUE': str(round(_avg, 2)) if _avg is not None else None,
            'STDDEV_VALUE': str(round(_stddev, 2)) if _stddev is not None else None,
            'ZERO_VALUE': str(round(_zero_count, 2)) if _zero_count is not None else None,
            'POSITIVE_VALUE': str(round(_positive_count, 2)) if _stddev is not None else None,
            'NEGATIVE_VALUE': str(round(_negative_count, 2)) if _stddev is not None else None,
            'PERCENTILE_25': str(_p25) if _p25 is not None else None,
            'PERCENTILE_50': str(_p50) if _p50 is not None else None,
            'PERCENTILE_75': str(_p75) if _p75 is not None else None,
            'BUCKET_VALUE': str(_bucket_str) if _bucket_str is not None else None,
            'FUTURE_DATE_COUNT': str(_future_date_count) if _future_date_count is not None else None,
            'TRUE_COUNT': str(_true_count) if _true_count is not None else None,
            'FALSE_COUNT': str(_false_count) if _false_count is not None else None
        })

    # Create Spark DataFrame from the profiling results
    profile_sdf = spark.createDataFrame(profile_rows, schema)
    return profile_sdf

In [0]:
# Profile all tables in table_list and consolidate results into a single DataFrame
consolidated_df = None

for table_name in table_list:

    profile_df = def_data_profiling(table_name, 10000, 6, 5)

    if consolidated_df is None:
        consolidated_df = profile_df
    else:
        consolidated_df = consolidated_df.unionByName(profile_df)

display(consolidated_df)

Starting data profiling for table: 'sample_superstore.superstore_bronze.orders' with sample size: 10000.....
Starting data profiling for table: 'sample_superstore.superstore_bronze.returns' with sample size: 10000.....
Starting data profiling for table: 'sample_superstore.superstore_bronze.people' with sample size: 10000.....


TABLE_NAME,COLUMN_NAME,COLUMN_ORDER,DATA_TYPE,TOTAL_COUNT,NULL_COUNT,NON_NULL_COUNT,DISTINCT_COUNT,BLANK_COUNT,TOP_N_VALUES,MIN_VALUE,MAX_VALUE,AVG_VALUE,STDDEV_VALUE,ZERO_VALUE,POSITIVE_VALUE,NEGATIVE_VALUE,PERCENTILE_25,PERCENTILE_50,PERCENTILE_75,BUCKET_VALUE,FUTURE_DATE_COUNT,TRUE_COUNT,FALSE_COUNT
sample_superstore.superstore_bronze.orders,ROW_ID,1,bigint,9994,0,9994,9994,,"[{""4"": 1}, {""5"": 1}, {""1"": 1}, {""3"": 1}, {""2"": 1}, {""6"": 1}]",1.0,9994.0,4997.5,2885.16,0.0,9994.0,0.0,2499.25,4997.5,7495.75,"[{"">= 1.0 < 1999.6"": 1999}, {"">= 1999.6 < 3998.2"": 1999}, {"">= 3998.2 < 5996.8"": 1998}, {"">= 5996.8 < 7995.4"": 1999}, {"">= 7995.4 < 9994.0"": 1999}]",,,
sample_superstore.superstore_bronze.orders,ORDER_ID,2,string,9994,0,9994,5009,0.0,"[{""CA-2017-100111"": 14}, {""CA-2017-157987"": 12}, {""CA-2016-165330"": 11}, {""US-2016-108504"": 11}, {""US-2015-126977"": 10}, {""CA-2015-131338"": 10}]",14,14,14.0,,,,,,,,,,,
sample_superstore.superstore_bronze.orders,ORDER_DATE,3,timestamp_ntz,9994,0,9994,1237,,"[{""2016-09-05 00:00:00"": 38}, {""2017-09-02 00:00:00"": 36}, {""2016-11-10 00:00:00"": 35}, {""2017-12-02 00:00:00"": 34}, {""2017-12-01 00:00:00"": 34}, {""2017-12-09 00:00:00"": 33}]",2014-01-03 00:00:00,2017-12-30 00:00:00,,,,,,,,,,0.0,,
sample_superstore.superstore_bronze.orders,SHIP_DATE,4,timestamp_ntz,9994,0,9994,1334,,"[{""2015-12-16 00:00:00"": 35}, {""2017-09-26 00:00:00"": 34}, {""2017-12-06 00:00:00"": 32}, {""2017-11-21 00:00:00"": 32}, {""2017-09-06 00:00:00"": 30}, {""2017-12-12 00:00:00"": 30}]",2014-01-07 00:00:00,2018-01-05 00:00:00,,,,,,,,,,0.0,,
sample_superstore.superstore_bronze.orders,SHIP_MODE,5,string,9994,0,9994,4,0.0,"[{""Standard Class"": 5968}, {""Second Class"": 1945}, {""First Class"": 1538}, {""Same Day"": 543}]",8,14,12.82,,,,,,,,,,,
sample_superstore.superstore_bronze.orders,CUSTOMER_ID,6,string,9994,0,9994,793,0.0,"[{""WB-21850"": 37}, {""PP-18955"": 34}, {""MA-17560"": 34}, {""JL-15835"": 34}, {""SV-20365"": 32}, {""JD-15895"": 32}]",8,8,8.0,,,,,,,,,,,
sample_superstore.superstore_bronze.orders,CUSTOMER_NAME,7,string,9994,0,9994,793,0.0,"[{""William Brown"": 37}, {""Paul Prost"": 34}, {""Matt Abelman"": 34}, {""John Lee"": 34}, {""Seth Vernon"": 32}, {""Jonathan Doherty"": 32}]",7,22,12.96,,,,,,,,,,,
sample_superstore.superstore_bronze.orders,SEGMENT,8,string,9994,0,9994,3,0.0,"[{""Consumer"": 5191}, {""Corporate"": 3020}, {""Home Office"": 1783}]",8,11,8.84,,,,,,,,,,,
sample_superstore.superstore_bronze.orders,COUNTRY,9,string,9994,0,9994,1,0.0,"[{""United States"": 9994}]",13,13,13.0,,,,,,,,,,,
sample_superstore.superstore_bronze.orders,CITY,10,string,9994,0,9994,531,0.0,"[{""New York City"": 915}, {""Los Angeles"": 747}, {""Philadelphia"": 537}, {""San Francisco"": 510}, {""Seattle"": 428}, {""Houston"": 377}]",4,17,9.33,,,,,,,,,,,


In [0]:
completeness_df = consolidated_df.groupBy("TABLE_NAME").agg(
    F.sum(F.col("NON_NULL_COUNT")).alias("NON_NULL_COUNT_SUM"),
    F.sum(F.col("TOTAL_COUNT")).alias("TOTAL_COUNT_SUM")
).withColumn(
    "DATA_COMPLETENESS",
    (F.col("NON_NULL_COUNT_SUM") / F.col("TOTAL_COUNT_SUM")) * 100
)

display(completeness_df)

TABLE_NAME,NON_NULL_COUNT_SUM,TOTAL_COUNT_SUM,DATA_COMPLETENESS
sample_superstore.superstore_bronze.orders,209874.0,209874.0,100.0
sample_superstore.superstore_bronze.returns,592.0,592.0,100.0
sample_superstore.superstore_bronze.people,8.0,8.0,100.0
