In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, from_json, lit
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType, FloatType, ArrayType
from datetime import datetime

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.4.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,org.postgresql:postgresql:42.6.0 pyspark-shell'

In [3]:
spark = SparkSession \
    .builder \
    .appName("pyspark kafka_2") \
    .getOrCreate()


23/07/21 16:34:55 WARN Utils: Your hostname, KHs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.81 instead (on interface en0)
23/07/21 16:34:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/liamchoi/Desktop/demo_project/.venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/liamchoi/.ivy2/cache
The jars for the packages stored in: /Users/liamchoi/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e9bf8af3-2f99-4c53-849c-94fbc2c6e645;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 

In [4]:
df_schema = StructType([
    StructField("id", StringType(), False),
    StructField("title", StringType(), False),
    StructField("score", IntegerType(), False),
    StructField("upvote_ratio", FloatType(), False),
    StructField("content", StringType(), True),
    StructField("url", StringType(), True),
    StructField("no_of_comments", IntegerType(), False),
    StructField("locked", BooleanType(), False)]
    )


In [5]:
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "reddit") \
  .option("spark.sql.streaming.schemaInference", "true") \
  .load()


df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS INTEGER) updated") \
        .withColumn("value", from_json("value",schema=df_schema)) \
        .selectExpr("key", 
                    "value.id",
                    "value.title",
                    "value.score",
                    "value.upvote_ratio",
                    "value.content",
                    "value.url",
                    "value.no_of_comments",
                    "value.locked",
                    "updated"
                    )


In [6]:
postgresql_properties = {
    "driver": "org.postgresql.Driver",
    "url": "jdbc:postgresql://localhost:5432/demo_project?currentSchema=bronze",
    "user": "airflow",
    "password": "airflow",
    "dbtable": "kafka"
}

In [7]:
def write_stream_to_postgresql(batch_df, batch_id):

    batch_df \
        .write \
        .mode("append") \
        .jdbc(url=postgresql_properties["url"],
              table=postgresql_properties["dbtable"],
              properties=postgresql_properties)
    print("record inserted")


In [8]:
stream_writer = df \
    .writeStream \
    .option("checkpointLocation", "./checkpoint/") \
    .outputMode("update") \
    .foreachBatch(write_stream_to_postgresql) \
    .start() \
    .awaitTermination()

23/07/21 16:35:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/07/21 16:35:07 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

record inserted


                                                                                

record inserted


                                                                                

record inserted


### read from postgres

In [9]:
df = spark \
  .read \
  .jdbc (url = postgresql_properties["url"], 
    table = postgresql_properties["dbtable"],
    properties = postgresql_properties)

df.show(truncate=False)

+-------+-------+-----------------------------------------------------------------------------------------------------------------------------------+-----+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
def upsert_to_postgres(df, batch_id):
    # Perform the merge (upsert) operation
    df.alias("source") \
        .mergeInto("bronze.kafka", "bronze.kafka.id = source.id") \
        .whenMatchedUpdate(set={
            "title":"source.title",
            "score":"source.score",
            "upvote_ratio":"source.upvote_ratio",
            "conten":"source.content",
            "url":"source.url",
            "no_of_comments":"source.no_of_comments",
            "locked":"source.locked",
            "updated":"source.updated"
        }) \
        .execute()


Column<'key'>
Column<'id'>
Column<'title'>
Column<'score'>
Column<'upvote_ratio'>
Column<'content'>
Column<'url'>
Column<'no_of_comments'>
Column<'locked'>
Column<'updated'>
