# Data ingestion and formatting

This notebook explains how to convert the Climate TRACE dataset to a format that is more appropriate for data science. 

```{note}
This section is relevant for data engineers, or data scientists who want to understand how the data 
has been prepared. Skip if you just want to access the final, prepared data.
```

The original data from Climate TRACE is offered as a series of CSV files bundled in ZIP archives. That format is universally understood, but it is not the most effective for effective analysis with data science tools. In particular, it is large: the source data, uncompressed, is about 20GB. This is the size at which most people would consider this project to be "big data" or at least "medium data". With the proper choice of data storage, we will bring it down to a breezy "small data" without losing information along the way.

Instead, we are going to use the Parquet format. This format has a number of advantages:
- it is _column-based_ : data systems can process big chunks of data at once, rather than line by line. Also, depending on the information requested, systems will read only the relevant columns and skip the rest very effectively
- it is _universal_ : most modern data systems will be able to read it
- it is _structured_ : basic information about numbers, categories, ... are preserved. It 


Looking at the code, we are performing a few tricks:

_Compacting the data_ We minimize the size of the files by taking advantage of its structures. In particular, we know in many cases that values are part of known enumerations (sectors, ...). We replace all these by `polars.Enumeration`s. Not only this makes files smaller, but it also allows data systems to make clever optimization for complex operations such as joining.

_Lazy reading_ If we were to read all the source data using a traditional system such as Excel or Pandas, we would require a serious amount of memory. The files themselves are more than 5GB. Polars is capable of reading straight from the zip file in a streaming fashion. This is what Polars calls a Lazy dataframe, or LazyFrame. Even when doing complicated operations such as joining the source files with the confidence information, Polars only uses 3GB of memory on my machine. In fact, this way of working is so fast that the `ctrace` package directly reads all the country emissions data from the zip files in less than a second.

_Using known enumerations_ You will see in the source code that nearly all the variables such as column names, names of gas and sectors, etc. are replaced CONSTANT_NAMES such as `CH4`,.... You can use that to autocomplete



In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import logging
logging.basicConfig(level=logging.DEBUG)

In [3]:
import os
import polars as pl
from ctrace.constants import *
import ctrace as ct
import pyarrow
from dds import data_function
import shutil
import dds
import huggingface_hub
logging.getLogger("dds").setLevel(logging.WARNING)
dds.accept_module(ct)

## Creating optimized parquet files

This first section creates files that are the most effective for reading and querying. The general approach is as follows:

1. Join the source and source confidence CSV files and writes them as parquet files for each subsector
2. Aggregate by year into a yearly parquet file
3. Optimize this parquet file for reading

This first command creates parquet files that join the source and source confidences for each subsector, and returns a list of all the created files.

In this notebook, another trick is to define the transformations as _data functions_. In short, this code will only run if the source code changes. This makes rerunning the notebooks very fast, and only updating when something has changed in the source code.

In [4]:
@data_function("/data_sources")
def load_sources():
    (_, files) = ct.data.load_source_compact()
    return files

load_sources()

DEBUG:ctrace.data:Opening path agriculture.zip co2


DEBUG:ctrace.data:sources:co2: agriculture.zip -> ['DATA/cropland-fires_emissions_sources.csv', 'DATA/enteric-fermentation-cattle-operation_emissions_sources.csv', 'DATA/enteric-fermentation-cattle-pasture_emissions_sources.csv', 'DATA/manure-left-on-pasture-cattle_emissions_sources.csv', 'DATA/manure-management-cattle-operation_emissions_sources.csv', 'DATA/rice-cultivation_emissions_sources.csv', 'DATA/synthetic-fertilizer-application_emissions_sources.csv']


DEBUG:ctrace.data:opening agriculture.zip / DATA/cropland-fires_emissions_sources.csv


DEBUG:ctrace.data:opening agriculture.zip / DATA/cropland-fires_emissions_sources.csv and DATA/cropland-fires_emissions_sources_confidence.csv


DEBUG:ctrace.data:loading source <zipfile.ZipExtFile name='DATA/cropland-fires_emissions_sources.csv' mode='r' compress_type=deflate>


DEBUG:ctrace.data:loaded str


DEBUG:ctrace.data:recast


DEBUG:ctrace.data:loading source conf <zipfile.ZipExtFile name='DATA/cropland-fires_emissions_sources_confidence.csv' mode='r' compress_type=deflate>


