# Data Ingestion Notebook

Source
* https://trade.cites.org/
* Download full database by clicking on button in orange banner at the top of the page

CITES Trade Database User Guide
* https://trade.cites.org/cites_trade_guidelines/en-CITES_Trade_Database_Guide.pdf

Guidance for how to filter to elephants
* https://cites.org/eng/prog/terrestrial_fauna/elephants#:~:text=African%20elephant%20(Loxodonta%20africana),Washington%20(Smithsonian%20Institution%20Press).

## Import libraries

In [1]:
import os
import glob
from zipfile import ZipFile
import pandas as pd
import numpy as np

## Set directories

In [2]:
dir_code = os.getcwd()
dir_data = os.path.join(os.path.dirname(dir_code), "data")

raw_zip_filename = "Trade_database_download_v2025.1"
raw_zip_filepath = os.path.join(dir_data, f"{raw_zip_filename}.zip")

dir_data_raw = os.path.join(dir_data, raw_zip_filename)

raw_stacked_filename = "cites_epehant_ivory_trades_raw_stacked.csv"
raw_stacked_filepath = os.path.join(dir_data, raw_stacked_filename)
raw_stacked_exists = os.path.exists(raw_stacked_filepath)

processed_filename = "cites_elephant_ivory_trades_clean.csv"
processed_filepath = os.path.join(dir_data, processed_filename)

## Read in raw elephant trade data

This looks for a stacked CSV that only contains elephant-related trade data from the CITES Trade Database. If this stacked file does not exist, nothing will happen in this step and the code will proceed to run in the partitioned raw datasets. 

Adding this logic so we don't have to read in the partitioned raw data every single time -- it takes more time and creates a lot of output in the notebook.

In [3]:
if raw_stacked_exists:
    df_elephant_raw = pd.read_csv(raw_stacked_filepath, dtype=str)
    df_elephant_raw.info()
else:
    print("Raw, stacked data with only elephant trades does not exist yet -- proceeding to run full ingestion proccess on the partitioned files.")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 376965 entries, 0 to 376964
Data columns (total 20 columns):
 #   Column                  Non-Null Count   Dtype 
---  ------                  --------------   ----- 
 0   Id                      376965 non-null  object
 1   Year                    376965 non-null  object
 2   Appendix                376965 non-null  object
 3   Taxon                   376965 non-null  object
 4   Class                   376965 non-null  object
 5   Order                   376965 non-null  object
 6   Family                  376965 non-null  object
 7   Genus                   345608 non-null  object
 8   Term                    376965 non-null  object
 9   Quantity                376965 non-null  object
 10  Unit                    74391 non-null   object
 11  Importer                376410 non-null  object
 12  Exporter                372347 non-null  object
 13  Origin                  211240 non-null  object
 14  Purpose                 314074 non-n

## Read in raw partitioned data (if ingestion process is being run for the first time)

### a. Unzip folder containing the CITES Trade Database

This assumes that if a separate folder exists with the same name as the zip folder, that you've already unzipped the data and all of the unzipped CSV files are in that folder. This can't tell if the zip was unsuccessful or only part of the files are in the folder.

In [4]:
if not raw_stacked_exists:
    if not os.path.exists(dir_data_raw):
        os.makedirs(dir_data_raw)
        with ZipFile(raw_zip_filepath, 'r') as raw_zip:
            raw_zip.extractall(path = dir_data_raw)
        print(f"Data unzipped in {dir_data_raw}")
    else:
        print("Data is already unzipped!")
else:
    print(f"Ingestion on the partitioned data has already been run -- skipping this step and reading from {raw_stacked_filename}")

Ingestion on the partitioned data has already been run -- skipping this step and reading from cites_epehant_ivory_trades_raw_stacked.csv


### b. Identify CVS files that need to be read in

These follow the pattern "trade_db_[#].csv"

In [5]:
if not raw_stacked_exists:
    csv_files = glob.glob(os.path.join(dir_data_raw, "trade_db_*.csv"))
    print(f"There are {len(csv_files)} CSV files to read in")
