In [1]:
from pyspark.sql import SparkSession
import os
import json

In [2]:
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell'

In [1]:
!export | grep -i spark

declare -x APACHE_SPARK_VERSION="3.2.1"
declare -x HOSTNAME="jupyter_spark"
declare -x PATH="/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/spark/bin"
declare -x PYSPARK_PYTHONPATH_SET="1"
declare -x PYTHONPATH="/usr/local/spark/python/lib/py4j-0.10.9.3-src.zip:/usr/local/spark/python:"
declare -x SPARK_CONF_DIR="/usr/local/spark/conf"
declare -x SPARK_HOME="/usr/local/spark"
declare -x SPARK_LOCAL_HOSTNAME="localhost"
declare -x SPARK_MASTER="local"
declare -x SPARK_OPTS="--driver-java-options=-Xms1024M --driver-java-options=-Xmx4096M --driver-java-options=-Dlog4j.logLevel=info"


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
    .appName("LabCDC") \
    .getOrCreate()

In [3]:
spark._sc

In [11]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-1:9092") \
  .option("subscribe", "lab_cdc.inventory.customers") \
  .load()

In [14]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [15]:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

In [22]:
# coletandos os dados em lista de Row objects
kafka_data = df.collect()
len(kafka_data), type(kafka_data), type(kafka_data[0])

(4, list, pyspark.sql.types.Row)

In [23]:
# visalizando os valores das chanves topic, partition, offset, timestamp, timestampType
print(' Topic:          {}\n'.format(kafka_data[0]['topic']),\
      'Partition:      {}\n'.format(kafka_data[0]['partition']),\
      'Offset:         {}\n'.format( kafka_data[0]['offset']),\
      'Timestamp:      {}\n'.format(kafka_data[0]['timestamp']),\
      'Timestamp Type: {}'.format(kafka_data[0]['timestampType']))

 Topic:          lab_cdc.inventory.customers
 Partition:      2
 Offset:         0
 Timestamp:      2022-04-18 21:54:57.071000
 Timestamp Type: 0


In [24]:
# visualizando os valores das chaves Key e Value
print('Key\n', json.dumps(json.loads(kafka_data[0]['key']), indent=4, sort_keys=True), \
      '\n\nValue\n',  json.dumps(json.loads(kafka_data[0]['value']), indent=4, sort_keys=True))

Key
 {
    "payload": {
        "id": 2
    },
    "schema": {
        "fields": [
            {
                "default": 0,
                "field": "id",
                "optional": false,
                "type": "int32"
            }
        ],
        "name": "lab_cdc.inventory.customers.Key",
        "optional": false,
        "type": "struct"
    }
} 

