# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


In [2]:
%timeout 20

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current timeout is None minutes.
timeout has been set to 20 minutes.


In [4]:
%%configure
{
"--job-bookmark-option":"job-bookmark-enable"
}

The following configurations have been updated: {'--job-bookmark-option': 'job-bookmark-enable'}


In [1]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
import pytz
import re
import unicodedata


# Initialize all the variables needed
source_bucket = "data-engineering-project-8433-3658-8863"
folder_name = "bronze_data"
processed_folder_name = "silver_data"
table_name = "electricity_data"

# Set up catalog parameters
glue_database = "data-engineering-project-glue-database"
glue_table_name = "raw_data_electricity_data"

# Set up the spark contexts, glue contexts and initialize job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Alternative für Notebook - ohne JOB_NAME Parameter
try:
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    JOB_NAME = args['JOB_NAME']
except:
    JOB_NAME = "notebook-job-electricity-transform"

job.init(JOB_NAME, args if 'args' in locals() else {})

Trying to create a Glue session for the kernel.
Session Type: glueetl
Timeout: 20
Session ID: c3dce2e0-b2a6-4fde-8cee-3fc1e851b308
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--job-bookmark-option job-bookmark-enable
Waiting for session c3dce2e0-b2a6-4fde-8cee-3fc1e851b308 to get into ready status...
Session c3dce2e0-b2a6-4fde-8cee-3fc1e851b308 has been created.



In [2]:
# ============================================
# Helper Functions for Data Transformation
# ============================================

def strip_accents(s):
    return ''.join(c for c in unicodedata.normalize('NFKD', str(s)) 
                  if not unicodedata.combining(c))

def canonicalize(txt):
    if txt is None or str(txt).lower() == 'nan':
        return ""
    s = strip_accents(str(txt).lower().strip())
    for ch in ["'", "’", "-", "_", ".", ",", "(", ")", "/", "\\"]:
        s = s.replace(ch, " ")
    return " ".join(s.split())

def normalize_city(s):
    if s is None:
        return ""
    s = str(s).lower()
    s = strip_accents(s)
    s = re.sub(r"[^a-z\s\-]", " ", s)
    s = " ".join(s.split())
    s = s.replace(" ste ", " sainte ").replace(" st ", " saint ")
    if s.startswith("ste "): 
        s = "sainte " + s[4:]
    if s.startswith("st "):  
        s = " saint " + s[3:]
    return s

# Region mapping dictionary
OLD_TO_NEW = {
    "aquitaine": "nouvelle aquitaine", 
    "poitou charentes": "nouvelle aquitaine", 
    "limousin": "nouvelle aquitaine",
    "midi pyrenees": "occitanie", 
    "languedoc roussillon": "occitanie",
    "burgundy": "bourgogne franche comte", 
    "franche comte": "bourgogne franche comte",
    "alsace": "grand est", 
    "lorraine": "grand est", 
    "champagne ardenne": "grand est",
    "haute normandie": "normandie", 
    "basse normandie": "normandie",
    "nord pas de calais": "hauts de france", 
    "picardy": "hauts de france", 
    "picardie": "hauts de france",
    "centre": "centre val de loire", 
    "brittany": "bretagne",
    "rhone alpes": "auvergne rhone alpes", 
    "auvergne": "auvergne rhone alpes",
    "paca": "provence alpes cote d azur", 
    "provence alpes": "provence alpes cote d azur",
    "corsica": "corse"
}

NEW_REGIONS = [
    "auvergne rhone alpes", "bourgogne franche comte", "bretagne", "centre val de loire",
    "grand est", "hauts de france", "ile de france", "normandie", "nouvelle aquitaine",
    "occitanie", "pays de la loire", "provence alpes cote d azur", "corse"
]

def map_region_to_new(name):
    if name is None:
        return None
    c = canonicalize(name)
    if c in OLD_TO_NEW:
        return OLD_TO_NEW[c]
    if c in NEW_REGIONS:
        return c
    if "rhone alpe" in c: 
        return "auvergne rhone alpes"
    return c

def force_display_name_for_elec(raw):
    if raw is None:
        return None
    if canonicalize(raw) in ("paca", "provence alpes cote d azur"):
        return "Provence-Alpes-Cote d'Azur"
    return str(raw)

# Timezone handling
TZ_PARIS = pytz.timezone("Europe/Paris")

