#### 🟤 Bronze Layer – Raw CO₂ Emissions Data Ingestion

##### Purpose
The Bronze layer serves as the raw ingestion layer of the ETL pipeline.  
It captures data exactly as received from the source without applying any transformations.

##### Key Responsibilities
- Load raw CO₂ emissions dataset
- Preserve original schema and values


In [0]:
#loading dataset
raw_path = "/Volumes/co2_emissions/default/vol1/co2_emissions.csv"
df_raw = (
    spark.read
         .option("header", "true")
         .option("inferSchema", "true")
         .csv(raw_path)
)
display(df_raw)


country,region,income_level,year,co2_emissions_mt,population,gdp_usd,energy_consumption_twh,renewable_energy_pct,industry_emissions_pct,transport_emissions_pct,urban_population_pct,co2_per_capita,gdp_per_capita
China,Asia,Low,1990,1399.68,595620971.0,4935590000000.0,2242.32,44.38,32.59,21.95,75.53,2.349950838,8286.465258
India,Asia,Low,2012,1559.02,912557512.0,6838000000000.0,1626.76,11.87,22.36,30.72,51.7,1.708407393,7493.22395
Germany,Europe,Lower-Middle,2012,2282.84,154368553.0,964196000000.0,2654.16,41.86,40.51,30.87,44.69,14.78824512,6246.067837
United Kingdom,Europe,Low,2002,6401.8,584708537.0,19791000000000.0,7752.47,53.2,39.66,37.86,45.89,10.94870281,33847.61739
Germany,Europe,Upper-Middle,2013,7471.4,1210528893.0,10802600000000.0,7823.43,39.51,32.66,24.54,61.75,6.172012947,8923.865421
France,Europe,Upper-Middle,2017,1149.94,29655304.0,13621700000000.0,2157.13,32.53,47.38,22.46,66.54,38.77687445,459332.8072
India,Asia,,2001,7787.37,,10810600000000.0,14263.86,8.19,48.41,27.15,75.64,,
India,Asia,Lower-Middle,1994,1655.44,1025661135.0,18939600000000.0,2315.95,25.54,33.54,16.6,,1.614022354,18465.77833
France,Europe,Low,1997,714.58,1268665529.0,4443340000000.0,867.66,56.45,34.18,35.19,35.82,0.563253264,3502.370576
Australia,Oceania,Low,2001,8639.05,914628168.0,8251490000000.0,10294.2,12.71,33.88,16.88,81.36,9.445423072,9021.688481


In [0]:
#Schema Validation
expected_columns = [
    "country",
    "region",
    "income_level",
    "year",
    "co2_emissions_mt",
    "population",
    "gdp_usd",
    "co2_per_capita",
    "gdp_per_capita",
    "energy_consumption_twh",
    "renewable_energy_pct",
    "industry_emissions_pct",
    "transport_emissions_pct",
    "urban_population_pct"
]
#checking if any missing column exist
missing_columns = [c for c in expected_columns if c not in df_raw.columns]

if missing_columns:
    raise Exception(f"Schema validation failed. Missing columns: {missing_columns}")
print("Schema validation passed")



Schema validation passed


In [0]:
#Numeric Fields validation
from pyspark.sql.types import NumericType
numeric_columns = [
    "year",
    "co2_emissions_mt",
    "population",
    "gdp_usd",
    "energy_consumption_twh"
]
#validating the numeric field columns and handling by raising exceptions
for field in df_raw.schema.fields:
    if field.name in numeric_columns:
        if not isinstance(field.dataType, NumericType):
            raise Exception(f"column {field.name} must be numeric")

print("Numeric field validation passed")
print(numeric_columns)


Numeric field validation passed
['year', 'co2_emissions_mt', 'population', 'gdp_usd', 'energy_consumption_twh']


In [0]:
#Time series validation
year_stats = df_raw.selectExpr(
    "min(year) as min_year",
    "max(year) as max_year"
).collect()[0]

if year_stats.min_year < 1990 or year_stats.max_year > 2020:
    raise Exception("Year values are outside valid range")

print("Time-series validation passed")


Time-series validation passed


In [0]:
#Saving it as delta table in the lakehouse
spark.sql("USE CATALOG co2_emissions")
spark.sql("USE SCHEMA default")
(
    df_raw.write
          .format("delta")
          .mode("overwrite")
          .saveAsTable("bronze_co2_emissions")
)
print("Bronze table created successfully")


Bronze table created successfully


In [0]:

%sql
--verifying the table count
SELECT COUNT(*) 
FROM co2_emissions.default.bronze_co2_emissions;


COUNT(*)
100000
