# ASCOR database creation 

This notebook creates the SQL database for the ASCOR data.

# Necessary downloads

In [42]:
import sys
import os
import re
import pandas as pd 
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import pandas as pd
import os
import re
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os
# Add the project root to path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

# Now import with full module path
from utils.database_creation_utils import get_db_connection, get_engine

engine = get_engine(db_name="ascor_api")
session = get_db_connection(db_name="ascor_api")



# Creating Country Entity

In [5]:

# Load the Excel file
df = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_countries.xlsx")
df.columns = df.columns.str.strip()

# Drop and create the country table
create_country_sql = """
DROP TABLE IF EXISTS country;

CREATE TABLE country (
  country_name VARCHAR NOT NULL,
  iso VARCHAR,
  region VARCHAR,
  bank_lending_group VARCHAR,
  imf_category VARCHAR,
  un_party_type VARCHAR,
  PRIMARY KEY (country_name)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_country_sql))
    conn.commit()

# Select and rename columns to match database schema
country_df = df[[
    'Name',
    'Country ISO code',
    'Region',
    'World Bank lending group',
    'International Monetary Fund fiscal monitor category',
    'Type of Party to the United Nations Framework Convention on Climate Change'
]].copy()

country_df.columns = [
    'country_name', 'iso', 'region',
    'bank_lending_group', 'imf_category', 'un_party_type'
]

# Insert into database
country_df.to_sql("country", engine, if_exists="append", index=False)

print("Country table created and populated successfully.")


Country table created and populated successfully.


# Creating Benchmark Entities 

In [7]:
import pandas as pd
from sqlalchemy import text

# Load the Excel file
df = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_benchmarks.xlsx")
df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

# Drop and create the benchmark tables
create_benchmark_sql = """
DROP TABLE IF EXISTS benchmark_values;
DROP TABLE IF EXISTS benchmarks;

CREATE TABLE benchmarks (
  benchmark_id INT NOT NULL,
  publication_date DATE NOT NULL,
  emissions_metric VARCHAR NOT NULL,
  emissions_boundary VARCHAR NOT NULL,
  units VARCHAR NOT NULL,
  benchmark_type VARCHAR NOT NULL,
  country_name VARCHAR NOT NULL,
  PRIMARY KEY (benchmark_id),
  FOREIGN KEY (country_name) REFERENCES country(country_name)
);

CREATE TABLE benchmark_values (
  value FLOAT NOT NULL,
  year INT NOT NULL,
  benchmark_id INT NOT NULL,
  PRIMARY KEY (benchmark_id, year),
  FOREIGN KEY (benchmark_id) REFERENCES benchmarks(benchmark_id)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_benchmark_sql))
    conn.commit()

# Prepare the `benchmarks` DataFrame
benchmarks_df = df[[
    "id", "publication_date", "emissions_metric", "emissions_boundary",
    "units", "benchmark_type", "country"
]].copy()

benchmarks_df.columns = [
    "benchmark_id", "publication_date", "emissions_metric", "emissions_boundary",
    "units", "benchmark_type", "country_name"
]

# Prepare the `benchmark_values` DataFrame
value_columns = [col for col in df.columns if col.isdigit()]
benchmark_values_df = df[['id'] + value_columns].melt(
    id_vars='id',
    var_name='year',
    value_name='value'
).dropna()

benchmark_values_df.columns = ['benchmark_id', 'year', 'value']
benchmark_values_df['year'] = benchmark_values_df['year'].astype(int)

# Insert into the database
benchmarks_df.to_sql("benchmarks", engine, if_exists="append", index=False)
benchmark_values_df.to_sql("benchmark_values", engine, if_exists="append", index=False)

print("Benchmark tables created and populated successfully.")


Benchmark tables created and populated successfully.


# Assessment Elements 

In [None]:
import pandas as pd
from sqlalchemy import text

# Load the Excel file
df = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_indicators.xlsx")
df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

# Select and rename columns to match database schema
assessment_elements_df = df[[
    'code', 'text', 'units_or_response_type', 'type'
]].copy()

