In [0]:
%run "./Util/Config"

In [0]:
%run "./Clean"

Config Schema:
```json
{
  "sourceFilePath": "xxx",
  "sourceFileType": "xxx",
  "targetFilePath": "xxx",
  "cleanseObject": [
    {
      "type": "Clean or Validate",
      "task": "functionName",
      "inputs":
        {
          "columns": ["list", "of", "column", "names"],
          "ErrorAction": "xxx",
          "e.t.c": 0
        }
    },
    {
      "type": "Clean or Validate",
      "task": "functionName",
      "inputs":
        {
          "columns": ["list", "of", "column", "names"],
          "ErrorAction": "xxx",
          "e.t.c": 0
        }
    }
  ]
}
```

In [0]:
# Used to remove _SUCCESS, _COMMITTED files 
spark.conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
spark.conf.set("parquet.enable.summary-metadata","false")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")

In [0]:
from pyspark.sql.types import *

def wrapper(config: dict):
  clean = Clean()
  
  log4jLogger = sc._jvm.org.apache.log4j
  LOGGER = log4jLogger.LogManager.getLogger(__name__)
  LOGGER.info("pyspark script logger initialized")
  sourceFilePath = f"abfss://datalakestore@{spark.conf.get('storageAccountDataLake')}.dfs.core.windows.net/{config['sourceFilePath']}"
  LOGGER.info(f'Reading file from source....File Path: {sourceFilePath}')
  df = spark.read.format(config["sourceFileType"]).load(sourceFilePath)
  LOGGER.info(f'Successfully read from {sourceFilePath}')
  
  error_fields = [StructField("__DataProcessingError__", StringType(), True)] + [StructField("__Function__", StringType(), True)] + [StructField("__CleanseTimestamp__", TimestampType(), True)]
  df_errors_schema = StructType(df.schema.fields + error_fields)
  
  df_errors = spark.createDataFrame([], df_errors_schema)
  
  for cleanseObject in config["cleanseObject"]: 
    # make input keys lower
    inputs = dict((k.lower(), v) for k,v in cleanseObject["inputs"].items())
    LOGGER.info(f'Running {cleanseObject["task"]} with inputs: {inputs}')
    if cleanseObject["type"].lower() == "clean":
      func = getattr(clean, cleanseObject["task"].lower())
      df, df_errors = func(df, df_errors, **inputs)
      
      # TODO: Validate functions 
#     if cleanseObject["type"].lower() == "validate": 
#       func = getattr(validate, cleanseObject["task"].lower())
#       func(df, df_errors, **cleanseObject["inputs"])
  
  # Write dataframes to lake 
  
  targetFilePath = f"abfss://datalakestore@{spark.conf.get('storageAccountDataLake')}.dfs.core.windows.net/{config['targetFilePath']}"
  LOGGER.info(f'Writing file (replace) as type({config["targetFileType"]}) to....File Path: {targetFilePath}')
  df.write.mode("overwrite").format(config["targetFileType"]).save(targetFilePath) # replace dataset
  LOGGER.info(f'Successfully written (replace) as type({config["targetFileType"]}) to {targetFilePath}')
  
  targetErrorFilePath = f"abfss://datalakestore@{spark.conf.get('storageAccountDataLake')}.dfs.core.windows.net/{config['targetErrorFilePath']}"
  LOGGER.info(f'Writing file (append) as type({config["targetErrorFileType"]}) to....File Path: {targetErrorFilePath}')
  df_errors.write.mode("append").format(config["targetErrorFileType"]).save(targetErrorFilePath) # append dataset 
  LOGGER.info(f'Successfully written (append) as type({config["targetErrorFileType"]}) to {targetErrorFilePath}')
      
  return df, df_errors

In [0]:
import json
# connect to datalake 

dbutils.widgets.text("ScriptConfig", '{"sourceFilePath":"Delta/WideWorldImportersOLTP/Sales/Customers","sourceFileType":"delta","targetFilePath":"Silver/WideWorldImportersOLTP/Sales/Customers","targetFileType":"parquet","targetErrorFilePath":"SilverError/WideWorldImportersOLTP/Sales/Customers","targetErrorFileType":"parquet","cleanseObject":[{"type":"clean","task":"cln_title_case","inputs":{"columns":["CustomerName", "AccountOpenedDate", "BillToCustomerID"],"error_action":"continue_and_null_value"}},{"type":"clean","task":"cln_regex_replace","inputs":{"columns":["PhoneNumber","FaxNumber"],"regex_exp":"\\((.*?)\\)","replace_string":"","error_action":"continue_and_null_value"}},{"type":"clean","task":"cln_upper_case","inputs":{"columns":["DeliveryAddressLine2","PaymentDays"],"error_action":"continue_and_null_value"}}]}',"")
config = getArgument("ScriptConfig")
rawConfig = config.encode('unicode_escape')
jsonConfig = json.loads(rawConfig)

df_cleaned, df_errors = wrapper(jsonConfig)