In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  *
from pyspark.sql.types import *

In [42]:
spark = SparkSession.builder \
    .appName("kafka_to_cassandra") \
    .master('local[*]') \
    .config("spark.cassandra.connection.host","localhost") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.jars.packages", "org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.3," \
                                    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3," \
                                    "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1") \
    .getOrCreate()

spark

In [43]:
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:29092") \
    .option("subscribe", "chess_games") \
    .option("startingOffsets", "earliest") \
    .load()

df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [44]:
chess_schema = StructType([
        StructField("Event", StringType(), True),
        StructField("Site", StringType(), True),
        StructField("Date", StringType(), True),
        StructField("Round", StringType(), True),
        StructField("White", StringType(), True),
        StructField("Black", StringType(), True),
        StructField("Result", StringType(), True),
        StructField("UTCDate", StringType(), True),
        StructField("UTCTime", StringType(), True),
        StructField("WhiteElo", StringType(), True),
        StructField("BlackElo", StringType(), True),
        StructField("WhiteRatingDiff", StringType(), True),
        StructField("BlackRatingDiff", StringType(), True),
        StructField("TimeControl", StringType(), True),
        StructField("Termination", StringType(), True),
        StructField("Variant", StringType(), True),
        StructField("Moves", StringType(), True)
    ])

In [45]:
df = df.selectExpr("CAST(value AS STRING)")
chess_stream = df.select(from_json(col("value"), chess_schema).alias("data"))
chess_df = chess_stream.select("data.*")

chess_df.printSchema()

root
 |-- Event: string (nullable = true)
 |-- Site: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Round: string (nullable = true)
 |-- White: string (nullable = true)
 |-- Black: string (nullable = true)
 |-- Result: string (nullable = true)
 |-- UTCDate: string (nullable = true)
 |-- UTCTime: string (nullable = true)
 |-- WhiteElo: string (nullable = true)
 |-- BlackElo: string (nullable = true)
 |-- WhiteRatingDiff: string (nullable = true)
 |-- BlackRatingDiff: string (nullable = true)
 |-- TimeControl: string (nullable = true)
 |-- Termination: string (nullable = true)
 |-- Variant: string (nullable = true)
 |-- Moves: string (nullable = true)



In [46]:
import uuid
for column in chess_df.columns:
    chess_df = chess_df.withColumnRenamed(column, column.lower())

uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())

# Thêm cột GameID vào DataFrame
chess_df = chess_df.withColumn("gameid", uuid_udf())

chess_df.printSchema()

root
 |-- event: string (nullable = true)
 |-- site: string (nullable = true)
 |-- date: string (nullable = true)
 |-- round: string (nullable = true)
 |-- white: string (nullable = true)
 |-- black: string (nullable = true)
 |-- result: string (nullable = true)
 |-- utcdate: string (nullable = true)
 |-- utctime: string (nullable = true)
 |-- whiteelo: string (nullable = true)
 |-- blackelo: string (nullable = true)
 |-- whiteratingdiff: string (nullable = true)
 |-- blackratingdiff: string (nullable = true)
 |-- timecontrol: string (nullable = true)
 |-- termination: string (nullable = true)
 |-- variant: string (nullable = true)
 |-- moves: string (nullable = true)
 |-- gameid: string (nullable = true)



In [47]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['localhost'])

    cassandra_session = cluster.connect()
except Exception as e:
        print(f"Could not create cassandra connection due to {e}")

cassandra_session

<cassandra.cluster.Session at 0x14981e5bb90>

In [48]:
keyspace_query = """
           CREATE KEYSPACE IF NOT EXISTS lichess WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
    """
cassandra_session.execute(keyspace_query)

<cassandra.cluster.ResultSet at 0x1498879fe10>

In [49]:
table_query = """
        CREATE TABLE IF NOT EXISTS lichess.games (
        Event text,
        Site text,
        Date text,
        Round text,
        White text,
        Black text,
        Result text,
        UTCDate text,
        UTCTime text,
        WhiteElo text,
        BlackElo text,
        WhiteRatingDiff text,
        BlackRatingDiff text,
        TimeControl text,
        Termination text,
        Variant text,
        Moves text,
        GameID text,
        PRIMARY KEY (gameId)
    );
    """
cassandra_session.execute(table_query)

<cassandra.cluster.ResultSet at 0x1498879db50>

In [50]:
query = chess_df.writeStream \
    .outputMode("append") \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "lichess") \
    .option("table", "games") \
    .option("checkpointLocation", "/tmp/kafka-checkpoint") \
    .start()

print("Da luu xong du lieu vao Cassandra")

# query.awaitTermination()

Da luu xong du lieu vao Cassandra
