How to Handle Bad Data in pyspark dataframe

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
schema_channels = StructType([StructField('CHANNEL_ID', IntegerType(), True), 
                              StructField('CHANNEL_DESC', StringType(), True), 
                              StructField('CHANNEL_CLASS', StringType(), True), 
                              StructField('CHANNEL_CLASS_ID', IntegerType(), True), 
                              StructField('CHANNEL_TOTAL', StringType(), True), 
                              StructField('CHANNEL_TOTAL_ID', IntegerType(), True)])

Permissive Mode (default)

In [4]:
df_channels = spark.read\
    .option("mode", "PERMISSIVE")\
    .csv("/home/phillipefs/spark_dev/pyspark-real-time-scenarios/data/bad_records.csv",header=True, inferSchema=True)

df_channels.show(truncate=False)

+-------------------------------------+------------+-------------+----------------+-------------+----------------+
|CHANNEL_ID                           |CHANNEL_DESC|CHANNEL_CLASS|CHANNEL_CLASS_ID|CHANNEL_TOTAL|CHANNEL_TOTAL_ID|
+-------------------------------------+------------+-------------+----------------+-------------+----------------+
|3                                    |Direct Sales|Direct       |12              |Channel total|1               |
|9                                    |Tele Sales  |Direct       |12              |Channel total|1               |
|5                                    |Catalog     |Indirect     |13              |Channel total|1               |
|4                                    |Internet    |Indirect     |13              |Channel total|1               |
|2                                    |Partners    |Others       |14              |Channel total|1               |
|12                                   |Partners    |Others       |14            

Permissive With Schema

In [5]:
df_channels = spark.read\
    .schema(schema_channels)\
    .option("mode", "PERMISSIVE")\
    .csv("/home/phillipefs/spark_dev/pyspark-real-time-scenarios/data/bad_records.csv",header=True, inferSchema=True)

df_channels.show(truncate=False)

+----------+------------+-------------+----------------+-------------+----------------+
|CHANNEL_ID|CHANNEL_DESC|CHANNEL_CLASS|CHANNEL_CLASS_ID|CHANNEL_TOTAL|CHANNEL_TOTAL_ID|
+----------+------------+-------------+----------------+-------------+----------------+
|3         |Direct Sales|Direct       |12              |Channel total|1               |
|9         |Tele Sales  |Direct       |12              |Channel total|1               |
|5         |Catalog     |Indirect     |13              |Channel total|1               |
|4         |Internet    |Indirect     |13              |Channel total|1               |
|2         |Partners    |Others       |14              |Channel total|1               |
|12        |Partners    |Others       |14              |Channel total|1               |
|null      |Partners    |Others       |14              |Channel total|1               |
|null      |null        |null         |null            |null         |null            |
|null      |null        |null   

Fail Fast Mode

In [6]:
df_channels = spark.read\
    .schema(schema_channels)\
    .option("mode", "FAILFAST")\
    .csv("/home/phillipefs/spark_dev/pyspark-real-time-scenarios/data/bad_records.csv",header=True, inferSchema=True)

df_channels.show(truncate=False)

23/05/22 18:41:19 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catal

Py4JJavaError: An error occurred while calling o60.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (172.25.124.89 executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:330)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 23 more
Caused by: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedCSVRecordError(QueryExecutionErrors.scala:1222)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:298)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:330)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 23 more
Caused by: java.lang.RuntimeException: Malformed CSV record
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedCSVRecordError(QueryExecutionErrors.scala:1222)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:298)
	... 26 more


Mode DROPMALFORMED

In [7]:
df_channels = spark.read\
    .schema(schema_channels)\
    .option("mode", "DROPMALFORMED")\
    .csv("/home/phillipefs/spark_dev/pyspark-real-time-scenarios/data/bad_records.csv",header=True, inferSchema=True)

df_channels.show(truncate=False)

+----------+------------+-------------+----------------+-------------+----------------+
|CHANNEL_ID|CHANNEL_DESC|CHANNEL_CLASS|CHANNEL_CLASS_ID|CHANNEL_TOTAL|CHANNEL_TOTAL_ID|
+----------+------------+-------------+----------------+-------------+----------------+
|3         |Direct Sales|Direct       |12              |Channel total|1               |
|9         |Tele Sales  |Direct       |12              |Channel total|1               |
|5         |Catalog     |Indirect     |13              |Channel total|1               |
|4         |Internet    |Indirect     |13              |Channel total|1               |
|2         |Partners    |Others       |14              |Channel total|1               |
+----------+------------+-------------+----------------+-------------+----------------+



