In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd
import smtplib
from email.mime.text import MIMEText

spark = SparkSession.builder.getOrCreate()

In [0]:
# Load latest data from Delta table
energy_df = spark.table("energy_summary_table")

display(energy_df)

In [0]:
from pyspark.sql.functions import when, col

energy_df_test = energy_df.withColumn(
    "Avg_Price_USD_MWh",
    when(col("Energy_Type") == "Solar", 1200.0).otherwise(col("Avg_Price_USD_MWh"))
)

display(energy_df_test)

In [0]:
energy_pdf = energy_df_test.toPandas()
energy_pdf.head()

In [0]:
upper_limit = 100
lower_limit = 10

abnormal = energy_pdf[
    (energy_pdf['Avg_Price_USD_MWh'] > upper_limit) |
    (energy_pdf['Avg_Price_USD_MWh'] < lower_limit)
]

abnormal

In [0]:
# Prepare alert message if abnormal prices exist
if not abnormal.empty:
    alert_message = f"""
     ALERT: Abnormal Energy Prices Detected

    The following energy types have prices outside the normal range:

    {abnormal.to_string(index=False)}

    Please review these entries in the ETRM dashboard.
    """
else:
    alert_message = "✅ All prices are within the normal range."

print(alert_message)

In [0]:
if not abnormal.empty:
    alert_entries = []
    for _, row in abnormal.iterrows():
        alert_entries.append((
            datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            f"High revenue alert for {row['Energy_Type']}: ${row['Total_Revenue_USD']:.2f}",
            row['Energy_Type'],
            float(row['Total_Revenue_USD'])
        ))

    alert_df = spark.createDataFrame(alert_entries,
        ["timestamp", "alert_message", "energy_type", "total_revenue_usd"])

    alert_df.write.mode("append").format("delta").saveAsTable("etrm_alert_logs")

In [0]:
import smtplib
from email.message import EmailMessage

# Sender and receiver details
sender_email = "pritisahani4444@gmail.com"
receiver_email = "preetusahani30@gmail.com"
app_password = "eypz bzlt nsxp wpgr"  

# Create the email
msg = EmailMessage()
msg["Subject"] = "⚠️ ETRM Alert: Abnormal Energy Prices Detected"
msg["From"] = sender_email
msg["To"] = receiver_email
msg.set_content(alert_message)

# Send via Gmail SMTP
try:
    with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
        smtp.login(sender_email, app_password)
        smtp.send_message(msg)
    print("Alert email sent successfully!")
except Exception as e:
    print("Error sending email:", e)


In [0]:
# Check if alert logs table exists, else create
spark.sql("""
CREATE TABLE IF NOT EXISTS etrm_alert_logs (
    timestamp STRING,
    alert_message STRING,
    energy_type STRING,
    total_revenue_usd DOUBLE
) USING DELTA
""")

In [0]:
display(spark.table("etrm_alert_logs").orderBy("timestamp", ascending=False))