* I have a Azure Eventhub, which is streaming data (in JSON format). I read it as a Spark dataframe, parse the incoming "body" with from_json(col("body"), schema) where schema is pre-defined. In code it, looks like:

In [None]:
# Source - https://stackoverflow.com/q
# Posted by mLC, modified by community. See post 'Timeline' for change history
# Retrieved 2025-11-16, License - CC BY-SA 4.0

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType().add(...) # define the incoming JSON schema 

df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"), schema)
)


And now = if there is some inconsistency between the incoming JSON's schema and the defined schema (e.g. the source eventhub starts sending data in new format without notice), the from_json() functions will not throw an error = instead, it will put NULL to the fields, which are present in my schema definition but not in the JSONs eventhub sends.

I want to capture this information and log it somewhere (Spark's log4j, Azure Monitor, warning email, ...).

**My question is: what is the best way how to achieve this.**

Some of my thoughts:

* First thing I can think of is to have a UDF, which checks for the NULLs and if there is any problem, it raise an Exception. I believe there it is not possible to send logs to log4j via PySpark, as the "spark" context cannot be initiated within the UDF (on the workers) and one wants to use the default:

* log4jLogger = sc._jvm.org.apache.log4j logger = log4jLogger.LogManager.getLogger('PySpark Logger')

* Second thing I can think of is to use "foreach/foreachBatch" function and put this check logic there.

* But I feel both these approaches are like.. like too much custom - I was hoping that Spark has something built-in for these purposes.

In [None]:
It turns out I was mistaken thinking that columnNameOfCorruptRecord option could be an answer. It will not work.

In [None]:
# Source - https://stackoverflow.com/a
# Posted by Jacek Laskowski, modified by community. See post 'Timeline' for change history
# Retrieved 2025-11-16, License - CC BY-SA 4.0

case _: BadRecordException => null


* And secondly due to this that simply disables any other parsing modes (incl. PERMISSIVE that seems to be used alongside columnNameOfCorruptRecord option):

In [None]:
# Source - https://stackoverflow.com/a
# Posted by Jacek Laskowski, modified by community. See post 'Timeline' for change history
# Retrieved 2025-11-16, License - CC BY-SA 4.0

new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))


* In other words, your only option is to use the 2nd item in your list, i.e. foreach or foreachBatch and handle corrupted records yourself.

* A solution could use from_json while keeping the initial body column. Any record with an incorrect JSON would end up with the result column null and foreach* would catch it, e.g.

In [None]:
# Source - https://stackoverflow.com/a
# Posted by Jacek Laskowski, modified by community. See post 'Timeline' for change history
# Retrieved 2025-11-16, License - CC BY-SA 4.0

def handleCorruptRecords:
  // if json == null the body was corrupt
  // handle it

df_stream_input = (spark
  .readStream
  .format("eventhubs")
  .options(**ehConfInput)
  .load()
  .select("body", from_json(col("body").cast("string"), schema).as("json"))
).foreach(handleCorruptRecords).start()


In [None]:
from pyspark.sql.functions import col, from_json

def handleCorruptRecords(df, batch_id):
    """
    This function runs for every micro-batch in the stream.
    df        : dataframe for that batch
    batch_id  : micro-batch ID
    """

    print(f"Processing batch: {batch_id}")

    # rows where JSON parsed correctly
    good_records = df.filter(col("json").isNotNull())

    # malformed rows - JSON failed to parse
    corrupt_records = df.filter(col("json").isNull())

    # Show counts (or write to Delta)
    print(f"Good Records: {good_records.count()}")
    print(f"Corrupt Records: {corrupt_records.count()}")

    # Example: write good records to Silver
    good_records.write.format("delta") \
        .mode("append") \
        .save("/mnt/silver/good_records")

    # Example: write bad records to a quarantine location
    corrupt_records.write.format("delta") \
        .mode("append") \
        .save("/mnt/bronze/corrupt_records")

    # You can also log, alert, etc.


# EVENT HUB READER
df_stream_input = (
    spark.readStream
        .format("eventhubs")
        .options(**ehConfInput)
        .load()
        .select(
            "body",
            from_json(col("body").cast("string"), schema).alias("json")
        )
)

# APPLY foreachBatch
query = (
    df_stream_input.writeStream
        .foreachBatch(handleCorruptRecords)
        .option("checkpointLocation", "/mnt/checkpoints/eh_json_stream")
        .start()
)


In [None]:
**Why json == null means corrupt?**
If from_json() cannot parse the JSON:

from_json(col("body").cast("string"), schema) = null

So:

* json.isNotNull() → valid JSON

* json.isNull() → malformed/corrupt JSON

* This is the standard approach in Databricks.

In [None]:
from pyspark.sql.streaming import ForeachWriter

class CorruptRecordWriter(ForeachWriter):

    def open(self, partition_id, epoch_id):
        # Called once for every partition and epoch
        # Return True to continue
        print(f"Opening writer for partition: {partition_id}, epoch: {epoch_id}")
        return True      

    def process(self, row):
        # row.json will be None if JSON parsing failed
        if row.json is None:
            print(f"CORRUPT RECORD FOUND: {row.body_string}")
            # Here you can store into quarantine table.
            # spark.sql("INSERT INTO delta.`/mnt/silver/quarantine` VALUES (...)")
        else:
            print(f"VALID RECORD: {row.json}")
            # Write valid JSON to your silver table
            # spark.sql("INSERT INTO delta.`/mnt/silver/clean` VALUES (...)")

    def close(self, error):
        if error:
            print(f"Error occurred: {error}")
        else:
            print("Completed processing partition.")


In [None]:
from pyspark.sql.functions import col, from_json

schema = ...  # your JSON schema

df_stream_input = (
    spark.readStream
        .format("eventhubs")
        .options(**ehConfInput)
        .load()
        .select(
            col("body").cast("string").alias("body_string"),
            from_json(col("body").cast("string"), schema).alias("json")
        )
)

# Apply ForeachWriter
query = (
    df_stream_input.writeStream
        .foreach(CorruptRecordWriter())
        .outputMode("append")
        .start()
)


In [None]:
How It Works
✔ Valid JSON

from_json() parses the JSON → struct is created → row.json is not null

✔ Corrupt JSON

Parsing fails → returns None (NULL) → handled inside process(row)