In [0]:
spark

In [0]:
# PERMISSIVE example

employee_df =spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("mode","PERMISSIVE")\
    .load("/FileStore/tables/emlpoyee.csv")

employee_df.show()

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
# DROPMALFORMED example

employee_df =spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("mode","DROPMALFORMED")\
    .load("/FileStore/tables/emlpoyee.csv")

employee_df.show()

+---+------+---+------+------------+--------+
| id|  name|age|salary|     address| nominee|
+---+------+---+------+------------+--------+
|  1|Manish| 26| 75000|       bihar|nominee1|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|
|  5|Vikash| 31|300000|        null|nominee5|
+---+------+---+------+------------+--------+



In [0]:
# FAILFAST: example <in this throws error because of bad record present in file

employee_df =spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("mode","FAILFAST")\
    .load("/FileStore/tables/emlpoyee.csv")

employee_df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-4498642543346243>:9[0m
[1;32m      1[0m [38;5;66;03m# DROPMALFORMED example[39;00m
[1;32m      3[0m employee_df [38;5;241m=[39mspark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      4[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      5[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      6[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m)\
[1;32m      7[0m     [38;5;241m.[39mload([38;5;124m"[39m[38;5;124m

In [0]:
from pyspark.sql.types import StringType,StructType,StructField,IntegerType

In [0]:

#id,name,age,salary,address,nominee

emp_schema =StructType([
    StructField("id",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("age",StringType(),True,),
    StructField("salary",IntegerType(),True),
    StructField("address",StringType(),True,),
    StructField("nominee",StringType(),True),
    StructField("_corrupt_record",StringType(),True)

])



In [0]:

# PERMISSIVE example

employee_df =spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .schema(emp_schema)\
    .option("badRecordsPath","/FileStore/tables/bad_records")\
    .load("/FileStore/tables/emlpoyee.csv")

employee_df.show(truncate=False)


+---+------+---+------+------------+--------+---------------+
|id |name  |age|salary|address     |nominee |_corrupt_record|
+---+------+---+------+------------+--------+---------------+
|1  |Manish|26 |75000 |bihar       |nominee1|null           |
|2  |Nikita|23 |100000|uttarpradesh|nominee2|null           |
|5  |Vikash|31 |300000|null        |nominee5|null           |
+---+------+---+------+------------+--------+---------------+



In [0]:
%fs
ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121,1721349228000
dbfs:/FileStore/tables/bad_records/,bad_records/,0,0
dbfs:/FileStore/tables/emlpoyee.csv,emlpoyee.csv,230,1721434558000
dbfs:/FileStore/tables/flight_data-1.csv,flight_data-1.csv,7121,1721349620000
dbfs:/FileStore/tables/flight_data-2.csv,flight_data-2.csv,7121,1721354248000
dbfs:/FileStore/tables/flight_data.csv,flight_data.csv,7121,1721349519000
dbfs:/FileStore/tables/flight_data_1-1.csv,flight_data_1-1.csv,7069,1721431466000
dbfs:/FileStore/tables/flight_data_1-2.csv,flight_data_1-2.csv,7069,1721431556000
dbfs:/FileStore/tables/flight_data_1.csv,flight_data_1.csv,6729,1721431142000


In [0]:
%fs
ls /FileStore/tables/bad_records/20240720T005354/bad_records

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20240720T005354/bad_records/part-00000-fab1037b-35f6-4bf5-a5fa-88997affed05,part-00000-fab1037b-35f6-4bf5-a5fa-88997affed05,484,1721436836000


In [0]:
bad_data_df = spark.read.format("json").load("/FileStore/tables/bad_records/20240720T005354/bad_records")
bad_data_df.show(truncate=False)

+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|path                               |reason                                                                                                                          |record                                     |
+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|dbfs:/FileStore/tables/emlpoyee.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pritam,22,150000,Bangalore,India,nominee3|3,Pritam,22,150000,Bangalore,India,nominee3|
|dbfs:/FileStore/tables/emlpoyee.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 4,Prantosh,17,200000,Kolkata,India

In [0]:
bad_data_df.show()

+--------------------+--------------------+--------------------+
|                path|              reason|              record|
+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|org.apache.spark....|3,Pritam,22,15000...|
|dbfs:/FileStore/t...|org.apache.spark....|4,Prantosh,17,200...|
+--------------------+--------------------+--------------------+

