#database view

In [62]:
import psycopg2
from psycopg2 import Error
import psycopg2.extras
import pandas as pd

db_params = {
    'dbname': 'postgres',
    'user': 'postgres',
    'password': '123456',
    'host': 'localhost',
    'port': '5432'
}

try:
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cursor:
            # Check if the table exists
            cursor.execute("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")
            tables = cursor.fetchall()
            print("Tables in the database:")
            for table in tables:
                print(table[0])

            table_name = 'toxic_release_inventory'
            cursor.execute(f"SELECT * FROM {table_name} LIMIT 5")
            rows = cursor.fetchall()
            print(f"\nFirst five rows of the table '{table_name}':")
            for row in rows:
                print(row)
except psycopg2.Error as e:
    print(f"An error occurred: {e}")


Tables in the database:
counties
climate
biodiversity
air_quality
pollutant_emissions
toxic_release_inventory

First five rows of the table 'toxic_release_inventory':
('82716NLSMP13151', 'Copper', Decimal('44.28536600'), Decimal('-105.38249000'), 2022, 'N', Decimal('2.00'), 'N', 'N', 'N', 56005, 'to air')
('82716NLSMP13151', 'Manganese', Decimal('44.28536600'), Decimal('-105.38249000'), 2022, 'N', Decimal('2.00'), 'N', 'N', 'N', 56005, 'to air')
('82716NLSMP13151', 'Ammonia', Decimal('44.28536600'), Decimal('-105.38249000'), 2022, 'N', Decimal('31201.00'), 'N', 'N', 'N', 56005, 'to air')
('82716NLSMP13151', 'Vanadium (except when contained in an alloy)', Decimal('44.28536600'), Decimal('-105.38249000'), 2022, 'N', Decimal('61.00'), 'N', 'N', 'N', 56005, 'to air')
('82716NLSMP13151', 'Nickel', Decimal('44.28536600'), Decimal('-105.38249000'), 2022, 'N', Decimal('0.10'), 'Y', 'N', 'N', 56005, 'to air')


In [2]:
try:
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cursor:
            cursor.execute("""
                SELECT column_name 
                FROM information_schema.columns 
                WHERE table_name = 'climate'
            """)
            columns = cursor.fetchall()
            print("Columns in the 'climate' table:")
            for column in columns:
                print(column[0])
except psycopg2.Error as e:
    print(f"An error occurred: {e}")


Columns in the 'climate' table:
climate_id
year
month
day
value
county_code
facility_id
sflag
mflag
qflag
element


#county insertion

In [3]:
df = pd.read_csv('../final_data/county_info.csv')

df = df[['geoid', 'county', 'latitude', 'longitude', 'state']]
df.columns = ['county_code', 'county_name', 'latitude', 'longitude', 'state_code']

# Convert state_code to a string with 2 characters
df['state_code'] = df['state_code'].apply(lambda x: '{:02}'.format(x) if isinstance(x, int) else x)

df['county_code'] = df['county_code'].apply(lambda x: str(x).zfill(5))

try:
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cursor:
            insert_query = """
            INSERT INTO counties (county_code, county_name, latitude, longitude, state_code)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (county_code) DO NOTHING;
            """
            for row in df.itertuples(index=False, name=None):
                cursor.execute(insert_query, row)
            conn.commit()
    print("Data inserted successfully")
except psycopg2.Error as e:
    print(f"An error occurred: {e}")
finally:
    if conn is not None:
        conn.close()  

Data inserted successfully


#biodiversity table

In [4]:
biodiversity_df = pd.read_csv('../final_data/biodb_with_county.csv')

biodiversity_df['geoid'] = biodiversity_df['geoid'].apply(lambda x: str(x).zfill(5))

biodiversity_df = biodiversity_df.drop(columns=['county','state'])

biodiversity_df.fillna(0, inplace=True)

biodiversity_df = biodiversity_df.drop_duplicates(subset='gbifID', keep='first')

biodiversity_df = biodiversity_df[biodiversity_df['geoid'] != '00000']

biodiversity_df.rename(columns={
    'gbifID': 'gbif_id',
    'datasetKey': 'dataset_key',
    'occurrenceID': 'occurrence_id',
    'order': 'orders',
    'infraspecific_epithet': 'infraspecificEpithet',
    'taxon_rank': 'taxonRank',
    'scientific_name': 'scientificName',
    'verbatim_scientific_name': 'verbatimScientificName',
    'occurrence_status': 'occurrenceStatus',
    'individual_count': 'individualCount',
    'publishing_org_key': 'publishingOrgKey',
    'decimal_latitude': 'decimalLatitude',
    'decimal_longitude': 'decimalLongitude',
    'coordinate_uncertainty_in_meters': 'coordinateUncertaintyInMeters',
    'event_date': 'eventDate',
    'taxon_key': 'taxonKey',
    'species_key': 'speciesKey',
    'basis_of_record': 'basisOfRecord',
    'collection_code': 'collectionCode',
    'catalog_number': 'catalogNumber',
    'last_interpreted': 'lastInterpreted',
    'media_type': 'mediaType',
    'geoid': 'county_code'
}, inplace=True)

