In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, from_json, lit, avg, sum as _sum
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType, DoubleType

In [None]:

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Charger Data Analysis") \
    .getOrCreate()



In [None]:
# Load the dataset
# Replace 'path_to_file.csv' with your dataset file path
data_path = "path_to_your_dataset.csv"
data = spark.read.option("header", "true").csv(data_path)

In [None]:

# Sample schema for the "charging_activity" column (modify as per your dataset's exact format)
charging_activity_schema = ArrayType(
    StructType([
        StructField("M", StructType([
            StructField("startTime", StructType([StructField("S", StringType())])),
            StructField("endTime", StructType([StructField("S", StringType())])),
            StructField("status", StructType([StructField("S", StringType())]))
        ]))
    ])
)



In [None]:
# Sample schema for the "energy_report" column (modify as per your dataset's exact format)
energy_report_schema = StructType([
    StructField("amount", StructType([StructField("N", StringType())])),
    StructField("currency", StructType([StructField("S", StringType())]))
])

In [None]:

# Parse charging_activity column
data = data.withColumn("charging_activity", from_json(col("charging_activity"), charging_activity_schema))


In [None]:

# Explode charging_activity to calculate charging times
charging_data = data.select(
    col("PK"),
    col("SK"),
    explode(col("charging_activity")).alias("activity")
).select(
    col("PK"),
    col("SK"),
    col("activity.M.startTime.S").alias("startTime"),
    col("activity.M.endTime.S").alias("endTime"),
    col("activity.M.status.S").alias("status")
)


In [None]:
# Filter only "Charging" status and calculate charging durations
from pyspark.sql.functions import unix_timestamp

charging_data = charging_data.filter(charging_data.status == "Charging")
charging_data = charging_data.withColumn("startTime", unix_timestamp(col("startTime"))) \
    .withColumn("endTime", unix_timestamp(col("endTime"))) \
    .withColumn("charging_duration", (col("endTime") - col("startTime")) / 60)  # Duration in minutes


In [None]:

# Calculate average charging duration
avg_charging_time = charging_data.select(avg("charging_duration").alias("average_charging_time")).collect()[0]["average_charging_time"]


In [None]:
# Parse energy_report column for revenue calculation
data = data.withColumn("energy_report", from_json(col("energy_report"), energy_report_schema))
data = data.withColumn("revenue", col("energy_report.amount.N").cast(DoubleType()))

In [None]:

# Calculate total revenue
total_revenue = data.select(_sum("revenue").alias("total_revenue")).collect()[0]["total_revenue"]

In [None]:

# Display results
print(f"Average Charging Time: {avg_charging_time:.2f} minutes")
print(f"Total Revenue Earned: ${total_revenue:.2f}")