In [None]:
import logging

from confluent_kafka.schema_registry import SchemaRegistryClient
from libs.configuration import configure
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T
from pyspark.sql.avro import functions as AF

from shared.spark_config import create_spark_config

__MODULE = "M2_Processors.airline.tier1"
logger = logging.getLogger("notebook")
env = configure()
conf = create_spark_config().setAppName(__MODULE)

In [None]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
sr_client = SchemaRegistryClient({"url": env.KAFKA_SCHEMA_REGISTRY_URL})
in_schema = sr_client.get_latest_version(f"{env.KAFKA_TOPIC_T1_AIRLINE}-value")
print(in_schema)
out_schema = sr_client.get_latest_version(f"{env.KAFKA_TOPIC_T2_AIRLINE}-value")
print(out_schema)

In [None]:
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", env.KAFKA_BOOTSTRAP_SERVERS)
    .option("subscribe", env.KAFKA_TOPIC_RAW_AIRLINE)
    # .option("minOffsetsPerTrigger", 20)
    # .option("maxOffsetsPerTrigger", 20)
    # .option("fetchOffset.numRetries", 0)
    .option("startingOffsets", "latest")
    # .option("endingOffsets", "latest")
    .load()
)

In [None]:
df = df.select(AF.from_avro("value", in_schema.schema.schema_str).alias("unflattened"))
df = df.select("unflattened.*")
df.show()

In [None]:
df.select([F.count(F.when(F.isnull(c), 1)).alias(c) for c in df.columns]).show()

In [None]:
df = (
    df.drop("id")
    .withColumn(
        "alias",
        F.when((F.col("alias") == "\\N"), None)
        .when(F.col("alias") == "N/A", None)
        .otherwise(F.col("alias")),
    )
    .withColumn(
        "icao",
        F.when(F.col("icao") == "\\N", None)
        .when(F.col("icao") == "N/A", None)
        .otherwise(F.col("icao")),
    )
    .withColumn(
        "callsign",
        F.when(F.col("callsign") == "\\N", None)
        .when(F.col("callsign") == "N/A", None)
        .otherwise(F.col("callsign")),
    )
    .withColumn(
        "country",
        F.when(F.col("country") == "\\N", None)
        .when(F.col("country") == "N/A", None)
        .otherwise(F.col("country")),
    )
)
df.show()

drop when icao IS null

In [None]:
df = df.where(F.isnotnull("icao"))
df.show()

In [None]:
kafka_write_stream = (
    df.select(AF.to_avro(F.struct("*"), out_schema.schema.schema_str).alias("value"))
    .writeStream.format("kafka")
    .trigger(once=True)
    .option("kafka.bootstrap.servers", env.KAFKA_BOOTSTRAP_SERVERS)
    .option("topic", env.KAFKA_TOPIC_T2_AIRLINE)
    .start()
)

In [None]:
iceberg_write_stream = (
    df.writeStream.format("iceberg")
    .outputMode("append")
    .trigger(once=True)
    .option("fanout-enabled", "true")
    .toTable("dev.tier1.airlines")
)

In [None]:
df.select([F.count(F.when(F.isnull(c), 1)).alias(c) for c in df.columns]).show()