In [17]:
SERVICE_NAME = "mycos"  # logical service name
ACCESS_KEY = "891d47bfa17346038b970fa46d63fd41"         # Copy from your new service credentials
SECRET_KEY = "25fb5c98b01a4359fc5a11fcd848ae60debd89c2ac7ebf66"  
ENDPOINT = "s3.eu-de.cloud-object-storage.appdomain.cloud"  # Frankfurt region endpoint
COS_BUCKET = "processed-data-bucket"
COS_FILE = "processed_customer_purchase_behavior.csv"

In [18]:
# Initialize Spark Session with basic Stocator config using the logical service name
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Real_Time_Analytics") \
    .config("spark.hadoop.fs.stocator.scheme.list", "cos") \
    .config("spark.hadoop.fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem") \
    .config(f"spark.hadoop.fs.cos.{SERVICE_NAME}.access.key", ACCESS_KEY) \
    .config(f"spark.hadoop.fs.cos.{SERVICE_NAME}.secret.key", SECRET_KEY) \
    .config(f"spark.hadoop.fs.cos.{SERVICE_NAME}.endpoint", f"https://{ENDPOINT}") \
    .getOrCreate()

print("✅ Spark Session Initialized with COS Service Name:", SERVICE_NAME)

✅ Spark Session Initialized with COS Service Name: mycos


In [19]:
# Explicitly set the Hadoop configuration for COS credentials
hadoopConf = spark._jsc.hadoopConfiguration()
hadoopConf.set(f"fs.cos.{SERVICE_NAME}.access.key", ACCESS_KEY)
hadoopConf.set(f"fs.cos.{SERVICE_NAME}.secret.key", SECRET_KEY)
hadoopConf.set(f"fs.cos.{SERVICE_NAME}.endpoint", f"https://{ENDPOINT}")

In [20]:
# Check that the configuration is set (optional)
print("Access Key from Hadoop conf:", hadoopConf.get(f"fs.cos.{SERVICE_NAME}.access.key"))
print("Secret Key from Hadoop conf:", hadoopConf.get(f"fs.cos.{SERVICE_NAME}.secret.key"))
print("Endpoint from Hadoop conf:", hadoopConf.get(f"fs.cos.{SERVICE_NAME}.endpoint"))

Access Key from Hadoop conf: 891d47bfa17346038b970fa46d63fd41
Secret Key from Hadoop conf: 25fb5c98b01a4359fc5a11fcd848ae60debd89c2ac7ebf66
Endpoint from Hadoop conf: https://s3.eu-de.cloud-object-storage.appdomain.cloud


In [21]:
# Build the COS URL using the logical service name:
cos_url = f"cos://{COS_BUCKET}.{SERVICE_NAME}/{COS_FILE}"
print("Using COS URL:", cos_url)

Using COS URL: cos://processed-data-bucket.mycos/processed_customer_purchase_behavior.csv


In [22]:
spark.conf.set(f"spark.hadoop.fs.cos.{COS_BUCKET}.access.key", ACCESS_KEY)
spark.conf.set(f"spark.hadoop.fs.cos.{COS_BUCKET}.secret.key", SECRET_KEY)
spark.conf.set(f"spark.hadoop.fs.cos.{COS_BUCKET}.endpoint", f"https://{ENDPOINT}")

print("✅ Spark Configuration Updated with COS Credentials")


✅ Spark Configuration Updated with COS Credentials


In [23]:
# Define the schema for your dataset
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("Customer ID", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Item Purchased", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Purchase Amount (USD)", DoubleType(), True),
    StructField("Location", StringType(), True),
    StructField("Season", StringType(), True),
    StructField("High Value Customer", StringType(), True),
])

# Read the data from COS using Stocator
processed_df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(cos_url)

print("✅ Processed Data Loaded Successfully!")
processed_df.show(5)

✅ Processed Data Loaded Successfully!
+-----------+---+------+--------------+--------+---------------------+-------------+------+-------------------+
|Customer ID|Age|Gender|Item Purchased|Category|Purchase Amount (USD)|     Location|Season|High Value Customer|
+-----------+---+------+--------------+--------+---------------------+-------------+------+-------------------+
|          1| 55|  Male|        Blouse|Clothing|                 53.0|     Kentucky|     L|               Gray|
|          2| 19|  Male|       Sweater|Clothing|                 64.0|        Maine|     L|             Maroon|
|          3| 50|  Male|         Jeans|Clothing|                 73.0|Massachusetts|     S|             Maroon|
|          4| 21|  Male|       Sandals|Footwear|                 90.0| Rhode Island|     M|             Maroon|
|          5| 45|  Male|        Blouse|Clothing|                 49.0|       Oregon|     M|          Turquoise|
+-----------+---+------+--------------+--------+------------------

## Write Batch Data to a Temporary Directory

In [25]:
import os

# Define a temporary directory to store CSV files (adjust path if necessary)
temp_dir = "/tmp/stream_data"
os.makedirs(temp_dir, exist_ok=True)

# Write the processed DataFrame to this directory as CSV files (overwrite existing)
processed_df.write.mode("overwrite").option("header", "true").csv(temp_dir)
print("✅ Processed data written to:", temp_dir)


✅ Processed data written to: /tmp/stream_data


## Read the Directory as a Streaming DataFrame

In [26]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Use the same schema as before
schema = StructType([
    StructField("Customer ID", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Item Purchased", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Purchase Amount (USD)", DoubleType(), True),
    StructField("Location", StringType(), True),
    StructField("Season", StringType(), True),
    StructField("High Value Customer", StringType(), True),
])

# Read streaming data from the temporary directory
streaming_df = spark.readStream \
    .option("header", "true") \
    .option("maxFilesPerTrigger", 1) \
    .schema(schema) \
    .csv(temp_dir)

print("✅ Streaming DataFrame created from directory")


✅ Streaming DataFrame created from directory


## Run a Real-Time Aggregation on the Streaming DataFrame

In [34]:
# Run a streaming query that aggregates total sales per category and writes to the console.
stream_query = streaming_df.groupBy("Category").agg({"Purchase Amount (USD)": "sum"}) \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

print("✅ Streaming query started. Check the console output for streaming results.")

In [32]:
import time

# Run a streaming query to compute total sales per category in real time
query = streaming_df.groupBy("Category").agg({"Purchase Amount (USD)": "sum"}) \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Let the streaming query run for a short period (e.g., 20 seconds)
time.sleep(20)
query.stop()
print("✅ Streaming query stopped.")

✅ Streaming query stopped.


In [33]:
import time

# Let the streaming query run for a while (e.g., 20 seconds) to simulate real-time processing.
time.sleep(20)

# Check streaming query status and progress (optional)
print("Query Status:", stream_query.status)
print("Last Progress:", stream_query.lastProgress)

# Stop the streaming query when finished.
stream_query.stop()
print("✅ Streaming query stopped.")


Query Status: {'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}
Last Progress: {'id': '48111222-4447-43da-97b2-20bc58538bd9', 'runId': 'aa831cd6-17c6-4c08-b38f-e39d41095744', 'name': None, 'timestamp': '2025-02-24T15:35:48.129Z', 'batchId': 0, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0, 'durationMs': {'latestOffset': 4, 'triggerExecution': 4}, 'stateOperators': [], 'sources': [{'description': 'FileStreamSource[file:/tmp/stream_data]', 'startOffset': None, 'endOffset': None, 'latestOffset': None, 'numInputRows': 0, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.0}], 'sink': {'description': 'org.apache.spark.sql.execution.streaming.ConsoleTable$@51a20257', 'numOutputRows': -1}}
✅ Streaming query stopped.
