In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import from_json

# Connecting Kafka Topic To Databricks

Real Time Data Streaming for `clickstream` topic from Confluent Kafka

In [0]:
# Define Kafka parameters
kafka_bootstrap_servers = ""  # Replace with your Kafka bootstrap server
kafka_topic = "clickstream"

# Confluent Cloud credentials (replace with your actual API Key and Secret)
api_key = ""
api_secret = ""

# Set Kafka configuration to use the API Key and Secret for SASL authentication
kafka_options = {
    "kafka.bootstrap.servers": kafka_bootstrap_servers,
    "subscribe": kafka_topic,
    "kafka.security.protocol": "SASL_SSL",  # Use SASL_SSL for encrypted communication
    "kafka.sasl.mechanism": "PLAIN",  # Authentication mechanism
    "kafka.sasl.jaas.config": f"org.apache.kafka.common.security.plain.PlainLoginModule required username='{api_key}' password='{api_secret}';",
    "startingOffsets": "latest"
}

# Read data from Kafka topic in real-time
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Select and display the streaming records
streaming_records_df = kafka_stream_df.selectExpr("CAST(value AS STRING) as message")

# Define the schema for the incoming JSON message
schema = """
    event STRING,
    user_id STRING,
    timestamp STRING,
    page STRING,
    referrer STRING,
    device STRING
"""

# Parse the JSON value to create structured data
parsed_df = streaming_records_df.select(from_json("message", schema).alias("data"))

# Extract the individual columns from the 'data' struct
parsed_df = parsed_df.select("data.*")

### Test real-time data analysis on `clickstream` topic data

In [0]:
# Display the parsed data (for debugging)
display(parsed_df)

# Mounting ADLS Gen 2 To Azure Databricks

In [0]:
%python
# Define Azure Data Lake details
container_name = ""  # Replace with your Data Lake container name
storage_account_name = ""  # Replace with your Storage Account name
storage_account_key = ""  # Replace with your Storage Account Key

# Unmount the directory if it is already mounted
mount_point = "/mnt/mydatalake"
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)

# Mount the Azure Data Lake Store Gen2
dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
    mount_point=mount_point,  # Path in Databricks where your storage will be accessible
    extra_configs={
        f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key
    }
)

/mnt/mydatalake has been unmounted.


True

Printing data from mount storage

In [0]:
%python
# Assuming the data is in JSON format
df = spark.read.json("/mnt/mydatalake/topics/")

# Show the first few records
display(df.limit(5))

Total Rows

In [0]:
display(f"Total records with null values: {df.count()}")

'Total records with null values: 453'

# Removing NULL Values and Duplicates

Checking Null Values

In [0]:
# Count null values for each column
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Display the null counts
display(null_counts)

Removing Null Values

In [0]:
%python
# Remove rows with null values in any column
df_null_cleaned = df.dropna()

# Display the cleaned DataFrame
display(df_null_cleaned)

Checking Duplicates

In [0]:
%python
# Group by all columns and count the occurrences
duplicate_counts_df = df_null_cleaned.groupBy(parsed_df.columns).count()

# Filter rows where count is greater than 1 (indicating duplicates)
duplicates_df = duplicate_counts_df.filter(col("count") > 1)

# Display the duplicates
display(duplicates_df)

Removing Duplicates

In [0]:
# Remove duplicate rows
df_ready = df_null_cleaned.dropDuplicates()

display(df_ready)

# Transformation

### Filtering data for a specific device (e.g., Tablet)

In [0]:
# Filter data for tablet device
device_tablet_df = df_ready.filter(df_ready.device == "tablet")

# Show the first few records
display(device_tablet_df)

### Grouping columns and checking count

In [0]:
# Perform groupby on the 'device' column and count the occurrences

display(df_ready.groupBy("device").agg(count("device").alias("count")).orderBy(col("count").desc()))

In [0]:
# Perform groupby on the 'event' column and count the occurrences

display(df_ready.groupBy("event").agg(count("event").alias("count")).orderBy(col("count").desc()))

In [0]:
# Perform groupby on the 'page' column and count the occurrences

display(df_ready.groupBy("page").agg(count("page").alias("count")).orderBy(col("count").desc()))

In [0]:
# Perform groupby on the 'referrer' column and count the occurrences

display(df_ready.groupBy("referrer").agg(count("referrer").alias("count")).orderBy(col("count").desc()))

In [0]:
# Perform groupby on the 'year', 'month' and 'day' columns and count the occurrences

display(df_ready.groupBy("year").agg(count("year").alias("count")).orderBy(col("count").desc()))
display(df_ready.groupBy("month").agg(count("month").alias("count")).orderBy(col("count").desc()))
display(df_ready.groupBy("day").agg(count("day").alias("count")).orderBy(col("count").desc()))

### Saving our analysis into new directory

In [0]:
# Path to save the data
device_tablet_df_analysis_path = "/mnt/mydatalake/analysis/device_tablet/"

# Save as csv with header
device_tablet_df.write.mode("overwrite").option("header", "true").csv(device_tablet_df_analysis_path)

print("Device tablet analysis saved!")

Device tablet analysis saved!
