# npri-db
This notebook describes the process of compiling several datasets relevant to addressing environmental racism in Canada and loading them into a Postgresql database. These include the National Pollutant Release Inventory (NPRI) Microsoft Access database of all records since 1993, tables derived from Statistics Canada's 2021 Census, and provincial/territorial level enforcement and compliance records (TBD).

## Imports

Some of these packages may need to be installed if you don't already have them locally.

In [1]:
import pandas
from datetime import datetime # For timestamping the database...TBD
from datetime import date
import requests
import zipfile
import io
import psycopg2
from sqlalchemy import create_engine
import subprocess

  from pandas.core import (


In [51]:
## Import credentials for a local Postgresql database
credentials = {"dbname": "", "user": "", "password": "", "port": ""}

engine = create_engine('postgresql+psycopg2://'+credentials["user"]+':'+credentials["password"]+'@localhost:'+credentials["port"]+'/'+credentials["dbname"]+'', 
                       isolation_level="AUTOCOMMIT")
conn = engine.raw_connection()

## Data

### NPRI

General overview of NPRI data: https://www.canada.ca/en/environment-climate-change/services/national-pollutant-release-inventory/tools-resources-data/access.html

How to interpret NPRI data: https://www.canada.ca/en/environment-climate-change/services/national-pollutant-release-inventory/using-interpreting-data.html

NPRI Microsoft Access database - all data, all years: https://open.canada.ca/data/en/dataset/06022cc0-a31e-4b4c-850d-d4dccda5f3ac

Chemicals information: https://www.canada.ca/en/health-canada/services/chemical-substances/fact-sheets/chemicals-glance.html

#### NPRI Access Database

The following script is based on https://stackoverflow.com/questions/66614826/how-to-use-pyodbc-to-migrate-tables-from-ms-access-to-postgres

It opens the NPRI Microsoft Access database (NPRI-INRP_DatabaseBaseDeDonnées_1993-present.accdb) and transfers it to Postgresql. It has to be done on a Windows machine, unfortunately.

In [None]:
import win32com.client
import pyodbc
from pathlib import Path
import os

conn_str = r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=original path...;'
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

a = win32com.client.Dispatch("Access.Application")
a.OpenCurrentDatabase(r'original path...')

table_list = []
for table_info in cursor.tables(tableType='TABLE'):
	table_list.append(table_info.table_name)

for table in table_list:
	print(f"Exporting: {table}")
	acExport = 1
	acTable = 0
	db_name = Path(r'new path...').stem.lower()
	postgres = credentials["user"]
	password = credentials["password"]
	port = credentials["port"]
	a.DoCmd.TransferDatabase(acExport, "ODBC Database", "ODBC;DRIVER={PostgreSQL Unicode};"f"DATABASE={postgres};"f"UID={postgres};"f"PWD={password};""SERVER=localhost;"f"PORT={port};", acTable, f"{table}", f"{table.lower()}")
	print(f"Finished Export of Table: {table}")

cursor.close()
conn.close()

a.DoCmd.CloseDatabase
a.Quit()
a=None
os.system("taskkill /im MSACCESS.exe") # Kill process in windows

If you're setting things up on a non-Windows machine, we first dump the Postgresql database to a sql file using the psql command line.

The specifics of this will vary depending on what version of Postgresql you've installed on Windows and your folder structure.

`psql: C:\Program Files\PostgreSQL\16\bin>pg_dump -U <user> -d <database> -p <port> -W -f <path>`

(based on: https://www.prisma.io/dataguide/postgresql/inserting-and-modifying-data/importing-and-exporting-data-in-postgresql)

Then, transfer the .sql file to the non-Windows machine. If you're setting up the Postgresql database on a Windows machine, this can be skipped because you've already loaded the database above.

The following code will drop the existing database, if present in Postgresql, and then create a new one.

In [None]:
# From https://stackoverflow.com/questions/34484066/create-a-postgres-database-using-python
# You may need to restart Postgres before being able to drop the database (if it already exists and if you already have connections)
cur = conn.cursor() # Connection should already be established above

cur.execute("DROP DATABASE IF EXISTS {};".format(credentials["dbname"]))
cur.execute("CREATE DATABASE {};".format(credentials["dbname"]))

cur.close()
conn.close()

Now we load the data and its schema:

In [None]:
# Due to the size of the NPRI data, this is best done using psql - see: https://stackoverflow.com/questions/17261061/execute-sql-schema-in-psycopg2-in-python
# May have to adjust postgres path inside .zsrch file cd /Applications/Postgres.app/Contents/Versions/15/bin
subprocess.call('psql -d '+credentials["dbname"]+' -U '+credentials["user"]+' -f NPRI/npri.sql', shell=True)

### Census Data - Attribute and Spatial

#### Attribute Data
The following downloads the 2016 and 2021 Canadian Index of Multiple Deprivation for Canada (regional/provincial indices TBD): https://www150.statcan.gc.ca/pub/45-20-0001/2023001/csv/can_scores_quintiles_csv-eng.zip

See the user guide here: https://www150.statcan.gc.ca/n1/pub/45-20-0001/452000012023002-eng.htm

First, change the connection to the new `npri` database:

In [4]:
engine = create_engine('postgresql+psycopg2://'+credentials["user"]+':'+credentials["password"]+'@localhost:'+credentials["port"]+'/'+credentials["dbname"]+'', 
                       isolation_level="AUTOCOMMIT")
conn = engine.raw_connection()

Next, create the schema for the CIMD table

In [7]:
#### I THINK THIS CAN BE DELETED AS THE TABLES ARE REPLACED ANYWAY IN LOADING IN NEXT CELL
for year in ["2016","2021"]:
    cur = conn.cursor()
    cur.execute("DROP TABLE IF EXISTS cimd_"+year+" CASCADE;") # Will delete existing table and anything that relies on it (e.g. joins)
    cur.execute("""
      CREATE TABLE cimd_"""+year+""" (
        PRCDDA integer,
        Province varchar(50),
        "Dissemination area (DA) Population" smallint,
        "Residential instability Quintiles" smallint,
        "Residential instability Scores" real,
        "Economic dependency Quintiles" smallint,
        "Economic dependency Scores" real,
        "Ethno-cultural composition Quintiles" smallint,
        "Ethno-cultural composition Scores" real,
        "Situational vulnerability Quintiles" smallint,
        "Situational vulnerability Scores" real
      );
    """) # See the user guide for more about these field names
    cur.close()

Load csv into database, based on: https://www.linkedin.com/pulse/two-simple-ways-upload-csv-files-directly-postgresql-database-mishra/

In [8]:
cimd = {"2016":"https://www150.statcan.gc.ca/pub/45-20-0001/2019001/csv/can_scores_quintiles-eng.zip", 
 "2021":"https://www150.statcan.gc.ca/pub/45-20-0001/2023001/csv/can_scores_quintiles_csv-eng.zip"}

for c in cimd.keys():
    r = requests.get(cimd[c]) 
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(c) # Currently only loading dissemination areas

    # Based on: https://stackoverflow.com/questions/23103962/how-to-write-dataframe-to-postgres-table
    df = pandas.read_csv(c+'/can_scores_quintiles_EN.csv', dtype={"Economic dependency Quintiles": int, 
                                                                                             "Ethno-cultural composition Quintiles": int, 
                                                                                             "Situational vulnerability Quintiles": int})
    display(df)
    
    engine = create_engine('postgresql+psycopg2://'+credentials["user"]+':'+credentials["password"]+'@localhost:'+credentials["port"]+'/'+credentials["dbname"]+'', 
                           isolation_level="AUTOCOMMIT")

    # Drop old table and create new empty table
    df.head(0).to_sql('cimd_'+c, engine, if_exists="replace", index=False)

    conn = engine.raw_connection()
    cur = conn.cursor()
    output = io.StringIO()
    df.to_csv(output, sep='\t', header=False, index=False)
    output.seek(0)
    contents = output.getvalue()
    cur.copy_from(output, 'cimd_'+c, null="") # null values become ''
    conn.commit()
    cur.close()

Unnamed: 0,PRCDDA,Province,Dissemination area (DA) Population,Residential instability Quintiles,Residential instability Scores,Economic dependency Quintiles,Economic dependency Scores,Ethno-cultural composition Quintiles,Ethno-cultural composition Scores,Situational vulnerability Quintiles,Situational vulnerability Scores
0,10010165,Newfoundland and Labrador,535,2,-0.523257,2,-0.345613,1,-0.760391,2,-0.361087
1,10010166,Newfoundland and Labrador,260,4,-0.045157,5,0.765725,1,-0.939756,1,-0.847012
2,10010167,Newfoundland and Labrador,340,4,-0.010904,4,0.184428,1,-0.814614,3,-0.083478
3,10010168,Newfoundland and Labrador,550,4,0.079384,4,0.364669,2,-0.669320,4,0.098873
4,10010169,Newfoundland and Labrador,300,3,-0.103398,4,0.298208,1,-0.744341,1,-0.630813
...,...,...,...,...,...,...,...,...,...,...,...
54770,59590023,British Columbia,440,3,-0.105219,2,-0.700334,2,-0.633294,5,0.882807
54771,59590024,British Columbia,320,4,0.527562,1,-1.334655,4,0.029889,5,0.430830
54772,59590025,British Columbia,685,2,-0.516546,1,-1.332887,1,-0.816583,4,0.097369
54773,59590026,British Columbia,580,3,-0.245186,1,-1.421167,3,-0.283526,4,0.265393


Unnamed: 0,PRCDDA,Province,Dissemination area (DA) Population,Residential instability Quintiles,Residential instability Scores,Economic dependency Quintiles,Economic dependency Scores,Ethno-cultural composition Quintiles,Ethno-cultural composition Scores,Situational vulnerability Quintiles,Situational vulnerability Scores
0,10010165,Newfoundland & Labrador,500,4,0.021479,2,-0.762730,2,-0.435369,2,-0.409896
1,10010166,Newfoundland & Labrador,325,3,-0.085015,4,0.130320,4,0.310954,3,-0.277458
2,10010167,Newfoundland & Labrador,435,1,-0.855457,1,-0.810557,5,1.482347,2,-0.451052
3,10010168,Newfoundland & Labrador,590,3,-0.261049,2,-0.767123,4,0.508313,3,-0.216957
4,10010169,Newfoundland & Labrador,300,3,-0.279099,1,-0.790291,4,0.330309,3,-0.019124
...,...,...,...,...,...,...,...,...,...,...,...
55822,59590026,British Columbia,510,1,-0.916828,3,-0.422681,2,-0.764244,5,0.478033
55823,59590027,British Columbia,625,2,-0.756908,2,-0.618850,1,-1.295480,4,0.167450
55824,59590032,British Columbia,105,4,0.174450,1,-0.815664,2,-0.362390,5,3.293610
55825,59590038,British Columbia,420,3,-0.323990,2,-0.708597,4,0.360693,5,4.019216


#### Get and Make Spatial Data

First, download spatial data - dissemination areas:

In [None]:
dissemination_area = "https://www12.statcan.gc.ca/census-recensement/2021/geo/sip-pis/boundary-limites/files-fichiers/lda_000b21a_e.zip"

r = requests.get(dissemination_area) 
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall("lda_000b21a_e")
#sd = geopandas.read_file("lda_000b21a_e/") # theoretically, can load zipfile directly. Problem with Geopandas dependencies?

Create schema for the dissemination area shapefile.

In [None]:
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS lda_000b21a_e CASCADE;") # Will delete existing table and anything that relies on it (e.g. joins)
cur.execute("CREATE TABLE lda_000b21a_e (DAUID varchar(10), DGUID varchar(30), LANDAREA float, PRUID varchar(2));") # Currently, dissemination area shp only
cur.execute("CREATE EXTENSION IF NOT EXISTS postgis;") # Add the postgis extension
cur.close()

Load dissemination area shapefiles into Postgresl. This will take some time and produce a lot of output unless quieted.

In [None]:
# Need to install shp2pgsql if haven't already - comes with Postgresql
path = 'lda_000b21a_e/lda_000b21a_e'
cmd = 'shp2pgsql -d -I -s 3347 {} public.{} | psql -h localhost -d {} -U {}'.format(path, 'lda_000b21a_e', credentials["dbname"], credentials["user"]) #Inserts data. Must already be in schema (which is what the previous cell does)
#print(cmd) # Debugging
subprocess.call(cmd, shell=True, stdout=subprocess.DEVNULL)

Update the dissemination area data - set the `dauid` as an integer and simplify geometries for mapping purposes:

In [None]:
cur = conn.cursor()
cur.execute('UPDATE lda_000b21a_e SET "geom" = ST_Multi(ST_SimplifyPreserveTopology("geom",75));') # Will delete existing table and anything that relies on it (e.g. joins)
cur.execute('ALTER TABLE lda_000b21a_e ALTER COLUMN "dauid" type int USING dauid::integer;') 
cur.close()

Turn NPRI facilities into point data following https://gis.stackexchange.com/questions/145007/creating-geometry-from-lat-lon-in-table-using-postgis

In [None]:
cur = conn.cursor()
cur.execute("SELECT AddGeometryColumn ('geographiclocation','geom',4269,'POINT',2);")
cur.execute('UPDATE geographiclocation SET geom = ST_SetSRID(ST_MakePoint("Longitude", "Latitude"), 4269);')
cur.execute("ALTER TABLE geographiclocation ALTER COLUMN geom TYPE Geometry(Point, 3347) USING ST_Transform(geom, 3347);") # Project to 3347 NAD83 Stats Can Lambert to make buffer calculations easier than on GCS
cur.close()

### Create and populate various NPRI views of the data in Postgresql

Use preset commands defined in the `npri_statements.sql` file to compile base tables/views

In [None]:
subprocess.call('psql -d '+credentials["dbname"]+' -U '+credentials["user"]+' -f NPRI/npri_statements.sql', shell=True)

Use Pandas to unstack and manipulate the data. Pandas offers some advantages against Postgresql.

Create materialized views (Exporter, Screen, Companies, Industries, Substances, Times)

Close the database connection:

In [52]:
for table in ["npri_reports_table",
              "npri_gaps_table",
              "npri_exporter_table",
              "npri_screen_table",
              "npri_companies_table",
              "npri_industry_table",
              "npri_tri_table",
              "history_substance_table",
              "history_company_table",
              "history_da_table"
             ]:
    cur = conn.cursor()
    cur.execute('CREATE TABLE IF NOT EXISTS '+table+' AS SELECT * FROM ' + table[0:len(table)-6]+';')
    subprocess.call('pg_dump -d npri -t '+table+' > NPRI/tables/'+table+'.sql', shell=True)
    cur.execute('DROP TABLE '+table+';')
    cur.close()

In [50]:
conn.close()

### Google Cloud actions

* delete "ALTER TABLE public.history_da_table OWNER TO postgres;" from .sql files
* upload to GC buckets (Cloud Storage)
* delete old db (!) and create new one (or drop schema - see below)
* SQL - sign into sql studio and run
  * create extension postgis
* import new tables 1x1
* set proper permssions: SQL - sign into sql studio and run
  * GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;
   

DROP SCHEMA public CASCADE; \
CREATE SCHEMA public;  \
GRANT ALL ON SCHEMA public TO postgres; \
GRANT ALL ON SCHEMA public TO public; \
CREATE EXTENSION postgis; \
/* https://stackoverflow.com/questions/3327312/how-can-i-drop-all-the-tables-in-a-postgresql-database */

### Enforcement and Compliance Data 
#### Ontario Data Catalogue

In [None]:
# Preprocessing
## Get all ON Compliance and Penalties URLs
ON_compliance_urls = {}
r = requests.get("https://data.ontario.ca/en/dataset/environmental-compliance-reports")
soup = BeautifulSoup(r.text, 'html.parser')
for link in soup.find_all('a'):
  l = link.get('href').lower()
  if '.csv' in l and '_fr' not in l: # Skip links that aren't CSVs or are in French
    year = l.split("_")[1][-4:]
    media = l.split("_")[2]
    if '.csv' in media:
      media = media[:-4]
    if year in ON_compliance_urls:
      ON_compliance_urls[year][media] = l
    else:
      ON_compliance_urls[year] = {media: l}    

ON_penalties_urls = {}
r = requests.get("https://data.ontario.ca/en/dataset/environmental-penalty-annual-report")
soup = BeautifulSoup(r.text, 'html.parser')
for link in soup.find_all('a'):
  l = link.get('href').lower()
  if '.xlsx' in l and '_fr' not in l: # Skip links that aren't CSVs or are in French
    year = l.split("_")[1][-4:]
    ON_penalties_urls[year] = l   

In [None]:
# Environmental Penalty Reports https://data.ontario.ca/en/dataset/environmental-penalty-annual-report
## https://data.ontario.ca/feeds/custom.atom?q=name:environmental-penalty-annual-report
## metadata: https://files.ontario.ca/moe_mapping/downloads/metadata/opendata/ep_metadata_new.pdf
## how to read: https://files.ontario.ca/moe_mapping/downloads/4Other/How_to_Read_EP_Report_EN.pdf
## Goes back to 2011 in separate files, mostly but not completely standardized

# Environmental Compliance Reports https://data.ontario.ca/en/dataset/environmental-compliance-reports
## Broken down into air, industrial sewage, and municipal/private sewage
## metadata: https://files.ontario.ca/moe_mapping/downloads/metadata/opendata/Environmental_Compliance_Report_metadata_EN.pdf
## RSS: https://data.ontario.ca/feeds/custom.atom?q=name:environmental-compliance-reports
## Goes back to 2012 in separate files, not all of which are named in a standard format...

# Occurences and Spills https://data.ontario.ca/en/dataset/environmental-occurrences-and-spills
## RSS: https://data.ontario.ca/feeds/custom.atom?q=name:environmental-occurrences-and-spills
## metadata: https://files.ontario.ca/moe_mapping/downloads/metadata/opendata/Spills_metadata_EN.pdf
ON_spills_url = 'https://files.ontario.ca/moe_mapping/downloads/4Other/SAC/spills_occurrences_2003-2021.csv'

# GHG https://data.ontario.ca/dataset/greenhouse-gas-emissions-reporting-by-facility
## RSS: https://data.ontario.ca/feeds/custom.atom?q=name:greenhouse-gas-emissions-reporting-by-facility
## 2010-2020 data is for 'specified activities'. There's also a 2016-2018 all activities file: https://files.ontario.ca/moe_mapping/downloads/1Air/GHG_by_year/GHG_Data_2016_2018_data_Jan232020.xls
## metadata: https://files.ontario.ca/moe_mapping/downloads/metadata/opendata/GHG_facility_metadata_EN.pdf
ON_ghg_url = 'https://files.ontario.ca/moe_mapping/downloads/1Air/GHG_by_year/GHG_Data_2010_2020_data_Dec162021.csv'

# Air pollutant stats https://data.ontario.ca/dataset/annual-air-pollutant-statistics
# Petroleum wells https://data.ontario.ca/dataset/petroleum-wells

# Hazardous waste https://data.ontario.ca/dataset/hazardous-waste-public-information
## RSS: https://data.ontario.ca/feeds/custom.atom?q=name:hazardous-waste-public-information
## metadata: https://files.ontario.ca/moe_mapping/downloads/metadata/opendata/hwin_metadata_en.pdf
## Goes back to 2002
## Returns a zipped .accdb file that can only be opened by Windows machines it seems?
#waste_url = 'https://files.ontario.ca/moe_mapping/downloads/3Land/HWIN_by_year/PIDS2021.zip'

# None of these are linked with an NPRI id....
# None have relations - e.g. no relation between spills/compliance and penalties
# None have lat/lng, only addresses. Spills only has municipality.

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36'}

urls = {"ON_spills": ON_spills_url, "ON_ghg": ON_ghg_url} #"ON_penalties": ON_penalties_urls, "ON_compliance": ON_compliance_urls, 

for file, url in urls.items():
  results[file] = {}
  if type(url) == dict: # Penalties case
    for year, link in url.items():
      results[file][year] = {}
      if type(link) == dict: # Compliance case
        results[file][year][media] = ""
        for media, path in link.items():
          print(file, year, media, path)
          results[file][year][media] = get_data(path)
      else:
        print(file, year, link)
        results[file][year] = get_data(link)
  else:
    print(file, url)
    results[file] = get_data(url)  

In [None]:
# Alternative local load for ON Penalties and ON Compliance
import os
if "ON_penalties" not in results:
  results["ON_penalties"] = {}
if "ON_compliance" not in results:
  results["ON_compliance"] = {}
  
for filename in os.listdir("penalties"):
  f = os.path.join("penalties", filename)
  #print(f)
  year = filename.split("_")[0]
  #print(year)
  data = pandas.read_excel(f)
  results["ON_penalties"][year] = data

for filename in os.listdir("compliance"):
  f = os.path.join("compliance", filename)
  #print(f)
  year = filename.split("_")[0]
  #print(year)
  media = filename.split("_")[1].lower().strip(".csv")
  #print(media)
  data = pandas.read_csv(f, encoding = 'ISO-8859-1')
  if year not in results["ON_compliance"]:
    results["ON_compliance"][year] = {}
  results["ON_compliance"][year][media] = data

In [None]:
# Create lookups out of penalties and compliance (all) datasets to facilitate merge with NPRI
results["ON_penalties"]["sites"] = None
for year in results["ON_penalties"].keys():
  results["ON_penalties"]["sites"] = pandas.concat([results["ON_penalties"]["sites"],results["ON_penalties"][year]])
results["ON_penalties"]["sites"] = results["ON_penalties"]["sites"].drop_duplicates(subset=["Company Name", "Site Address", "Municipality"])
results["ON_penalties"]["sites"] = results["ON_penalties"]["sites"][["Company Name", "Site Address", "Municipality"]]
results["ON_penalties"]["sites"]

results["ON_compliance"]["sites"] = None
for year in results["ON_compliance"].keys():
  if year != "sites":
    for media in results["ON_compliance"][year].keys():
      #print(year, media)
      results["ON_compliance"]["sites"] = pandas.concat([results["ON_compliance"]["sites"],results["ON_compliance"][year][media]])
results["ON_compliance"]["sites"] = results["ON_compliance"]["sites"].drop_duplicates(subset=["Facility Owner", "Site Address", "Site Municipality"]) # Site Name not consistent, Facility Owner is
results["ON_compliance"]["sites"] = results["ON_compliance"]["sites"][["Facility Owner", "Site Address", "Site Municipality"]]
results["ON_compliance"]["sites"]

#### Match NPRI and ON datasets

May need to convert to doing this via postgresql. Or, copy here as dataframe through pandas, manipulate, drop table, add back to the database.

In [None]:
# First, connect data with geographic information 
presets = {"ON_penalties": {"site_address": "Site Address", 
                            "city": "Municipality", 
                            "full_address": "full_address"}, 
           "ON_compliance": {"site_address": "Site Address", 
                             "city": "Site Municipality", 
                             "full_address": "full_address"}, 
           "NPRI_releases": {"site_address": "Address line 1 / Première ligne dadresse", 
                             "id": "NPRI_ID / No_INRP",
                            "year": "Reporting_Year / Année"}, 
           "NPRI_locations": {"id": "NPRI ID / ID INRP",
                             "year": "Year of last filed report / Année de déclaration la plus récente",
                             "site_address": "Address line 1 / Première ligne dadresse"}
} # keep building this out

"""
NPRI full = 910182 records
De-duplicate NPRI_locations. They are duplicated on "last reported year". Ideally we could use that to connect with a specific NPRI release year, because info could change over time
but that would probably leave a lot of gaps. For instance, NPRI ID 500352 has reports for 2020 and 2018 but only location info for 2020. Keep latest info, which should be first.
NPRI left outer join with locations de-duplicated = 910182 records
Also de-duplicate ON_penalties sites and ON_compliance sites on addresses. These are duplicated with only minor changes to facility name spelling it seems.
"""

results["NPRI_locations"] = results["NPRI_locations"].drop_duplicates(subset=presets["NPRI_locations"]["id"], keep = "first")
results["ON_penalties"]["sites"] = results["ON_penalties"]["sites"].drop_duplicates(subset=presets["ON_penalties"]["site_address"], keep = "first") # Assuming site address alone is unique, i.e. no two different facilities would both have 123 A St as their address
results["ON_compliance"]["sites"] = results["ON_compliance"]["sites"].drop_duplicates(subset=presets["ON_compliance"]["site_address"], keep = "first") # Assuming site address alone is unique, i.e. no two different facilities would both have 123 A St as their address

#### Match based on address/name and indicate where this wasn't possible (TBD) 

#### Try geocoding to more fully match datasets

In [None]:
# Construct a full address from the site address and the municipality
results["ON_compliance"]["sites"][presets["ON_compliance"]["full_address"]] = results["ON_compliance"]["sites"][presets["ON_compliance"]["site_address"]] + ", " + \
  results["ON_compliance"]["sites"][presets["ON_compliance"]["city"]] + ", Ontario, Canada"
results["ON_penalties"]["sites"][presets["ON_penalties"]["full_address"]] = results["ON_penalties"]["sites"][presets["ON_penalties"]["site_address"]] + ", " + \
  results["ON_penalties"]["sites"][presets["ON_penalties"]["city"]] + ", Ontario, Canada"
results["ON_penalties"]["sites"]

In [None]:
# Get unique full addressses from across all ON sites
unique_addresses = pandas.concat([
  results["ON_compliance"]["sites"][[presets["ON_compliance"]["full_address"], presets["ON_compliance"]["site_address"]]], 
  results["ON_penalties"]["sites"][[presets["ON_penalties"]["full_address"], presets["ON_penalties"]["site_address"]]],
])
unique_addresses = unique_addresses.drop_duplicates(subset="full_address")
unique_addresses

In [None]:
# Geocode addresses
# https://towardsdatascience.com/geocode-with-python-161ec1e62b89
#!pip install geopy
import geopy
from geopy.extra.rate_limiter import RateLimiter
from geopy.geocoders import GoogleV3 #Nominatim # AIzaSyBHhxIX9n4GANIf5Q-I6_nhrpzx6E0l_BM
locator = GoogleV3(api_key = 'AIzaSyBHhxIX9n4GANIf5Q-I6_nhrpzx6E0l_BM', user_agent="Geocoding Ontario Ministry of Environment Data")

# 1 - conveneint function to delay between geocoding calls
geocode = RateLimiter(locator.geocode, min_delay_seconds=1)
# 2- - create location column - geocode
unique_addresses['location'] = unique_addresses['full_address'].apply(geocode)
# 3 - create separate columns
unique_addresses['lat'] = unique_addresses['location'].apply(lambda loc: loc.latitude if loc else None)
unique_addresses['long'] = unique_addresses['location'].apply(lambda loc: loc.longitude if loc else None)
unique_addresses['geocoded_address']  = unique_addresses['location'].apply(lambda loc: loc.address if loc else None)
unique_addresses

In [None]:
# Alternative local load of locations
unique_addresses = pandas.read_csv("ontario_unique_addresses.csv").drop(columns = "Unnamed: 0")
unique_addresses

In [None]:
# Create ON id on ON sites so that they can be added in the buffering and address match process to NPRI locations (and then accessible from NPRI releases)
## Create id based on lat/long so that it should be consistent over time (assmuning geocoding is consistent)
unique_addresses["ON_ID"] = unique_addresses["lat"].astype(str).str[-5:] + \
  unique_addresses["location"].str.replace(" ", "").str[0::5].str.lower() #+ \
  #unique_addresses.index.astype(str)
  # should probably strip punctuation....
## Geodataframe and do search - create small buffers around each Ontario MOE facility and see if they intersect with any NPRI facilities (if not, they may not be NPRI reporters...)
unique_addresses = geopandas.GeoDataFrame(unique_addresses, geometry = geopandas.points_from_xy(unique_addresses["long"],unique_addresses["lat"], crs = 4326))
results["NPRI_locations"] = geopandas.GeoDataFrame(results["NPRI_locations"], geometry = geopandas.points_from_xy(results["NPRI_locations"]["Longitude / Longitude"],results["NPRI_locations"]["Latitude / Latitude"], crs = 4326))
## Project data
proj = 'PROJCS["Canada_Albers_Equal_Area_Conic",GEOGCS["GCS_North_American_1983",DATUM["North_American_Datum_1983",SPHEROID["GRS_1980",6378137,298.257222101]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]],PROJECTION["Albers_Conic_Equal_Area"],PARAMETER["False_Easting",0],PARAMETER["False_Northing",0],PARAMETER["longitude_of_center",-96],PARAMETER["Standard_Parallel_1",50],PARAMETER["Standard_Parallel_2",70],PARAMETER["latitude_of_center",40],UNIT["Meter",1],AUTHORITY["EPSG","102001"]]'
unique_addresses.to_crs(proj, inplace=True)
results["NPRI_locations"].to_crs(proj, inplace=True)

In [None]:
# Diagnostic: check for misisng geographic data
unique_addresses[unique_addresses.is_empty] # 2
results["NPRI_locations"][results["NPRI_locations"].is_empty] # 9 

In [None]:
# Try matching datasets on addresses
## 9 addresses join on 22612
results["NPRI_locations"] = results["NPRI_locations"].merge(unique_addresses, how = 'left', 
                               left_on = presets["NPRI_locations"]["site_address"], 
                               right_on = unique_addresses["Site Address"],
                               suffixes = ("", "_ON"))
results["NPRI_locations"] = results["NPRI_locations"][(~results["NPRI_locations"].duplicated("ON_ID")) | (results["NPRI_locations"]['ON_ID'].isnull())] # Eliminate double joined records (1)
results["NPRI_locations"].loc[~results["NPRI_locations"]["full_address"].isna(), "notes"] = "match on address"
results["NPRI_locations"].loc[~results["NPRI_locations"]["full_address"].isna()]

In [None]:
# Buffer ON penalties and ON compliances and find NPRI matches
## 145 matches at 100m
## within 1 = unique addresses that have NPRI facilities nearby
## within 0 = those NPRI facilities
## several ON facilities "share" the same NPRI facility
unique_addresses["buffer"] = unique_addresses.buffer(100) # x meter search
within = unique_addresses["buffer"].sindex.query_bulk(results["NPRI_locations"]["geometry"], predicate="within") # The spatial query
matches = {}
for pos, fac in enumerate(within[0]):
  if fac not in matches:
    matches[fac] = [within[1][pos]]
  else:
    matches[fac].append(within[1][pos])
for key in matches.keys():
  on_ids = unique_addresses[unique_addresses.index.isin(matches[key])]
  on_ids = list(on_ids["ON_ID"].unique())
  on_ids = ";".join(on_ids)
  results["NPRI_locations"].loc[results["NPRI_locations"].index == key, "ON_ID"] = on_ids
  results["NPRI_locations"].loc[results["NPRI_locations"].index == key, "notes"] = "match on geocoding"
results["NPRI_locations"][~results["NPRI_locations"]["ON_ID"].isna()]

In [None]:
# Do the reverse - to unique addresses, add the NPRI ids found within x meters
## Should also do on addressses....
matches = {}
for pos, fac in enumerate(within[1]):
  if fac not in matches:
    matches[fac] = [within[0][pos]]
  else:
    matches[fac].append(within[0][pos])
for key in matches.keys():
  npri_ids = results["NPRI_locations"][results["NPRI_locations"].index.isin(matches[key])]
  npri_ids = list(npri_ids["NPRI ID / ID INRP"].astype(str).unique()) 
  npri_ids = ";".join(npri_ids)
  unique_addresses.loc[unique_addresses.index == key, "NPRI_ID"] = npri_ids
  unique_addresses.loc[unique_addresses.index == key, "notes"] = "match on geocoding"
unique_addresses[~unique_addresses["NPRI_ID"].isna()]

In [None]:
## DELETE OR SAVE FOR LATER
## Concat compliance years/media
results["ON_compliance_full"] = None
for year in results["ON_compliance"].keys():
  if year != "sites":
    for media in results["ON_compliance"][year].keys():
      if 'ï»¿Facility Owner' in results["ON_compliance"][year][media]:
        results["ON_compliance"][year][media]["Facility Owner"] = results["ON_compliance"][year][media]['ï»¿Facility Owner']
      results["ON_compliance_full"] = pandas.concat([results["ON_compliance_full"], results["ON_compliance"][year][media]])
      results["ON_compliance_full"]["year"] = year
      results["ON_compliance_full"]["media"] = media
results["ON_compliance_full"].reset_index(inplace=True)
results["ON_compliance_full"].drop(columns=["index", "Unnamed: 17", "Unnamed: 19", "Unnamed: 20", "Unnamed: 21", 'ï»¿Facility Owner'], inplace=True) # Drop extra columns
results["ON_compliance_full"] = results["ON_compliance_full"][~results["ON_compliance_full"]["Facility Owner"].isna()] # Drop extra rows from 2013 municipal data
results["ON_compliance_full"] = results["ON_compliance_full"][~results["ON_compliance_full"]["Facility Owner"].str.contains("This data is provided for information purposes only")]
results["ON_compliance_full"] # 7219
## Join sites info into concated data based on site address
results["ON_compliance_full"]["full_address"] = results["ON_compliance_full"]["Site Address"] + ", " + results["ON_compliance_full"]["Site Municipality"]  + ", Ontario, Canada" # Create "full address" string for matching
results["ON_compliance_full"] = results["ON_compliance_full"].merge(unique_addresses, how='left', left_on="full_address", right_on="full_address", suffixes=("", "_SiteInfo"))
results["ON_compliance_full"]
## delete results["ON_compliance"]["sites"]

## Concat compliance years/media
results["ON_penalties_full"] = None
for year in results["ON_penalties"].keys():
  if year != "sites":
    results["ON_penalties_full"] = pandas.concat([results["ON_penalties_full"], results["ON_penalties"][year]])
    results["ON_penalties_full"]["year"] = year
results["ON_penalties_full"].reset_index(inplace=True)
results["ON_penalties_full"].drop(columns=["index"], inplace=True) # Drop extra columns
results["ON_penalties_full"]["full_address"] = results["ON_penalties_full"]["Site Address"] + ", " + results["ON_penalties_full"]["Municipality"]  + ", Ontario, Canada" # Create "full address" string for matching
## Join sites info into concated data based on site address
results["ON_penalties_full"] = results["ON_penalties_full"].merge(unique_addresses, how='left', left_on="full_address", right_on="full_address", suffixes=("", "_SiteInfo"))
results["ON_penalties_full"]
## delete results["ON_penalties"]["sites"]
results["NPRI_releases"] = results["NPRI_releases"].merge(results["NPRI_locations"], how = 'left', 
                               left_on = presets["NPRI_releases"]["id"],
                               right_on = presets["NPRI_locations"]["id"])
results["NPRI_releases"]

In [None]:
## Get ON penalties and compliance for a NPRI id
### Make these more general get-records with an item type option
def get_penalties(npri_id, year = None):
  """
  id : string of NPRI ID
  year : string of a year (optional)
  """
  data = results["NPRI_releases"].loc[results["NPRI_releases"][presets["NPRI_releases"]["id"]].astype(str) == npri_id]
  # Could just look up NPRI_ID in ON_penalties_full??
  local_ids = list(data["ON_ID"].unique())
  for local_id in local_ids:
    print(type(local_id))
    if type(local_id) == float: # lazy check for nan
      print("We have no records for this facility. Either they have no penalties or we are unable to match its local records with NPRI")
    else:
      # try to split at ; to get at multiple ids
      multiples = local_id.split(";")
      print(multiples)
      r = None
      if len(multiples) == 1:
        r = results["ON_penalties_full"].loc[results["ON_penalties_full"]["ON_ID"]==multiples[0]]
      else:
        for i in multiples:
          temp = results["ON_penalties_full"].loc[results["ON_penalties_full"]["ON_ID"]==multiples[i]]
          r = pandas.concat([r, temp])
      if r.empty:
        print("We have a match for this facility but there are no records of penalties")
      else:
        return r

def get_compliance(npri_id, year = None, media = None):
  """
  npri_id : string of NPRI ID
  year : string of a year (optional)
  media: string of a media type (optional)
  """
  data = results["NPRI_releases"].loc[results["NPRI_releases"][presets["NPRI_releases"]["id"]].astype(str) == npri_id]
  # Could just look up NPRI_ID in ON_compliance_full??
  local_ids = list(data["ON_ID"].unique())
  for local_id in local_ids:
    #print(type(local_id))
    if type(local_id) == float: # lazy check for nan
      print("We have no records for this facility. Either they have no violations or we are unable to match its local records with NPRI")
    else:
      # try to split at ; to get at multiple ids
      multiples = local_id.split(";")
      print(multiples)
      r = None
      if len(multiples) == 1:
        r = results["ON_compliance_full"].loc[results["ON_compliance_full"]["ON_ID"]==multiples[0]]
      else:
        for i in multiples:
          temp = results["ON_compliance_full"].loc[results["ON_compliance_full"]["ON_ID"]==multiples[i]]
          r = pandas.concat([r, temp])
      if r.empty:
        print("We have a match for this facility but there are no records of violations")
      else:
        return r

## Aggregate penalties and violations ("exceedences")
def aggregate(npri_id, item, year = None):
  """
  npri_id : list of NPRI ID(s)
  item : string of a violations or penalties dataset e.g. ON_penalties_full
  year : string of a specific year to aggregate results for (optional)
  """
  r = {}
  for i in npri_id:
    print(i)
    r[i] = None
    if item == "ON_penalties_full":
      data = get_penalties(str(i))
      if data is not None:
        r[i] = data.shape[0] # lazy count of penalties
      else:
        r[i] = 0 # 0 recorded penalties...
    else:
      data = get_compliance(str(i))
      if data is not None:
        r[i] = data.shape[0] # lazy count of penalties
      else:
        r[i] = 0 # 0 recorded penalties...
    results["NPRI_releases"].loc[results["NPRI_releases"][presets["NPRI_releases"]["id"]] == i, item] = r[i]
  return results["NPRI_releases"]

ids = list(results["NPRI_releases"][~results["NPRI_releases"]["ON_ID"].isna()][presets["NPRI_releases"]["id"]].unique()) # NPRI ids with matching local locations
aggregate(ids[0:5], "ON_compliance_full")
results["NPRI_releases"].loc[~results["NPRI_releases"]["ON_compliance_full"].isna()]
#get_compliance("201")

In [None]:
print("ON NPRI facilities since 2012: ", str(len(list(results["NPRI_releases"].loc[
  ((results["NPRI_releases"]["PROVINCE"] == "ON") & 
  (results["NPRI_releases"]["Reporting_Year / Année"] >= 2012))
][presets["NPRI_releases"]["id"]].unique()
))))
print("ON facilities with violations since 2012: ",str(len(list(results["ON_compliance_full"].loc[
  (results["ON_compliance_full"]["year"].astype(int) >= 2012)
]["full_address"].unique()))))
print("ON facilities with penalties since 2012: ",str(len(list(results["ON_penalties_full"].loc[
  (results["ON_penalties_full"]["year"].astype(int) >= 2012)
]["full_address"].unique()))))