In [None]:
import os, sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


SPARK_VERSION="3.1.1"
SPARK_MASTER_HOST="spark-master"
SPARK_MASTER_PORT="7077"
HIVE_METASTORE_HOST="192.168.10.14"
HIVE_METASTORE_PORT=9083


"""
spark.cores.max: Max number of cores for a spark session, not max cores allocating to each worker.
spark.executor.cores: Number of cores allocating to each worker. The number can be less than number of cores each worker has.
spark.executor.memory: Memory size for each worker.

E.g., 2 cores * 5 workers

    case-1:
        .config("spark.cores.max", 3)
        .config("spark.executor.cores", "1")
        .config("spark.executor.memory", "1g")

        >> 3 executors are created and each executor has 1 core and 1g memory.
        
    case-2:
        .config("spark.cores.max", 3)
        .config("spark.executor.cores", "3")
        .config("spark.executor.memory", "1g")

        >> Failed. 
        >> Since each worker has 2 cores, we can only set even number and less 2.
        >> A executor is created in one spark worker.
    
"""

spark = (
    SparkSession.builder.appName("SparkCassandraApp")
    .config(
        "spark.master",
        "spark://{}:{}".format(SPARK_MASTER_HOST, SPARK_MASTER_PORT),
    )
    .config(
        "spark.hadoop.hive.metastore.uris",
        "thrift://{}:{}".format(HIVE_METASTORE_HOST, HIVE_METASTORE_PORT),
    )
    .config('spark.jars.packages', f'org.apache.spark:spark-sql-kafka-0-10_2.12:{SPARK_VERSION}')
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
    .config("spark.streaming.stopGracefullyOnShutdown", "true")
    .config("spark.cores.max", 1)
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "1g")
    .config("spark.debug.maxToStringFields", "100")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

In [None]:
# Define Kafka parameters
kafka_bootstrap_servers = "192.168.10.4:9081,192.168.10.4:9082,192.168.10.4:9083"
kafka_topic = "crypto.candles_minute"
output_kafka_topic = "spark.test_1"

# Define options for reading data from Kafka
kafka_source_params = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic,
    "startingOffsets": "latest"  # You can specify where to start reading data from
}

# Read messages from Kafka a topic
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_source_params) \
    .load()

In [None]:
# Process messages
processed_stream_df = kafka_stream_df \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
# from pyspark.sql.functions import lit
# from pyspark.sql.functions import rand, round

# processed_stream_df=processed_stream_df.withColumn("partition",lit(5))
# # processed_stream_df=processed_stream_df.withColumn("partition",(round(rand(seed=23)*10, 0)).cast("int"))

In [None]:
# Define options for writing data back to Kafka
kafka_sink_params = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "topic": output_kafka_topic
}


# Write the processed data back to Kafka
write_stream = processed_stream_df \
    .writeStream \
    .format("kafka") \
    .options(**kafka_sink_params) \
    .option("checkpointLocation", "/home/tmp/checkpoints") \
    .start()

In [None]:
# spark.stop()