In [None]:
%pip install pandas
%pip install tqdm
%pip install psycopg2-binary

In [None]:
import pandas as pd
from tqdm import tqdm
from uuid import uuid5, UUID
import psycopg2

In [None]:
UUID_NAMESPACE = UUID("c87c53d6-4464-4018-b4c9-15718d354ec8")
UUID_NAMESPACE

# ARIA

In [None]:
df = pd.read_csv("./accidents-tous-req10905.csv", encoding="cp1252", sep=";", skiprows=7)
print(df.info())

In [None]:
print(df.head(1))

In [None]:
def convert_to_db(df : pd.DataFrame, trunc = None):    

    if trunc is not None :
        df = df.head(trunc)

    def create_line(line : pd.Series):
        address = " ".join([str(line["Départment"]), str(line["Commune"])])
        site_id = str(uuid5(UUID_NAMESPACE, address))
        sites = {
            "site_id" : site_id,
            "plant_name": "",
            "address": address,
            "latitude": None,               # to fill later
            "longitude": None,              # to fill later
            "country": line["Pays"],
            "industrial_activity": line["Code NAF"],
        }

        accident_key = " ".join([str(line["Titre"]), str(line["Date"])])
        accident_id = str(uuid5(UUID_NAMESPACE, accident_key))
        accidents = {
            "accident_id": accident_id,
            "site_id": site_id,
            "title": line["Titre"],
            "source": "ARIA",
            "source_id": str(line["Numéro ARIA"]),
            "accident_date": line["Date"],
            "severity_scale": line["Echelle"],
            "raw_data": "", #line,
            "created_at": "date.now()",
            "updated_at": "",
        }

        causes = {
            "accident_id": accident_id,
            "event_category": line["Causes profondes"],
            "equipment_failure": line["Causes premières"],
            "description": line["Contenu"], # could also reuse Contenu
        }

        substances = {
            "accident_id": accident_id,
            "name": line["Matières"],
            "cas_number": "",
            "quantity": "",
            "clp_class" : line["Classe de danger CLP"]
        }

        consequences_human = {
            "accident_id": accident_id,
            "fatalities": None,
            "injuries": None,
            "evacuated": None,
            "hospitalized": None,
        }

        consequences = {
            "ENVIRONNEMENTALES" : "",
            "ÉCONOMIQUES" : ""
        }

        try :
            for consequence in line["Conséquences"].split("CONSÉQUENCES "):
                if len(consequence) < 2 : continue
                s = consequence.split(',')
                key = s[0]
                content = (','.join(s[1:])).removesuffix(',')
                consequences[key] = content
        except : pass

        consequences_other = {
            "accident_id": accident_id,
            "environmental_impact": consequences["ENVIRONNEMENTALES"],
            "economic_cost": consequences["ÉCONOMIQUES"],
            "disruption_duration": line["Type évènement"]
        }

        tables = {
            "sites": sites,
            "accidents": accidents,
            "causes": causes,
            "substances": substances,
            "consequences_human": consequences_human,
            "consequences_other": consequences_other
        }

        # taken_cols = ["Titre", "Pays", "Code NAF", "Numéro ARIA", "Date", "Echelle", "Causes profondes", "Causes premières", "Contenu", "Matières", "Conséquences", "Départment", "Commune", "Classe de danger CLP", "Type évènement"]
        # print(line.drop(labels=taken_cols, errors='ignore'))
        
        return tables
    
    db_lines = []

    for x in tqdm(iter(df.iloc), total=trunc, ncols=200):
        db_lines.append(create_line(x))

    return db_lines

ARIA_db_jsons = convert_to_db(df, trunc=10)

In [None]:
from IPython.display import JSON, display

def print_db_jsons(json):
    for i, db_line in enumerate(json):
        print(i, "=" * 200)
        for key in db_line :
            print(key, flush=True, end='')
            display(JSON(db_line[key], expanded=True))
        if i == 0 : break

print_db_jsons(ARIA_db_jsons)

# OSHA - Injuries (ITA)

In [None]:
df = pd.read_csv("./OSHA ITA.csv", sep=",")
print(df.info())

In [None]:
print(df.head(1))

