In [43]:

import requests
import pandas as pd
from datetime import timedelta
from pyspark.sql.types import *
from delta.tables import DeltaTable



StatementMeta(, e447fce1-baa7-4c27-9165-4cfadb72411d, 45, Finished, Available, Finished)

In [44]:
# Base URL for GitHub raw CSV files
base_url =  "https://raw.githubusercontent.com/nhayari/windEnergy/main/wind-power-dataset/"
first_filename_prefix="20240615"            
suffix_filename = "_wind_power_data.csv"
# Path to the wind_power table in the Bronze Lakehouse
bronze_table_path = "abfss://wind_turbine_energy@onelake.dfs.fabric.microsoft.com/LH_Bronze_WindEnergy.Lakehouse/Tables/dbo/wind_power"
           

StatementMeta(, e447fce1-baa7-4c27-9165-4cfadb72411d, 46, Finished, Available, Finished)

In [45]:
# Creat table if not exist
schema = StructType([
    StructField('production_id',          LongType(),    True),
    StructField('date',                   DateType(),    True),
    StructField('time',                   StringType(),  True),
    StructField('turbine_name',           StringType(),  True),
    StructField('capacity',               LongType(),    True),
    StructField('location_name',          StringType(),  True),
    StructField('latitude',               DoubleType(),  True),
    StructField('longitude',              DoubleType(),  True),
    StructField('region',                 StringType(),  True),
    StructField('status',                 StringType(),  True),
    StructField('responsible_department', StringType(),  True),
    StructField('wind_speed',             DoubleType(),  True),
    StructField('wind_direction',         StringType(),  True),
    StructField('energy_produced',        DoubleType(),  True)
])

# Test si la table existe
try:
    DeltaTable.forPath(spark, bronze_table_path)
    print("La table existe déjà → rien à faire")
except:
    print("Table inexistante → création")
    empty_df = spark.createDataFrame([], schema)
    empty_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(bronze_table_path)
    print("Table créée")

StatementMeta(, e447fce1-baa7-4c27-9165-4cfadb72411d, 47, Finished, Available, Finished)

Table inexistante → création
Table créée


In [46]:
# Load existing wind_power data and convert to Pandas
df_spark = spark.read.format("delta").load(bronze_table_path)
df_pandas = df_spark.toPandas()

StatementMeta(, e447fce1-baa7-4c27-9165-4cfadb72411d, 48, Finished, Available, Finished)

In [49]:
# Find the most recent date in table, else the first date in name of file  
most_recent_date = pd.to_datetime(df_pandas['date'], format = '%Y%m%d').max()

if pd.isna(most_recent_date): 
    next_date = first_filename_prefix
else:
    next_date = (most_recent_date + timedelta(days=1)).strftime('%Y%m%d')

StatementMeta(, e447fce1-baa7-4c27-9165-4cfadb72411d, 51, Finished, Available, Finished)

In [50]:
# Download and load new data in a Pandas DataFrame
file_url = f"{base_url}{next_date}{suffix_filename}" 
df_pandas_new = pd.read_csv(file_url)
df_pandas_new['date'] = pd.to_datetime(df_pandas_new['date'])

StatementMeta(, e447fce1-baa7-4c27-9165-4cfadb72411d, 52, Finished, Available, Finished)

In [52]:

# Convert to Spark DataFrame and append in wind_power table
df_spark_new = spark.createDataFrame(df_pandas_new, schema = df_spark.schema)
df_spark_new.write.format("delta").mode("append").save(bronze_table_path)

StatementMeta(, e447fce1-baa7-4c27-9165-4cfadb72411d, 54, Finished, Available, Finished)