assessment_elements_df.columns = ['code', 'text', 'response_type', 'type']
assessment_elements_df['response_type'] = assessment_elements_df['response_type'].fillna("Not specified")

# Drop and create the assessment_elements table
create_assessment_elements_sql = """
DROP TABLE IF EXISTS assessment_elements;

CREATE TABLE assessment_elements (
  code VARCHAR NOT NULL,
  text VARCHAR NOTL NUL,
  response_type VARCHAR NOT NULL,
  type VARCHAR NOT NULL,
  PRIMARY KEY (code)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_assessment_elements_sql))
    conn.commit()

# Insert into the database
assessment_elements_df.to_sql("assessment_elements", engine, if_exists="append", index=False)

print("Assessment elements table created and populated successfully.")


Assessment elements table created and populated successfully.


# Assessment Results Entity

In [11]:
import pandas as pd
from sqlalchemy import text

# Load the assessment results Excel file
df = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_assessments_results.xlsx")
df.columns = df.columns.str.strip()

# Drop and create the assessment_results table
create_assessment_results_sql = """
DROP TABLE IF EXISTS assessment_results;

CREATE TABLE assessment_results (
  assessment_id INT NOT NULL,
  response VARCHAR,
  assessment_date DATE,
  publication_date DATE,
  source VARCHAR,
  year VARCHAR,
  code VARCHAR NOT NULL,
  country_name VARCHAR NOT NULL,
  PRIMARY KEY (assessment_id, code),
  FOREIGN KEY (code) REFERENCES assessment_elements(code),
  FOREIGN KEY (country_name) REFERENCES country(country_name)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_assessment_results_sql))
    conn.commit()

# Columns that represent coded responses (non-pillar only)
response_cols = [col for col in df.columns if (
    col.startswith("indicator ") or
    col.startswith("metric ") or
    col.startswith("area ")
)]

# Prepare a list for parsed results
rows = []

for _, row in df.iterrows():
    assessment_id = row["Id"]
    country_name = row["Country"]
    assessment_date = pd.to_datetime(row["Assessment date"]).date()
    publication_date = pd.to_datetime(row["Publication date"]).date()

    for col in response_cols:
        code = col.split(" ", 1)[1]  # Extract e.g., "EP.1.a"
        response = row[col] if pd.notna(row[col]) else None
        original_col = col  # e.g., 'indicator EP.1.a'

        # Look for optional year and source columns
        year_col = f"year {original_col}"
        year = str(int(row[year_col])) if year_col in df.columns and pd.notna(row[year_col]) else None

        source_col = f"source {original_col}"
        source = row[source_col] if source_col in df.columns and pd.notna(row[source_col]) else None

        rows.append({
            "assessment_id": assessment_id,
            "response": response,
            "assessment_date": assessment_date,
            "publication_date": publication_date,
            "source": source,
            "year": year,
            "code": code,
            "country_name": country_name
        })

# Convert to DataFrame
assessment_results_df = pd.DataFrame(rows)

# Insert into the database
assessment_results_df.to_sql("assessment_results", engine, if_exists="append", index=False)

print("Assessment results table created and populated successfully.")


  assessment_date = pd.to_datetime(row["Assessment date"]).date()


Assessment results table created and populated successfully.


# Assessment Trends 

In [37]:
import pandas as pd
from sqlalchemy import text

# Load the Excel file
df = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_assessments_results_trends_pathways.xlsx")
df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")

# Select and rename relevant columns
assessment_trends_df = df[[
    'id', 'country', 'emissions_metric', 'emissions_boundary',
    'units', 'assessment_date', 'publication_date', 'last_historical_year'
]].copy()

assessment_trends_df.columns = [
    'trend_id', 'country_name', 'emissions_metric', 'emissions_boundary',
    'units', 'assessment_date', 'publication_date', 'last_historical_year'
]

# Convert date and year fields to appropriate types
assessment_trends_df["assessment_date"] = pd.to_datetime(assessment_trends_df["assessment_date"]).dt.date
assessment_trends_df["publication_date"] = pd.to_datetime(assessment_trends_df["publication_date"]).dt.date
assessment_trends_df["last_historical_year"] = assessment_trends_df["last_historical_year"].astype("Int64")

