# PySpark data loading and preliminary cleaning complete code

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, to_timestamp
from pyspark.sql.types import DoubleType

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Electric Power Project - Data Cleaning") \
    .getOrCreate()

# File path
file_path = "../data/raw/household_power.csv"

# Load data (semicolon-separated, '?' as missing)
df_raw = spark.read.csv(file_path, header=True, sep=";", inferSchema=False)

# Replace '?' with null and drop rows with nulls
df = df_raw.replace("?", None).dropna()

# Cast numeric columns to DoubleType
columns_to_cast = [
    "Global_active_power", "Global_reactive_power", "Voltage",
    "Global_intensity", "Sub_metering_1", "Sub_metering_2", "Sub_metering_3"
]
for col_name in columns_to_cast:
    df = df.withColumn(col_name, col(col_name).cast(DoubleType()))

# Create timestamp column from Date and Time
df = df.withColumn("Datetime", to_timestamp(concat_ws(" ", col("Date"), col("Time")), "d/M/yyyy H:mm:ss"))

# Drop original Date and Time columns
df = df.drop("Date", "Time")

# Reorder columns (Datetime first)
cols = df.columns
cols.remove("Datetime")
df = df.select(["Datetime"] + cols)

# Show schema and sample data
df.printSchema()
df.show(5, truncate=False)

# Save cleaned data to Parquet
output_path = "../data/processed/cleaned_power.parquet"
df.write.mode("overwrite").parquet(output_path)

print("Data cleaning completed and saved to:", output_path)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/16 22:28:11 WARN Utils: Your hostname, MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.2.100 instead (on interface en0)
25/07/16 22:28:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/16 22:28:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- Datetime: timestamp (nullable = false)
 |-- Global_active_power: double (nullable = true)
 |-- Global_reactive_power: double (nullable = true)
 |-- Voltage: double (nullable = true)
 |-- Global_intensity: double (nullable = true)
 |-- Sub_metering_1: double (nullable = true)
 |-- Sub_metering_2: double (nullable = true)
 |-- Sub_metering_3: double (nullable = true)

+-------------------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|Datetime           |Global_active_power|Global_reactive_power|Voltage|Global_intensity|Sub_metering_1|Sub_metering_2|Sub_metering_3|
+-------------------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|2006-12-16 17:24:00|4.216              |0.418                |234.84 |18.4            |0.0           |1.0           |17.0          |
|2006-12-16 17:25:00|5.36               |0.436                |233.63 |23.0          

25/07/16 22:28:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/16 22:28:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/16 22:28:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
[Stage 2:>                                                        (0 + 10) / 10]

Data cleaning completed and saved to: ../data/processed/cleaned_power.parquet


25/07/16 22:28:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/16 22:28:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

# Aggregate data to daily level using PySpark

Convert the cleaned minute-level electricity data into daily-level aggregated data to be used in time series forecasting and modeling.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, avg, max, min, sum as spark_sum

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Electric Power Project - Daily Aggregation") \
    .getOrCreate()

# Load the cleaned Parquet data
input_path = "../data/processed/cleaned_power.parquet"
df = spark.read.parquet(input_path)

# Extract date from timestamp
df = df.withColumn("Date", to_date("Datetime"))

# Aggregate data by date using common statistical functions
df_daily = df.groupBy("Date").agg(
    avg("Global_active_power").alias("avg_active_power"),
    spark_sum("Global_active_power").alias("total_active_power"),
    avg("Voltage").alias("avg_voltage"),
    max("Voltage").alias("max_voltage"),
    min("Voltage").alias("min_voltage"),
    avg("Global_intensity").alias("avg_intensity"),
    spark_sum("Sub_metering_1").alias("sum_sub_1"),
    spark_sum("Sub_metering_2").alias("sum_sub_2"),
    spark_sum("Sub_metering_3").alias("sum_sub_3")
)

# Optional: sort by date
df_daily = df_daily.orderBy("Date")

# Show results
df_daily.show(5)

# Save to Parquet for modeling
output_path = "../data/processed/daily_power.parquet"
df_daily.write.mode("overwrite").parquet(output_path)

print("Daily-level data saved to:", output_path)


25/07/16 22:32:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+----------+------------------+------------------+------------------+-----------+-----------+------------------+---------+---------+---------+
|      Date|  avg_active_power|total_active_power|       avg_voltage|max_voltage|min_voltage|     avg_intensity|sum_sub_1|sum_sub_2|sum_sub_3|
+----------+------------------+------------------+------------------+-----------+-----------+------------------+---------+---------+---------+
|2006-12-16|3.0534747474747492|1209.1760000000006|236.24376262626276|     243.73|     230.98|13.082828282828302|      0.0|    546.0|   4926.0|
|2006-12-17| 2.354486111111111|           3390.46|240.08702777777793|     249.37|     229.57| 9.999027777777764|   2033.0|   4187.0|  13341.0|
|2006-12-18|1.5304347222222197|2203.8259999999964|241.23169444444474|     248.48|     229.08| 6.421666666666658|   1063.0|   2621.0|  14018.0|
|2006-12-19| 1.157079166666667|1666.1940000000006|241.99931250000026|     248.89|     231.24| 4.926388888888899|    839.0|   7602.0|   6197.0|

# Exploratory Data Analysis (EDA) on Daily Power Data

Use Pandas and Matplotlib/Seaborn to perform basic exploratory data analysis (EDA) on the daily-level electricity data.

In [8]:
!pip install pyarrow




In [9]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Set style
sns.set(style="whitegrid")

# Load the daily-level Parquet file (converted by PySpark)
file_path = "../data/processed/daily_power.parquet"
df = pd.read_parquet("../data/processed/daily_power.parquet")

# Show basic info
print("📊 Data shape:", df.shape)
print("🧾 Columns:", df.columns.tolist())
print(df.head())

# Convert 'Date' column to datetime (if not already)
df['Date'] = pd.to_datetime(df['Date'])

# Plot total active power over time
plt.figure(figsize=(14, 5))
plt.plot(df['Date'], df['total_active_power'], color='blue')
plt.title("Total Daily Active Power (kW)")
plt.xlabel("Date")
plt.ylabel("Total Active Power")
plt.tight_layout()
plt.show()

# Plot average voltage over time
plt.figure(figsize=(14, 4))
plt.plot(df['Date'], df['avg_voltage'], color='orange')
plt.title("Average Daily Voltage (V)")
plt.xlabel("Date")
plt.ylabel("Avg Voltage")
plt.tight_layout()
plt.show()

# Heatmap of sub-metering consumption
df_sub = df[['sum_sub_1', 'sum_sub_2', 'sum_sub_3']]
plt.figure(figsize=(8, 4))
sns.heatmap(df_sub.corr(), annot=True, cmap="Blues")
plt.title("Correlation: Sub-metering")
plt.tight_layout()
plt.show()

# Histogram of total daily active power
plt.figure(figsize=(8, 4))
sns.histplot(df['total_active_power'], bins=50, kde=True)
plt.title("Distribution of Total Daily Active Power")
plt.xlabel("Total Active Power (kW)")
plt.tight_layout()
plt.show()


ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.