In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("OpenSEA Pipeline")
    .config("spark.streaming.stopGracefullyonShutdown", True)
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]")
    .getOrCreate()
)

In [None]:
spark

In [None]:
# Retrieve data from kafka topic

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "os-kafka:9092")
    .option("subscribe", "test-os_event")
    .option("startingOffsets", "earliest")
    .load()
)

In [None]:
# Show kafka topic schema and messages

kafka_df.printSchema()
# kafka_df.show()

In [None]:
# Parse value from binary to string into kafka_json_df
from pyspark.sql.functions import expr, schema_of_json

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [None]:
# kafka_json_df.show()

In [None]:
# Code block to retrieve schema from json data
# first make sure spark is in '.read' mode and not '.readStream' mode

# json_schema = schema_of_json(kafka_json_df.select('value').head()[0])
# json_schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType

json_schema = StructType([
    StructField("payload", StructType([
        StructField("collection", StructType([
            StructField("slug", StringType(), True)
        ]), True),
        StructField("event_timestamp", StringType(), True),
        StructField("item", StructType([
            StructField("chain", StructType([
                StructField("name", StringType(), True)
            ]), True),
            StructField("metadata", StructType([
                StructField("image_url", StringType(), True),
                StructField("metadata_url", StringType(), True),
                StructField("name", StringType(), True)
            ]), True),
            StructField("nft_id", StringType(), True)
        ]), True),
        StructField("maker", StructType([
            StructField("address", StringType(), True)
        ]), True),
        StructField("order_hash", StringType(), True),
        StructField("payment_token", StructType([
            StructField("address", StringType(), True),
            StructField("decimals", LongType(), True),
            StructField("eth_price", StringType(), True),
            StructField("name", StringType(), True),
            StructField("symbol", StringType(), True),
            StructField("usd_price", StringType(), True)
        ]), True),
        StructField("protocol_address", StringType(), True),
        StructField("protocol_data", StructType([
            StructField("parameters", StructType([
                StructField("conduitKey", StringType(), True),
                StructField("consideration", ArrayType(StructType([
                    StructField("endAmount", StringType(), True),
                    StructField("identifierOrCriteria", StringType(), True),
                    StructField("itemType", LongType(), True),
                    StructField("recipient", StringType(), True),
                    StructField("startAmount", StringType(), True),
                    StructField("token", StringType(), True)
                ])), True),
                StructField("counter", StringType(), True),
                StructField("endTime", StringType(), True),
                StructField("offer", ArrayType(StructType([
                    StructField("endAmount", StringType(), True),
                    StructField("identifierOrCriteria", StringType(), True),
                    StructField("itemType", LongType(), True),
                    StructField("startAmount", StringType(), True),
                    StructField("token", StringType(), True)
                ])), True),
                StructField("offerer", StringType(), True),
                StructField("orderType", LongType(), True),
                StructField("salt", StringType(), True),
                StructField("startTime", StringType(), True),
                StructField("totalOriginalConsiderationItems", LongType(), True),
                StructField("zone", StringType(), True),
                StructField("zoneHash", StringType(), True)
            ]), True),
            StructField("signature", StringType(), True)
        ]), True),
        StructField("quantity", LongType(), True),
        StructField("sale_price", StringType(), True),
        StructField("taker", StructType([
            StructField("address", StringType(), True)
        ]), True),
        StructField("transaction", StructType([
            StructField("hash", StringType(), True),
            StructField("timestamp", StringType(), True)
        ]), True)
    ]), True),
    StructField("sent_at", StringType(), True)
])

In [None]:
from pyspark.sql.functions import from_json,col

raw_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [None]:
raw_df.printSchema()
# raw_df.show(truncate=True)

In [None]:
df_test = spark.createDataFrame([('name', 'val')], schema='key string, value string')
# df_test.show

In [None]:
test_os_df = kafka_json_df.withColumn(
    "values_json", \
    from_json(col("value"), \
    json_schema)).selectExpr(
    "values_json.payload.collection.slug as collection",
    "values_json.payload.item.metadata.image_url",) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "os-kafka:9095") \
    .option("topic", "test-topic")

### kafka_topic_text_data (silver)

In [None]:
text_data_df = kafka_json_df.withColumn(
    "values_json", \
    from_json(col("value"), \
    json_schema)).selectExpr(
    "values_json.payload.collection.slug as collection", \
    "values_json.payload.item.metadata.name as nft_name", \
    "values_json.payload.item.nft_id", \
    "values_json.payload.transaction.hash as transaction_hash", \
    "values_json.payload.transaction.timestamp as transaction_timestamp",) \
    # .writeStream \
    # .format("kafka") \
    # .option("kafka.bootstrap.servers", "os-kafka:9095") \
    # .option("topic", "test-topic") \
    # .option("checkpointLocation", "checkpoint_dir_kafka") \
    # .start()

In [None]:
kafka_topic_3.printSchema()
# kafka_topic_2.show(truncate=True)

In [None]:
from pyspark.sql.functions import struct, to_json, lit

# Convert the DataFrame to the required format for Kafka
kafka_ready_df = text_data_df.withColumn(
    "value", to_json(struct(
        col("collection"),
        col("nft_name"),
        col("nft_id"),
        col("transaction_hash"),
        col("transaction_timestamp")
    ))
).withColumn(
    "key", lit(None).cast("string")  # Optional: Set to None if not using a key
).select("key", "value")

In [None]:
(kafka_ready_df.writeStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "os-kafka:9092") \
                .option("topic", "test-topic") \
                .option("checkpointLocation", "checkpoint_dir_kafka") \
                .start())