DEBUG:ctrace.data:columns: ['source_id', 'source_name', 'iso3_country', 'sector', 'subsector', 'start_time', 'end_time', 'source_type', 'capacity', 'capacity_factor', 'activity', 'gas', 'emissions_factor', 'emissions_quantity', 'created_date', 'modified_date']


DEBUG:ctrace.data:source conf: 


KeyboardInterrupt: 

To help with the loading, the data is partitioned by year. This is the most relevant for most users: most people are expected to look at specific years and sectors (especially the latest year). This reduces the amount of data to load.

Let us have a quick peek at the data in one of these files. It looks already pretty good: a lot of the redundant data such as the enumerations has been deduplicated. All the enumeration data is now converted to integers, this is what `dictionary<values=string, indices=int32, ordered=0>` means. It is not quite ready for high performance however.

In [5]:
from pyarrow.parquet import read_table
fname = load_sources()[0]
print(fname)
read_table(fname)

/tmp/co2/cropland-fires_emissions_sources.parquet


pyarrow.Table
source_id: uint64
iso3_country: dictionary<values=string, indices=int32, ordered=0>
sector: dictionary<values=string, indices=int32, ordered=0>
subsector: dictionary<values=string, indices=int32, ordered=0>
original_inventory_sector: dictionary<values=string, indices=int32, ordered=0>
start_time: timestamp[ms, tz=UTC]
end_time: timestamp[ms, tz=UTC]
temporal_granularity: dictionary<values=string, indices=int32, ordered=0>
gas: dictionary<values=string, indices=int32, ordered=0>
emissions_quantity: double
emissions_factor: double
emissions_factor_units: large_string
capacity: double
capacity_units: large_string
capacity_factor: double
activity: double
activity_units: large_string
created_date: timestamp[ms, tz=UTC]
modified_date: timestamp[ms, tz=UTC]
source_name: large_string
source_type: large_string
lat: double
lon: double
other1: large_string
other2: large_string
other3: large_string
other4: large_string
other5: large_string
other6: large_string
other7: large_string
ot

## Aggregating by year and optimizing the output

The following block takes all the sector files and aggregates them by year. This is based on the expectation that most users will work on the latest year, and that some users will want to look into the trends across the years.

Since these files will be read many times (every time we want to do a graph), it pays off to optimize them. The Parquet format is designed for fast reads of the relevant data. We will do two main optimizations: optimal compression, optimizing the row groups and adding statistics.



_Compression_ Parquet allows some data to be compressed by columns. The first intuition is that, looking at each column of data separately, there will be more patterns and thus more opportunities to compress the data. The second intuition is that, in data-intensive application, reading the data is the bottleneck. It is then faster to read smaller compressed data in memory and then decompress it (losing a bit of time in compute), rather than reading larger, uncompressed data. Modern compression algorithms such as ZStandard or LZ4 are designed to be very effective at using a processor. Using them is essentially a pure gain in terms of processing speed.


```{admonition} CTODO
The year of a data record is defined by its start time. This may be different than the convention used by Climate Trace. To check.
```


In [6]:
write_directory = "/tmp"
years = ct.data.years
version = ct.data.version
gases = ct.constants.GAS_LIST

@data_function("/write_data")
def write_data():
    data_files = load_sources()
    dfs = []
    for tmp_name in data_files:
        print(tmp_name)
        df = pl.scan_parquet(tmp_name)
        df = df.pipe(ct.data.recast_parquet, conf=True)
        dfs.append(df)
    ldf = pl.concat(dfs)
    fnames = []
    for gas in gases:
        for year in years:
            fname1 = f"{write_directory}/pre_climate_trace-sources_{version}_{year}_{gas}.parquet"
            (
                ldf.filter(c_start_time.dt.year() == int(year))
                   .filter(c_gas == gas)
                   .sort(by=[GAS, SECTOR, SUBSECTOR, ISO3_COUNTRY, SOURCE_ID])
                   .sink_parquet(
                    fname1,
                    compression="zstd",
                    maintain_order=True,
                    statistics=True,
                )
            )
            fname = f"{write_directory}/climate_trace-sources_{version}_{year}_{gas}.parquet"
            print(fname)
            ds = pyarrow.dataset.dataset(fname1)
            pyarrow.dataset.write_dataset(
                ds,
                base_dir="/tmp",
                basename_template="ds_{i}.parquet",
                format="parquet",
                partitioning=None,
                min_rows_per_group=300_000,
                max_rows_per_group=1_000_000,
            )
            shutil.copyfile("/tmp/ds_0.parquet", fname)
            fnames.append((fname1, fname))
    return fnames