In [None]:
def convert_to_db(df : pd.DataFrame, trunc = None):    

    if trunc is not None :
        df = df.head(trunc)

    def create_line(line : pd.Series):
        address = f"{line['street_address']} {line['city']} {line['state']} {line['zip_code']}"
        site_key = line["establishment_name"] + " " + address
        site_id = str(uuid5(UUID_NAMESPACE, site_key))
        sites = {
            "site_id" : site_id,
            "plant_name": line["establishment_name"],  # or fallback to "company_name"
            "address": address,
            "latitude": None,  # Geocode later from address
            "longitude": None,
            "country": "USA",
            "industrial_activity": str(line["naics_code"]),  # Maps to NAF equivalent
        }

        accident_key = " ".join([line["job_description"], line["date_of_incident"]])
        accident_id = str(uuid5(UUID_NAMESPACE, accident_key))
        accidents = {
            "accident_id": accident_id,
            "site_id": site_id,
            "title": line["job_description"],  # Brief incident summary
            "source": "OSHA ITA",
            "source_id": line["case_number"],  # Unique OSHA case identifier
            "accident_date": line["date_of_incident"],
            "severity_scale": int(line["incident_outcome"]),  # 1 = Death / 2 = Days away from work / 3 = Job transfer or restriction / 4 = Other recordable case
            "raw_data": "",# line.to_dict(),  # Full line as JSON
            "created_at": "date.now()",
            "updated_at": "",  # Fill on save
        }

        causes = {
            "accident_id": accident_id, 
            "event_category": line["NEW_NAR_WHAT_HAPPENED"],  # Deep/root causes
            "equipment_failure": line["NEW_NAR_BEFORE_INCIDENT"],  # Initial triggers
            "description": line["NEW_INCIDENT_DESCRIPTION"],
        }

        substances = {
            "accident_id": accident_id,
            "name": line["NEW_NAR_OBJECT_SUBSTANCE"],  # Object/substance hit/contacted
            "cas_number": "",  # Not in ITA; research via name if needed
            "quantity": "",  # Derive from context if available
            "clp_class": line["NEW_NAR_INJURY_ILLNESS"],  # Injury type as hazard proxy
        }

        consequences_human = {
            "accident_id": accident_id,
            "fatalities": 1 if pd.notna(line["date_of_death"]) else 0,
            "injuries": 1,  # Each line is one recordable case
            "evacuated": None,  # Not directly available
            "hospitalized": 1 if line["dafw_num_away"] > 0 else 0,  # Days away implies severity
        }

        consequences_other = {
            "accident_id": accident_id,
            "environmental_impact": "",  # ITA focuses on worker injuries
            "economic_cost": "",  # Estimate from total_hours_worked if needed
            "disruption_duration": int(line["djtr_num_tr"]),  # Restriction days as proxy
        }

        tables = {
            "sites": sites,
            "accidents": accidents,
            "causes": causes,
            "substances": substances,
            "consequences_human": consequences_human,
            "consequences_other": consequences_other
        }

        # taken_cols = ["Titre", "Pays", "Code NAF", "Numéro ARIA", "Date", "Echelle", "Causes profondes", "Causes premières", "Contenu", "Matières", "Conséquences", "Départment", "Commune", "Classe de danger CLP", "Type évènement"]
        # print(line.drop(labels=taken_cols, errors='ignore'))
        
        return tables
    
    db_lines = []

    if trunc is None:
        itr = iter(df.iloc)
    else :
        itr = iter(df.head(trunc).iloc)

    for x in tqdm(itr, total=5, ncols=200):
        db_lines.append(create_line(x))

    return db_lines

OSHA_db_jsons = convert_to_db(df, trunc=500)

In [None]:
print_db_jsons(OSHA_db_jsons)

# Database Inserting

In [None]:
from psycopg2.extras import execute_values

