In [1]:
#task3 - data pipeline tasks
#Use AWS CLI or PySpark’s built-in S3 support to load the dataset directly.
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Verify HomeC Dataset") \
    .getOrCreate()

# Path to the dataset on EC2
file_path = "/home/ec2-user/HomeC.csv"

# Load the dataset
dataset = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the schema of the dataset
print("Schema of the dataset:")
dataset.printSchema()

# Display the first 10 rows of the dataset
print("First 10 rows of the dataset:")
dataset.show(10, truncate=False)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/05 20:58:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Schema of the dataset:
root
 |-- time: string (nullable = true)
 |-- use [kW]: double (nullable = true)
 |-- gen [kW]: double (nullable = true)
 |-- House overall [kW]: double (nullable = true)
 |-- Dishwasher [kW]: double (nullable = true)
 |-- Furnace 1 [kW]: double (nullable = true)
 |-- Furnace 2 [kW]: double (nullable = true)
 |-- Home office [kW]: double (nullable = true)
 |-- Fridge [kW]: double (nullable = true)
 |-- Wine cellar [kW]: double (nullable = true)
 |-- Garage door [kW]: double (nullable = true)
 |-- Kitchen 12 [kW]: double (nullable = true)
 |-- Kitchen 14 [kW]: double (nullable = true)
 |-- Kitchen 38 [kW]: double (nullable = true)
 |-- Barn [kW]: double (nullable = true)
 |-- Well [kW]: double (nullable = true)
 |-- Microwave [kW]: double (nullable = true)
 |-- Living room [kW]: double (nullable = true)
 |-- Solar [kW]: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- visibility: double (nullable = tr

24/12/05 20:59:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+-----------+-----------+------------------+---------------+--------------+--------------+----------------+-----------+----------------+----------------+---------------+---------------+---------------+-----------+-----------+--------------+----------------+-----------+-----------+--------+----------+-------------------+--------+---------+-----------+---------------+--------+-----------------+
|time      |use [kW]   |gen [kW]   |House overall [kW]|Dishwasher [kW]|Furnace 1 [kW]|Furnace 2 [kW]|Home office [kW]|Fridge [kW]|Wine cellar [kW]|Garage door [kW]|Kitchen 12 [kW]|Kitchen 14 [kW]|Kitchen 38 [kW]|Barn [kW]  |Well [kW]  |Microwave [kW]|Living room [kW]|Solar [kW] |temperature|humidity|visibility|apparentTemperature|pressure|windSpeed|windBearing|precipIntensity|dewPoint|precipProbability|
+----------+-----------+-----------+------------------+---------------+--------------+--------------+----------------+-----------+----------------+----------------+---------------+------

In [2]:
#task3 - Data Transformation: Create at least 2 new columns (e.g., Year, Month).
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder \
    .appName("HomeC Data Transformation") \
    .getOrCreate()

# Path to the dataset on EC2
file_path = "/home/ec2-user/HomeC.csv"

# Load the dataset
dataset = spark.read.csv(file_path, header=True, inferSchema=True)

# Transform 'time' column from Unix timestamp to readable timestamp
df_transformed = dataset.withColumn(
    "time", 
    F.to_timestamp(F.from_unixtime(F.col("time")))
)

# Create new columns 'Year' and 'Month'
df_transformed = df_transformed.withColumn("Year", F.year("time")) \
                               .withColumn("Month", F.month("time"))

# Display the transformed dataset with new columns
print("Transformed dataset:")
df_transformed.show(10, truncate=False)



24/12/05 21:00:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Transformed dataset:
+-------------------+-----------+-----------+------------------+---------------+--------------+--------------+----------------+-----------+----------------+----------------+---------------+---------------+---------------+-----------+-----------+--------------+----------------+-----------+-----------+--------+----------+-------------------+--------+---------+-----------+---------------+--------+-----------------+----+-----+
|time               |use [kW]   |gen [kW]   |House overall [kW]|Dishwasher [kW]|Furnace 1 [kW]|Furnace 2 [kW]|Home office [kW]|Fridge [kW]|Wine cellar [kW]|Garage door [kW]|Kitchen 12 [kW]|Kitchen 14 [kW]|Kitchen 38 [kW]|Barn [kW]  |Well [kW]  |Microwave [kW]|Living room [kW]|Solar [kW] |temperature|humidity|visibility|apparentTemperature|pressure|windSpeed|windBearing|precipIntensity|dewPoint|precipProbability|Year|Month|
+-------------------+-----------+-----------+------------------+---------------+--------------+--------------+---------------

In [3]:
#Task 3 - Data Aggregation: Compute at least 5 key metrics
#total_usage, total_solar, Top 10 Months by Solar Energy Generation, Energy Generated vs. Consumed Per Year
from pyspark.sql import functions as F

# 1. Total Energy Usage by Year and Month
total_usage_by_month = df_transformed.groupBy("Year", "Month").agg(
    F.sum("use [kW]").alias("Total_Usage_kW")
)
print("Total Energy Usage by Month:")
total_usage_by_month.show(10, truncate=False)

# 2. Average Monthly Temperature
avg_temp_by_month = df_transformed.groupBy("Year", "Month").agg(
    F.avg("temperature").alias("Avg_Temperature")
)
print("Average Temperature by Month:")
avg_temp_by_month.show(10, truncate=False)

# 3. Top 10 Months by Solar Energy Generation
top_solar_months = df_transformed.groupBy("Year", "Month").agg(
    F.sum("Solar [kW]").alias("Total_Solar_kW")
).orderBy(F.desc("Total_Solar_kW")).limit(10)
print("Top 10 Months by Solar Energy Generation:")
top_solar_months.show(truncate=False)

# 4. Total Energy Generated vs. Consumed Per Year
energy_comparison = df_transformed.groupBy("Year").agg(
    F.sum("use [kW]").alias("Total_Consumed_kW"),
    F.sum("gen [kW]").alias("Total_Generated_kW")
)
print("Energy Generated vs. Consumed Per Year:")
energy_comparison.show(truncate=False)

# 5. Monthly Average Usage for Top Consumer Appliances
# Selecting some key appliances for demonstration: 'Dishwasher [kW]', 'Fridge [kW]', 'Microwave [kW]'
avg_appliance_usage = df_transformed.groupBy("Year", "Month").agg(
    F.avg("Dishwasher [kW]").alias("Avg_Dishwasher_kW"),
    F.avg("Fridge [kW]").alias("Avg_Fridge_kW"),
    F.avg("Microwave [kW]").alias("Avg_Microwave_kW")
)
print("Monthly Average Usage for Top Appliances:")
avg_appliance_usage.show(10, truncate=False)




Total Energy Usage by Month:


                                                                                

+----+-----+-----------------+
|Year|Month|Total_Usage_kW   |
+----+-----+-----------------+
|NULL|NULL |NULL             |
|2016|1    |432839.7404670082|
+----+-----+-----------------+

Average Temperature by Month:


[Stage 9:>                                                          (0 + 1) / 1]

+----+-----+-----------------+
|Year|Month|Avg_Temperature  |
+----+-----+-----------------+
|NULL|NULL |NULL             |
|2016|1    |50.74193461135845|
+----+-----+-----------------+

Top 10 Months by Solar Energy Generation:


                                                                                

+----+-----+-----------------+
|Year|Month|Total_Solar_kW   |
+----+-----+-----------------+
|2016|1    |38412.76483438449|
|NULL|NULL |NULL             |
+----+-----+-----------------+

Energy Generated vs. Consumed Per Year:


                                                                                

+----+-----------------+------------------+
|Year|Total_Consumed_kW|Total_Generated_kW|
+----+-----------------+------------------+
|NULL|NULL             |NULL              |
|2016|432839.7404670082|38412.76483438449 |
+----+-----------------+------------------+

Monthly Average Usage for Top Appliances:


[Stage 18:>                                                         (0 + 1) / 1]

+----+-----+-------------------+-------------------+--------------------+
|Year|Month|Avg_Dishwasher_kW  |Avg_Fridge_kW      |Avg_Microwave_kW    |
+----+-----+-------------------+-------------------+--------------------+
|NULL|NULL |NULL               |NULL               |NULL                |
|2016|1    |0.03136752497494595|0.06355641007374108|0.010982993427659423|
+----+-----+-------------------+-------------------+--------------------+



                                                                                

In [4]:
#
output_path = "/home/ec2-user/procesed_data/"

# Save each DataFrame to local storage
# Save the transformed DataFrame to local storage
df_transformed.write.csv(output_path + "df_transformed.csv", header=True, mode="overwrite")
total_usage_by_month.write.csv(output_path + "total_usage_by_month.csv", header=True, mode="overwrite")
avg_temp_by_month.write.csv(output_path + "avg_temp_by_month.csv", header=True, mode="overwrite")
top_solar_months.write.csv(output_path + "top_solar_months.csv", header=True, mode="overwrite")
energy_comparison.write.csv(output_path + "energy_comparison.csv", header=True, mode="overwrite")
avg_appliance_usage.write.csv(output_path + "avg_appliance_usage.csv", header=True, mode="overwrite")



                                                                                

In [12]:
!pip install boto3




Defaulting to user installation because normal site-packages is not writeable
Collecting boto3
  Downloading boto3-1.35.72-py3-none-any.whl (139 kB)
     |████████████████████████████████| 139 kB 3.9 MB/s            
Collecting s3transfer<0.11.0,>=0.10.0
  Downloading s3transfer-0.10.4-py3-none-any.whl (83 kB)
     |████████████████████████████████| 83 kB 3.2 MB/s             
[?25hCollecting botocore<1.36.0,>=1.35.72
  Downloading botocore-1.35.72-py3-none-any.whl (13.1 MB)
     |████████████████████████████████| 13.1 MB 32.5 MB/s            
Installing collected packages: botocore, s3transfer, boto3
Successfully installed boto3-1.35.72 botocore-1.35.72 s3transfer-0.10.4


In [7]:
#Task 3 - Store Processed Data Back to S3
import boto3
import os

# Initialize S3 client
s3 = boto3.client("s3")

# S3 bucket name and folder
s3_bucket_name = "big-data-pipeline-rimysore"
s3_folder = "proccesed_data/"

# Function to upload files to S3
def upload_to_s3(local_folder, s3_bucket, s3_prefix):
    for root, dirs, files in os.walk(local_folder):
        for file in files:
            local_file_path = os.path.join(root, file)
            s3_file_path = s3_prefix + file
            print(f"Uploading {local_file_path} to s3://{s3_bucket}/{s3_file_path}")
            s3.upload_file(local_file_path, s3_bucket, s3_file_path)

# Upload the processed data to S3
upload_to_s3(output_path, s3_bucket_name, s3_folder)


Uploading /home/ec2-user/procesed_data/df_transformed.csv/part-00000-377f067d-495f-4880-8e5d-49b979dcdb6a-c000.csv to s3://big-data-pipeline-rimysore/proccesed_data/part-00000-377f067d-495f-4880-8e5d-49b979dcdb6a-c000.csv
Uploading /home/ec2-user/procesed_data/df_transformed.csv/.part-00000-377f067d-495f-4880-8e5d-49b979dcdb6a-c000.csv.crc to s3://big-data-pipeline-rimysore/proccesed_data/.part-00000-377f067d-495f-4880-8e5d-49b979dcdb6a-c000.csv.crc
Uploading /home/ec2-user/procesed_data/df_transformed.csv/_SUCCESS to s3://big-data-pipeline-rimysore/proccesed_data/_SUCCESS
Uploading /home/ec2-user/procesed_data/df_transformed.csv/._SUCCESS.crc to s3://big-data-pipeline-rimysore/proccesed_data/._SUCCESS.crc
Uploading /home/ec2-user/procesed_data/total_usage_by_month.csv/part-00000-f737bda6-3fe8-432d-959c-2c0834a141c6-c000.csv to s3://big-data-pipeline-rimysore/proccesed_data/part-00000-f737bda6-3fe8-432d-959c-2c0834a141c6-c000.csv
Uploading /home/ec2-user/procesed_data/total_usage_by_mo

In [8]:
#Task 4 SQL

# Read the CSV file back into a DataFrame
total_usage_by_month_df = spark.read.csv(output_path + "total_usage_by_month.csv", header=True, inferSchema=True)
avg_temp_by_month = spark.read.csv(output_path + "avg_temp_by_month.csv", header=True, inferSchema=True)
top_solar_months = spark.read.csv(output_path + "top_solar_months.csv", header=True, inferSchema=True)
energy_comparison = spark.read.csv(output_path + "energy_comparison.csv", header=True, inferSchema=True)
avg_appliance_usage = spark.read.csv(output_path + "avg_appliance_usage.csv", header=True, inferSchema=True)

# Register the DataFrame as a temporary view
total_usage_by_month_df.createOrReplaceTempView("total_usage_by_month")
avg_temp_by_month.createOrReplaceTempView("avg_temp_by_month")
top_solar_months.createOrReplaceTempView("top_solar_months")
energy_comparison.createOrReplaceTempView("energy_comparison")
avg_appliance_usage.createOrReplaceTempView("avg_appliance_usage")

# Now, run your SQL query
# Query 1 - Avg temp month over month analysis
query_1 = """
SELECT Year, Month, SUM(Avg_Temperature) AS TotalUsageTemp
FROM avg_temp_by_month
GROUP BY Year, Month
ORDER BY TotalUsageTemp DESC
LIMIT 10
"""

result_1 = spark.sql(query_1)
result_1.show()

#query2 - Identify average seasonal trends
query_2 = """
SELECT Month, SUM(Total_Usage_kW) AS TotalUsage
FROM total_usage_by_month
GROUP BY Month
ORDER BY Month
"""
result_2 = spark.sql(query_2)
result_2.show()

#query3 - Identify average seasonal trends for solar power
query_3 = """
SELECT Month, SUM(Total_Solar_kW) AS TotalUsageSolar
FROM top_solar_months
GROUP BY Month
ORDER BY Month
"""
result_3 = spark.sql(query_3)
result_3.show()

#query4 - Identify average seasonal on generation
query_4 = """
SELECT Year, SUM(Total_Generated_kW) AS TotalGenerated
FROM energy_comparison
GROUP BY Year
ORDER BY Year
"""
result_4 = spark.sql(query_4)
result_4.show()

#query5 - Identify average seasonal trends for fridge
query_5 = """
SELECT Month, SUM(Avg_Fridge_kW) AS TotalFridgeUsage
FROM avg_appliance_usage
GROUP BY Month
ORDER BY Month
"""
result_5 = spark.sql(query_5)
result_5.show()












+----+-----+-----------------+
|Year|Month|   TotalUsageTemp|
+----+-----+-----------------+
|2016|    1|50.74193461135845|
|NULL| NULL|             NULL|
+----+-----+-----------------+

+-----+-----------------+
|Month|       TotalUsage|
+-----+-----------------+
| NULL|             NULL|
|    1|432839.7404670082|
+-----+-----------------+

+-----+-----------------+
|Month|  TotalUsageSolar|
+-----+-----------------+
| NULL|             NULL|
|    1|38412.76483438449|
+-----+-----------------+

+----+-----------------+
|Year|   TotalGenerated|
+----+-----------------+
|NULL|             NULL|
|2016|38412.76483438449|
+----+-----------------+

+-----+-------------------+
|Month|   TotalFridgeUsage|
+-----+-------------------+
| NULL|               NULL|
|    1|0.06355641007374108|
+-----+-------------------+

