In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

BRONZE_BASE = "/Volumes/workspace/default/prueba/etl_project/bronze"
SILVER_BASE = "/Volumes/workspace/default/prueba/etl_project/silver"

LANDING_PATH = "/Volumes/workspace/default/prueba/etl_project/bronze/landing"
RAW_POPULATION_PATH = f"{LANDING_PATH}/ population.csv"
RAW_CO2_PATH = f"{LANDING_PATH}/ co2.csv"

In [0]:
population_df = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load(RAW_POPULATION_PATH)\
    .withColumnRenamed("Country Name", "country")\
    .withColumnRenamed("Country Code", "country_code")\
    .withColumnRenamed("Year", "year")\
    .withColumnRenamed("Value", "population")
        
display(population_df)

In [0]:
co2_df = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load(RAW_CO2_PATH) \
    .withColumnRenamed("country", "country") \
    .withColumn("year", F.col("year").cast(IntegerType())) \
    .withColumn("co2", F.col("co2").cast(FloatType())) \
    .withColumn("co2_per_capita", F.col("co2_per_capita").cast(FloatType())) \
    .select("country", "year", "co2", "co2_per_capita")
display(co2_df)

In [0]:
# Save to Bronze layer (Delta)

population_df.write.format("delta").mode("overwrite").save(f"{SILVER_BASE}/population")
co2_df.write.format("delta").mode("overwrite").save(f"{SILVER_BASE}/co2")

print("âœ… delta file created in Silver layer created")