# SQL schema with composite primary key
create_assessment_trends_sql = """
DROP TABLE IF EXISTS assessment_trends;

CREATE TABLE assessment_trends (
  trend_id INT NOT NULL,
  emissions_metric VARCHAR,
  emissions_boundary VARCHAR,
  units VARCHAR,
  assessment_date DATE,
  publication_date DATE,
  last_historical_year INT,
  country_name VARCHAR NOT NULL,
  PRIMARY KEY (trend_id, country_name),
  FOREIGN KEY (country_name) REFERENCES country(country_name)
);
"""

# Execute SQL and insert the data
with engine.connect() as conn:
    conn.execute(text(create_assessment_trends_sql))
    conn.commit()

assessment_trends_df.to_sql("assessment_trends", engine, if_exists="append", index=False)

print("Assessment trends table created and populated successfully with composite primary key.")


Assessment trends table created and populated successfully with composite primary key.


  assessment_trends_df["assessment_date"] = pd.to_datetime(assessment_trends_df["assessment_date"]).dt.date


# Trend values table 

i think the commeneted out bit is redundent but keeping just in case need in future 

In [None]:
# import pandas as pd
# from sqlalchemy import text

# # Load and clean the Excel file
# trend_values_data = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_assessments_results_trends_pathways.xlsx")
# trend_values_data.columns = trend_values_data.columns.str.strip().str.lower().str.replace(" ", "_")

# # Extract and rename relevant columns
# trend_values_df = trend_values_data[[
#     "id", "country", "metric_ep1.a.i", "source_metric_ep1.a.i", "year_metric_ep1.a.i",
#     "metric_ep1.a.ii_1-year", "metric_ep1.a.ii_3-year", "metric_ep1.a.ii_5-year"
# ]].copy()

# # Rename to match lowercase column names in SQL
# trend_values_df.columns = [
#     "trend_id", "country_name", "metric_ep1_a_i", "source_metric_ep1_a_i", "year_metric_ep1_a_i",
#     "metric_ep1_a_ii_1_year", "metric_ep1_a_ii_3_year", "metric_ep1_a_ii_5_year"
# ]

# # Convert year column to nullable integer
# trend_values_df["year_metric_ep1_a_i"] = pd.to_numeric(trend_values_df["year_metric_ep1_a_i"], errors="coerce").astype("Int64")

# # Drop and create the table with lowercase columns
# create_trend_values_sql = """
# DROP TABLE IF EXISTS trend_values;

# CREATE TABLE trend_values (
#   metric_ep1_a_i VARCHAR,
#   source_metric_ep1_a_i VARCHAR,
#   year_metric_ep1_a_i INT,
#   metric_ep1_a_ii_1_year VARCHAR,
#   metric_ep1_a_ii_3_year VARCHAR,
#   metric_ep1_a_ii_5_year VARCHAR,
#   trend_id INT NOT NULL,
#   country_name VARCHAR NOT NULL,
#   PRIMARY KEY (trend_id, country_name),
#   FOREIGN KEY (trend_id, country_name) REFERENCES assessment_trends(trend_id, country_name)
# );
# """

# # Execute and populate the table
# with engine.connect() as conn:
#     conn.execute(text(create_trend_values_sql))
#     conn.commit()

# trend_values_df.to_sql("trend_values", engine, if_exists="append", index=False)

# print("✅ Trend values table created and populated successfully.")


✅ Trend values table created and populated successfully.


In [55]:
import pandas as pd
from sqlalchemy import text

# Load and clean the Excel file
trend_values_data = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_assessments_results_trends_pathways.xlsx")
trend_values_data.columns = trend_values_data.columns.str.strip().str.lower().str.replace(" ", "_")

# Extract relevant columns
trend_values_df = trend_values_data[[
    "id", "country", "metric_ep1.a.i", "source_metric_ep1.a.i", "year_metric_ep1.a.i",
    "metric_ep1.a.ii_1-year", "metric_ep1.a.ii_3-year", "metric_ep1.a.ii_5-year"
]].copy()

