In [2]:
import logging

from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType


In [3]:
def create_spark_connection():
    s_conn = None

    try:
        s_conn = SparkSession.builder \
            .appName('SparkDataStreaming') \
            .config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.4.1,"
                                           "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
            .config('spark.cassandra.connection.host', 'localhost') \
            .getOrCreate()
        
        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn

spark_conn = create_spark_connection()

In [4]:
spark_conn

In [5]:
def connect_to_kafka(spark_conn):
    spark_df = None
    try:
        spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', 'localhost:9092') \
            .option('subscribe', 'users_created') \
            .load()
        logging.info("kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df
spark_df = connect_to_kafka(spark_conn)

In [6]:
spark_df

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [8]:
def create_selection_df_from_kafka(spark_df):
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])

    sel = spark_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")

    return sel
selection_df = create_selection_df_from_kafka(spark_df)
selection_df

DataFrame[id: string, first_name: string, last_name: string, gender: string, address: string, post_code: string, email: string, username: string, registered_date: string, phone: string, picture: string]

In [11]:
def create_cassandra_connection():
    try:
        # connecting to the cassandra cluster
        cluster = Cluster(['localhost'])

        cas_session = cluster.connect()

        return cas_session
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
        return None

session = create_cassandra_connection()
session

Traceback (most recent call last):
  File "c:\Users\letan\anaconda3\envs\envE2Eprj\Lib\site-packages\cassandra\cluster.py", line 3577, in _reconnect_internal
    return self._try_connect(host)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\letan\anaconda3\envs\envE2Eprj\Lib\site-packages\cassandra\cluster.py", line 3599, in _try_connect
    connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\letan\anaconda3\envs\envE2Eprj\Lib\site-packages\cassandra\cluster.py", line 1670, in connection_factory
    return self.connection_class.factory(endpoint, self.connect_timeout, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\letan\anaconda3\envs\envE2Eprj\Lib\site-packages\cassandra\connection.py", line 846, in factory
    conn = cls(endpoint, *args, **kwargs)
           ^^