In [1]:
# Installing required packages
try:

  dbutils.library.installPyPI("koalas")
  dbutils.library.installPyPI("dateparser")
  dbutils.library.installPyPI("datefinder")

except:
  pass

# Importing required libraries
import dateparser, datefinder, databricks.koalas as ks, pandas as pd, numpy as np
from pyspark.sql.types import TimestampType,IntegerType,DoubleType,StringType
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, sum, stddev_pop, mean, count, countDistinct, variance, when, isnan, isnull

# Initializing Variables

file_name = "dbfs:/FileStore/tables/Sample_Data.csv"
dp = lambda x: dateparser.parse(x)
dfp = lambda x: datefinder.parser.parse(x)
inf = lambda x: int(x)
fl = lambda x: float(x)
str1 = lambda x: str(x)
metrics=[]

# Creating Spark Session
def spark_session():
    
    conf = SparkConf().set('spark.driver.host', '127.0.0.1')
    session = SparkSession.builder.config(conf=conf).master("local").appName("cloud-dq-tests").getOrCreate()
    return session

# Reading Data
def read_data(session):
    
    dataframe = session.read.csv(file_name,header="true")
    return dataframe

# Inferring DataTypes of columns on a random population sample -> Return inferred datatypes for corresponding columns
def infer_types(dataframe):
  
  date_cols=[]
  dataframe = dataframe.sample(False,0.1)
  
  # Using Koalas for inferring datatypes
  ks_dataframe = ks.DataFrame(dataframe)
  
  for i in list(ks_dataframe.columns):
    
    # Checking if column has integer values
    try:
      ks_dataframe[i].apply(inf)
      date_cols.append("Integer")
      continue
      
    except:

      # Checking if column has double or floating point values
      try:
        ks_dataframe[i].apply(fl)
        date_cols.append("Double")
        continue
        
      except:
        pass

      try:
        
        # Checking if column has date or timestamp values
        try:
          # Parsing datetime values using dateparser package
          ks_dataframe[i]=ks_dataframe[i].apply(dp)
          
        except:
          # Parsing datetime values using datefinder package
          ks_dataframe[i]=ks_dataframe[i].apply(dfp)
          
        date_cols.append("Date")
        continue

      # By Default: Column has string values
      except:
        
        date_cols.append("String")
        continue
      
  return date_cols

# Converting Datatypes of columns to inferred datatypes -> Returns new dataframe with inferred datatypes
def convert_datatypes(dataframe,date_cols):
  
  for i,column in enumerate(dataframe.columns):
    
    # If inferred data type is string, convert column to String type
    if(date_cols[i]=="String"):
      dataframe = dataframe.withColumn(column, col(column).cast(StringType()))
      
    # If inferred data type is Integer, convert column to Integer type
    elif(date_cols[i]=="Integer"):
      dataframe = dataframe.withColumn(column, col(column).cast(IntegerType()))
      
    # If inferred data type is Double, convert column to Double type
    elif(date_cols[i]=="Double"):
      dataframe = dataframe.withColumn(column, col(column).cast(DoubleType()))
      
    # If inferred data type is Datetime/Timestamp, convert column to Datetime/Timestamp type
    else:
      dataframe = dataframe.withColumn(column, col(column).cast(TimestampType()))
      
  return dataframe

# Returns of sum of Numeric Column
def sum_num(dataframe,column):
  return dataframe.select(sum(column)).first()[0]

# Returns of mean of Numeric Column
def mean_num(dataframe,column):
  return dataframe.select(mean(column)).first()[0]
  
# Returns of minimum date of Timestamp Column
def minimum_date(dataframe, column):
  return dataframe.select(min(column)).first()[0].isoformat()

# Returns of minimum length of String Column
def minimum_string(dataframe,column):
  return len(dataframe.select(min(column)).first()[0])

# Returns of minimum value of Numeric Column
def minimum_num(dataframe,column):
  return dataframe.select(min(column)).first()[0]

# Returns of maximum value of Numeric Column
def maximum_num(dataframe,column):
  return dataframe.select(max(column)).first()[0]

# Returns of Maximum date of Timestamp Column
def maximum_date(dataframe, column):
  return dataframe.select(max(column)).first()[0].isoformat()

# Returns of Maximum length of String Column
def maximum_string(dataframe,column):
  return len(dataframe.select(max(column)).first()[0])

# Returns total number of records in a column
def count_records(dataframe,column):
  return dataframe.select(column).count()

# Returns distinct number of records in a column
def countdistinct(dataframe,column):
  return dataframe.select(countDistinct(column)).first()[0]

# Returns number of records having NULL / NAN values in a column
def countnull(dataframe,column):
  return dataframe.select(count(when(isnull(column) | isnan(column),column))).first()[0]
 
