In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# Initialize a Spark session
spark = SparkSession \
    .builder \
    .appName("KafkaReadTest") \
    .getOrCreate()

# Read from Kafka topic as a batch dataframe
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "52.27.188.150:9092") \
    .option("subscribe", "bigdata-project2") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", """{"bigdata-project2":{"0":10}}""") \
    .load()

# Select and cast the key and value columns from Kafka's binary format
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Show the first 10 messages
df.show(10, truncate=False)



+----+--------------------------------------------------------------------+
|key |value                                                               |
+----+--------------------------------------------------------------------+
|null|2.0,30.0,Female,39.0,14.0,5.0,18.0,Standard,Annual,932.0,17.0,1.0   |
|null|3.0,65.0,Female,49.0,1.0,10.0,8.0,Basic,Monthly,557.0,6.0,1.0       |
|null|4.0,55.0,Female,14.0,4.0,6.0,18.0,Basic,Quarterly,185.0,3.0,1.0     |
|null|5.0,58.0,Male,38.0,21.0,7.0,7.0,Standard,Monthly,396.0,29.0,1.0     |
|null|6.0,23.0,Male,32.0,20.0,5.0,8.0,Basic,Monthly,617.0,20.0,1.0        |
|null|8.0,51.0,Male,33.0,25.0,9.0,26.0,Premium,Annual,129.0,8.0,1.0       |
|null|9.0,58.0,Female,49.0,12.0,3.0,16.0,Standard,Quarterly,821.0,24.0,1.0|
|null|10.0,55.0,Female,37.0,8.0,4.0,15.0,Premium,Annual,445.0,30.0,1.0    |
|null|11.0,39.0,Male,12.0,5.0,7.0,4.0,Standard,Quarterly,969.0,13.0,1.0   |
|null|12.0,64.0,Female,3.0,25.0,2.0,11.0,Standard,Quarterly,415.0,29.0,1.0|
+----+------

In [0]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession \
    .builder \
    .appName("KafkaReadTest") \
    .getOrCreate()

# Read from Kafka topic
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "52.27.188.150:9092") \
    .option("subscribe", "bigdata-project2") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

# Count the number of messages
message_count = df.count()

print(f"Number of messages in the topic 'bigdata-project2': {message_count}")




Number of messages in the topic 'bigdata-project2': 505207


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Initialize a Spark session
spark = SparkSession \
    .builder \
    .appName("KafkaReadTest") \
    .getOrCreate()

# Read from Kafka topic as a batch dataframe
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "52.27.188.150:9092") \
    .option("subscribe", "bigdata-project2") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

# Select and cast the key and value columns from Kafka's binary format
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Define the schema for your data
schema = StructType([
    StructField("CustomerID", IntegerType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("Tenure", IntegerType(), True),
    StructField("Usage Frequency", IntegerType(), True),
    StructField("Support Calls", IntegerType(), True),
    StructField("Payment Delay", IntegerType(), True),
    StructField("Subscription Type", StringType(), True),
    StructField("Contract Length", StringType(), True),
    StructField("Total Spend", FloatType(), True),
    StructField("Last Interaction", IntegerType(), True),
    StructField("Churn", IntegerType(), True)
])

# Split the 'value' column into individual columns
df = df.selectExpr("split(value, ',') as data")

# Expand the 'data' array into separate columns
for i in range(12):
    df = df.withColumn(f"col_{i+1}", col("data")[i])

# Select and rename the columns based on the schema
df = df.select(
    col("col_1").cast(IntegerType()).alias("CustomerID"),
    col("col_2").cast(IntegerType()).alias("Age"),
    col("col_3").alias("Gender"),
    col("col_4").cast(IntegerType()).alias("Tenure"),
    col("col_5").cast(IntegerType()).alias("Usage Frequency"),
    col("col_6").cast(IntegerType()).alias("Support Calls"),
    col("col_7").cast(IntegerType()).alias("Payment Delay"),
    col("col_8").alias("Subscription Type"),
    col("col_9").alias("Contract Length"),
    col("col_10").cast(FloatType()).alias("Total Spend"),
    col("col_11").cast(IntegerType()).alias("Last Interaction"),
    col("col_12").cast(IntegerType()).alias("Churn")
)


# Show the first 10 rows of the DataFrame with labels
df.show(10, truncate=False)


+----------+---+------+------+---------------+-------------+-------------+-----------------+---------------+-----------+----------------+-----+
|CustomerID|Age|Gender|Tenure|Usage Frequency|Support Calls|Payment Delay|Subscription Type|Contract Length|Total Spend|Last Interaction|Churn|
+----------+---+------+------+---------------+-------------+-------------+-----------------+---------------+-----------+----------------+-----+
|2         |30 |Female|39    |14             |5            |18           |Standard         |Annual         |932.0      |17              |1    |
|3         |65 |Female|49    |1              |10           |8            |Basic            |Monthly        |557.0      |6               |1    |
|4         |55 |Female|14    |4              |6            |18           |Basic            |Quarterly      |185.0      |3               |1    |
|5         |58 |Male  |38    |21             |7            |7            |Standard         |Monthly        |396.0      |29              

In [0]:
import boto3
from io import StringIO
import pandas as pd

# Initialize a session using your Amazon S3 credentials
session = boto3.Session(
    aws_access_key_id='AKIARBUWNMP7DVA4YKPQ',
    aws_secret_access_key='Cu4Hd5ktIvuFGQbnA3mzs9zjWiZD672uiaTXlp0S',
    region_name='us-east-2'  # The region your S3 bucket is in
)

# Convert the DataFrame to CSV format as a string
csv_buffer = StringIO()
df.toPandas().to_csv(csv_buffer, index=False)

# Specify your S3 bucket and object key
bucket_name = 'bigdata-team9-finalproject'
object_key = 'processed_tables/dataset_with_labels.csv'

# Upload the CSV data to S3
s3 = session.client('s3')
s3.put_object(Bucket=bucket_name, Key=object_key, Body=csv_buffer.getvalue())

# Print a success message
print(f"DataFrame 'df' has been written to S3 bucket at 's3://{bucket_name}/{object_key}'.")


DataFrame 'df' has been written to S3 bucket at 's3://bigdata-team9-finalproject/processed_tables/dataset_with_labels.csv'.


In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Add a new column 'subscriptionid' with a series of numbers
df = df.withColumn("subscriptionid", monotonically_increasing_id() + 1000000)

# Show the first 10 rows of the DataFrame with the new 'subscriptionid' column
df.show(10, truncate=False)


+----------+---+------+------+---------------+-------------+-------------+-----------------+---------------+-----------+----------------+-----+--------------+
|CustomerID|Age|Gender|Tenure|Usage Frequency|Support Calls|Payment Delay|Subscription Type|Contract Length|Total Spend|Last Interaction|Churn|subscriptionid|
+----------+---+------+------+---------------+-------------+-------------+-----------------+---------------+-----------+----------------+-----+--------------+
|2         |30 |Female|39    |14             |5            |18           |Standard         |Annual         |932.0      |17              |1    |1000000       |
|3         |65 |Female|49    |1              |10           |8            |Basic            |Monthly        |557.0      |6               |1    |1000001       |
|4         |55 |Female|14    |4              |6            |18           |Basic            |Quarterly      |185.0      |3               |1    |1000002       |
|5         |58 |Male  |38    |21             |

In [0]:
import boto3
from io import StringIO

# Initialize a session using your Amazon S3 credentials
session = boto3.Session(
    aws_access_key_id='AKIARBUWNMP7DVA4YKPQ',
    aws_secret_access_key='Cu4Hd5ktIvuFGQbnA3mzs9zjWiZD672uiaTXlp0S',
    region_name='us-east-2'  # The region your S3 bucket is in
)

# Create customerdim DataFrame
customerdim = df.select("CustomerID", "Age", "Gender")

# Create churnfacttable DataFrame
churnfacttable = df.select(
    "CustomerID", "subscriptionid", "Churn", "Total Spend", "Usage Frequency", "Support Calls",
    "Payment Delay"
)

# Create subscriptiondim DataFrame
subscriptiondim = df.select(
    "subscriptionid", "Contract Length", "Tenure"
)
# Convert customerdim DataFrame to CSV format as a string
csv_buffer_customerdim = StringIO()
customerdim.toPandas().to_csv(csv_buffer_customerdim, index=False)

# Specify your S3 bucket and object key for customerdim
bucket_name = 'bigdata-team9-finalproject'
object_key_customerdim = 'processed_tables/customerdim_original.csv'

# Upload the CSV data for customerdim to S3
s3_customerdim = session.client('s3')
s3_customerdim.put_object(Bucket=bucket_name, Key=object_key_customerdim, Body=csv_buffer_customerdim.getvalue())

# Print a success message for customerdim
print(f"customerdim DataFrame has been written to S3 bucket at 's3://{bucket_name}/{object_key_customerdim}'.")

# Repeat the above steps for churnfacttable and subscriptiondim
# Convert churnfacttable DataFrame to CSV format as a string
csv_buffer_churnfacttable = StringIO()
churnfacttable.toPandas().to_csv(csv_buffer_churnfacttable, index=False)

# Specify your S3 bucket and object key for churnfacttable
object_key_churnfacttable = 'processed_tables/churnfacttable_original.csv'

# Upload the CSV data for churnfacttable to S3
s3_churnfacttable = session.client('s3')
s3_churnfacttable.put_object(Bucket=bucket_name, Key=object_key_churnfacttable, Body=csv_buffer_churnfacttable.getvalue())

# Print a success message for churnfacttable
print(f"churnfacttable DataFrame has been written to S3 bucket at 's3://{bucket_name}/{object_key_churnfacttable}'.")

# Convert subscriptiondim DataFrame to CSV format as a string
csv_buffer_subscriptiondim = StringIO()
subscriptiondim.toPandas().to_csv(csv_buffer_subscriptiondim, index=False)

# Specify your S3 bucket and object key for subscriptiondim
object_key_subscriptiondim = 'processed_tables/subscriptiondim_original.csv'

# Upload the CSV data for subscriptiondim to S3
s3_subscriptiondim = session.client('s3')
s3_subscriptiondim.put_object(Bucket=bucket_name, Key=object_key_subscriptiondim, Body=csv_buffer_subscriptiondim.getvalue())

# Print a success message for subscriptiondim
print(f"subscriptiondim DataFrame has been written to S3 bucket at 's3://{bucket_name}/{object_key_subscriptiondim}'.")


customerdim DataFrame has been written to S3 bucket at 's3://bigdata-team9-finalproject/processed_tables/customerdim_original.csv'.
churnfacttable DataFrame has been written to S3 bucket at 's3://bigdata-team9-finalproject/processed_tables/churnfacttable_original.csv'.
subscriptiondim DataFrame has been written to S3 bucket at 's3://bigdata-team9-finalproject/processed_tables/subscriptiondim_original.csv'.