Value
 {
    "payload": {
        "after": {
            "email": "gbailey@foobar.com",
            "first_name": "George",
            "id": 2,
            "last_name": "Bailey"
        },
        "before": null,
        "op": "r",
        "source": {
            "connector": "postgresql",
            "db": "postgres",
            "lsn": 24278656,
            "name": "lab_cdc",
            "schema": "inventory",
            "sequence": "[null,\"24278656\"]",
            "snapshot": "true",
            "table": "customers",
            "ts_ms": 1650318888394,
            "txId": 737,
            "version": "1.8.1.Final",
        

In [32]:
json.loads(kafka_data[0]['value'])['payload']

{'before': None,
 'after': {'id': 2,
  'first_name': 'George',
  'last_name': 'Bailey',
  'email': 'gbailey@foobar.com'},
 'source': {'version': '1.8.1.Final',
  'connector': 'postgresql',
  'name': 'lab_cdc',
  'ts_ms': 1650318888394,
  'snapshot': 'true',
  'db': 'postgres',
  'sequence': '[null,"24278656"]',
  'schema': 'inventory',
  'table': 'customers',
  'txId': 737,
  'lsn': 24278656,
  'xmin': None},
 'op': 'r',
 'ts_ms': 1650318888394,
 'transaction': None}

In [34]:
json.loads(kafka_data[0]['value'])['payload'].keys()

dict_keys(['before', 'after', 'source', 'op', 'ts_ms', 'transaction'])

In [35]:
json.loads(kafka_data[0]['value'])['payload']['op']

'r'

In [36]:
json.loads(kafka_data[0]['value'])['payload']['after']

{'id': 2,
 'first_name': 'George',
 'last_name': 'Bailey',
 'email': 'gbailey@foobar.com'}

In [38]:
json.loads(kafka_data[0]['value'])['payload']['source']

{'version': '1.8.1.Final',
 'connector': 'postgresql',
 'name': 'lab_cdc',
 'ts_ms': 1650318888394,
 'snapshot': 'true',
 'db': 'postgres',
 'sequence': '[null,"24278656"]',
 'schema': 'inventory',
 'table': 'customers',
 'txId': 737,
 'lsn': 24278656,
 'xmin': None}

In [45]:
for row in kafka_data:
    # print(str(row[0]) + "," + str(row[1]))
    json_value = json.loads(row[1])
    # print(json_value['payload'])
    op         = json_value['payload']['op']
    user_id    = json_value['payload']['after']['id']
    first_name = json_value['payload']['after']['first_name']
    last_name  = json_value['payload']['after']['last_name']
    email      = json_value['payload']['after']['email']
    print(op, user_id, first_name, last_name, email)
    # print(json_value['payload']['op'] + "," + str(json_value['payload']['id']) + "," + json_value['payload']['first_name'] + "," + json_value['payload']['last_name'] + "," + json_value['payload']['email'])

r 2 George Bailey gbailey@foobar.com
r 3 Edward Walker ed@walker.com
r 4 Jonh Kretchmar annek@noanswer.org
r 1 Sally Thomas sally.thomas@acme.com


In [7]:
output_table_basedir = '/spark_files'
output_table = "%s/delta-table" % output_table_basedir

# CREATE TABLE IF NOT EXISTS customers
# CREATE OR REPLACE TABLE customers
sql_customers_df = spark.sql("""
CREATE TABLE IF NOT EXISTS customers (
  id LONG NOT NULL,
  first_name STRING NOT NULL,
  last_name STRING NOT NULL,
  email STRING NOT NULL)
USING DELTA
LOCATION '%s/poc1/customers'
""" % output_table_basedir)

# sql_customers_df.show()
!ls /spark_files/poc1/customers

_delta_log


In [9]:
# kafka postgresql dbservermysql_poc1 readStream poc1_inventory_mysql db_mysql_poc_1
sql_kafka_customers_df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka_raiox:9092") \
  .option("subscribe", "db_pg_poc1.inventory.customers") \
  .load()

In [10]:
sql_kafka_customers_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|{"schema":{"type"...|{"schema":{"type"...|
|{"schema":{"type"...|{"schema":{"type"...|
|{"schema":{"type"...|{"schema":{"type"...|
|{"schema":{"type"...|{"schema":{"type"...|
+--------------------+--------------------+



In [11]:
sql_kafka_customers_df.selectExpr("*").show()

+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|                 key|               value|               topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|db_pg_poc1.invent...|        0|     0|2022-03-17 17:11:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|db_pg_poc1.invent...|        0|     1|2022-03-17 17:11:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|db_pg_poc1.invent...|        0|     2|2022-03-17 17:11:...|            0|
|[7B 22 73 63 68 6...|[7B 22 73 63 68 6...|db_pg_poc1.invent...|        0|     3|2022-03-17 17:11:...|            0|
+--------------------+--------------------+--------------------+---------+------+--------------------+-------------+



In [12]:
sql_kafka_customers_df.selectExpr("count(*)").show()

+--------+
|count(1)|
+--------+
|       4|
+--------+



In [13]:
import json

result = sql_kafka_customers_df.selectExpr("key", "value")
data_collect = result.collect()

for row in data_collect:
    # print(str(row[0]) + "," + str(row[1]))
    json_value = json.loads(row[1])
    # print(json_value['payload'])
    print(json_value['payload']['__op'] + "," + str(json_value['payload']['id']) + "," + json_value['payload']['first_name'] + "," + json_value['payload']['last_name'] + "," + json_value['payload']['email'])

# print(result[0]["payload"])
# print(result[1]["payload"])
# result.show()

r,1001,Sally,Thomas,sally.thomas@acme.com
r,1002,George,Bailey,gbailey@foobar.com
r,1003,Edward,Walker,ed@walker.com
r,1004,Anne,Kretchmar,annek@noanswer.org


In [21]:
json.loads(data_collect[0][1])

{'schema': {'type': 'struct',
  'fields': [{'type': 'int32', 'optional': False, 'field': 'id'},
   {'type': 'string', 'optional': False, 'field': 'first_name'},
   {'type': 'string', 'optional': False, 'field': 'last_name'},
   {'type': 'string', 'optional': False, 'field': 'email'},
   {'type': 'string', 'optional': True, 'field': '__op'},
   {'type': 'string', 'optional': True, 'field': '__table'},
   {'type': 'int64', 'optional': True, 'field': '__lsn'},
   {'type': 'int64', 'optional': True, 'field': '__source_ts_ms'},
   {'type': 'string', 'optional': True, 'field': '__deleted'}],
  'optional': False,
  'name': 'db_pg_poc1.inventory.customers.Value'},
 'payload': {'id': 1001,
  'first_name': 'Sally',
  'last_name': 'Thomas',
  'email': 'sally.thomas@acme.com',
  '__op': 'r',
  '__table': 'customers',
  '__lsn': 34393800,
  '__source_ts_ms': 1647537117788,
  '__deleted': 'false'}}

In [17]:
data_collect[0][0]

bytearray(b'{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"db_pg_poc1.inventory.customers.Key"},"payload":{"id":1001}}')

In [16]:
# Extracao de schema dinamica
def infer_schema_json(kafka_df):
    df_json = (
        # filter out empty values
        sql_kafka_customers_df.withColumn("value", pyspark.sql.functions.expr("string(value)"))
        .filter(pyspark.sql.functions.col("value").isNotNull())
        # get latestecord
        .select("key", pyspark.sql.functions.expr("struct(offset, value) r"))
        .groupBy("key").agg(pyspark.sql.functions.expr("max(r) r")) 
        .select("r.value"))

    # decode the json values
    df_read = spark.read.json(df_json.rdd.map(lambda x: x.value), multiLine=True)

    # drop corrupt records
    if "_corrupt_record" in df_read.columns:
        df_read = (df_read.filter(pyspark.sql.functions.col("_corrupt_record").isNotNull()).drop("_corrupt_record"))

    # schema
    return df_read.schema.json()

In [17]:
# monta schema a partir de json
topic_schema_txt = infer_schema_json(sql_kafka_customers_df)
topic_schema = pyspark.sql.types.StructType.fromJson(json.loads(topic_schema_txt))
print(topic_schema)

StructType(List(StructField(payload,StructType(List(StructField(__deleted,StringType,true),StructField(__lsn,LongType,true),StructField(__op,StringType,true),StructField(__source_ts_ms,LongType,true),StructField(__table,StringType,true),StructField(email,StringType,true),StructField(first_name,StringType,true),StructField(id,LongType,true),StructField(last_name,StringType,true))),true),StructField(schema,StructType(List(StructField(fields,ArrayType(StructType(List(StructField(field,StringType,true),StructField(optional,BooleanType,true),StructField(type,StringType,true))),true),true),StructField(name,StringType,true),StructField(optional,BooleanType,true),StructField(type,StringType,true))),true)))


In [18]:
# cria dataframe intermediario (filter out empty values + get latest version of each record + convert to JSON with schema)
sql_kafka_customers_df_new = sql_kafka_customers_df\
    .withColumn("value", pyspark.sql.functions.expr("string(value)"))\
    .filter(pyspark.sql.functions.col("value").isNotNull())\
    .select(\
        pyspark.sql.functions.expr("offset as kafka_offset"),\
        pyspark.sql.functions.expr("timestamp as kafka_ts"),\
        pyspark.sql.functions.expr("string(key) as kafka_key"),\
        "value")\
    .select("kafka_key", pyspark.sql.functions.expr("struct(*) as r"))\
    .groupBy("kafka_key")\
    .agg(pyspark.sql.functions.expr("max(r) r"))\
    .withColumn('value', pyspark.sql.functions.from_json(pyspark.sql.functions.col("r.value"), topic_schema))\
    .select('r.kafka_key', 'r.kafka_offset', 'r.kafka_ts', 'value.*')
sql_kafka_customers_df_new.show()

+--------------------+------------+--------------------+--------------------+--------------------+
|           kafka_key|kafka_offset|            kafka_ts|             payload|              schema|
+--------------------+------------+--------------------+--------------------+--------------------+
|{"schema":{"type"...|           1|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
|{"schema":{"type"...|           2|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
|{"schema":{"type"...|           3|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
|{"schema":{"type"...|           0|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
+--------------------+------------+--------------------+--------------------+--------------------+



In [19]:
# cria tabela se nao existir se baseando no schema do dataframe intermediario
delta_location = '%s/poc1/kafka_customers' % output_table_basedir
spark.createDataFrame([], sql_kafka_customers_df_new.schema)\
 .write\
 .option("mergeSchema", "true")\
 .format("delta")\
 .mode("append")\
 .save(delta_location)

In [20]:
# merge de tabela
DeltaTable.forPath(spark, delta_location)\
   .alias("t")\
   .merge(sql_kafka_customers_df_new.alias("s"), "s.kafka_key = t.kafka_key")\
   .whenMatchedUpdateAll()\
   .whenNotMatchedInsertAll()\
   .execute()

In [21]:
persisted_df = spark.read.format("delta").load(delta_location)
persisted_df.select("*").show()
persisted_df.select("kafka_offset", "payload.*").show()

+--------------------+------------+--------------------+--------------------+--------------------+
|           kafka_key|kafka_offset|            kafka_ts|             payload|              schema|
+--------------------+------------+--------------------+--------------------+--------------------+
|{"schema":{"type"...|           0|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
|{"schema":{"type"...|           3|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
|{"schema":{"type"...|           1|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
|{"schema":{"type"...|           2|2021-06-11 21:03:...|{false, 34072456,...|{[{id, false, int...|
+--------------------+------------+--------------------+--------------------+--------------------+

+------------+---------+--------+----+--------------+---------+--------------------+----------+----+---------+
|kafka_offset|__deleted|   __lsn|__op|__source_ts_ms|  __table|               email|first_name| 

In [22]:
%%sql postgres@postgres_data_source
-- DROP SCHEMA inventory_poc1;
CREATE SCHEMA IF NOT EXISTS inventory_poc1 AUTHORIZATION postgres;

Done.


[]

In [23]:
%%sql postgres@postgres_data_source
CREATE TABLE IF NOT EXISTS inventory_poc1.customers (
	id serial NOT NULL,
	first_name varchar(255) NOT NULL,
	last_name varchar(255) NOT NULL,
	email varchar(255) NOT NULL,
	CONSTRAINT customers_email_key UNIQUE (email),
	CONSTRAINT customers_pkey PRIMARY KEY (id)
);

Done.


[]

In [25]:
# replica os dados para a tabela final
mode = "overwrite"
url = "jdbc:postgresql://postgres_data_source/postgres"
properties = {"user": "postgres","password": "postgres","driver": "org.postgresql.Driver"}
df = persisted_df.select("payload.id", "payload.first_name", "payload.last_name", "payload.email")\
    .write \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "inventory_poc1.customers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .option("truncate", "true") \
    .mode(mode) \
    .save()

In [26]:
%%sql postgres@postgres_data_source
SELECT id, first_name, last_name, email FROM inventory_poc1.customers;

4 rows affected.


id,first_name,last_name,email
1001,Sally,Thomas,sally.thomas@acme.com
1004,Anne,Kretchmar,annek@noanswer.org
1002,George,Bailey,gbailey@foobar.com
1003,Edward,Walker,ed@walker.com


Referências:
https://docs.delta.io/latest/delta-batch.html#-ddlcreatetable
https://docs.delta.io/latest/delta-constraints.html
https://spark.apache.org/docs/3.1.1/sql-ref.html
https://spark.apache.org/docs/3.1.1/sql-ref-syntax.html
https://docs.delta.io/latest/best-practices.html
https://debezium.io/documentation/reference/1.6/connectors/postgresql.html
https://partners-intl.aliyun.com/help/doc-detail/141203.htm
https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html#content
https://debezium.io/documentation/online-resources/
https://github.com/suchitgupta01/spark-streaming-with-debezium
https://suchit-g.medium.com/spark-streaming-with-kafka-connect-debezium-connector-ab9163808667
https://stackoverflow.com/questions/62296734/how-to-transform-a-debezium-message-in-json-format-such-that-it-can-be-loaded-in
https://github.com/kimaina/openmrs-elt
https://sandeepkattepogu.medium.com/python-spark-transformations-on-kafka-data-8a19b498b32c
https://spark.apache.org/docs/2.1.2/api/python/_modules/pyspark/sql/readwriter.html
https://docs.delta.io/latest/quick-start.html#create-a-table&language-python
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.collect.html
https://sparkbyexamples.com/pyspark/pyspark-collect/
https://keestalkstech.com/2019/11/streaming-a-kafka-topic-to-a-delta-table-on-s3-with-spark-structured-streaming/ *****
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html