# Returns number of records having NULL values in a Datetime column
def countnulldate(dataframe,column):
  return dataframe.select(count(when(isnull(column),column))).first()[0]  
  
# Returns of Standard Deviation of Numeric Column
def std_dev(dataframe,column):
  return dataframe.select(stddev_pop(column)).first()[0]

# Returns of Variance of Numeric Column
def variance_num(dataframe,column):
  return dataframe.select(variance(column)).first()[0]
  
# Returns of Quantiles (5th, 25th, 50th, 75th, 95th) of Numeric Column
def quantiles(dataframe,column):
  return dataframe.approxQuantile(column,[0.05,0.25,0.5,0.75,0.95],0.0)    

# Creating Spark Session
session=spark_session()

# Reading data
df=read_data(session)

# Inferring Datatypes of Dataframe's columns
date_cols = infer_types(df)

# Converting Dataframe's columns into inferred datatypes
df = convert_datatypes(df,date_cols)

# Checking datatype of new column
for val in df.dtypes:
  
  # Adding new metrics for every column
  metric={}
  
  # If datatype is 'String'
  if(val[1]=='string'):
    
    # Computes the required metrics for a string datatype column
    record_count=count_records(df,val[0])
    distinct_record_count = countdistinct(df,val[0])
    record_null_count = countnull(df,val[0])
    completeness = round(((record_count - record_null_count)/record_count),3)
    min1=minimum_string(df,val[0])
    max1=maximum_string(df,val[0])
    
    # Storing computed metrics in a dictionary for that column
    metric['Column'] = val[0]
    metric['Number_of_Records'] = record_count
    metric['Distinct_Number_of_Records'] = distinct_record_count
    metric['Number_of_Null_Records'] = record_null_count
    metric['Completeness'] = completeness
    metric['Minimum_Length'] = min1
    metric['Maximum_Length'] = max1
    metrics.append(metric)
  
  
  # If datatype is 'TimeStamp'
  elif(val[1]=='timestamp'):
    
    # Computes the required metrics for a TimeStamp datatype column
    record_count=count_records(df,val[0])
    distinct_record_count = countdistinct(df,val[0])
    record_null_count = countnulldate(df,val[0])
    completeness = round(((record_count - record_null_count)/record_count), 3)
    min1=minimum_date(df,val[0])
    max1=maximum_date(df,val[0])
    
    # Storing computed metrics in a dictionary for that column
    metric['Column'] = val[0]
    metric['Number_of_Records'] = record_count
    metric['Distinct_Number_of_Records'] = distinct_record_count
    metric['Number_of_Null_Records'] = record_null_count
    metric['Completeness'] = completeness
    metric['Minimum_Date'] = min1
    metric['Maximum_Date'] = max1
    metrics.append(metric)
    
  
  # If datatype is 'Numeric', i.e. , Integer or Double
  elif(val[1]=='int' or val[1] == 'double'):
    
    # Computes the required metrics for a Numeric datatype column
    record_count=count_records(df,val[0])
    distinct_record_count = countdistinct(df,val[0])
    record_null_count = countnull(df,val[0])
    completeness = round(((record_count - record_null_count)/record_count) ,3)
    sum1=round(sum_num(df,val[0]),3)
    mean1=round(mean_num(df,val[0]),3)
    min1=round(minimum_num(df,val[0]),3)
    max1=round(maximum_num(df,val[0]),3)
    range1 = round(max1 - min1 , 3)
    stddev1=round(std_dev(df,val[0]),3)
    variance1=round(variance_num(df,val[0]),3)
    percentile_5, percentile_25, percentile_50, percentile_75, percentile_95 = [round(j,3) for j in quantiles(df,val[0])]
    
    # Storing computed metrics in a dictionary for that column
    metric['Column'] = val[0]
    metric['Number_of_Records'] = record_count
    metric['Distinct_Number_of_Records'] = distinct_record_count
    metric['Number_of_Null_Records'] = record_null_count
    metric['Completeness'] = completeness
    metric['Sum'] = sum1
    metric['Mean'] = mean1
    metric['Minimum'] = min1
    metric['Maximum'] = max1
    metric['Range'] = range1
    metric['Standard_Deviation'] = stddev1
    metric['Variance'] = variance1
    metric['5th_Quartile'] = percentile_5
    metric['25th_Quartile'] = percentile_25
    metric['50th_Quartile'] = percentile_50
    metric['75th_Quartile'] = percentile_75
    metric['95th_Quartile'] = percentile_95
    metric['95th_Quartile'] = percentile_95
    metrics.append(metric)
   
# Printing Computed Data Profiling Metrics
print("\nMetrics: \n")
for i in metrics:
  print(i)
  print("\n")