In [None]:
import logging
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError, NoBrokersAvailable

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def create_topic_if_not_exists(
    bootstrap_servers: str,
    topic_name: str,
    num_partitions: int = 1,
    replication_factor: int = 1,
) -> bool:
    admin_client = None
    try:
        # Initialize Kafka Admin Client
        admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

        # Fetch current topics
        existing_topics = admin_client.list_topics()

        if topic_name in existing_topics:
            logger.info(
                "Topic '%s' already exists.",
                topic_name,
            )
            return

        # Define new topic
        topic = NewTopic(
            name=topic_name,
            num_partitions=num_partitions,
            replication_factor=replication_factor,
        )

        # Create topic
        admin_client.create_topics(new_topics=[topic], validate_only=False)
        logger.info("Topic '%s' created successfully.", topic_name)
        return True

    except TopicAlreadyExistsError:
        logger.warning(
            "Topic '%s' already exists (caught exception).",
            topic_name,
        )
        return False

    except NoBrokersAvailable:
        logger.error(
            "No Kafka brokers available."
            " Check 'bootstrap_servers'configuration."
        )
        return False

    except Exception as e:  # pylint: disable=broad-except
        logger.error(
            "Failed to create topic %s: %s",
            topic_name,
            e,
        )
        return False

    finally:
        if admin_client is not None:
            admin_client.close()


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import from_json, col, window, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType, BooleanType
from constants import kafka_config

conf = SparkConf() \
    .setAppName('SparkApp') \
    .setMaster('spark://localhost:7077') \
    .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .set("spark.sql.shuffle.partitions", "10")
 

sc = SparkContext.getOrCreate(conf=conf)

# Créer un SQLContext pour les opérations SQL
sql_context = SQLContext(sc)

schema = StructType([
    StructField("sens", IntegerType(), True),
    StructField("terminus", StringType(), True),
    StructField("infotrafic", BooleanType(), True),
    StructField("temps", StringType(), True),
    StructField("tempsReel", StringType(), True),
    StructField("stop", StringType(), True),
    StructField("numLigne", StringType(), True),
])

# Read raw data from Kafka
raw_stream = sql_context.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config["bootstrap_servers"]) \
    .option("subscribe", "bus_position") \
    .option("startingOffsets", "earliest") \
    .load()

parsed_stream = raw_stream \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select(
        col("data.sens").alias("sens"),
        col("data.terminus").alias("terminus"),
        col("data.infotrafic").alias("infotrafic"),
        col("data.temps").alias("temps"),
        col("data.tempsReel").alias("tempsReel"),
        col("data.stop").alias("stop"),
        col("data.numLigne").alias("numLigne")
    )

query = parsed_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()