## Working with Corrupt Data
* ETL pipelines need robust solutions to handle corrupt data. 
* This is because data corruption scales as the size of data and complexity of the data application grow. Corrupt data includes:


* __`Missing information`__
* __`Incomplete information`__
* __`Schema mismatch`__
* __`Differing formats or data types`__
* __`User errors when writing data producers`__

* Since ETL pipelines are built to be automated, production-oriented solutions must ensure pipelines behave as expected. This means that data engineers must both expect and systematically handle corrupt records.

## How to handle Bad Records in spark and those types?

* There are tree types of modes available while reading and creating dataframe.
* Dealing with bad Records Verify correctness of the data When reading CSV files with a specified schema, it is possible that the data in the files does not match the schema.
* __`PERMISSIVE (default)`__: nulls are inserted for fields that could not be parsed correctly
* __`DROPMALFORMED`__: drops lines that contain fields that could not be parsed
* __`FAILFAST`__: aborts the reading if any malformed data is found

In [0]:
dbutils.fs.put("/FileStore/tables/channels.csv","""CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12,Channel total,1
9,Tele Sales,Direct,12,Channel total,1
5,Catalog,Indirect,13,Channel total,1
4,Internet,Indirect,13,Channel total,1
2,Partners,Others,14,Channel total,1
12,Partners,Others,14,Channel total,1,45,ram,3434
sample,Partners,Others,14,Channel total,1,45,ram,3434
10 Partners Others 14 Channel total 1
11 Partners Others 14 Channel total 1""")

Wrote 459 bytes.
Out[1]: True

In [0]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, NoneType] = None, maxCharsPerColumn: Union[str, int, NoneType] = None, maxMalformedLogPerPartition: Union[str, int, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] 

In [0]:
from pyspark.sql.types import *
schema_channels = StructType([StructField('CHANNEL_ID', IntegerType(), True), 
                              StructField('CHANNEL_DESC', StringType(), True), 
                              StructField('CHANNEL_CLASS', StringType(), True), 
                              StructField('CHANNEL_CLASS_ID', IntegerType(), True), 
                              StructField('CHANNEL_TOTAL', StringType(), True), 
                              StructField('CHANNEL_TOTAL_ID', IntegerType(), True),
                              StructField('BadData', StringType(), True)])
df_channels = spark.read.schema(schema_channels).csv("/FileStore/tables/channels.csv",header=True,columnNameOfCorruptRecord="BadData")
display(df_channels)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID,BadData
3.0,Direct Sales,Direct,12.0,Channel total,1.0,
9.0,Tele Sales,Direct,12.0,Channel total,1.0,
5.0,Catalog,Indirect,13.0,Channel total,1.0,
4.0,Internet,Indirect,13.0,Channel total,1.0,
2.0,Partners,Others,14.0,Channel total,1.0,
12.0,Partners,Others,14.0,Channel total,1.0,"12,Partners,Others,14,Channel total,1,45,ram,3434"
,Partners,Others,14.0,Channel total,1.0,"sample,Partners,Others,14,Channel total,1,45,ram,3434"
,,,,,,10 Partners Others 14 Channel total 1
,,,,,,11 Partners Others 14 Channel total 1


In [0]:
df_channels.cache()
good_data = df_channels.filter("BadData is null").drop("BadData")
bad_data = df_channels.filter("BadData is not null").select("BadData")

In [0]:
display(bad_data)

BadData
"12,Partners,Others,14,Channel total,1,45,ram,3434"
"sample,Partners,Others,14,Channel total,1,45,ram,3434"
10 Partners Others 14 Channel total 1
11 Partners Others 14 Channel total 1


##### `Handling bad records and files`

* if any invalid data based schema. it will create timestamp based folder and place json log file
* it will store three fields,filename with path,error reason,errored data 
* Azure Databricks provides a unified interface for handling bad records and files without interrupting Spark jobs. 
* You can obtain the exception records/files and reasons from the exception logs by setting the data source option `badRecordsPath`. 
* `badRecordsPath` specifies a path to store exception files for recording the information about bad records for `CSV and JSON` sources and bad files for all the file-based built-in sources (for example, Parquet).

In [0]:
from pyspark.sql.types import *
schema_channels = StructType([StructField('CHANNEL_ID', IntegerType(), True), 
                              StructField('CHANNEL_DESC', StringType(), True), 
                              StructField('CHANNEL_CLASS', StringType(), True), 
                              StructField('CHANNEL_CLASS_ID', IntegerType(), True), 
                              StructField('CHANNEL_TOTAL', StringType(), True), 
                              StructField('CHANNEL_TOTAL_ID', IntegerType(), True)])
df_channels = spark.read.schema(schema_channels).option("badRecordsPath","/channels/badata/").csv("/FileStore/tables/channels.csv",header=True,columnNameOfCorruptRecord="BadData")
display(df_channels)

In [0]:
df_baddata = spark.read.json("/channels/badata/*/*/")
display(df_baddata)

path,reason,record
dbfs:/FileStore/tables/channels.csv,org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record,"12,Partners,Others,14,Channel total,1,45,ram,3434"
dbfs:/FileStore/tables/channels.csv,org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record,"sample,Partners,Others,14,Channel total,1,45,ram,3434"
dbfs:/FileStore/tables/channels.csv,org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record,10 Partners Others 14 Channel total 1
dbfs:/FileStore/tables/channels.csv,org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record,11 Partners Others 14 Channel total 1


In [0]:
%fs  head dbfs:/channels/badata/20220928T164614/bad_records/part-00000-946a209a-465a-46a3-acd3-e33adf9fd287

In [0]:
display(df_channels)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12.0,Channel total,1.0
9,Tele Sales,Direct,12.0,Channel total,1.0
5,Catalog,Indirect,13.0,Channel total,1.0
4,Internet,Indirect,13.0,Channel total,1.0
2,Partners,Others,14.0,Channel total,1.0
12,Partners,Others,14.0,Channel total,1.0
sample,Partners,Others,14.0,Channel total,1.0
10 Partners Others 14 Channel total 1,,,,,
11 Partners Others 14 Channel total 1,,,,,
