# PySpark Data Engineering Portfolio: Energy Consumption Analysis

This notebook demonstrates PySpark operations using publicly available datasets related to energy consumption in buildings and time series analysis.

## Objectives:
- Load energy consumption data from Open Power System Data.
- Clean and transform data using PySpark.
- Perform time series analysis.
- Visualize energy consumption trends.
- (Optional) Build a predictive model using PySpark MLlib.


In [None]:
# Install PySpark if not installed (Uncomment below)
# !pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, to_timestamp, date_trunc
import pandas as pd
import plotly.express as px


In [None]:
# Initialize PySpark session
spark = SparkSession.builder.appName("EnergyConsumptionAnalysis").getOrCreate()

## Fetch Energy Data from Open Power System Data
We will use publicly available data from Open Power System Data.

In [None]:
import requests
def fetch_energy_data(url, save_path):
    """Downloads CSV data from Open Power System Data repository."""
    response = requests.get(url)
    if response.status_code == 200:
        with open(save_path, 'wb') as f:
            f.write(response.content)
        print(f"Data saved to {save_path}")
    else:
        print("Failed to download data")

# Example dataset (change URL as needed)
dataset_url = "https://data.open-power-system-data.org/time_series/2020-07-16/time_series_60min_singleindex.csv"
fetch_energy_data(dataset_url, "energy_data.csv")

In [None]:
# Load dataset into PySpark DataFrame
df = spark.read.csv("energy_data.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)

## Data Cleaning
We will handle missing values and ensure proper timestamp formats.

In [None]:
# Handle missing values by filling with mean
df = df.fillna({"energy_consumption": df.agg({"energy_consumption": "mean"}).collect()[0][0]})

# Convert timestamp column
df = df.withColumn("timestamp_column", to_timestamp(col("utc_timestamp")))
df.show(5)

## Time Series Analysis
We aggregate energy consumption data by day.

In [None]:
daily_consumption = df.withColumn("date", date_trunc("day", col("timestamp_column")))\
                   .groupBy("date")\
                   .agg(sum("energy_consumption").alias("total_daily_consumption"))
daily_consumption.show(5)

## Visualization
We use Plotly to visualize daily energy consumption trends.

In [None]:
daily_pd = daily_consumption.toPandas()
fig = px.line(daily_pd, x="date", y="total_daily_consumption", title="Daily Energy Consumption")
fig.show()

## Conclusion
- We successfully loaded and processed energy consumption data using PySpark.
- We aggregated data by day to analyze trends.
- Interactive visualizations were generated using Plotly.

**Next Steps:**
- Implement a predictive model using PySpark MLlib.
- Perform anomaly detection in energy consumption patterns.
