In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import from_json, col, expr, get_json_object, to_json, when
from pyspark.sql.types import TimestampType, StructType

def deserialize_cdc_msg(df: DataFrame, cdc_schema: StructType):
        # df = df[0]
        return (
            df.select(spark_avro_deserializer(col('payload')).alias('value'))
                .withColumn('value', from_json('value', cdc_schema))
                .select(
                'value.op',
                'value.after.*',
                (col('value.ts_ms') / 1000).cast(TimestampType()).alias('ts'))
        )
                
def deserialize_cdc_msg_json(df: DataFrame, cdc_schema: StructType):
        df = df.select('payload')
        return (
            df.withColumn('value', from_json(to_json('payload'), cdc_schema))
                .select(
                'value.op',
                'value.after.*',
                (col('value.ts_ms') / 1000).cast(TimestampType()).alias('ts'))
        )
        

def deserialize_cdc_msg_json_new(df: DataFrame, cdc_schema: StructType):
        df = df.select('payload')
        return (
            df.withColumn('value', from_json(to_json('payload'), cdc_schema))    \
                .withColumn('result', when(col('value.op') == "d", 'value.before')   \
                    .otherwise('value.after'))
            .select(    \
                'value.op', \
                'result.*',    \
                (col('value.ts_ms') / 1000).cast(TimestampType()).alias('ts'))
        )

: 

In [81]:
spark = SparkSession.builder.appName("test").getOrCreate()
df = spark.read.option("multiline", "true").json("sample_stock_event.json")

In [82]:
# df.show()
df.printSchema()

root
 |-- payload: struct (nullable = true)
 |    |-- after: struct (nullable = true)
 |    |    |-- BackorderOrderID: string (nullable = true)
 |    |    |-- Comments: string (nullable = true)
 |    |    |-- ContactPersonID: long (nullable = true)
 |    |    |-- CustomerID: long (nullable = true)
 |    |    |-- CustomerPurchaseOrderNumber: string (nullable = true)
 |    |    |-- DeliveryInstructions: string (nullable = true)
 |    |    |-- ExpectedDeliveryDate: long (nullable = true)
 |    |    |-- InternalComments: string (nullable = true)
 |    |    |-- IsUndersupplyBackordered: boolean (nullable = true)
 |    |    |-- LastEditedBy: long (nullable = true)
 |    |    |-- LastEditedWhen: long (nullable = true)
 |    |    |-- OrderDate: long (nullable = true)
 |    |    |-- OrderID: long (nullable = true)
 |    |    |-- PickedByPersonID: string (nullable = true)
 |    |    |-- PickingCompletedWhen: string (nullable = true)
 |    |    |-- SalespersonPersonID: long (nullable = true)
 |  

In [83]:
cdc_schema = get_cdc_schema('orders')

# json input deserialize_cdc_msg
deserialized_df = deserialize_cdc_msg_json(df, cdc_schema)

In [86]:
deserialized_df.show()

+---+-------+----------+-------------------+----------------+---------------+----------------+---------+--------------------+---------------------------+------------------------+--------------+--------------------+----------------+--------------------+------------+-------------------+--------------------+
| op|OrderID|CustomerID|SalespersonPersonID|PickedByPersonID|ContactPersonID|BackorderOrderID|OrderDate|ExpectedDeliveryDate|CustomerPurchaseOrderNumber|IsUndersupplyBackordered|      Comments|DeliveryInstructions|InternalComments|PickingCompletedWhen|LastEditedBy|     LastEditedWhen|                  ts|
+---+-------+----------+-------------------+----------------+---------------+----------------+---------+--------------------+---------------------------+------------------------+--------------+--------------------+----------------+--------------------+------------+-------------------+--------------------+
|  c|  73622|       594|                 14|            null|           2387|  

In [None]:
cdc_schema = get_cdc_schema('orders')

# json input deserialize_cdc_msg
deserialized_df_new = deserialize_cdc_msg_json_new(df, cdc_schema)

In [None]:
deserialized_df_new.show()

In [None]:
df_new = df.withColumn('value', from_json(to_json('payload'), cdc_schema))    \
                .withColumn('result', when(col('value.op') == "d", 'value.before')   \
                    .otherwise('value.after'))
                
df_new.show()

: 