badRecordsPath

In [39]:
df_channels = spark.read\
    .schema(schema_channels)\
    .option("badRecordsPath", "/home/phillipefs/spark_dev/pyspark-real-time-scenarios/data/bad/")\
    .csv("/home/phillipefs/spark_dev/pyspark-real-time-scenarios/data/bad_records.csv",header=True)

df_channels.show(truncate=False)

+----------+------------+-------------+----------------+-------------+----------------+
|CHANNEL_ID|CHANNEL_DESC|CHANNEL_CLASS|CHANNEL_CLASS_ID|CHANNEL_TOTAL|CHANNEL_TOTAL_ID|
+----------+------------+-------------+----------------+-------------+----------------+
|3         |Direct Sales|Direct       |12              |Channel total|1               |
|9         |Tele Sales  |Direct       |12              |Channel total|1               |
|5         |Catalog     |Indirect     |13              |Channel total|1               |
|4         |Internet    |Indirect     |13              |Channel total|1               |
|2         |Partners    |Others       |14              |Channel total|1               |
|12        |Partners    |Others       |14              |Channel total|1               |
|null      |Partners    |Others       |14              |Channel total|1               |
|null      |null        |null         |null            |null         |null            |
|null      |null        |null   

columnNameOfCorruptRecord

In [48]:
schema_channels_bad_data = StructType([StructField('CHANNEL_ID', IntegerType(), True), 
                              StructField('CHANNEL_DESC', StringType(), True), 
                              StructField('CHANNEL_CLASS', StringType(), True), 
                              StructField('CHANNEL_CLASS_ID', IntegerType(), True), 
                              StructField('CHANNEL_TOTAL', StringType(), True), 
                              StructField('CHANNEL_TOTAL_ID', IntegerType(), True),
                              StructField('BadData', StringType(), False)])

In [49]:

df_channels = spark.read\
    .schema(schema_channels_bad_data)\
    .csv("/home/phillipefs/spark_dev/pyspark-real-time-scenarios/data/bad_records.csv",header=True,columnNameOfCorruptRecord="BadData")

df_channels.show(truncate=False)

+----------+------------+-------------+----------------+-------------+----------------+-----------------------------------------------------+
|CHANNEL_ID|CHANNEL_DESC|CHANNEL_CLASS|CHANNEL_CLASS_ID|CHANNEL_TOTAL|CHANNEL_TOTAL_ID|BadData                                              |
+----------+------------+-------------+----------------+-------------+----------------+-----------------------------------------------------+
|3         |Direct Sales|Direct       |12              |Channel total|1               |null                                                 |
|9         |Tele Sales  |Direct       |12              |Channel total|1               |null                                                 |
|5         |Catalog     |Indirect     |13              |Channel total|1               |null                                                 |
|4         |Internet    |Indirect     |13              |Channel total|1               |null                                                 |
|2    

In [73]:
df_channels.cache()
df_good = df_channels.filter("BadData is null").drop("BadData")
df_bad = df_channels.filter("BadData is not null").select("BadData")

In [75]:
df_good.show()

+----------+------------+-------------+----------------+-------------+----------------+
|CHANNEL_ID|CHANNEL_DESC|CHANNEL_CLASS|CHANNEL_CLASS_ID|CHANNEL_TOTAL|CHANNEL_TOTAL_ID|
+----------+------------+-------------+----------------+-------------+----------------+
|         3|Direct Sales|       Direct|              12|Channel total|               1|
|         9|  Tele Sales|       Direct|              12|Channel total|               1|
|         5|     Catalog|     Indirect|              13|Channel total|               1|
|         4|    Internet|     Indirect|              13|Channel total|               1|
|         2|    Partners|       Others|              14|Channel total|               1|
+----------+------------+-------------+----------------+-------------+----------------+



In [74]:
df_bad.show(truncate=False)

+-----------------------------------------------------+
|BadData                                              |
+-----------------------------------------------------+
|12,Partners,Others,14,Channel total,1,45,ram,3434    |
|sample,Partners,Others,14,Channel total,1,45,ram,3434|
|10 Partners Others 14 Channel total 1                |
|11 Partners Others 14 Channel total 1                |
+-----------------------------------------------------+