def elec_to_utc_hour(row):
    """Convert electricity data timestamp to UTC hour"""
    from pyspark.sql import Row
    from datetime import datetime
    import pytz
    
    if hasattr(row, 'ts_local') and row.ts_local is not None:
        try:
            # If ts_local exists and is timezone-aware, convert to UTC
            ts = row.ts_local
            if hasattr(ts, 'tz') and ts.tz is not None:
                return ts.astimezone(pytz.UTC).replace(minute=0, second=0, microsecond=0)
            else:
                # If no timezone, assume Paris time
                ts_paris = TZ_PARIS.localize(ts)
                return ts_paris.astimezone(pytz.UTC).replace(minute=0, second=0, microsecond=0)
        except:
            pass
    
    # Fallback: try to combine date and heure columns
    if hasattr(row, 'date') and hasattr(row, 'heure'):
        try:
            date_str = str(row.date)
            heure_str = str(row.heure).strip()
            if date_str and heure_str:
                dt_str = f"{date_str} {heure_str}"
                dt_naive = datetime.strptime(dt_str, "%Y-%m-%d %H:%M")
                dt_paris = TZ_PARIS.localize(dt_naive)
                return dt_paris.astimezone(pytz.UTC).replace(minute=0, second=0, microsecond=0)
        except:
            pass
    
    return None

# Register UDFs
strip_accents_udf = udf(strip_accents, StringType())
canonicalize_udf = udf(canonicalize, StringType())
normalize_city_udf = udf(normalize_city, StringType())
map_region_to_new_udf = udf(map_region_to_new, StringType())
force_display_name_udf = udf(force_display_name_for_elec, StringType())




In [3]:
# ============================================
# Read and Transform Electricity Data
# ============================================

# Read data from data catalog
try:
    electricity_df_from_catalog = glueContext.create_data_frame_from_catalog(
        glue_database,
        glue_table_name,
        additional_options={"useCatalogSchema": True, "useSparkDataSource": True, "header": True},
        transformation_ctx="electricity_df_from_catalog"
    )

    print("Original data schema:")
    electricity_df_from_catalog.printSchema()
    print(f"Original row count: {electricity_df_from_catalog.count()}")
    
    # Show sample of original data
    print("Sample of original data:")
    electricity_df_from_catalog.show(10)

except Exception as e:
    print(f"Error reading from catalog: {str(e)}")
    raise e

# Step 1: Region normalization and cleaning
electricity_df = electricity_df_from_catalog

# Add canonical region and clean display name
electricity_df = electricity_df.withColumn(
    "region_canon", 
    map_region_to_new_udf(col("region"))
)

electricity_df = electricity_df.withColumn(
    "region_clean", 
    force_display_name_udf(col("region"))
)

# Filter only the new regions
electricity_df = electricity_df.filter(col("region_canon").isin(NEW_REGIONS))

print(f"After region filtering: {electricity_df.count()} rows")

# Step 2: Timestamp conversion to UTC
# Create timestamp from date and heure columns
electricity_df = electricity_df.withColumn(
    "ts_local",
    when(col("date").isNotNull() & col("heure").isNotNull(),
         concat(col("date"), lit(" "), col("heure")))
    .otherwise(lit(None))
)

# Convert to timestamp
electricity_df = electricity_df.withColumn(
    "ts_local",
    to_timestamp(col("ts_local"), "yyyy-MM-dd HH:mm")
)

# Create UTC hour timestamp (assuming ts_local is in Paris time)
electricity_df = electricity_df.withColumn(
    "ts_hour_utc",
    to_utc_timestamp(col("ts_local"), "Europe/Paris")
)

# Step 3: Define energy columns and handle numeric conversion
ENERGY_COLS = [
    "solaire_mw", "eolien_mw", "hydraulique_mw", "nucleaire_mw", "thermique_mw",
    "pompage_mw", "bioenergies_mw", "stockage_batterie_mw", "destockage_batterie_mw", 
    "eolien_terrestre_mw", "eolien_offshore_mw", "ech_physiques_mw", "consommation_mw"
]

# Convert all energy columns to numeric, handling potential issues
for col_name in ENERGY_COLS:
    if col_name in electricity_df.columns:
        electricity_df = electricity_df.withColumn(
            col_name,
            when(col(col_name).isNull(), lit(0.0))
            .otherwise(
                when(col(col_name).cast("double").isNull(), lit(0.0))
                .otherwise(col(col_name).cast("double"))
            )
        )

# Step 4: Data quality filters
# Remove negative consumption and extreme outliers
electricity_df = electricity_df.filter(
    (col("consommation_mw").isNull()) | 
    (col("consommation_mw") >= 0)
)

print(f"After data quality filters: {electricity_df.count()} rows")

# Step 5: Aggregation to regional hourly level
agg_exprs = [avg("consommation_mw").alias("consommation_mw")]

# Add other energy columns if they exist
for col_name in ENERGY_COLS:
    if col_name != "consommation_mw" and col_name in electricity_df.columns:
        agg_exprs.append(avg(col_name).alias(col_name))

electricity_hourly = electricity_df.groupBy("region_clean", "ts_hour_utc").agg(*agg_exprs)