else:
    print(f"Ingestion on the partitioned data has already been run -- skipping this step and reading from {raw_stacked_filename}")

Ingestion on the partitioned data has already been run -- skipping this step and reading from cites_epehant_ivory_trades_raw_stacked.csv


### c. Read in and store only elephant-related trade records from each csv

This process loops through each of the CSV and does the following:

* Read in CSV
* Filter only to elephant-related trade records
    * Family = "Elephantidae"
* Store filtered data partition in a list (this list will be concatentated into one master dataset after all files are read in)

In [6]:
if not raw_stacked_exists:
    # Initialize empty list -- this is what we'll store the elephant-related trade records in from each csv
    # -- Each element in this list is a dataframe corresponding to one of the CSVs in the zipped folder
    elephant_partitions = []

    # Loop through each CSV
    for file in csv_files:
        # Read in CSV file
        print(f"Reading in {file}...")
        df = pd.read_csv(file, dtype=str)

        # Filter to elephant-related trade records
        print(f"-- Filtering to only trades involving elephants")
        df_elephants = df[df["Family"] == "Elephantidae"]

        # Add filtered df to elephant_partitions
        elephant_partitions.append(df_elephants)
        print(f"-- Done with this file!\n")

        # Delete stored partition dfs to clear memory
        del df, df_elephants
else:
    print(f"Ingestion on the partitioned data has already been run -- skipping this step and reading from {raw_stacked_filename}")

Ingestion on the partitioned data has already been run -- skipping this step and reading from cites_epehant_ivory_trades_raw_stacked.csv


### d. Stack all partitions into one dataframe

In [7]:
if not raw_stacked_exists:
    df_elephant_raw = pd.concat(elephant_partitions, ignore_index=True)
    df_elephant_raw.info()
else:
    print(f"Ingestion on the partitioned data has already been run -- skipping this step and reading from {raw_stacked_filename}")

Ingestion on the partitioned data has already been run -- skipping this step and reading from cites_epehant_ivory_trades_raw_stacked.csv


### e. Export this file so we don't have to rerun the part with CSV partitions every single team

In [8]:
if not raw_stacked_exists:
    df_elephant_raw.to_csv(raw_stacked_filepath, index=False)
else:
    print(f"Ingestion on the partitioned data has already been run -- skipping this step and reading from {raw_stacked_filename}")

Ingestion on the partitioned data has already been run -- skipping this step and reading from cites_epehant_ivory_trades_raw_stacked.csv


## Pre-processing

### a. Explore trade terms (which ones relate to ivory?)

In [9]:
df_elephant_raw["Term"].unique()

array(['live', 'ivory carvings', 'tusks', 'ivory pieces', 'unspecified',
       'leather products (small)', 'bodies', 'carvings', 'skin pieces',
       'specimens', 'derivatives', 'bone products', 'trophies',
       'leather items', 'feet', 'skulls', 'bone carvings', 'garments',
       'teeth', 'shoes', 'bones', 'bone pieces', 'sets of piano keys',
       'skeletons', 'leather products (large)', 'meat', 'skins', 'hair',
       'hair products', 'tails', 'genitalia', 'ears', 'leather',
       'skin scraps', 'ivory scraps', 'shells', 'furniture', 'roots',
       'powder', 'sides', 'jewellery - ivory ', 'caviar', 'medicine',
       'plates', 'piano keys', 'jewellery', 'trunk', 'extract',
       'horn pieces', 'cloth', 'claws', 'fur products (large)',
       'fur product (small)'], dtype=object)

### b. Flag trades containing ivory

Any terms containing "ivory", "tusk", or "piano" will be treated as ivory trades. We cannot be certain about any of the other terms being ivory.

In [10]:
df = df_elephant_raw.copy()

# Flag ivory trades
ivory_terms = ["ivory", "tusk", "piano"]
df["ivory_trade"] = df["Term"].apply(lambda x: any(term in x.lower() for term in ivory_terms))

# Confirm this worked
df[df["ivory_trade"] == True]["Term"].unique()

