In [0]:

storage_account_name = "stgreenenergyfm" 
storage_account_key = ""

spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
)

In [0]:

raw_path = "abfss://raw-data@stgreenenergyfm.dfs.core.windows.net/power_plants/global_power_plant_database.csv"

df_plants = spark.read.option("header", "true").option("inferSchema", "true").csv(raw_path)


df_cleaned = df_plants.select(
    "country_long", "name", "capacity_mw", "primary_fuel", 
    "latitude", "longitude", "commissioning_year"
).withColumnRenamed("name", "plant_name") \
 .dropna(subset=["country_long", "primary_fuel"])


processed_path = "abfss://processed-data@stgreenenergyfm.dfs.core.windows.net/power_plants_cleaned"
df_cleaned.write.mode("overwrite").parquet(processed_path)

display(df_cleaned.limit(10)) 

country_long,plant_name,capacity_mw,primary_fuel,latitude,longitude,commissioning_year
Afghanistan,Kajaki Hydroelectric Power Plant Afghanistan,33.0,Hydro,32.322,65.119,
Afghanistan,Kandahar DOG,10.0,Solar,31.67,65.795,
Afghanistan,Kandahar JOL,10.0,Solar,31.623,65.792,
Afghanistan,Mahipar Hydroelectric Power Plant Afghanistan,66.0,Hydro,34.556,69.4787,
Afghanistan,Naghlu Dam Hydroelectric Power Plant Afghanistan,100.0,Hydro,34.641,69.717,
Afghanistan,Nangarhar (Darunta) Hydroelectric Power Plant Afghanistan,11.55,Hydro,34.4847,70.3633,
Afghanistan,Northwest Kabul Power Plant Afghanistan,42.0,Gas,34.5638,69.1134,
Afghanistan,Pul-e-Khumri Hydroelectric Power Plant Afghanistan,6.0,Hydro,35.9416,68.71,
Afghanistan,Sarobi Dam Hydroelectric Power Plant Afghanistan,22.0,Hydro,34.5865,69.7757,
Albania,Bistrica 1,27.0,Hydro,39.9116,20.1047,1965.0


In [0]:

raw_owid_path = "abfss://raw-data@stgreenenergyfm.dfs.core.windows.net/owid_energy/owid-energy-data.csv"
df_owid = spark.read.option("header", "true").option("inferSchema", "true").csv(raw_owid_path)



df_owid_cleaned = df_owid.filter("year >= 2000") \
    .select(
        "country", "year", "population", "gdp",
        "renewables_electricity", "solar_electricity", "wind_electricity", "hydro_electricity",
        "fossil_electricity", "greenhouse_gas_emissions"
    ).fillna(0) 

processed_owid_path = "abfss://processed-data@stgreenenergyfm.dfs.core.windows.net/owid_energy_cleaned"
df_owid_cleaned.write.mode("overwrite").parquet(processed_owid_path)

display(df_owid_cleaned.limit(10))

country,year,population,gdp,renewables_electricity,solar_electricity,wind_electricity,hydro_electricity,fossil_electricity,greenhouse_gas_emissions
ASEAN (Ember),2000,0,0,73.25,0.0,0.0,50.45,305.36,216.89
ASEAN (Ember),2001,0,0,77.19,0.0,0.0,54.33,327.66,231.01
ASEAN (Ember),2002,0,0,76.52,0.0,0.0,53.29,356.67,248.23
ASEAN (Ember),2003,0,0,76.4,0.0,0.0,53.21,381.84,264.56
ASEAN (Ember),2004,0,0,77.96,0.0,0.0,52.99,418.6,289.95
ASEAN (Ember),2005,0,0,79.2,0.0,0.02,53.88,443.48,306.06
ASEAN (Ember),2006,0,0,86.27,0.04,0.05,60.52,460.92,319.99
ASEAN (Ember),2007,0,0,90.84,0.04,0.06,63.25,488.2,340.76
ASEAN (Ember),2008,0,0,99.62,0.04,0.06,69.94,504.23,349.48
ASEAN (Ember),2009,0,0,104.29,0.04,0.07,73.68,520.17,362.09


In [0]:
server_name = "sqlserver-greenenergy-fm.database.windows.net"
database_name = "greenenergy-db"
url = (
    f"jdbc:sqlserver://{server_name}:1433;"
    f"database={database_name};"
    "encrypt=true;"
    "trustServerCertificate=false;"
    "hostNameInCertificate=*.database.windows.net;"
    "loginTimeout=30;"
)


user = "" 
password = ""

# Push Power Plant Data to SQL

df_cleaned.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "dbo.Fact_PowerPlants") \
    .option("user", user) \
    .option("password", password) \
    .mode("append") \
    .save()

#  Push Energy Transition Data to SQL
df_owid_cleaned.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "dbo.Fact_EnergyTransition") \
    .option("user", user) \
    .option("password", password) \
    .mode("append") \
    .save()

print("finished")

finished