print(f"After hourly aggregation: {electricity_hourly.count()} rows")

# Step 6: Add calendar features
electricity_transformed = electricity_hourly.withColumn(
    "ts_paris", 
    from_utc_timestamp(col("ts_hour_utc"), "Europe/Paris")
)

# Extract time components
electricity_transformed = electricity_transformed.withColumn(
    "hour", 
    hour(col("ts_paris"))
).withColumn(
    "dow", 
    dayofweek(col("ts_paris"))
).withColumn(
    "week", 
    weekofyear(col("ts_paris"))
).withColumn(
    "month", 
    month(col("ts_paris"))
).withColumn(
    "doy", 
    dayofyear(col("ts_paris"))
).withColumn(
    "is_weekend", 
    when((col("dow") == 1) | (col("dow") == 7), 1).otherwise(0)
).withColumn(
    "hour_sin", 
    sin(2 * 3.14159 * col("hour") / 24.0)
).withColumn(
    "hour_cos", 
    cos(2 * 3.14159 * col("hour") / 24.0)
)

# Step 7: Add lag features for consumption (for future analytics)
window_spec = Window.partitionBy("region_clean").orderBy("ts_hour_utc")

electricity_transformed = electricity_transformed.withColumn(
    "consommation_lag24", 
    lag("consommation_mw", 24).over(window_spec)
).withColumn(
    "consommation_lag168", 
    lag("consommation_mw", 168).over(window_spec)
)

# Add rolling statistics (7-day rolling average)
electricity_transformed = electricity_transformed.withColumn(
    "consommation_roll168", 
    avg("consommation_mw").over(window_spec.rowsBetween(-167, 0))
)

print("Transformed data schema:")
electricity_transformed.printSchema()
print(f"Final transformed row count: {electricity_transformed.count()}")

# Show sample of transformed data
print("Sample of transformed data:")
electricity_transformed.orderBy("region_clean", "ts_hour_utc").show(20)

Original data schema:
root
 |-- region: string (nullable = true)
 |-- date: string (nullable = true)
 |-- heure: string (nullable = true)
 |-- ts_local: long (nullable = true)
 |-- nature: string (nullable = true)
 |-- perimetre: string (nullable = true)
 |-- consommation_mw: double (nullable = true)
 |-- thermique_mw: double (nullable = true)
 |-- nucleaire_mw: double (nullable = true)
 |-- eolien_mw: double (nullable = true)
 |-- solaire_mw: double (nullable = true)
 |-- hydraulique_mw: double (nullable = true)
 |-- pompage_mw: double (nullable = true)
 |-- bioenergies_mw: double (nullable = true)
 |-- stockage_batterie_mw: double (nullable = true)
 |-- destockage_batterie_mw: double (nullable = true)
 |-- eolien_terrestre_mw: double (nullable = true)
 |-- eolien_offshore_mw: double (nullable = true)
 |-- ech_physiques_mw: double (nullable = true)
 |-- flux physiques d'auvergne-rhne-alpes vers auvergne-rhne-alpes: string (nullable = true)
 |-- flux physiques de bourgogne-franche-comt

In [4]:
print(electricity_transformed.head())

Row(region_clean="Provence-Alpes-Cote d'Azur", ts_hour_utc=None, consommation_mw=0.0, solaire_mw=0.0, eolien_mw=0.0, hydraulique_mw=0.0, nucleaire_mw=0.0, thermique_mw=0.0, pompage_mw=0.0, bioenergies_mw=0.0, stockage_batterie_mw=0.0, destockage_batterie_mw=0.0, eolien_terrestre_mw=0.0, eolien_offshore_mw=0.0, ech_physiques_mw=0.0, ts_paris=None, hour=None, dow=None, week=None, month=None, doy=None, is_weekend=0, hour_sin=None, hour_cos=None, consommation_lag24=None, consommation_lag168=None, consommation_roll168=0.0)


In [5]:
# ============================================
# Write to Silver Bucket
# ============================================

# Convert back to DynamicFrame for writing
electricity_dynamic_frame = DynamicFrame.fromDF(
    electricity_transformed, 
    glueContext, 
    "electricity_transformed"
)

# Write to silver bucket as Parquet
sink = glueContext.getSink(
    path=f"s3://{source_bucket}/{processed_folder_name}/electricity_transformed/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="sink",
)

sink.setCatalogInfo(
    catalogDatabase=glue_database,
    catalogTableName="silver_electricity_data"
)

sink.setFormat("glueparquet")
sink.writeFrame(electricity_dynamic_frame)

print(f"Successfully written transformed electricity data to s3://{source_bucket}/{processed_folder_name}/electricity_transformed/")

# Commit the job
job.commit()


