In [1]:
""" istall the driver if does not exist """
# !pip install cassandra-driver

' istall the driver if does not exist '

In [2]:
""" Create the keyspace and created_users table for data loading if it does not exist """

from cassandra.cluster import Cluster

cluster = Cluster(['cassandra_db'])  # Update with your Cassandra host
session = cluster.connect()

# Create the keyspace
keyspace_query = """
CREATE KEYSPACE IF NOT EXISTS spark_streams 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
"""
session.execute(keyspace_query)
print("Keyspace 'spark_streams' created (if not exists).")

# Use the keyspace
session.set_keyspace('spark_streams')

# Create the table
table_query = """
CREATE TABLE IF NOT EXISTS created_users (
    first_name text,
    last_name text,
    gender text,
    address text,
    postcode text,
    email text,
    username text,
    dob text,
    registered text,
    phone text,
    picture text,
    PRIMARY KEY ((first_name, last_name), email)
);
"""
session.execute(table_query)
print("Table 'created_users' created (if not exists).")

# Close the connection
cluster.shutdown()

Keyspace 'spark_streams' created (if not exists).
Table 'created_users' created (if not exists).


In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Writing to Multiple Sinks") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,com.datastax.spark:spark-cassandra-connector_2.12:3.3.0,org.postgresql:postgresql:42.2.20,com.microsoft.sqlserver:mssql-jdbc:9.4.0.jre8")
    .config("spark.cassandra.connection.host", "cassandra_db")  # Docker hostname
    .config("spark.cassandra.connection.port", "9042")       # Default port
    .config("spark.cassandra.auth.username", "cassandra")    # Credentials from your Docker setup
    .config("spark.cassandra.auth.password", "cassandra")
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]") 
    .getOrCreate()
)
spark

In [4]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "users_created")
    .option("startingOffsets", "earliest")
    .load()
)

In [5]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [7]:
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = StructType([
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("address", StringType(), True),
        StructField("postcode", StringType(), True),
        StructField("email", StringType(), True),
        StructField("username", StringType(), True),
        StructField("dob", StringType(), True),
        StructField("registered", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("picture", StringType(), True)
    ])

In [8]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [9]:
# To the schema of the data, place a sample json file and change readStream to read 
from pyspark.sql.functions import to_timestamp

# Convert the 'dob' column to a timestamp
streaming_df = streaming_df.withColumn("dob", to_timestamp("dob"))
# Cast the 'registered' column to timestamp
streaming_df = streaming_df.withColumn("registered", to_timestamp("registered"))

streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- postcode: string (nullable = true)
 |-- email: string (nullable = true)
 |-- username: string (nullable = true)
 |-- dob: timestamp (nullable = true)
 |-- registered: timestamp (nullable = true)
 |-- phone: string (nullable = true)
 |-- picture: string (nullable = true)



In [10]:
# Python function to write to multiple sinks
def device_data_output(df, batch_id):
    print("Batch id: "+ str(batch_id))
    
    # Write to parquet
    df.write.format("parquet").mode("append").save("data/output/device_data.parquet/")
    
    
    # Write to JDBC Postgres
    (
        df.write
        .mode("append")
        .format("jdbc")
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://postgres:5432/airflow")
        .option("dbtable", "loaded_users_data")
        .option("user", "airflow")
        .option("password", "airflow")
        .save()
    
    )
     # Write to SQL Server
    (
        df.write
        .mode("append")
        .format("jdbc")
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("url", "jdbc:sqlserver://host.docker.internal:1433;databaseName=data_eng_for_user_younes")  # Update with your database name
        .option("dbtable", "loaded_users_data")  # Update with your target table name
        .option("user", "younes")  # Update with your SQL Server username
        .option("password", "Younes921722")  # Update with your SQL Server password
        .save()
    )
   
    # Write to Cassandra
    (
        df.write
        .format("org.apache.spark.sql.cassandra")
        .mode("append")
        .option("table", "created_users")
        .option("keyspace", "spark_streams")
        .option("spark.cassandra.connection.host", "cassandra_db")
        .option("spark.cassandra.connection.port", "9042")
        .save()
    )
    
    
    # Diplay
    df.show()

In [11]:
# Running foreachBatch
# Write the output to Multiple Sinks
(streaming_df
 .writeStream
 .foreachBatch(device_data_output)
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka")
 .start()
 .awaitTermination())


Batch id: 0


StreamingQueryException: Query [id = 6e8d2e6b-f463-4761-abca-942705d36fbe, runId = 04308dd9-fa6c-48bb-a780-f37f998b5265] terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 272, in call
    raise e
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 269, in call
    self.func(DataFrame(jdf, self.session), batch_id)
  File "/tmp/ipykernel_159/529085115.py", line 32, in device_data_output
    .save()
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 966, in save
    self._jwrite.save()
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o95.save.
: com.microsoft.sqlserver.jdbc.SQLServerException: The TCP/IP connection to the host host.docker.internal, port 1433 has failed. Error: "Network is unreachable. Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".
	at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:237)
	at com.microsoft.sqlserver.jdbc.SQLServerException.ConvertConnectExceptionToSQLServerException(SQLServerException.java:288)
	at com.microsoft.sqlserver.jdbc.SocketFinder.findSocket(IOBuffer.java:2466)
	at com.microsoft.sqlserver.jdbc.TDSChannel.open(IOBuffer.java:676)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:2957)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:2628)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:2471)
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:1470)
	at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:915)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:122)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:118)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at jdk.proxy3/jdk.proxy3.$Proxy36.call(Unknown Source)
	at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
	at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:658)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:658)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

