In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=86b895961a9c75dfe9f4db11b817db296bace6e6c9d3391bc191b4ed1a3dce12
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [10]:
!pip install fpdf

Collecting fpdf
  Downloading fpdf-1.7.2.tar.gz (39 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: fpdf
  Building wheel for fpdf (setup.py) ... [?25l[?25hdone
  Created wheel for fpdf: filename=fpdf-1.7.2-py2.py3-none-any.whl size=40702 sha256=0ea61ca62fe0d3dd8c0610d0951749dcea29063f4adbee4026defa8f1fe3302c
  Stored in directory: /root/.cache/pip/wheels/f9/95/ba/f418094659025eb9611f17cbcaf2334236bf39a0c3453ea455
Successfully built fpdf
Installing collected packages: fpdf
Successfully installed fpdf-1.7.2


In [54]:
import pandas as pd

xlsx_file_path = '/content/Pakistan Services PC.xlsx'

csv_file_path = '/content/Pakistan Services PC.csv'

df = pd.read_excel(xlsx_file_path)

df.to_csv(csv_file_path, index=False)

print(f"File converted and saved to {csv_file_path}")

File converted and saved to /content/Pakistan Services PC.csv


In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, month
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import pandas as pd
from fpdf import FPDF
import seaborn as sns
import plotly.express as px
from matplotlib_venn import venn2
from datetime import datetime

In [57]:
spark = SparkSession.builder.appName("EnergyDataAnalysisReport").getOrCreate()

In [58]:
data_path = "/content/Pakistan Services PC.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

In [73]:
df

DataFrame[Statistical Period: timestamp, Global Irradiation (kWh/㎡): double, Average Temperature(°C): double, Theoretical Yield (kWh): double, PV Yield (kWh): double, Inverter Yield (kWh): double, Export (kWh): double, Consumed (kWh): double, Loss Due to Export Limitation (kWh): int, Loss Due to Export Limitation(PKR): int, Revenue (PKR): double, Device Name: string, Surplus Energy (kWh): double, Revenue from Sales: double, Cost of Purchases: double, Net Revenue: double, Hour: int, DayOfWeek: int, Month: int]

In [59]:
df.printSchema()

root
 |-- Statistical Period: timestamp (nullable = true)
 |-- Global Irradiation (kWh/㎡): double (nullable = true)
 |-- Average Temperature(°C): double (nullable = true)
 |-- Theoretical Yield (kWh): double (nullable = true)
 |-- PV Yield (kWh): double (nullable = true)
 |--  Inverter Yield (kWh): double (nullable = true)
 |-- Export (kWh): double (nullable = true)
 |-- Consumed (kWh): double (nullable = true)
 |-- Loss Due to Export Limitation (kWh): integer (nullable = true)
 |-- Loss Due to Export Limitation(PKR): integer (nullable = true)
 |-- Revenue (PKR): double (nullable = true)
 |-- Device Name: string (nullable = true)



In [60]:
df = df.withColumnRenamed(" Inverter Yield (kWh)", "Inverter Yield (kWh)")
df = df.select(
    "Statistical Period",
    "Global Irradiation (kWh/㎡)",
    "Average Temperature(°C)",
    "Theoretical Yield (kWh)",
    "PV Yield (kWh)",
    "Inverter Yield (kWh)",
    "Export (kWh)",
    "Consumed (kWh)",
    "Loss Due to Export Limitation (kWh)",
    "Loss Due to Export Limitation(PKR)",
    "Revenue (PKR)",
    "Device Name"
)


In [61]:
df = df.withColumn("Surplus Energy (kWh)", col("PV Yield (kWh)") - col("Consumed (kWh)"))
df = df.withColumn("Revenue from Sales", col("Export (kWh)") * col("Revenue (PKR)"))
df = df.withColumn("Cost of Purchases", col("Consumed (kWh)") * col("Revenue (PKR)"))
df = df.withColumn("Net Revenue", col("Revenue from Sales") - col("Cost of Purchases"))

pdf = df.toPandas()

In [62]:
plt.figure(figsize=(10, 6))
plt.hist(pdf["PV Yield (kWh)"], bins=50, color='skyblue', edgecolor='black')
plt.title("Histogram of PV Yield")
plt.xlabel("PV Yield (kWh)")
plt.ylabel("Frequency")
plt.savefig("histogram_pv_yield.png")
plt.close()

In [63]:
pdf['Month'] = pd.to_datetime(pdf['Statistical Period']).dt.month
avg_pv_yield = pdf.groupby('Month')['PV Yield (kWh)'].mean()
avg_pv_yield.plot(kind='bar', color='skyblue', edgecolor='black', figsize=(10, 6))
plt.title("Average PV Yield per Month")
plt.xlabel("Month")
plt.ylabel("Average PV Yield (kWh)")
plt.savefig("bar_avg_pv_yield.png")
plt.close()

In [64]:
set1 = set(pdf[pdf['PV Yield (kWh)'] > 50].index)
set2 = set(pdf[pdf['Consumed (kWh)'] > 50].index)
plt.figure(figsize=(10, 6))
venn2([set1, set2], ('High PV Yield', 'High Consumption'))
plt.title("Venn Diagram of High PV Yield and High Consumption")
plt.savefig("venn_diagram.png")
plt.close()

In [65]:
plt.figure(figsize=(10, 6))
plt.plot(pdf['Statistical Period'], pdf['PV Yield (kWh)'], color='skyblue', marker='o')
plt.title("Line Plot of PV Yield over Time")
plt.xlabel("Date")
plt.ylabel("PV Yield (kWh)")
plt.savefig("line_plot.png")
plt.close()


In [67]:
plt.figure(figsize=(10, 6))
sns.boxplot(x='Month', y='Revenue (PKR)', data=pdf, palette='Set3')
plt.title("Box Plot of Revenue per Month")
plt.xlabel("Month")
plt.ylabel("Revenue (PKR)")
plt.savefig("box_plot.png")
plt.close()




Passing `palette` without assigning `hue` is deprecated and will be removed in v0.14.0. Assign the `x` variable to `hue` and set `legend=False` for the same effect.




In [68]:
df = df.withColumn("Hour", hour("Statistical Period"))
df = df.withColumn("DayOfWeek", dayofweek("Statistical Period"))
df = df.withColumn("Month", month("Statistical Period"))

In [69]:
assembler = VectorAssembler(inputCols=["Hour", "DayOfWeek", "Month", "PV Yield (kWh)"], outputCol="features")
data = assembler.transform(df)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

lr = LinearRegression(featuresCol="features", labelCol="Consumed (kWh)")
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="Consumed (kWh)", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

coefficients = lr_model.coefficients.toArray()
intercept = lr_model.intercept

Root Mean Squared Error (RMSE) on test data = 11.700273346927577


In [70]:
class PDFReport(FPDF):
    def header(self):
        self.set_font('Arial', 'B', 12)
        self.cell(0, 10, 'Energy Data Analysis Report', 0, 1, 'C')

    def chapter_title(self, title):
        self.set_font('Arial', 'B', 12)
        self.cell(0, 10, title, 0, 1, 'L')
        self.ln(5)

    def chapter_body(self, body):
        self.set_font('Arial', '', 12)
        self.multi_cell(0, 10, body)
        self.ln()

    def add_image(self, image_path, title):
        self.add_page()
        self.chapter_title(title)
        self.image(image_path, x=None, y=None, w=180, h=100)

In [71]:
pdf_report = PDFReport()

In [72]:
pdf_report.add_page()
pdf_report.chapter_title("Introduction")
intro_text = "This report provides an analysis of energy production, consumption, and financial metrics. " \
             "Visualizations and predictive analysis have been performed using PySpark and the data is presented in the following sections."
pdf_report.chapter_body(intro_text)


In [41]:
pdf_report.add_image("histogram_pv_yield.png", "Histogram of PV Yield")
pdf_report.add_image("bar_avg_pv_yield.png", "Average PV Yield per Month")

In [42]:
pdf_report.add_page()
pdf_report.chapter_title("Predictive Analysis")
predictive_text = f"The predictive model was built using Linear Regression. The model's Root Mean Squared Error (RMSE) on the test data is {rmse:.2f}. " \
                  f"The coefficients of the model are: {coefficients} and the intercept is {intercept:.2f}."
pdf_report.chapter_body(predictive_text)

In [43]:
pdf_report.output("EnergyDataAnalysisReport.pdf")

spark.stop()