# Migrate data from ToxRefDB to HAWC

Resource: User Guide (https://nepis.epa.gov/Exe/ZyPDF.cgi/P1015KWT.PDF?Dockey=P1015KWT.PDF)

In [None]:
import os

import django
import pandas as pd
import psycopg2
from asgiref.sync import sync_to_async  # access the ORM safely within the notebook

from hawc.apps.animalv2 import constants
from hawc.apps.animalv2.models import (
    AnimalGroup,
    Chemical,
    DataExtraction,
    DoseGroup,
    DoseResponseGroupLevelData,
    Endpoint,
    Experiment,
    ObservationTime,
    Treatment,
)
from hawc.apps.assessment.models import DoseUnits, DSSTox, Species, Strain
from hawc.apps.study.models import Study
from hawc.apps.vocab.models import Observation

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hawc.main.settings.local")
django.setup()

### Set up database functions

In [763]:
def connect_to_db(db_name, user, password, host, port):
    conn = psycopg2.connect(dbname=db_name, user=user, password=password, host=host, port=port)
    print(f"Connected to {db_name} successfully!")
    return conn


def fetch_data_from_source(query, conn):
    with conn.cursor() as cursor:
        cursor.execute(query)
        return cursor.fetchall()


async def create_model_objects(model, data):
    for row in data:
        await sync_to_async(model.objects.get_or_create)(**row)


# set to toxref db
source_conn = connect_to_db("hawc-fixture", "hawc", "", "localhost", "5432")

Connected to hawc-fixture successfully!


### Stream ToxRefDB data to excel

Done for data manipulation convenience. Split by study type to avoid creating files too large; this may be redundant 

In [764]:
# Study types are found in the User Guide (https://nepis.epa.gov/Exe/ZyPDF.cgi/P1015KWT.PDF?Dockey=P1015KWT.PDF)
# Data is split by study to avoid reading too much into memory at once
STUDY_TYPES = ["CHR", "SUB", "SAC", "DEV", "MGR", "REP", "DNT", "ACU", "OTH"]

# Define query for study dose-response data
## left join to get studies without tg or dose info?
query = """SELECT * FROM prod_toxrefdb_2_1.chemical
LEFT JOIN prod_toxrefdb_2_1.study ON study.chemical_id=chemical.chemical_id
LEFT JOIN prod_toxrefdb_2_1.tg ON tg.study_id=study.study_id
LEFT JOIN prod_toxrefdb_2_1.dose ON dose.study_id=study.study_id
LEFT JOIN prod_toxrefdb_2_1.dtg ON dtg.tg_id=tg.tg_id AND dose.dose_id=dtg.dose_id
LEFT JOIN prod_toxrefdb_2_1.tg_effect ON tg.tg_id=tg_effect.tg_id
LEFT JOIN prod_toxrefdb_2_1.effect ON effect.effect_id=tg_effect.effect_id
LEFT JOIN prod_toxrefdb_2_1.endpoint ON endpoint.endpoint_id=effect.endpoint_id
LEFT JOIN prod_toxrefdb_2_1.dtg_effect ON tg_effect.tg_effect_id=dtg_effect.tg_effect_id AND dtg.dtg_id=dtg_effect.dtg_id
WHERE study.study_type = %s"""

# fetch data from toxref
print("### fetching toxref data ###")

cur = source_conn.cursor()

for type in STUDY_TYPES:
    # Define the output CSV file
    output_file = f"{type}_data.csv"

    # Use the COPY command to stream data to the CSV file
    with open(output_file, "w") as f_output:
        cur.execute(query, (type,))
        executed_query = cur.query.decode("utf-8")
        cur.copy_expert(f"COPY ({executed_query}) TO STDOUT WITH CSV HEADER", f_output)

### fetching toxref data ###


### Create Experiment and Chemical objects
Uses info from ToxRefDB study and chemical tables

In [765]:
dsstox_array = []

# Read in all chemical dsstox_substance_ids and create HAWC objects
for type in STUDY_TYPES:
    data_path = f"{type}_data.csv"
    df = pd.read_csv(data_path)
    dsstox = df["dsstox_substance_id"].unique()

    # add dtxsids to assessment
    dsstox_array.extend([{"dtxsid": value} for value in dsstox])

print("### creating dsstox items ###")
await create_model_objects(DSSTox, dsstox_array)
print(dsstox_array)

### creating dsstox items ###
[{'dtxsid': 'DTXSID9034496'}, {'dtxsid': 'DTXSID7034676'}, {'dtxsid': 'DTXSID9034496'}, {'dtxsid': 'DTXSID1037806'}, {'dtxsid': 'DTXSID7034676'}, {'dtxsid': 'DTXSID9034496'}, {'dtxsid': 'DTXSID1024174'}, {'dtxsid': 'DTXSID1032640'}, {'dtxsid': 'DTXSID6034928'}, {'dtxsid': 'DTXSID8040222'}, {'dtxsid': 'DTXSID9034496'}, {'dtxsid': 'DTXSID0032572'}, {'dtxsid': 'DTXSID5047299'}, {'dtxsid': 'DTXSID8024151'}, {'dtxsid': 'DTXSID5047320'}, {'dtxsid': 'DTXSID4020791'}, {'dtxsid': 'DTXSID9032610'}, {'dtxsid': 'DTXSID6032354'}, {'dtxsid': 'DTXSID6032358'}]


In [None]:
print("### Create a new HAWC Experiment and Chemical for each ToxRef study item ###")

# Read the CSV file into a DataFrame
study_to_exp_map = {}
toxref_to_hawc_chem_map = {}
studies = Study.objects.all()

study_map = await sync_to_async(lambda: [{study.study_identifier: study} for study in studies])()

for type in STUDY_TYPES:
    data_path = f"{type}_data.csv"
    df = pd.read_csv(data_path)
    unique_studies = df["study_id"].unique()

    for study_id in unique_studies:
        # get the first row for each study
        row = df[df["study_id"] == study_id].iloc[0]

        # Create a new experiment object

        # assumption: study_source_id will map to HAWC Study study_identifier
        experiment = {
            "study_id": study_map[row["study_source_id"]].id,
            # Experiment name; duration, admin_route, study_type, chemical. Ex: 30 day oral CHR Isazofos
            "name": f'{row["dose_end"]} {row["dose_end_unit"]} {row["admin_route"]} {row["study_type"]} {row["preferred_name"]}',
            "design": "",  # TODO: possibly s['study_type'], unsure.  predefined constants are 'todo' rn
            "has_multiple_generations": True if row["study_type"] == "MGR" else False,
            "guideline_compliance": row["study_source"],  # ask abt this text field
            "guideline": row["study_type_guideline"],
            "comments": row["study_comment"] or "",
        }

        exp = await sync_to_async(Experiment.objects.create)(**experiment)

        # map ToxRefDB study to HAWC experiment
        study_to_exp_map[study_id] = exp.id

        # Create a new HAWC Chemical for each ToxRef study (many-to-one study->chemical to one-to-one)
        # Each toxref study is linked to only one chemical

        vehicle = row["vehicle"]
        if vehicle is None:
            inhalation_study = row["admin_route"].lower() == "inhalation"
            vehicle = "not reported, assumed clean air." if inhalation_study else "not reported"

        chemical = {
            "name": row["preferred_name"],
            "dtxsid_id": row["dsstox_substance_id"],
            "cas": row["casrn"],
            "experiment_id": exp.id,
            "source": row["substance_source_name"] or "",
            "purity": row["substance_purity"] or "",
            "comments": row["substance_comment"] or "",
            # TODO: add "assumed clean air" for unreported inhalation studies
            "vehicle": vehicle,
        }

        # TODO: bulk create
        chem = await sync_to_async(Chemical.objects.create)(**chemical)
        toxref_to_hawc_chem_map[row["chemical_id"]] = chem.id

### Create a new HAWC Experiment and Chemical for each ToxRef study item ###


IndexError: list index out of range

### Create HAWC Animalgroup objects

Uses info from ToxRefDB tg and study tables

In [None]:
# map ToxRef field values to HAWC constants


def map_to_const(value, const):
    if const == constants.Lifestage:
        if "pregnancy" in value:
            value = constants.Lifestage.AG.label
        if "fetal" in value:
            value = constants.Lifestage.DEV.label  # TODO: is this a valid map

    elif const == constants.ObservationTimeUnits:
        time_arr = ["week", "month", "day"]

        if value in time_arr:
            value = f"{value}s"
        elif value == "GD":
            value = constants.ObservationTimeUnits.GD.label
        elif value == "PND":
            value = constants.ObservationTimeUnits.PND.label
        elif value == "LD":
            value = constants.ObservationTimeUnits.DAY.label  # does LD map to day?
        elif value == "lactation week":
            value = constants.ObservationTimeUnits.WK.label  # confirm

    for c in const:
        if c.label.lower() == value.lower():
            return c
    return None

Create a new HAWC animalgroup, species, and strain for each ToxRef tg

In [None]:
animalgroup_objects = []
toxref_to_hawc_tg_map = {}

for type in STUDY_TYPES:
    data_path = f"{type}_data.csv"
    df = pd.read_csv(data_path)
    df["tg_comment"] = df["tg_comment"].fillna("")  # clean this field
    df["life_stage"] = df["life_stage"].fillna("")

    unique_tgs = df["tg_id"].unique()

    for tg_id in unique_tgs:
        # get the first row for each treatment group
        row = df[df["tg_id"] == tg_id].iloc[0]

        # not all treatment groups will have an effect
        if not pd.isna(row["tg_effect_id"]):
            # create new species and strain if necessary
            species, created = await sync_to_async(Species.objects.get_or_create)(
                name=row["species"]
            )
            strain, created = await sync_to_async(Strain.objects.get_or_create)(
                name=row["strain"], species_id=species.id
            )

            # figure out generation question
            if len(row["generation"]) > 2:
                gen = constants.Generation.F1
            else:
                gen = row["generation"]

            sex = constants.Sex.COMBINED if row["sex"] == "MF" else row["sex"]
            experiment_id = study_to_exp_map[row["study_id"]]

            await sync_to_async(AnimalGroup.objects.create)(
                experiment_id=experiment_id,
                name=f"{sex} {strain.name} {species.name}",  # ex: female wistar rat
                sex=sex,
                comments=row["tg_comment"] or "",  # TODO: confirm if this mapping is correct
                generation=gen,
                lifestage_at_assessment=map_to_const(
                    row["life_stage"], constants.Lifestage
                ),  # TODO: currently getting life_stage from tg_effect: figure out which one to use
                lifestage_at_exposure=map_to_const(row["life_stage"], constants.Lifestage),
                species_id=species.id,
                strain_id=strain.id,
            )

        exposure_duration_description = f'{row["dose_duration"]} {row["dose_duration_unit"]}'

        treatment = await sync_to_async(
            Treatment.objects.create
        )(
            experiment_id=experiment_id,
            name=f'{exposure_duration_description} {row["admin_route"]} {row["preferred_name"]}',  # TODO: ask preferred name. Ex: 13 Day Oral Tebupirimfos
            chemical_id=toxref_to_hawc_chem_map[row["chemical_id"]],
            route_of_exposure=map_to_const(row["admin_route"], constants.RouteExposure),
            exposure_duration=row["dose_duration"],
            exposure_duration_description=exposure_duration_description,
            comments=row["tg_comment"] or "",
        )
        toxref_to_hawc_tg_map[tg_id] = treatment.id

Create HAWC dosegroup and dataextraction from ToxRef dose, dtg, dtg_effect, tg, tg_effect

In [None]:
toxref_to_hawc_endpoint_map = {}

In [None]:
# arbitrary, unsure of the difference btw id and dose_id
dose_group_id = 12000

for type in STUDY_TYPES:
    data_path = f"{type}_data.csv"
    df = pd.read_csv(data_path)
    unique_doses = df["dose_id"].unique()
    # get data for each unique dose
    for dose_id in unique_doses:
        # filter unique doses
        dose_df = df[df["dose_id"] == dose_id]
        dose_df["effect_var_type"] = dose_df["effect_var_type"].fillna(
            constants.VarianceType.NA.label
        )  # set NA to the default
        dose_df["dtg_effect_comment"] = dose_df["dtg_effect_comment"].fillna(
            ""
        )  # set this to the default
        dose_df["effect_val_unit"] = dose_df["effect_val_unit"].fillna(
            ""
        )  # set this to the default

        unique_dtg = dose_df["dtg_id"].unique()
        unique_tg_effect = dose_df["tg_effect_id"].unique()

        # filter dose treatment groups for each dose
        for dtg_id in unique_dtg:
            row = dose_df[dose_df["dtg_id"] == dtg_id].iloc[0]

            # create a hawc dosegroup
            # TODO: id vs dose_group_id?
            dose_unit = await sync_to_async(DoseUnits.objects.get)(name="mg/kg/d")

            await sync_to_async(DoseGroup.objects.create)(
                dose=row["dose_adjusted"],
                dose_group_id=dose_group_id,
                dose_units_id=dose_unit.id,  # TODO: see where to store conc ppm
                treatment_id=toxref_to_hawc_tg_map[row["tg_id"]],
            )
            # increment manual dose group id
            dose_group_id += 1

        for tg_effect_id in unique_tg_effect:
            # filter unique treatment groups
            dtg_effect_df = dose_df[dose_df["tg_effect_id"] == tg_effect_id]

            for dtg_effect_id in dtg_effect_df["dtg_effect_id"]:
                row = dtg_effect_df[dtg_effect_df["dtg_effect_id"] == dtg_effect_id].iloc[0]

                # get endpoint mapping
                # TODO: add term ids (or use some api)
                experiment = await sync_to_async(Experiment.objects.get)(
                    id=study_to_exp_map[row["study_id"]]
                )

                endpoint, created = await sync_to_async(Endpoint.objects.get_or_create)(
                    experiment=experiment,
                    name=row["effect_desc"],
                    system=row["endpoint_category"],
                    effect=row["endpoint_type"],
                    effect_subtype=row["endpoint_target"],
                )

                toxref_to_hawc_endpoint_map[row["endpoint_id"]] = endpoint.id

                # Create new observation time model if needed

                if not pd.isna(row["time_unit"]):
                    obs_time = await sync_to_async(ObservationTime.objects.create)(
                        observation_time_text=row["time"],
                        observation_time_units=map_to_const(
                            row["time_unit"], constants.ObservationTimeUnits
                        ),
                        endpoint_id=endpoint.id,
                    )

                    # if data extraction info is available, create model
                    dataextraction = await sync_to_async(
                        DataExtraction.objects.create
                    )(
                        experiment_id=study_to_exp_map[row["study_id"]],
                        treatment_id=toxref_to_hawc_tg_map[row["tg_id"]],
                        endpoint_id=endpoint.id,
                        method_to_control_for_litter_effects=constants.MethodToControlForLitterEffects.NR,  # default not reported
                        is_qualitative_only=False if row["no_quant_data_reported"] == "f" else True,
                        variance_type=map_to_const(row["effect_var_type"], constants.VarianceType),
                        # values_estimated = row["effect_val"],
                        response_units=str(row["effect_val_unit"])[
                            :32
                        ],  # this is limited to 32 characters, unsure if we standardise somewhere
                        observation_timepoint_id=obs_time.id,
                        result_details=row[
                            "dtg_effect_comment"
                        ],  # TODO: effect_comment or dtg_effect_comment here?
                    )

                    # calculate NOEL: lowest dose with a critical effect (dose_adjusted units are all mg/kg/day)
                    filtered_df = df[df["critical_effect"]]
                    LOEL = filtered_df["dose_adjusted"].min()

                    # calculate LOEL: highest dose with no critical effect
                    filtered_df = df[not df["critical_effect"]]
                    NOEL = filtered_df["dose_adjusted"].max()

                    statistically_significant = (
                        constants.StatisticallySignificant.YES
                        if row["treatment_related"] == "t"
                        else constants.StatisticallySignificant.NO
                    )

                    await sync_to_async(
                        DoseResponseGroupLevelData.objects.create
                    )(
                        data_extraction_id=dataextraction.id,
                        treatment_name=f'{row["dose_duration"]} {row["dose_duration_unit"]} {row["admin_route"]} {row["preferred_name"]}',  # from treatment model
                        dose=row["dose_adjusted"],  # TODO: check this out in terms mapping w dtg
                        n=None if pd.isna(row["n"]) else row["n"],
                        response=row["effect_val"],  # TODO verify
                        variance=row["effect_var"],
                        treatment_related_effect=statistically_significant,  # TODO: assumed this would be row['dtg_effect_comment'], but it's controlled
                        statistically_significant=statistically_significant,
                        p_value=0.05
                        if row["treatment_related"] == "t"
                        else "",  # TODO: ask about default p_value
                        NOEL=NOEL if not pd.isna(NOEL) else -999,
                        LOEL=LOEL if not pd.isna(LOEL) else -999,
                    )

Remove extra files

In [None]:
for type in STUDY_TYPES:
    # Define the output CSV file
    output_file = f"{type}_data.csv"
    if os.path.exists(output_file):
        os.remove(output_file)

## Migrate observation Data

In [None]:
obs_query = (
    """SELECT study_id, endpoint_id, tested_status, reported_status FROM prod_toxrefdb_2_1.obs"""
)

obs_data = fetch_data_from_source(obs_query, source_conn)

obs_mapping = [
    {
        "study_id": study_to_exp_map[row[0]],
        "endpoint_id": toxref_to_hawc_endpoint_map[row[1]],
        "tested_status": row[2],
        "reported_status": row[3],
    }
    for row in obs_data
]
await Observation.objects.bulk_create(obs_mapping)
obs_mapping

In [None]:
# Close the cursor and connection
cur.close()
source_conn.close()