# 1. Imports

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import json
import logging
import pickle
import time
import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("consumer")


config = json.load(open("config.json"))
print(json.dumps(config, indent=2))

# 2. Initialize Spark with Kafak Consumer

In [None]:
# Create Spark Configuration and set application name
conf = SparkConf().setAppName("KafkaExp")

# Default pyspark installation lacks kafka consumer libraries. Install kafka-client libs manually
kafka_packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{"2.12"}:{"3.3.0"}',
    'org.apache.kafka:kafka-clients:2.8.0',
    "org.apache.hadoop:hadoop-aws:3.3.0",
    "com.google.guava:guava:21.0",
    "org.apache.httpcomponents:httpcore:4.4.8"
]

# Provide kafka jar paths to driver and executors
kafka_jar_paths = '/mnt/home/prathyush/.ivy2/jars/'.join([
    "org.apache.hadoop_hadoop-client-runtime-3.3.2.jar",
    "org.apache.kafka_kafka-clients-2.8.1.jar",
    "hadoop-aws-2.7.5.jar",
    "aws-java-sdk-core-1.12.268.jar"
])

# Connect to Spark cluster (Cluster mode instead of local mode)
conf = (conf.setMaster('spark://spark:7077')
        .set('spark.jars.packages', ','.join(kafka_packages))
        .set('spark.driver.extraClassPath', '/mnt/home/prathyush/.ivy2/jars/*')
        .set('spark.executor.extraClassPath', '/mnt/home/prathyush/.ivy2/jars/*')
        )

# Create spark context
sc = SparkContext(conf=conf)

logger.info(f"Spark Driver memory: {sc._conf.get('spark.driver.memory')}")
logger.info(f"Spark Executor memory: {sc._conf.get('spark.executor.memory')}")
logger.info(
    f'Loaded jars:\n{json.dumps((sc._jsc.sc().listJars().toList().toString().replace("List(", "").replace(")", "").split(", ")), indent=2)}')
sc.setLogLevel("ERROR")

# Create spark session
spark = SparkSession(sc)
spark.conf.set("spark.sql.parquet.compression.codec", "gzip")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")

# 3. Test Kafka topic and connection

In [None]:
from confluent_kafka.admin import AdminClient

def test_kafka_connection(broker_conf:dict) -> None:
    """
    Function to test kafka connection
    :param broker_conf: Broker configuration
    :returns: None
    """
    client = AdminClient(broker_conf)
    topics = client.list_topics().topics
    if not topics:
        raise RuntimeError()
    print("Kafka Connection successful!")


broker_conf = {
    'bootstrap.servers': config["global"]["kafka_bootstrap_servers"]
}

# Test kafka connection
test_kafka_connection(broker_conf)


# 4. Load Schema

In [None]:
# Load schema 
schema = pickle.load(open("schema.pkl", 'rb'))

# 5. Configure Spark-Kafka consumer options and Subscribe to Kafka Topic

In [None]:
spark.sparkContext.setLogLevel("ERROR")

# Configure spark kafka client options
spark_kafka_options = {
    "kafka.bootstrap.servers": config["global"]["kafka_bootstrap_servers"],
    "subscribe": config["global"]["kafka_topic"],
    "kafka.group.id": config["global"]["kafka_consumer_group"],
    "maxOffsetsPerTrigger": config["global"]["max_records_per_batch"],
    "startingOffsets": "earliest",
}

# Enable spark read stream
df = ...

# 6. Start spark structred streaming job

In [None]:
# Lambda Function for processing each batch of record
def process_batch(batch_df, batch_idx):
    print(f"{batch_idx} | {batch_df.count()}")

    # Process data and calculate
    # a. age
    # b. num_contribs
    # c. min_max_years

    ...

    # Select required columns -  "name", "age", "num_contribs", "min_max"
    ...

    # Save to parquet file - result.parquet 
    ...
    return batch_df

# Structred streaming query
query = ...

# 7. Monitor structred streaming job progress

In [None]:
# Add startup delay
time.sleep(5)
# Update Job Status

print(query.status)
while query.status['isDataAvailable'] or query.status['isTriggerActive']:
    print(query.status)
    time.sleep(5)

# Stop query
query.stop()

logger.info("Structred streaming job completed successfully")