In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import *

In [16]:
# Initialize Spark session with Cassandra support
spark = SparkSession.builder \
    .appName("CSV to Cassandra") \
    .config("spark.cassandra.connection.host", "cassandra") \
    .config("spark.cassandra.connection.port", "9042") \
    .getOrCreate()


    ##.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0") \

In [17]:
schema = StructType([
    StructField("create_time", StringType(), True),
    StructField("bid", IntegerType(), True),
    StructField("bn", StringType(), True),
    StructField("campaign_id", IntegerType(), True),
    StructField("cd", IntegerType(), True),
    StructField("custom_track", StringType(), True),
    StructField("de", StringType(), True),
    StructField("dl", StringType(), True),
    StructField("dt", StringType(), True),
    StructField("ed", StringType(), True),
    StructField("ev", IntegerType(), True),
    StructField("group_id", IntegerType(), True),
    StructField("id", StringType(), True),
    StructField("job_id", IntegerType(), True),
    StructField("md", StringType(), True),
    StructField("publisher_id", IntegerType(), True),
    StructField("rl", StringType(), True),
    StructField("sr", StringType(), True),
    StructField("ts", StringType(), True),
    StructField("tz", IntegerType(), True),
    StructField("ua", StringType(), True),
    StructField("uid", StringType(), True),
    StructField("utm_campaign", StringType(), True),
    StructField("utm_content", StringType(), True),
    StructField("utm_medium", StringType(), True),
    StructField("utm_source", StringType(), True),
    StructField("utm_term", StringType(), True),
    StructField("v", IntegerType(), True),
    StructField("vp", StringType(), True)])

In [18]:
# Load CSV file
csv_file = "tracking.csv" 
df = spark.read.option("header", "true").option("delimiter", ",") \
    .option("quote", '"') \
    .option("escape", '"').option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSS").schema(schema).csv(csv_file)


In [19]:
df.count()

6662

In [22]:
# Define Cassandra keyspace and table
keyspace = "realtime_data"
table = "tracking"

In [23]:
# Ensure Cassandra keyspace and table exist
from cassandra.cluster import Cluster
cluster = Cluster(["cassandra"])
session = cluster.connect()
session.execute(f"CREATE KEYSPACE IF NOT EXISTS {keyspace} WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}};")
session.set_keyspace(keyspace)
session.execute(f"""
    CREATE TABLE IF NOT EXISTS {table} (
        create_time text PRIMARY KEY,
        bid int,
        bn text,
        campaign_id int,
        cd int,
        custom_track text,
        de text,
        dl text,
        dt text,
        ed text,
        ev int,
        group_id int,
        id text,
        job_id int,
        md text,
        publisher_id int,
        rl text,
        sr text,
        ts text,
        tz int,
        ua text,
        uid text,
        utm_campaign text,
        utm_content text,
        utm_medium text,
        utm_source text,
        utm_term text,
        v int,
        vp text
    );
""")



<cassandra.cluster.ResultSet at 0x7bd7d6b8add0>

In [26]:
# Write data to Cassandra
df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("overwrite") \
    .options(table=table, keyspace=keyspace, **{"confirm.truncate": "true"}) \
    .save()

print("Data successfully imported into Cassandra!")


Data successfully imported into Cassandra!


In [27]:
test = spark.read \
  .format("org.apache.spark.sql.cassandra") \
  .options(table=table, keyspace=keyspace) \
  .load()

test.show()


+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|         create_time| bid|        bn|campaign_id| cd|custom_track|   de|                  dl|             dt|                  ed| ev|group_id|  id|job_id|  md|publisher_id|                  rl|       sr|                  ts|  tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|  v|      vp|
+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+--------

In [28]:
test.count()

6662

In [14]:
# Stop Spark session
spark.stop()