In [1]:
import sys
import datetime, time
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,TimestampType, DateType


def get_null_perc(spark, df, null_cols):
    """ Get null/empty percentage for columns

    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform null/empty analysis on
        null_cols (List): list of columns that need to be considered for analysis

    Returns:
        DataFrame: dataframe with null check analysis
    """
    schema = StructType([ \
        StructField("Column",StringType(),True), \
        StructField("NullPercentage",StringType(),True)
      ])
    emptyRDD = spark.sparkContext.emptyRDD()
    resultdf = spark.createDataFrame(emptyRDD, schema=schema)
    
    for x in null_cols:
    	df_null_count = df.select(F.col(x)).filter(F.col(x).isNull() | (F.col(x) == '')).count()
    	df_null = spark.createDataFrame([[x, str(df_null_count*100.0/df.count()) + '%' ]],schema=schema)
    	resultdf = resultdf.union(df_null)

    return resultdf

def get_summary_numeric(df, numeric_cols):
    """ Get Summary for numeric columns

    Args:
        df (DataFrame): dataframe to perform analysis on
        numeric_cols (List): list of columns that need to be considered for analysis

    Returns:
        DataFrame: dataframe with summary analysis
    """

    return df.select(numeric_cols).summary()

def get_distinct_counts(spark, df, aggregate_cols):
    """ Get distinct count for columns

    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform distinct count analysis on
        aggregate_cols (List): list of columns that need to be considered for analysis

    Returns:
        DataFrame: dataframe with distinct count analysis
    """
    schema = StructType([ \
        StructField("Column",StringType(),True), \
        StructField("DistinctCount",StringType(),True)
      ])
    
    emptyRDD = spark.sparkContext.emptyRDD()
    resultdf = spark.createDataFrame(emptyRDD, schema=schema)
    
    for x in aggregate_cols:
    	df_distinct_count = df.select(F.col(x)).distinct().count()
    	df_distinct = spark.createDataFrame([[x, str(df_distinct_count)]],schema=schema)
    	resultdf = resultdf.union(df_distinct)

    return resultdf

def get_distribution_counts(spark, df, aggregate_cols):
    """ Get Distribution Counts for columns

    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform null/empty analysis on
        aggregate_cols (List): list of columns that need to be considered for analysis

    Returns:
        Array: Array of objects with dataframes
    """
    result = []
    for i in aggregate_cols:
    	result.append(df.groupby(F.col(i)).count().sort(F.col("count").desc()))
    ###
    
    return result

def get_mismatch_perc(spark, df, data_quality_cols_regex):
    """ Get Mismatch Percentage for columns

    Args:
        spark (Spark): SparkSession object
        df (DataFrame): dataframe to perform null/empty analysis on
        data_quality_cols_regex (Dictionary): Dictionary of columns/regex-expression for data quality analysis

    Returns:
        DataFrame: DataFrame with data quality analysis
    """
    schema = StructType([ \
        StructField("Column",StringType(),True), \
        StructField("MismatchPercentage",StringType(),True)
      ])
    
    emptyRDD = spark.sparkContext.emptyRDD()
    resultdf = spark.createDataFrame(emptyRDD, schema=schema)
    
    
    for key, value in data_quality_cols_regex.items():
    	df_regex_not_like_count = df.select(F.col(key)).filter(~F.col(key).rlike(value)).count()
    	df_regex_not_like = spark.createDataFrame([[key, str(df_regex_not_like_count*100.0/df.count()) + '%']],schema=schema)
    	resultdf = resultdf.union(df_regex_not_like)
    
    return resultdf

StatementMeta(SparkPool01, 12, 1, Finished, Available)

In [None]:
# Load tables for data quality checks 

df = spark.read.load('<ADLG2>', format='format'
## If header exists uncomment line below
, header=True
)
display(df.limit(10))

In [None]:
# This is a master schema of all columns name and type. below is an example for Sales.Orders Table
schema = StructType([
		 	StructField(name='OrderID', dataType=IntegerType(), nullable=False),
		 	StructField(name='CustomerID', dataType=IntegerType(), nullable=False),
		 	StructField(name='SalespersonPersonID', dataType=IntegerType(), nullable=True),
            StructField(name='PickedByPersonID', dataType=IntegerType(), nullable=True),
            StructField(name='ContactPersonID', dataType=IntegerType(), nullable=True),
            StructField(name='BackorderOrderID', dataType=IntegerType(), nullable=True),
		 	StructField(name='OrderDate', dataType=DateType(), nullable=False),
		 	StructField(name='ExpectedDeliveryDate', dataType=DateType(), nullable=True),
		 	StructField(name='CustomerPurchaseOrderNumber', dataType=StringType(), nullable=True),
            StructField(name='IsUndersupplyBackordered', dataType=StringType(), nullable=True),
            StructField(name='Comments', dataType=StringType(), nullable=True),
		 	StructField(name='DeliveryInstructions', dataType=StringType(), nullable=True),
            StructField(name='InternalComments', dataType=StringType(), nullable=True),
		 	StructField(name='PickingCompletedWhen', dataType=TimestampType(), nullable=True),
		 	StructField(name='LastEditedBy', dataType=IntegerType(), nullable=True),
		 	StructField(name='LastEditedWhen', dataType=TimestampType(), nullable=True)
		 	])

In [11]:
null_cols = ['Email']
numeric_cols = ['Phone']
aggregate_cols = ['Email']
data_quality_cols_regex = {'age': '^[0-99]{1,2}$', 'first_name': '^[a-zA-Z]*$', 'gender': '^M(ale)?$|^F(emale)?$', 'Email':'/^([a-zA-Z0-9._%-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6})*$/'}
result_limit = 10

StatementMeta(SparkPool01, 12, 11, Finished, Available)

In [15]:
null_cols = ['OrderID','CustomerID','OrderDate']
numeric_cols = ['OrderID', 'CustomerID','SalespersonPersonID','PickedByPersonID','ContactPersonID','BackorderOrderID']
aggregate_cols = ['OrderID',]
#data_quality_cols_regex = {'age': '^[0-99]{1,2}$', 'first_name': '^[a-zA-Z]*$', 'gender': '^M(ale)?$|^F(emale)?$', 'Email':'/^([a-zA-Z0-9._%-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6})*$/'}
data_quality_cols_regex = {'Gender': '^M(ale)?$|^F(emale)?$', 'Email':'/^([a-zA-Z0-9._%-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6})*$/'}
result_limit = 10

StatementMeta(SparkPool01, 12, 15, Finished, Available)

In [None]:
### 1. NULL Checks
resultdf = get_null_perc(spark, df, null_cols)
print("NULL/Empty Percentage for Columns")
resultdf.show(result_limit, False)

In [None]:
###2. Summary, Average, Standard Deviation, Percentiles for Numeric Columns
resultdf = get_summary_numeric(df, numeric_cols)
print("Summary for Numeric Columns")
resultdf.show(result_limit, False)

In [None]:
###3. Distinct Count
print("Distinct Counts for Aggregate Columns")
resultdf = get_distinct_counts(spark, df, aggregate_cols)
resultdf.show(result_limit, False)

In [None]:

###4. Distribution Count
print("Distribution Count for Aggregate Columns")
result = get_distribution_counts(spark, df, aggregate_cols)
for i in result:
	print("======== Distribution for - " + i.columns[0] + " ========")
	i.show(result_limit, False)


In [None]:
###5. Data Quality
print("Data Quality Issue Percentage for Columns")
resultdf = get_mismatch_perc(spark, df, data_quality_cols_regex)
resultdf.show(result_limit, False)