def insert_jsons_in_db(db_jsons, conn):
    cur = conn.cursor()

    # 1. Insert sites
    # Generate an array of tuples without duplicates
    constraint_set = set()
    sites_tuples = []
    for db_json in db_jsons:
        constraint_key = db_json["sites"]["plant_name"] + db_json["sites"]["address"]
        if constraint_key in constraint_set : continue
        constraint_set.add(constraint_key)

        sites_tuples.append((
            db_json["sites"]["site_id"],
            db_json["sites"]["plant_name"], 
            db_json["sites"]["address"], 
            db_json["sites"]["latitude"], 
            db_json["sites"]["longitude"], 
            db_json["sites"]["country"], 
            db_json["sites"]["industrial_activity"]
        ))

    print("Inserting sites")
    execute_values(cur, """INSERT INTO sites (site_id, plant_name, address, latitude, longitude, country, industrial_activity) VALUES %s ON CONFLICT (plant_name, address) DO NOTHING""", sites_tuples)

    cur.execute("""
        SELECT site_id, plant_name, address 
        FROM sites
    """)
    all_sites = cur.fetchall()
    site_mapping = {(row[1], row[2]): row[0] for row in all_sites}

    # Insert accidents  
    accidents_tuples = [
        (
            db_json["accidents"]["accident_id"], 
            site_mapping[(db_json["sites"]["plant_name"], db_json["sites"]["address"])], 
            db_json["accidents"]["title"], 
            db_json["accidents"]["source"], 
            db_json["accidents"]["source_id"], 
            db_json["accidents"]["accident_date"], 
            db_json["accidents"]["severity_scale"]
        ) 
        for db_json in db_jsons
    ]
    print("Inserting accidents")
    execute_values(cur, """INSERT INTO accidents (accident_id, site_id, title, source, source_id, accident_date, severity_scale) VALUES %s ON CONFLICT DO NOTHING""", accidents_tuples)

    # Insert causes
    causes_tuples = [
        (
            db_json["causes"]["accident_id"], 
            db_json["causes"]["event_category"], 
            db_json["causes"]["equipment_failure"], 
            db_json["causes"]["description"]
        ) 
        for db_json in db_jsons
    ]
    print("Inserting causes")
    execute_values(cur, """INSERT INTO causes (accident_id, event_category, equipment_failure, description) VALUES %s ON CONFLICT (accident_id) DO NOTHING""", causes_tuples)

    # Insert substances
    substances_tuples = [
        (
            db_json["substances"]["accident_id"], 
            db_json["substances"]["name"], 
            db_json["substances"]["cas_number"], 
            db_json["substances"]["quantity"], 
            db_json["substances"]["clp_class"]
        ) 
        for db_json in db_jsons
    ]
    print("Inserting substances")
    execute_values(cur, """INSERT INTO substances (accident_id, name, cas_number, quantity, clp_class) VALUES %s ON CONFLICT (accident_id) DO NOTHING""", substances_tuples)

    # Insert human consequences
    human_tuples = [
        (
            db_json["consequences_human"]["accident_id"], 
            db_json["consequences_human"]["fatalities"], 
            db_json["consequences_human"]["injuries"], 
            db_json["consequences_human"]["evacuated"], 
            db_json["consequences_human"]["hospitalized"]
        ) 
        for db_json in db_jsons
    ]
    print("Inserting consequences_human")
    execute_values(cur, """INSERT INTO consequences_human (accident_id, fatalities, injuries, evacuated, hospitalized) VALUES %s ON CONFLICT (accident_id) DO NOTHING""", human_tuples)

    # Insert other consequences
    other_tuples = [
        (
            db_json["consequences_other"]["accident_id"], 
            db_json["consequences_other"]["environmental_impact"], 
            db_json["consequences_other"]["economic_cost"], 
            db_json["consequences_other"]["disruption_duration"]
        ) 
        for db_json in db_jsons
    ]
    print("Inserting consequences_other")
    execute_values(cur, """INSERT INTO consequences_other (accident_id, environmental_impact, economic_cost, disruption_duration) VALUES %s ON CONFLICT (accident_id) DO NOTHING""", other_tuples)

    # Commit all inserts
    conn.commit()
    cur.close()

print(len(ARIA_db_jsons))

with psycopg2.connect(host="localhost", database="postgres", user="postgres", password="7833", port=5432) as conn : 
    insert_jsons_in_db(ARIA_db_jsons, conn)