Llamado de librerias necesarias

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id
import config

Crear sesión de spark

In [2]:
spark = SparkSession.builder.appName("load_eia_dwh").getOrCreate()

Carga del archivo del datalake

In [3]:
file_path = os.path.join(config.FOLDER_DL, config.FILE_DL)
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.columns


['area-name',
 'duoarea',
 'period',
 'process',
 'process-name',
 'product',
 'product-name',
 'series',
 'series-description',
 'units',
 'value']

Obteniendo relación con la dimensión de Area

In [4]:
path_dim_area = os.path.join(config.FOLDER_DWH, config.FILE_DIM_AREA)
dim_area = (spark.read
            .option("header", "true")
            .option("sep", ";")
            .csv(path_dim_area)
            .alias("area"))

df_joined = (df.alias("f")
             .join(dim_area, col("f.duoarea") == col("area.DUO_AREA"), "left")
             .select("f.*",col("area.ID").alias("AREA_ID")))

df_joined.show(truncate=False)

+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+
|area-name|duoarea|period|process|process-name                                    |product|product-name|series         |series-description                                                                                        |units|value|AREA_ID|
+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+
|NA       |RCASF  |2017  |R01    |Proved Reserves                                 |EPC0   |Crude Oil   |RCRR01RCASF_1  |California State Offshore Crude Oil Proved Reserves (Million Barrels)                                     |MMBBL|147  |9      |
|USA-MI 

cruce con la dimesión de procesos

In [5]:
path_dim_process = os.path.join(config.FOLDER_DWH, config.FILE_DIM_PROCESS)
dim_process = (spark.read
            .option("header", "true")
            .option("sep", ";")
            .csv(path_dim_process)
            .alias("process"))

df_joined = (df_joined.alias("f")
             .join(dim_process, col("f.process") == col("process.PROCESS"), "left")
             .select("f.*", col("process.ID").alias("PROCESS_ID")))

df_joined.show(truncate=False)


+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+
|area-name|duoarea|period|process|process-name                                    |product|product-name|series         |series-description                                                                                        |units|value|AREA_ID|PROCESS_ID|
+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+
|NA       |RCASF  |2017  |R01    |Proved Reserves                                 |EPC0   |Crude Oil   |RCRR01RCASF_1  |California State Offshore Crude Oil Proved Reserves (Million Barrels)                                  

Cruce con la dimensión de productos

In [6]:
path_dim_product = os.path.join(config.FOLDER_DWH, config.FILE_DIM_PRODUCT)
dim_process = (spark.read
            .option("header", "true")
            .option("sep", ";")
            .csv(path_dim_product)
            .alias("product"))

df_joined = (df_joined.alias("f")
             .join(dim_process, col("f.product") == col("product.PRODUCT"), "left")
             .select("f.*", col("product.ID").alias("PRODUCT_ID")))

df_joined.show(truncate=False)

+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+----------+
|area-name|duoarea|period|process|process-name                                    |product|product-name|series         |series-description                                                                                        |units|value|AREA_ID|PROCESS_ID|PRODUCT_ID|
+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+----------+
|NA       |RCASF  |2017  |R01    |Proved Reserves                                 |EPC0   |Crude Oil   |RCRR01RCASF_1  |California State Offshore Crude Oil Proved Reserves (Million Barrels) 

Cruce con la dimensión de la serie de tiempo

In [7]:
path_dim_time = os.path.join(config.FOLDER_DWH, config.FILE_DIM_TIME)
dim_time = (spark.read
            .option("header", "true")
            .option("sep", ";")
            .csv(path_dim_time)
            .alias("y"))

df_joined = (df_joined.alias("f")
             .join(dim_time, col("f.period") == col("y.YEAR"), "left")
             .select("f.*", col("y.ID").alias("TIME_ID")))

df_joined.show(truncate=False)

+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+----------+-------+
|area-name|duoarea|period|process|process-name                                    |product|product-name|series         |series-description                                                                                        |units|value|AREA_ID|PROCESS_ID|PRODUCT_ID|TIME_ID|
+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+----------+-------+
|NA       |RCASF  |2017  |R01    |Proved Reserves                                 |EPC0   |Crude Oil   |RCRR01RCASF_1  |California State Offshore Crude Oil Proved Res

Añadir variables adicionales

In [8]:
df_joined = df_joined.withColumn("ID", monotonically_increasing_id())

df_joined.show(truncate=False)


+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+----------+-------+---+
|area-name|duoarea|period|process|process-name                                    |product|product-name|series         |series-description                                                                                        |units|value|AREA_ID|PROCESS_ID|PRODUCT_ID|TIME_ID|ID |
+---------+-------+------+-------+------------------------------------------------+-------+------------+---------------+----------------------------------------------------------------------------------------------------------+-----+-----+-------+----------+----------+-------+---+
|NA       |RCASF  |2017  |R01    |Proved Reserves                                 |EPC0   |Crude Oil   |RCRR01RCASF_1  |California State Offshore Crude Oi

tomar solo las columnas nencesarias

In [9]:
df_fact = df_joined.select(['ID',
                  'AREA_ID',
                  'PROCESS_ID',
                  'PRODUCT_ID',
                  'TIME_ID',
                  col('units').alias('UNITS'),
                  col('value').alias('VALUE')])

Complementar valores nulos

In [10]:
df_fact = df_fact.fillna(0, subset=['AREA_ID', 'PROCESS_ID', 'PRODUCT_ID', 'TIME_ID', 'VALUE'])

Exportar archivo final

In [11]:
export_path = os.path.join(config.FOLDER_DWH, config.FILE_FACT)

try:
    # Validar que el DataFrame no esté vacío antes de guardar
    if not df.rdd.isEmpty():
        df.write.mode('overwrite').option("header", "true").csv(export_path)
        print(f"✅ Archivo guardado exitosamente en: {export_path}")
    else:
        print("⚠️ El DataFrame está vacío. No se generó el archivo.")
except Exception as e:
    print(f"❌ Error al guardar el archivo: {e}")


✅ Archivo guardado exitosamente en: DWH/fact_crude_reserves_prod.csv


Cerrar sesión

In [12]:
spark.stop()