insert_query = """
INSERT INTO biodiversity (
    gbif_id, dataset_key, occurrence_id, kingdom, phylum, class, orders, family, genus, species,
    infraspecific_epithet, taxon_rank, scientific_name, verbatim_scientific_name, locality, occurrence_status,
    individual_count, publishing_org_key, decimal_latitude, decimal_longitude, coordinate_uncertainty_in_meters,
    event_date, taxon_key, species_key, basis_of_record, collection_code, catalog_number, license,
    last_interpreted, media_type, issue, county_code
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""

print(f"Number of columns in DataFrame: {len(biodiversity_df.columns)}")
placeholder_count = insert_query.count('%s')
print(f"Number of placeholders in SQL query: {placeholder_count}")

try:
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cursor:
            for row in biodiversity_df.itertuples(index=False, name=None):
                cursor.execute(insert_query, row)
            conn.commit()
    print("Data inserted successfully")
except psycopg2.Error as e:
    print(f"An error occurred: {e}")
finally:
    if conn is not None:
        conn.close()


Number of columns in DataFrame: 32
Number of placeholders in SQL query: 32
Data inserted successfully


#climate table

In [5]:
climate_df = pd.read_csv('../final_data/final_weather_data.csv', low_memory=False)

# Ensure geoid is 5 digits
climate_df['geoid'] = climate_df['geoid'].apply(lambda x: str(x).zfill(5))

climate_df = climate_df.drop(columns=['county', 'state'])

climate_df = climate_df[climate_df['geoid'] != '00000']

# Data cleaning: Ensure numeric columns contain only numeric values or NaN
numeric_columns = [f"VALUE{i}" for i in range(1, 32)] 
for col in numeric_columns:
    climate_df[col] = pd.to_numeric(climate_df[col], errors='coerce')  # Convert non-numeric to NaN

normalized_data = []

# Iterate over the DataFrame and create a new record for each day
for index, row in climate_df.iterrows():
    for day in range(1, 32): 
        mflag = str(row.get(f'MFLAG{day}', ''))[0] if pd.notnull(row.get(f'MFLAG{day}')) else None
        qflag = str(row.get(f'QFLAG{day}', ''))[0] if pd.notnull(row.get(f'QFLAG{day}')) else None
        sflag = str(row.get(f'SFLAG{day}', ''))[0] if pd.notnull(row.get(f'SFLAG{day}')) else None
        record = {
            'facility_id': row['ID'],
            'year': row['YEAR'],
            'month': row['MONTH'],
            'day': day,
            'element': row['ELEMENT'],
            'value': row.get(f'VALUE{day}', None),
            'mflag': mflag,
            'qflag': qflag,
            'sflag': sflag,
            'county_code': row['geoid']
        }
        normalized_data.append(record)

data_tuples = [tuple(record.values()) for record in normalized_data]

conn = None
try:
    conn = psycopg2.connect(**db_params)
    with conn.cursor() as cursor:
        insert_query = """
        INSERT INTO climate (facility_id, year, month, day, element,
                             value, mflag, qflag, sflag, county_code)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """

        for data_tuple in data_tuples:
            cursor.execute(insert_query, data_tuple)
        conn.commit() 
    print("Data inserted successfully")
except Error as e: 
    print(f"An error occurred: {e}")
finally:
    if conn is not None:
        conn.close()

Data inserted successfully


#air_quality table

In [13]:
ozone_df = pd.read_csv('../final_data/Ozone_Levels_Daily_Records_2022.csv')
carbon_mono_df = pd.read_csv('../final_data/Carbon_Monoxide_Levels_Daily_Records_2022.csv')

air_quality_df = pd.concat([ozone_df, carbon_mono_df])

air_quality_df['geoid'] = air_quality_df['geoid'].apply(lambda x: str(x).zfill(5))

air_quality_df = air_quality_df.drop(columns=['county', 'state'])

numeric_cols = ['Observation Count', 'Observation Percent', 'Arithmetic Mean', '1st Max Value', '1st Max Hour', 'AQI']
air_quality_df[numeric_cols] = air_quality_df[numeric_cols].fillna(0)

air_quality_df = air_quality_df[air_quality_df['geoid'] != '00000']

air_quality_df.rename(columns={
    'Site Num': 'site_num',
    'Latitude': 'latitude',
    'Longitude': 'longitude',
    'Datum':'datum',
    'Parameter Name':'parameter_name',
    'Sample Duration':'sample_duration',
    'Pollutant Standard':'pollutant_standard',
    'Date Local':'date_local',
    'Units of Measure':'units_of_measure',
    'Event Type,Observation Count':'event_type',
    'Observation Count':'observation_count',
    'Observation Percent':'observation_percent',
    'Arithmetic Mean':'arithmetic_mean',
    '1st Max Value':'max_value',
    '1st Max Hour':'max_hour',
    'AQI':'aqi',
    'Local Site Name':'local_site_name',
    'Address':'address',
    'City Name':'city_name'
}, inplace=True)

air_quality_df.drop_duplicates(subset=['site_num', 'date_local', 'parameter_name'], keep='first', inplace=True)

insert_query = """
INSERT INTO air_quality (
    site_num, latitude, longitude, datum, parameter_name,
    sample_duration, pollutant_standard, date_local, units_of_measure,
    event_type, observation_count, observation_percent, arithmetic_mean,
    max_value, max_hour, aqi, local_site_name, address, city_name,
    geoid
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""

print(f"Number of columns in DataFrame: {len(air_quality_df.columns)}")

placeholder_count = insert_query.count('%s')
print(f"Number of placeholders in SQL query: {placeholder_count}")

assert len(air_quality_df.columns) == placeholder_count, "The number of placeholders does not match the number of DataFrame columns."

try:
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cursor:
            for row in air_quality_df.itertuples(index=False, name=None):
                cursor.execute(insert_query, row)
            conn.commit()
    print("Data inserted successfully")
except psycopg2.Error as e:
    print(f"An error occurred: {e}")
finally:
    if conn is not None:
        conn.close()


Number of columns in DataFrame: 20
Number of placeholders in SQL query: 20
Data inserted successfully


#pollutant_emissions table

In [36]:
pollutant_emissions_df = pd.read_csv('../final_data/nei_faculty_sample10000.csv')

pollutant_emissions_df['geoid'] = pollutant_emissions_df['geoid'].apply(lambda x: str(x).zfill(5))

pollutant_emissions_df = pollutant_emissions_df[pollutant_emissions_df['geoid'] != '00000']

pollutant_emissions_df.rename(columns={
    'SITE  NAME': 'site_name',
    'Latitude': 'latitude',
    'Longitude': 'longitude',
    'Pollutant': 'pollutant',
    'Pollutant Type': 'pollutant_type',
    'Facility Type': 'facility_type',
    'NAICS': 'naics',
    'Emissions (Tons)': 'emissions_tons', 
    'geoid': 'county_code'
}, inplace=True)

pollutant_emissions_df.drop_duplicates(subset=['site_name', 'pollutant'], keep='first', inplace=True)

pollutant_emissions_df = pollutant_emissions_df[[
    'latitude', 'longitude', 'site_name', 'pollutant', 'pollutant_type',
    'facility_type', 'naics', 'emissions_tons', 'county_code'
]]

insert_query = """
INSERT INTO pollutant_emissions (
    latitude, longitude, site_name, pollutant, pollutant_type,
    facility_type, naics, emissions_tons, county_code
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
"""

print(f"Number of columns in DataFrame: {len(pollutant_emissions_df.columns)}")

placeholder_count = insert_query.count('%s')
print(f"Number of placeholders in SQL query: {placeholder_count}")

assert len(pollutant_emissions_df.columns) == placeholder_count, "The number of placeholders does not match the number of DataFrame columns."

try:
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cursor:
            for row in pollutant_emissions_df.itertuples(index=False, name=None):
                cursor.execute(insert_query, row)
            conn.commit()
    print("Data inserted successfully")
except psycopg2.Error as e:
    print(f"An error occurred: {e}")
finally:
    if conn is not None:
        conn.close()

Number of columns in DataFrame: 9
Number of placeholders in SQL query: 9
Data inserted successfully


#pollutant_emissions table

In [61]:
toxic_release_inventory_df = pd.read_csv('../final_data/toxic_release_inventory.csv').rename(str.lower, axis='columns')

toxic_release_inventory_df = toxic_release_inventory_df[[
    'tri_facility_id', 'latitude', 'longitude', 
    'reporting_year', 'chem_name', 'elemental_metal_included', 
    'rel_est_amt', 'carcinogen', 'pbt_ind', 'pfas_ind', 
    'discharge_to', 'geoid'
]]

toxic_release_inventory_df.rename(columns={
    'geoid': 'county_code'
}, inplace=True)

toxic_release_inventory_df['county_code'] = toxic_release_inventory_df['county_code'].astype(str).str.zfill(5)
toxic_release_inventory_df = toxic_release_inventory_df[toxic_release_inventory_df['county_code'] != '00000']

insert_query = """
INSERT INTO toxic_release_inventory (
    tri_facility_id, latitude, longitude, 
    reporting_year, chem_name, elemental_metal_included, rel_est_amt, 
    carcinogen, pbt_ind, pfas_ind, 
    discharge_to, county_code
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (tri_facility_id, chem_name, reporting_year) DO NOTHING;
"""

print(f"Number of columns in DataFrame: {len(toxic_release_inventory_df.columns)}")
placeholder_count = insert_query.count('%s')
print(f"Number of placeholders in SQL query: {placeholder_count}")

assert len(toxic_release_inventory_df.columns) == insert_query.count('%s'), "The number of placeholders does not match the number of DataFrame columns."

try:
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cursor:
            for row in toxic_release_inventory_df.itertuples(index=False, name=None):
                cursor.execute(insert_query, row)
            conn.commit()
    print("Data inserted successfully")
except psycopg2.Error as e:
    print(f"An error occurred: {e}")
finally:
    if conn is not None:
        conn.close()


Number of columns in DataFrame: 12
Number of placeholders in SQL query: 12
Data inserted successfully