array(['ivory carvings', 'tusks', 'ivory pieces', 'sets of piano keys',
       'ivory scraps', 'jewellery - ivory ', 'piano keys'], dtype=object)

### c. Filter to ivory trades only

In [11]:
df = df[df["ivory_trade"] == True]
print(f"We have {len(df)} trades that contain elephant ivory!")

We have 259902 trades that contain elephant ivory!


### d. Fill in blank "Units" values with "Number of specimens"

Data usage guide explains that any trade records that do not have "Units" filled in means the units are "Number of specimens"

In [12]:
# Explore unique values to see exactly which one represents number of specimens
# -- Copy/paste that value as what to fill blanks with
df["Unit"].unique()

array([nan, 'sets', 'kg', 'g', 'pairs', 'shipments', 'boxes', 'bags',
       'cases', 'pieces', 'cartons', 'm', 'oz', 'cm', 'ml', 'm3',
       'Number of specimens', 'cm3', 'm2'], dtype=object)

In [13]:
df["unit_clean"] = df["Unit"].fillna("Number of specimens")
df["unit_clean"].value_counts()

Number of specimens    199561
kg                      41257
g                       12527
sets                     4910
pairs                    1214
oz                        341
shipments                  57
boxes                      11
cm                          5
cases                       4
m3                          3
pieces                      3
m                           3
bags                        2
cartons                     1
ml                          1
cm3                         1
m2                          1
Name: unit_clean, dtype: int64

### e. Filter to "Number of specimens" records only

Trades where the unit is "Number of specimens" accounts for the vast majority of the data. There is no clear way to aggregate all of the units together in a way that preserves the true unit of the trade in a meaningful way (i.e. no clear conversion between "kg" or "g" and "Number of specimens"). Therefore we will drop anything that is not in a unit of "Number of specimens" to maintain reasonable data integrity.

In [14]:
df = df[df["unit_clean"] == "Number of specimens"]
print(f"We have {len(df)} elephant ivory trades that are reported in units of 'Number of specimens'!")

We have 199561 elephant ivory trades that are reported in units of 'Number of specimens'!


### f. Replace unknown ISO country codes with NULL

* According to data usage guide, country code "XX" represents an unknown ISO code. We should replace these with NULL so that this value does not get loaded as nodes, attributes, etc.

In [15]:
# List of columns that are populated with ISO codes
cols_country_iso = ["Importer", "Exporter", "Origin"]

# Replaec "XX" with NULL
for col in cols_country_iso:
    df[f"{col.lower()}_clean"] = df[col].replace("XX", np.nan)

### g. Drop trade records that have unknown or missing importing AND exporting countries

We can only use records that have a known importer AND exporter in order to create graph relationships
* We will drop a trade if we do not know the importing and exporting country

In [16]:
# Flag records where we do not know BOTH the importer and exporter
df["bad_records_iso"] = df[["importer_clean", "exporter_clean"]].isnull().any(axis=1)
print(f"There are {df['bad_records_iso'].sum()} records that have missing importer OR exporter information. Dropping these!")

# Drop records with a full, known exporter -> importer path
df = df[df["bad_records_iso"] != 1]
print(f"We have {len(df)} elephant ivory trades with a known exporter -> importer path!")

There are 4969 records that have missing importer OR exporter information. Dropping these!
We have 194592 elephant ivory trades with a known exporter -> importer path!


### h. Create numeric (integer) versions of "Year" and "Quantity" fields

In [17]:
df["year_num"] = pd.to_numeric(df["Year"], errors="coerce")
df["quantity_num"] = pd.to_numeric(df["Quantity"], errors="coerce")

### i. Missing record check

Make sure that all variables that must be populated for a given trade are populated.

* ID
* year_num
* Taxon
* Family
* Term
* quantity_num
* unit_clean
* importer_clean
* exporter_clean

In [18]:
cols_mandatory = ["Id", "year_num", "Taxon", "Family", "Term", "quantity_num", "unit_clean", "importer_clean", "exporter_clean"]