write_data()

/tmp/co2/cropland-fires_emissions_sources.parquet
/tmp/co2/enteric-fermentation-cattle-operation_emissions_sources.parquet
/tmp/co2/enteric-fermentation-cattle-pasture_emissions_sources.parquet
/tmp/co2/manure-left-on-pasture-cattle_emissions_sources.parquet
/tmp/co2/manure-management-cattle-operation_emissions_sources.parquet
/tmp/co2/rice-cultivation_emissions_sources.parquet
/tmp/co2/synthetic-fertilizer-application_emissions_sources.parquet
/tmp/co2/non-residential-onsite-fuel-usage_emissions_sources.parquet
/tmp/co2/residential-onsite-fuel-usage_emissions_sources.parquet
/tmp/co2/forest-land-clearing_emissions_sources.parquet
/tmp/co2/forest-land-degradation_emissions_sources.parquet
/tmp/co2/forest-land-fires_emissions_sources.parquet
/tmp/co2/net-forest-land_emissions_sources.parquet
/tmp/co2/net-shrubgrass_emissions_sources.parquet
/tmp/co2/net-wetland_emissions_sources.parquet
/tmp/co2/removals_emissions_sources.parquet
/tmp/co2/shrubgrass-fires_emissions_sources.parquet
/tmp/

