<center><img src="http://localhost/images/interacting-with-cassandra-from-spark.jpg" alt="Interacting with Cassandra from Spark" style="text-align: center; width:960px; height:540px;"/></center>


In [1]:
%cassandra
# Keyspace creation 

CREATE KEYSPACE if not exists tutorial WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

In [2]:
%cassandra
# Create the customers table

CREATE TABLE if not exists tutorial.customers (
    id text PRIMARY KEY,
    county text,
    name text
);

In [3]:
%cassandra
# Load sample data into customers table

INSERT INTO tutorial.customers (id, county, name) VALUES ('1', 'Orange County', 'Alice Johnson');
INSERT INTO tutorial.customers (id, county, name) VALUES ('2', 'Los Angeles County', 'Bob Smith');
INSERT INTO tutorial.customers (id, county, name) VALUES ('3', 'San Diego County', 'Charlie Brown');
INSERT INTO tutorial.customers (id, county, name) VALUES ('4', 'Riverside County', 'Diana Prince');
INSERT INTO tutorial.customers (id, county, name) VALUES ('5', 'Santa Clara County', 'Eve Adams');
INSERT INTO tutorial.customers (id, county, name) VALUES ('6', 'San Francisco County', 'Frank Castle');
INSERT INTO tutorial.customers (id, county, name) VALUES ('7', 'Alameda County', 'Grace Lee');
INSERT INTO tutorial.customers (id, county, name) VALUES ('8', 'Sacramento County', 'Henry Ford');
INSERT INTO tutorial.customers (id, county, name) VALUES ('9', 'Fresno County', 'Ivy Taylor');
INSERT INTO tutorial.customers (id, county, name) VALUES ('10', 'Ventura County', 'Jack Sparrow');

In [4]:
%cassandra
# Check the table

SELECT * FROM tutorial.customers LIMIT 5;

In [5]:
%cassandra
# Create the transactions table

CREATE TABLE if not exists tutorial.transactions ( 
    customerid text,
    year int,
    month int,
    id timeuuid,
    amount int,
    card text,
    status text,
    PRIMARY KEY ((customerid, year, month), id)
);

In [6]:
%cassandra
# Add sample data to the transactions table

INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('1', 2023, 1, now(), 500, 'VISA', 'APPROVED');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('2', 2023, 2, now(), 200, 'MASTERCARD', 'DECLINED');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('3', 2023, 3, now(), 750, 'AMEX', 'PENDING');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('4', 2023, 4, now(), 400, 'DISCOVER', 'APPROVED');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('5', 2023, 5, now(), 300, 'VISA', 'DECLINED');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('1', 2023, 6, now(), 900, 'MASTERCARD', 'APPROVED');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('2', 2023, 7, now(), 100, 'AMEX', 'PENDING');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('3', 2023, 8, now(), 650, 'DISCOVER', 'DECLINED');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('4', 2023, 9, now(), 850, 'VISA', 'APPROVED');
INSERT INTO tutorial.transactions (customerid, year, month, id, amount, card, status) VALUES ('5', 2023, 10, now(), 700, 'MASTERCARD', 'PENDING');


In [7]:
%cassandra
# chack the transactions table

SELECT * FROM tutorial.transactions LIMIT 5;

In [8]:
%spark

sc.version

In [9]:
%spark
// Load the Cassandra transactions table into a DataFrame

val transactionsDF = spark.read
                        .format("org.apache.spark.sql.cassandra")
                        .option("keyspace", "tutorial")
                        .option("table", "transactions")
                        .load

In [10]:
%spark

// Q1: Calculate the current balance per customer
val balanceDF = transactionsDF
                .groupBy("customerid")
                .agg(sum("amount").as("current_balance")) // Sum up the 'amount' column for each customer
                .orderBy("customerid") // Optional: Sort by customer ID

In [11]:
%spark
// Show the resulting balance

// balanceDF.show
z.show(balanceDF)

In [12]:
%spark
// Show the resulting balance

// balanceDF.show
z.show(balanceDF)

In [13]:
%spark

// Q2: Perform aggregation: Total number of transactions and average amount by customer and card type
val aggregatedDF = transactionsDF
                    .groupBy("customerid", "card") // Group by customer ID and card type
                    .agg(
                        count("id").as("transaction_count"),       // Count the number of transactions
                        avg("amount").as("average_transaction"),  // Calculate the average transaction amount
                        sum("amount").as("total_transaction")     // Calculate the total transaction amount
                        )
                    .orderBy("customerid", "card") // Optional: Sort by customer ID and card type

In [14]:
%spark

// Show the resulting aggregated data
//aggregatedDF.show(truncate = false)
z.show(aggregatedDF)

In [15]:
%spark

// Show the resulting aggregated data

z.show(aggregatedDF) //.show(truncate = false)

In [16]:
%spark

// Load customers table into a DataFrame

val customersDF = spark.read
                .format("org.apache.spark.sql.cassandra")
                .option("keyspace", "tutorial")
                .option("table", "customers")
                .load

In [17]:
%spark
// Join transactions with customers on customer ID

val joinedDF = transactionsDF.join(customersDF,  transactionsDF("customerid") === customersDF("id"), "inner")


In [18]:
%spark
// Q3: Calculate total spending per county per month

val spendingPerCountyDF = joinedDF
                .groupBy("county", "year", "month") // Group by county, year, and month
                .agg(sum("amount").as("total_spent")) // Sum up the amount for each group
                .orderBy("county", "year", "month") // Optional: Sort the results

In [19]:
%spark
// Show the results

// spendingPerCountyDF.show(truncate = false)
z.show(spendingPerCountyDF)

In [20]:
%spark
// Show the results

//spendingPerCountyDF.show(truncate = false)
z.show(spendingPerCountyDF)

In [21]:
%spark
// Create temporary views for SQL querying

transactionsDF.createOrReplaceTempView("transactions")
customersDF.createOrReplaceTempView("customers")

In [22]:
%spark
//Q4:  Write the Spark SQL query to find names of users with approved transactions

val query = """
            SELECT DISTINCT c.id, c.name
            FROM customers c
            JOIN transactions t
            ON c.id = t.customerid
            WHERE t.status = 'APPROVED'
    """

In [23]:
%spark
// Execute the query

val resultDF = spark.sql(query)

In [24]:
%spark
// Show the result
resultDF.show(truncate = false)

In [25]:
%cassandra

drop table if exists tutorial.approved_users;

In [26]:
%cassandra
# Create table for approved users name

CREATE TABLE tutorial.approved_users (
    id text PRIMARY KEY,
    name text
);

In [27]:
%spark
// write the result back to the Cassandra table

resultDF.write
        .format("org.apache.spark.sql.cassandra")
        .option("keyspace", "tutorial")
        .option("table", "approved_users")
        .mode("append")
        .save

In [28]:
%cassandra
# Verify the data

SELECT * FROM tutorial.approved_users;

In [29]:
%spark

sc.stop


In [30]:
%spark
