In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
emp_schema="id integer, name string, age integer, salary integer, address string, nominee string"

In [0]:

# due to corrupt record it will fail in FAILFAST
df = spark.read.format("csv") \
  .schema(emp_schema) \
  .option("mode","FAILFAST") \
  .option("header", 'false') \
  .load("/FileStore/tables/new_62.csv")

display(df)

In [0]:

# due to corrupt record it will drop bad records in DROPMALFORMED
df1 = spark.read.format("csv") \
  .schema(emp_schema) \
  .option("mode","DROPMALFORMED") \
  .option("header", 'false') \
  .load("/FileStore/tables/new_62.csv")

display(df1)

id,name,age,salary,address,nominee
1,Manish,26,75000,bihar,nominee1
2,Nikita,23,100000,uttarpradesh,nominee2
5,Vikash,31,300000,,nominee5


In [0]:
# In permissive, null records will be ignored
df2 = spark.read.format("csv") \
  .schema(emp_schema) \
  .option("mode","PERMISSIVE") \
  .option("header", 'false') \
  .load("/FileStore/tables/new_62.csv")

display(df2)

id,name,age,salary,address,nominee
1,Manish,26,75000,bihar,nominee1
2,Nikita,23,100000,uttarpradesh,nominee2
3,Pritam,22,150000,Bangalore,India
4,Prantosh,17,200000,Kolkata,India
5,Vikash,31,300000,,nominee5


In [0]:
#lets check which records are corrupt

emp_schema_corrupt="id integer, name string, age integer, salary integer, address string, nominee string, _corrupt_record string"

df2 = spark.read.format("csv") \
  .schema(emp_schema_corrupt) \
  .option("mode","PERMISSIVE") \
  .option("header", 'false') \
  .load("/FileStore/tables/new_62.csv")

display(df2)



id,name,age,salary,address,nominee,_corrupt_record
1,Manish,26,75000,bihar,nominee1,
2,Nikita,23,100000,uttarpradesh,nominee2,
3,Pritam,22,150000,Bangalore,India,"3,Pritam,22,150000,Bangalore,India,nominee3"
4,Prantosh,17,200000,Kolkata,India,"4,Prantosh,17,200000,Kolkata,India,nominee4"
5,Vikash,31,300000,,nominee5,


In [0]:
#uploading bad records to a path

df2 = spark.read.format("csv") \
  .schema(emp_schema_corrupt) \
  .option("header", 'false') \
  .option("badRecordsPath","/FileStore/tables/emp_badrecord/") \
  .load("/FileStore/tables/new_62.csv")

In [0]:
df2.show()


+---+------+---+------+------------+--------+---------------+
| id|  name|age|salary|     address| nominee|_corrupt_record|
+---+------+---+------+------------+--------+---------------+
|  1|Manish| 26| 75000|       bihar|nominee1|           null|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|           null|
|  5|Vikash| 31|300000|        null|nominee5|           null|
+---+------+---+------+------------+--------+---------------+



In [0]:
dbutils.fs.ls("/FileStore/tables/emp_badrecord/20240110T015623/bad_records/part-00000-95621afe-31bb-4f53-ade0-75b51e687830/")

Out[26]: [FileInfo(path='dbfs:/FileStore/tables/emp_badrecord/20240110T015623/bad_records/part-00000-95621afe-31bb-4f53-ade0-75b51e687830', name='part-00000-95621afe-31bb-4f53-ade0-75b51e687830', size=480, modificationTime=1704851820000)]

In [0]:
bad_records=spark.read.json("/FileStore/tables/emp_badrecord/20240110T015623/bad_records/part-00000-95621afe-31bb-4f53-ade0-75b51e687830/")
bad_records.show(truncate=False)

+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|path                             |reason                                                                                                                          |record                                     |
+---------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|dbfs:/FileStore/tables/new_62.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pritam,22,150000,Bangalore,India,nominee3|3,Pritam,22,150000,Bangalore,India,nominee3|
|dbfs:/FileStore/tables/new_62.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 4,Prantosh,17,200000,Kolkata,India,nominee4|