[('/tmp/pre_climate_trace-sources_v3-2024-ct4_2021_co2.parquet',
  '/tmp/climate_trace-sources_v3-2024-ct4_2021_co2.parquet'),
 ('/tmp/pre_climate_trace-sources_v3-2024-ct4_2022_co2.parquet',
  '/tmp/climate_trace-sources_v3-2024-ct4_2022_co2.parquet'),
 ('/tmp/pre_climate_trace-sources_v3-2024-ct4_2023_co2.parquet',
  '/tmp/climate_trace-sources_v3-2024-ct4_2023_co2.parquet'),
 ('/tmp/pre_climate_trace-sources_v3-2024-ct4_2024_co2.parquet',
  '/tmp/climate_trace-sources_v3-2024-ct4_2024_co2.parquet'),
 ('/tmp/pre_climate_trace-sources_v3-2024-ct4_2021_co2e_100yr.parquet',
  '/tmp/climate_trace-sources_v3-2024-ct4_2021_co2e_100yr.parquet'),
 ('/tmp/pre_climate_trace-sources_v3-2024-ct4_2022_co2e_100yr.parquet',
  '/tmp/climate_trace-sources_v3-2024-ct4_2022_co2e_100yr.parquet'),
 ('/tmp/pre_climate_trace-sources_v3-2024-ct4_2023_co2e_100yr.parquet',
  '/tmp/climate_trace-sources_v3-2024-ct4_2023_co2e_100yr.parquet'),
 ('/tmp/pre_climate_trace-sources_v3-2024-ct4_2024_co2e_100yr.parquet

_Optimizing row groups_ A parquet file is a collection of groups of rows, and these rows are organized column-wise along with some statistics. We can choose how many groups to create: the minimum is one group (all the data into a single group), which is the most standard. This is not optimal however: reading can only be done by one processor core at a time. If we have more, they will sit idle. This is why it is better to choose the number of groups to be close to the expected number of processor cores (10-100). When reading, each core will process a different chunk of the file in parallel.

Polars cannot do this yet, so the code below directly calls the `pyarrow` package to restructure the final file, calling the function `pyarrow.dataset.write_dataset`. 

Here is the parquet files produced directly by Polars. It is the result of joining datasets which themselves are the result of reading many files (each by subsector). It is very fragmented (see the `num_row_groups` statistics below).


In [7]:
(fname_pre, fname_post) = write_data()[0]
print(fname_pre)
print(fname_post)
parquet_file = pyarrow.parquet.ParquetFile(fname_pre)
# print(parquet_file.metadata.row_group(0).column(2).statistics)
parquet_file.metadata

/tmp/pre_climate_trace-sources_v3-2024-ct4_2021_co2.parquet
/tmp/climate_trace-sources_v3-2024-ct4_2021_co2.parquet


<pyarrow._parquet.FileMetaData object at 0x7d8356541b20>
  created_by: Polars
  num_columns: 54
  num_rows: 15184030
  num_row_groups: 32
  format_version: 1.0
  serialized_size: 173153

The final file is more compact: only 58 row groups. It will be much faster to read (up to 50 times faster on my computer) because the readers do not need to gather information from each of the row groups.

In [8]:
parquet_file = pyarrow.parquet.ParquetFile(fname_post)
parquet_file.metadata

<pyarrow._parquet.FileMetaData object at 0x7d832cd93d30>
  created_by: parquet-cpp-arrow version 15.0.2
  num_columns: 54
  num_rows: 15184030
  num_row_groups: 48
  format_version: 2.6
  serialized_size: 273069

_Statistics_ Each row group in a parquet file has statistics. These statistics contain for each columns basic information such as minimum, maximum, etc. as you can see below. During a query, a data system first reads these statistics to check what blocks of data it should read. 

For example, the first row group only contains agriculture data (which you can infer from `min: agriculture` and `max: agriculture`). As the result, if a query is looking for waste data, it can safely skip this full block. 

Grouping the rows and creating statistics can dramatically reduce the amount of data being read and processed. Finding the right number of groups is a tradeoff between using more cores to read the data in parallel, and not having to read too many statistics descriptions. In the extreme case of the file created by Polars (5000 row groups), the statistics make up 40% of the file and can take up to 90% of the processing time! If your parquet file reads slowly, it is probably due to its internal layout.

In [13]:
parquet_file = pyarrow.parquet.ParquetFile(fname_post)
parquet_file.metadata.row_group(0).column(12).statistics

<pyarrow._parquet.Statistics object at 0x7d8356541350>
  has_min_max: True
  min: 0.0905689899739461
  max: 9955196.16948596
  null_count: 0
  distinct_count: None
  num_values: 327680
  physical_type: DOUBLE
  logical_type: None
  converted_type (legacy): NONE

## Initial checks

We know check that it works correctly. Let's load the newly created data instead of the default version stored on the internet, for the year 2022.

In [30]:
sdf = ct.read_source_emissions(gas=CO2, year=2024, p="/tmp")
sdf

About 6M records for this year. This is spread across multiple gas and also multiple trips in the case of boats or airplanes.

In [31]:
sdf.select(pl.len()).collect()

len
u32
15183967


Check the number of distinct source IDs

In [32]:
by_sec = (sdf
.group_by(SOURCE_ID, SECTOR)
.agg(pl.len())
.collect())

The number of sources outside FLU:

```{admonition} CTODO
This number does not match the official number on the Climate Trace website (395075 for 2022). Investigate.
```

In [33]:
by_sec.filter(c_sector != FORESTRY_AND_LAND_USE).select(pl.len())

len
u32
748730


Check: no source is associated with multiple sectors.

In [34]:
by_sec.group_by(SOURCE_ID).agg(c_sector.n_unique()).filter(pl.col(SECTOR) > 1)

source_id,sector
u64,u32


Check: no annual source should be duplicated by gas. It used to be the case with V2 release.

In [23]:
(sdf
.filter(c_temporal_granularity =="annual")
.group_by(SOURCE_ID, GAS)
.agg(pl.len())
.filter(pl.col("len") > 1)
.sort(by="len")
.collect())

source_id,gas,len
u64,enum,u32


Check: emissions should always be defined. V2 used to have empty values.

In [24]:
sdf = ct.read_source_emissions(CO2E_100YR, 2022, "/tmp")
(sdf
 .select(c_emissions_quantity.is_null().alias("null_emissions"), c_subsector, c_iso3_country)
 .group_by(c_subsector, "null_emissions")
 .agg(pl.len())
 .collect()
 .pivot(index=SUBSECTOR, on="null_emissions", values="len")
)

subsector,false
enum,u32
"""rice-cultivation""",684408
"""electricity-generation""",106464
"""net-shrubgrass""",682260
"""solid-waste-disposal""",115500
"""domestic-shipping""",99132
…,…
"""water-reservoirs""",84408
"""net-forest-land""",679632
"""international-aviation""",58920
"""cement""",26892


## Upload the data to the Hugging Face Hub

As a final step, we make the datasets available on Hugging Face as a downloadable dataset.

This step will only work if you have the credentials to upload the dataset.

In [None]:
import huggingface_hub.utils
try:
    api = huggingface_hub.HfApi()
    for (_, fpath) in write_data():
        fname = os.path.basename(fpath)
        print(fname, fpath)
        api.upload_file(
            path_or_fileobj=fpath,
            path_in_repo=fname,
            repo_id="tjhunter/climate-trace",
            repo_type="dataset-REMOVE_ME",
        )
except huggingface_hub.utils.HfHubHTTPError as e:
    print("error")
    print(e)