n_missing_mandatory = df[cols_mandatory].isnull().any(axis=1).sum()

if n_missing_mandatory > 0:
    raise ValueError("THERE ARE MISSING VALUES IN SOME MANDATORY COLUMNS REQUIRED FOR GRAPH CREATION. INVESTIGATE!")

### j. Duplicate record checks

In [19]:
# By all records
df[df.duplicated(keep=False)]

Unnamed: 0,Id,Year,Appendix,Taxon,Class,Order,Family,Genus,Term,Quantity,...,Export.permit.RandomID,Origin.permit.RandomID,ivory_trade,unit_clean,importer_clean,exporter_clean,origin_clean,bad_records_iso,year_num,quantity_num


In [20]:
# By ID column
df[df.duplicated(subset=["Id"], keep=False)]

Unnamed: 0,Id,Year,Appendix,Taxon,Class,Order,Family,Genus,Term,Quantity,...,Export.permit.RandomID,Origin.permit.RandomID,ivory_trade,unit_clean,importer_clean,exporter_clean,origin_clean,bad_records_iso,year_num,quantity_num


### k. Create final dataset to use for graph

* Only keep relevant columns that will be used in graph database
* Convert all columns to lowercase
* Rename columns if necessary
* Sort data by year and taxon

In [21]:
# Keep relevant columns
cols_keep = ["Id", "year_num", "Taxon", "Family", "Term", "quantity_num", "unit_clean", "importer_clean", "exporter_clean", "origin_clean"]
df_final = df[cols_keep]

# Convert all column names to lowercase
df_final.columns = df_final.columns.str.lower()

# Rename columns
df_final = df_final.rename(columns = {
    "year_num": "year",
    "quantity_num": "quantity",
    "unit_clean": "unit",
    "importer_clean": "importer",
    "exporter_clean": "exporter",
    "origin_clean": "origin"
})

# Sort
df_final = df_final.sort_values(by=["year", "taxon"])

## Preview and export cleaned dataset

### a. Preview

In [22]:
df_final

Unnamed: 0,id,year,taxon,family,term,quantity,unit,importer,exporter,origin
105,761424121,1976,Elephas maximus,Elephantidae,ivory carvings,100.0,Number of specimens,DE,IN,
106,301000281,1976,Elephas maximus,Elephantidae,ivory carvings,42.0,Number of specimens,DE,IN,
14893,2437221511,1977,Elephantidae spp.,Elephantidae,tusks,2.0,Number of specimens,DK,NG,
158,103564221,1977,Elephas maximus,Elephantidae,ivory carvings,1.0,Number of specimens,CA,US,
21376,5571162511,1977,Loxodonta africana,Elephantidae,ivory pieces,1.0,Number of specimens,US,GB,
...,...,...,...,...,...,...,...,...,...,...
376960,6430275356,2023,Loxodonta africana,Elephantidae,ivory carvings,1.0,Number of specimens,ZA,GB,
375578,5472110955,2024,Loxodonta africana,Elephantidae,ivory pieces,200.0,Number of specimens,US,MZ,
375720,2335572955,2024,Loxodonta africana,Elephantidae,tusks,2.0,Number of specimens,PA,CM,
376701,3767382656,2024,Loxodonta africana,Elephantidae,ivory carvings,30.0,Number of specimens,MY,MW,


In [23]:
df_final.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 194592 entries, 105 to 376704
Data columns (total 10 columns):
 #   Column    Non-Null Count   Dtype  
---  ------    --------------   -----  
 0   id        194592 non-null  object 
 1   year      194592 non-null  int64  
 2   taxon     194592 non-null  object 
 3   family    194592 non-null  object 
 4   term      194592 non-null  object 
 5   quantity  194592 non-null  float64
 6   unit      194592 non-null  object 
 7   importer  194592 non-null  object 
 8   exporter  194592 non-null  object 
 9   origin    36958 non-null   object 
dtypes: float64(1), int64(1), object(8)
memory usage: 16.3+ MB


### b. Export

In [24]:
df_final.to_csv(processed_filepath, index=False)