In [0]:
dbutils.fs.put('dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv', """CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12,Channel total,1
9,Tele Sales,Direct,12,Channel total,1
5,Catalog,Indirect,13,Channel total,1
4,Internet,Indirect,13,Channel total,1
2,Partners,Others,14,Channel total,1
sample,Partners,Others,14,Channel total,1
10 Partners Others 14 Channel total 1
11 Partners Others 14 Channel total 1""")

Wrote 397 bytes.
Out[1]: True

In [0]:
from pyspark.sql.types import *

In [0]:
schema_channels = StructType([StructField('CHANNEL_ID', IntegerType(), True), 
                              StructField('CHANNEL_DESC', StringType(), True), 
                              StructField('CHANNEL_CLASS', StringType(), True), 
                              StructField('CHANNEL_CLASS_ID', IntegerType(), True), 
                              StructField('CHANNEL_TOTAL', StringType(), True), 
                              StructField('CHANNEL_TOTAL_ID', IntegerType(), True)])

df_channels = spark.read.csv(path='dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv', header=True, inferSchema=True)
df_channels.schema

Out[5]: StructType([StructField('CHANNEL_ID', StringType(), True), StructField('CHANNEL_DESC', StringType(), True), StructField('CHANNEL_CLASS', StringType(), True), StructField('CHANNEL_CLASS_ID', IntegerType(), True), StructField('CHANNEL_TOTAL', StringType(), True), StructField('CHANNEL_TOTAL_ID', IntegerType(), True)])

In [0]:
display(df_channels)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12.0,Channel total,1.0
9,Tele Sales,Direct,12.0,Channel total,1.0
5,Catalog,Indirect,13.0,Channel total,1.0
4,Internet,Indirect,13.0,Channel total,1.0
2,Partners,Others,14.0,Channel total,1.0
sample,Partners,Others,14.0,Channel total,1.0
10 Partners Others 14 Channel total 1,,,,,
11 Partners Others 14 Channel total 1,,,,,


In [0]:
#'PERMISSIVE', allows bad data
df_channels = spark.read.csv(path='dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv', schema=schema_channels,header=True, mode='PERMISSIVE')
display(df_channels)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3.0,Direct Sales,Direct,12.0,Channel total,1.0
9.0,Tele Sales,Direct,12.0,Channel total,1.0
5.0,Catalog,Indirect,13.0,Channel total,1.0
4.0,Internet,Indirect,13.0,Channel total,1.0
2.0,Partners,Others,14.0,Channel total,1.0
,Partners,Others,14.0,Channel total,1.0
,,,,,
,,,,,


In [0]:
#'FAILFAST', raise exception when bad data found
df_channels = spark.read.csv(path='dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv', schema=schema_channels,header=True, mode='FAILFAST')
display(df_channels)

In [0]:
#'DROPMALFORMED', removes bad data, consumes much time to identify bad data
df_channels = spark.read.csv(path='dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv', schema=schema_channels,header=True, mode='DROPMALFORMED')
display(df_channels)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12,Channel total,1
9,Tele Sales,Direct,12,Channel total,1
5,Catalog,Indirect,13,Channel total,1
4,Internet,Indirect,13,Channel total,1
2,Partners,Others,14,Channel total,1


In [0]:
#
df_channels = spark.read.option('badRecordsPath','/FileStore/badRecord').csv(path='dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv', schema=schema_channels,header=True)
display(df_channels)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12,Channel total,1
9,Tele Sales,Direct,12,Channel total,1
5,Catalog,Indirect,13,Channel total,1
4,Internet,Indirect,13,Channel total,1
2,Partners,Others,14,Channel total,1


In [0]:
%fs ls /FileStore/badRecord/20230116T224231/bad_records/

path,name,size,modificationTime
dbfs:/FileStore/badRecord/20230116T224231/bad_records/part-00000-acaa83ed-f395-49ad-a661-780bfb6c4c9c,part-00000-acaa83ed-f395-49ad-a661-780bfb6c4c9c,603,1673908952000


In [0]:
display(spark.read.json(path='dbfs:/FileStore/badRecord/20230116T224231/bad_records/part-00000-acaa83ed-f395-49ad-a661-780bfb6c4c9c'))

path,reason,record
dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv,"java.lang.NumberFormatException: For input string: ""sample""","sample,Partners,Others,14,Channel total,1"
dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv,org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record,10 Partners Others 14 Channel total 1
dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv,org.apache.spark.sql.catalyst.csv.MalformedCSVException: Malformed CSV record,11 Partners Others 14 Channel total 1


In [0]:
#Other method
schema_channels = StructType([StructField('CHANNEL_ID', IntegerType(), True), 
                              StructField('CHANNEL_DESC', StringType(), True), 
                              StructField('CHANNEL_CLASS', StringType(), True), 
                              StructField('CHANNEL_CLASS_ID', IntegerType(), True), 
                              StructField('CHANNEL_TOTAL', StringType(), True), 
                              StructField('CHANNEL_TOTAL_ID', IntegerType(), True),
                             StructField('BadData', StringType(), True)])

df_channels = spark.read.csv(path='dbfs:/FileStore/spark_RealTimeScenarioData/channels.csv', schema=schema_channels, header=True, columnNameOfCorruptRecord='BadData')
display(df_channels)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID,BadData
3.0,Direct Sales,Direct,12.0,Channel total,1.0,
9.0,Tele Sales,Direct,12.0,Channel total,1.0,
5.0,Catalog,Indirect,13.0,Channel total,1.0,
4.0,Internet,Indirect,13.0,Channel total,1.0,
2.0,Partners,Others,14.0,Channel total,1.0,
,Partners,Others,14.0,Channel total,1.0,"sample,Partners,Others,14,Channel total,1"
,,,,,,10 Partners Others 14 Channel total 1
,,,,,,11 Partners Others 14 Channel total 1


In [0]:
good_data = df_channels.filter('BadData is null').drop('BadData')
bad_data = df_channels.filter('BadData is not null').select('BadData')

In [0]:
display(good_data)

CHANNEL_ID,CHANNEL_DESC,CHANNEL_CLASS,CHANNEL_CLASS_ID,CHANNEL_TOTAL,CHANNEL_TOTAL_ID
3,Direct Sales,Direct,12,Channel total,1
9,Tele Sales,Direct,12,Channel total,1
5,Catalog,Indirect,13,Channel total,1
4,Internet,Indirect,13,Channel total,1
2,Partners,Others,14,Channel total,1
