### Example Exploratory Notebook

Use this notebook to explore the data generated by the pipeline in your preferred programming language.

**Note**: This notebook is not executed as part of the pipeline.

In [0]:
%sql
USE redpanda_dev.dev_matthew_giglia_profiles;

In [0]:
# from pyspark import pipelines as dp
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType
    ,StructField
    ,StringType
    ,BinaryType
    ,IntegerType
    ,LongType
    ,TimestampType
    ,FloatType
)
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import (
    col
    ,current_timestamp
    ,lit
    ,udf
    ,sha2
    ,concat_ws
)
from typing import Any

In [0]:
%sql
from deleted_records_from_profiles_bronze
|> select *, parse_json(value_str) as variant_col 
|> select *,
  variant_col:email::string as email,
  variant_col:firstname::string as firstname,
  variant_col:lastname::string as lastname,
  variant_col:last_login::timestamp as last_login,
  variant_col:registraion_date::timestamp as registration_date,
  variant_col:preferences::struct<language: string, notifications: string> as preferences,
  variant_col:subscription_level::string as subscription_level

In [0]:
class Sink:
    def __init__(self, spark: SparkSession, topic: str, table_name: str, key_columns: list, value_columns: list, redpanda_config: dict, transformations: list = None):
        self.spark = spark
        self.topic = topic
        self.table_name = table_name
        self.key_columns = key_columns
        self.value_columns = value_columns
        self.redpanda_config = redpanda_config
        self.transformations = transformations

    def table_sink_to_kafka(self):
       
        key_expr = ','.join(self.key_columns)
        value_expr = ','.join(self.value_columns)
        # transformations = ','.join(self.transformations)
        expr = f"to_json(struct({key_expr})) as key", f"to_json(struct({value_expr})) AS value"

        print(key_expr)
        print(value_expr)
        print(self.transformations)
        print(expr)
        
        df = self.spark.table(self.table_name)
        for transformation in self.transformations:
            df = df.selectExpr("*", transformation)
        # df = df.selectExpr(*expr)
        return df



        # df = (
        #     self.spark.readStream
        #     .table(self.table_name)
        #     .selectExpr(
        #         "CAST(key AS BINARY) AS key",
        #         "CAST(value AS BINARY) AS value"
        #     )

In [0]:
catalog_use = "redpanda_dev"
schema_use = "dev_matthew_giglia_profiles"

In [0]:
test = Sink(
  spark = spark
  ,topic = "deleted_profiles"
  ,table_name = f"{catalog_use}.{schema_use}.deleted_records_from_profiles_bronze"
  ,key_columns = ["user_id"]
  ,value_columns = ["email", "first_name", "last_name"]
  ,redpanda_config = dict()
  ,transformations= [
    "parse_json(value_str) as variant_col", 
    [
      "variant_col:email::string as email", 
      "variant_col:first_name::string as first_name",
      "variant_col:last_name::string as last_name",
      "variant_col:user_id::string as user_id"
    ]
  ]
)

In [0]:
,variant_col:last_login::timestamp as last_login,variant_col:registraion_date::timestamp as registration_date,variant_col:preferences::struct<language: string, notifications: string> as preferences,variant_col:subscription_level::string as subscription_level

In [0]:
df = test.table_sink_to_kafka()

In [0]:
display(df)

In [0]:
%sql
select * from profiles_bronze ORDER BY timestamp DESC;

In [0]:
%sql
SELECT * FROM profiles ORDER BY timestamp DESC;

In [0]:
from pyspark.sql.functions import col, unix_millis

In [0]:
df = spark.table("mgiglia.dev_matthew_giglia_redpanda.profiles_bronze")
from pyspark.sql.functions import sha2, concat_ws
df = df.withColumn("recordId", sha2(concat_ws("||", *df.columns), 256))


In [0]:
display(df)

In [0]:
# Create a streaming DataFrame by reading from the profiles table
df = (
  spark.table("mgiglia.dev_matthew_giglia_redpanda.profiles_bronze")
    .select("topic", "partition", "offset", "value_str", "timestampType", "timestamp", "ingestTime")
    .withColumn(
      "latency_ms"
      ,unix_millis(col("ingestTime")) - unix_millis(col("timestamp"))
    )
    .orderBy(col("timestamp"), ascending=False)
    .limit(1000)
  )

# Display the streaming DataFrame
display(df)

In [0]:
# Calculate the average latency_ms from streaming_df
average_latency_ms_df = df.groupBy().avg("latency_ms")

# Display the result
display(average_latency_ms_df)

In [0]:
%sql
select count(*) from profiles;

In [0]:
%sql
DESCRIBE HISTORY profiles_bronze;

In [0]:
df = (
  spark.read.option("readChangeFeed", "true")
  .option("startingVersion", 11)
  .table(f"profiles_bronze")
  .filter("_change_type = 'delete'")
)
display(df)

In [0]:
%sql
DESCRIBE EXTENDED profiles AS JSON;

In [0]:
%sql 
FROM profiles_scd2 |>
AGGREGATE COUNT(*) as rcrd_cnt 
GROUP BY __START_AT._commit_version 