# Rename for SQL compatibility
trend_values_df.columns = [
    "trend_id", "country_name", "metric_ep1_a_i", "source_metric_ep1_a_i", "year_metric_ep1_a_i",
    "metric_ep1_a_ii_1_year", "metric_ep1_a_ii_3_year", "metric_ep1_a_ii_5_year"
]

# Clean `metric_ep1_a_i`: set to NaN if "No data" or not numeric, then convert to float
trend_values_df["metric_ep1_a_i"] = pd.to_numeric(
    trend_values_df["metric_ep1_a_i"].replace("No data", pd.NA), errors="coerce"
)

# Clean `year_metric_ep1_a_i` to integer
trend_values_df["year_metric_ep1_a_i"] = pd.to_numeric(
    trend_values_df["year_metric_ep1_a_i"], errors="coerce"
).astype("Int64")

# Optionally strip % from change columns (they will remain as strings or can be converted)
for col in ["metric_ep1_a_ii_1_year", "metric_ep1_a_ii_3_year", "metric_ep1_a_ii_5_year"]:
    trend_values_df[col] = trend_values_df[col].astype(str).str.replace("%", "").str.strip()
    trend_values_df[col] = trend_values_df[col].replace("Not applicable", pd.NA)

# Create the SQL table
create_trend_values_sql = """
DROP TABLE IF EXISTS trend_values;

CREATE TABLE trend_values (
  metric_ep1_a_i FLOAT,
  source_metric_ep1_a_i VARCHAR,
  year_metric_ep1_a_i INT,
  metric_ep1_a_ii_1_year VARCHAR,
  metric_ep1_a_ii_3_year VARCHAR,
  metric_ep1_a_ii_5_year VARCHAR,
  trend_id INT NOT NULL,
  country_name VARCHAR NOT NULL,
  PRIMARY KEY (trend_id, country_name),
  FOREIGN KEY (trend_id, country_name) REFERENCES assessment_trends(trend_id, country_name)
);
"""

# Execute table creation
with engine.connect() as conn:
    conn.execute(text(create_trend_values_sql))
    conn.commit()

# Insert cleaned data
trend_values_df.to_sql("trend_values", engine, if_exists="append", index=False)

print("✅ Cleaned trend_values table created and populated successfully.")


✅ Cleaned trend_values table created and populated successfully.


# values per year table 

In [56]:
import pandas as pd
from sqlalchemy import text

# Load and clean the Excel file
trends_data = pd.read_excel("../data/TPI_ASCOR_data_13012025/ASCOR_assessments_results_trends_pathways.xlsx")
trends_data.columns = trends_data.columns.str.strip().str.lower().str.replace(" ", "_")

# Identify year columns (2021 to 2030)
year_cols = [col for col in trends_data.columns if col.isdigit() and 2021 <= int(col) <= 2030]

# Reshape into long format
value_per_year_df = trends_data[["id", "country"] + year_cols].melt(
    id_vars=["id", "country"],
    value_vars=year_cols,
    var_name="year",
    value_name="value"
)

# Rename to match database schema
value_per_year_df.columns = ["trend_id", "country_name", "year", "value"]
value_per_year_df["year"] = value_per_year_df["year"].astype(int)
value_per_year_df["value"] = pd.to_numeric(value_per_year_df["value"], errors="coerce")

# Drop rows with missing values (optional)
value_per_year_df = value_per_year_df.dropna(subset=["value"])

# SQL to create the value_per_year table
create_value_per_year_sql = """
DROP TABLE IF EXISTS value_per_year;

CREATE TABLE value_per_year (
  year INT NOT NULL,
  value FLOAT NOT NULL,
  trend_id INT NOT NULL,
  country_name VARCHAR NOT NULL,
  FOREIGN KEY (trend_id, country_name) REFERENCES trend_values(trend_id, country_name)
);
"""

# Execute SQL and populate the table
with engine.connect() as conn:
    conn.execute(text(create_value_per_year_sql))
    conn.commit()

value_per_year_df.to_sql("value_per_year", engine, if_exists="append", index=False)

print("✅ value_per_year table created and populated successfully.")


✅ value_per_year table created and populated successfully.
