In [1]:
import pandas as pd
import requests
from bs4 import BeautifulSoup

https://www.epa.gov/dwreginfo/public-notification-rule

In [14]:
pn_violation_assoc_df = pd.read_csv('../data/ECHO_data_csv/SDWA_PN_VIOLATION_ASSOC.csv', dtype=str)

In [15]:
pn_violation_assoc_df.head()

Unnamed: 0,SUBMISSIONYEARQUARTER,PWSID,PN_VIOLATION_ID,RELATED_VIOLATION_ID,COMPL_PER_BEGIN_DATE,COMPL_PER_END_DATE,VIOLATION_CODE,CONTAMINANT_CODE,FIRST_REPORTED_DATE,LAST_REPORTED_DATE
0,2021Q2,NC0118129,4707272,4706607,01/01/2006,12/31/2006,3,2274,08/22/2008,06/28/2019
1,2021Q2,NC0118129,4707272,4706707,01/01/2006,12/31/2006,3,2306,08/22/2008,06/28/2019
2,2021Q2,NC0118129,4707272,4706807,01/01/2006,12/31/2006,3,2326,08/22/2008,06/28/2019
3,2021Q2,NC0118129,4707272,4706907,01/01/2006,12/31/2006,3,2383,08/22/2008,06/28/2019
4,2021Q2,NC0118129,4707272,4707007,01/01/2006,12/31/2006,3,2931,08/22/2008,06/28/2019


In [48]:
print(f'Total number of records in pn violation assoc table is {pn_violation_assoc_df.shape[0]}')

Total number of records in pn violation assoc table is 298518


In [56]:
sum(pn_violation_assoc_df.groupby(['PWSID', 'PN_VIOLATION_ID', 'RELATED_VIOLATION_ID']).size().tolist())

298518

The column set {PWSID, PN_VIOLATION_ID, RELATED_VIOLATION_ID} uniquely identifies each record

In [19]:
pn_violation_assoc_df.isnull().sum()

SUBMISSIONYEARQUARTER        0
PWSID                        0
PN_VIOLATION_ID              0
RELATED_VIOLATION_ID         0
COMPL_PER_BEGIN_DATE         0
COMPL_PER_END_DATE       19781
VIOLATION_CODE               0
CONTAMINANT_CODE           175
FIRST_REPORTED_DATE          0
LAST_REPORTED_DATE        2916
dtype: int64

In [20]:
## Seeing date formats
date_columns = ['COMPL_PER_BEGIN_DATE', 'COMPL_PER_END_DATE', 'FIRST_REPORTED_DATE', 'LAST_REPORTED_DATE']
pn_violation_assoc_df[date_columns].head()

Unnamed: 0,COMPL_PER_BEGIN_DATE,COMPL_PER_END_DATE,FIRST_REPORTED_DATE,LAST_REPORTED_DATE
0,01/01/2006,12/31/2006,08/22/2008,06/28/2019
1,01/01/2006,12/31/2006,08/22/2008,06/28/2019
2,01/01/2006,12/31/2006,08/22/2008,06/28/2019
3,01/01/2006,12/31/2006,08/22/2008,06/28/2019
4,01/01/2006,12/31/2006,08/22/2008,06/28/2019


In [21]:
for datecol in date_columns:
    pn_violation_assoc_df[datecol] = pd.to_datetime(pn_violation_assoc_df[datecol], format='%m/%d/%Y')

In [22]:
pn_violation_assoc_df[date_columns].head()

Unnamed: 0,COMPL_PER_BEGIN_DATE,COMPL_PER_END_DATE,FIRST_REPORTED_DATE,LAST_REPORTED_DATE
0,2006-01-01,2006-12-31,2008-08-22,2019-06-28
1,2006-01-01,2006-12-31,2008-08-22,2019-06-28
2,2006-01-01,2006-12-31,2008-08-22,2019-06-28
3,2006-01-01,2006-12-31,2008-08-22,2019-06-28
4,2006-01-01,2006-12-31,2008-08-22,2019-06-28


In [23]:
for datacol in date_columns:
    print('Range of {} is between {} and {}'.format(datacol, pn_violation_assoc_df[datacol].min(), pn_violation_assoc_df[datacol].max()))

Range of COMPL_PER_BEGIN_DATE is between 1992-06-30 00:00:00 and 2021-05-04 00:00:00
Range of COMPL_PER_END_DATE is between 1992-09-30 00:00:00 and 2021-12-31 00:00:00
Range of FIRST_REPORTED_DATE is between 2002-02-16 00:00:00 and 2021-07-01 00:00:00
Range of LAST_REPORTED_DATE is between 2002-02-16 00:00:00 and 2021-07-01 00:00:00


In [25]:
len(pn_violation_assoc_df.columns)

10

### Web Scraping Column description and data type

In [26]:
def get_html():
    URL = "https://echo.epa.gov/tools/data-downloads/sdwa-download-summary"
    r = requests.get(URL)  
    html_soup = BeautifulSoup(r.content, 'html.parser') 
    return html_soup
html_soup = get_html()

In [40]:
cols = []
filename = 'SDWA_PN_VIOLATION_ASSOC.csv'
idx = [i for i, item in enumerate(list(html_soup.find_all("div", {"class": 'field-item even'})[0].find_all('h3'))) if filename in item.getText()][0]
for th in html_soup.find_all("div", {"class": 'field-item even'})[0].find_all('h3')[idx].find_next_sibling('table').find("thead").find_all("th"):
    cols.append(th.get_text().replace(u'\xa0', u''))
rows = []
for row in html_soup.find_all("div", {"class": 'field-item even'})[0].find_all('h3')[idx].find_next_sibling('table').find("tbody").find_all("tr"):
    r = []
    for td in row.find_all('td'):
        r.append(td.get_text().replace(u'\xa0', u'').replace('CONTAMINATION_CODE', 'CONTAMINANT_CODE'))
    rows.append(r)
rows.append(['COMPL_PER_BEGIN_DATE', 'Date', ''])
rows.append(['RELATED_VIOLATION_ID', 'Char', '40'])


column_datatype = pd.DataFrame(rows, columns=cols)
column_datatype = column_datatype[column_datatype.Element.isin(pn_violation_assoc_df.columns)]

column_datatype

Unnamed: 0,Element,Data Type,Length
0,PWSID,Char,9.0
1,SUBMISSIONYEARQUARTER,Char,6.0
2,PN_VIOLATION_ID,Num,
3,COMPL_PER_END_DATE,Date,
4,VIOLATION_CODE,Char,2.0
5,CONTAMINANT_CODE,Char,4.0
6,FIRST_REPORTED_DATE,Date,
7,LAST_REPORTED_DATE,Date,
8,COMPL_PER_BEGIN_DATE,Date,
9,RELATED_VIOLATION_ID,Char,40.0


In [57]:
pn_violation_assoc_df.PN_VIOLATION_ID.dropna().map(lambda x: len(x)).max()

20

In [38]:
data_dictionary = {
    p.find_all('strong')[0].getText().replace(u'\xa0', u'') : p.getText().replace(u'\xa0', u' ')
    for p in html_soup.find_all("div", {"class": 'field-item even'})[0].find_all('p') 
    if p.find_all('strong') and len(p.find_all('strong')) == 1 and p.find_all('strong')[0].getText().replace(u'\xa0', u'') in list(pn_violation_assoc_df.columns)
}
pd.set_option('display.max_colwidth', None)
col_desc = pd.DataFrame([data_dictionary[col] for col in data_dictionary], columns=['raw_desc'])

col_desc[['COLUMN', 'DESCRIPTION']] = col_desc['raw_desc'].str.split('-', 1, expand=True)
col_desc = col_desc[['COLUMN', 'DESCRIPTION']]
col_desc

Unnamed: 0,COLUMN,DESCRIPTION
0,CONTAMINANT_CODE,A code value that represents a contaminant for which a public water system has incurred a violation of a primary drinking water regulation. A full description of the codes can be accessed in the SDWA_REF_CODE_VALUES.csv.
1,COMPL_PER_END_DATE,"End of the compliance period. Note that for open-ended compliance periods, the COMPL_PER_END_DATE is listed as blank (MM/DD/YYYY format)."
2,FIRST_REPORTED_DATE,The first reported date for the milestone event. The date format is MM/DD/YYYY.
3,LAST_REPORTED_DATE,The last reported date for the milestone event. The date format is MM/DD/YYYY.
4,PN_VIOLATION_ID,Unique identifier identifying the public notification.
5,PWSID,"A unique identifying code for a public water system in SDWIS. The PWSID consists of a two-letter state or region code, followed by seven digits."
6,SUBMISSIONYEARQUARTER,The fiscal year and quarter when the event took place.
7,VIOLATION_CODE,A full description of violation codes can be accessed in the SDWA_REF_CODE_VALUES (CSV) table.


### Generating create SQL statement to create 'pn_violation_assoc' table to store pn violation details

In [41]:
temp_dict = {}

def space(n):
    return ' ' * (max([len(col) for col in pn_violation_assoc_df.columns]) + 5 - n)

for ind in column_datatype.index:
    col_name = column_datatype['Element'][ind]
    data_type = column_datatype['Data Type'][ind]
    length = column_datatype['Length'][ind]
    data_type = 'VARCHAR' if data_type == 'Char' else 'DATE' if data_type == 'Date' else 'INT'
    data_type = data_type + '('+ length +')' if length != '' and data_type == 'VARCHAR' else data_type + '(255)' if data_type == 'VARCHAR' else data_type
    temp_dict[col_name] = data_type
print('CREATE TABLE PN_VIOLATIONS_ASSOC (')
for col in pn_violation_assoc_df.columns:
    print(f'\t{col}{space(len(col))}{temp_dict[col]}', end='')
    if col != pn_violation_assoc_df.columns[-1]:
        print(',')
    else:
        print()
print(') ENGINE = InnoDB;')

CREATE TABLE PN_VIOLATIONS_ASSOC (
	SUBMISSIONYEARQUARTER     VARCHAR(6),
	PWSID                     VARCHAR(9),
	PN_VIOLATION_ID           INT,
	RELATED_VIOLATION_ID      VARCHAR(40),
	COMPL_PER_BEGIN_DATE      DATE,
	COMPL_PER_END_DATE        DATE,
	VIOLATION_CODE            VARCHAR(2),
	CONTAMINANT_CODE          VARCHAR(4),
	FIRST_REPORTED_DATE       DATE,
	LAST_REPORTED_DATE        DATE
) ENGINE = InnoDB;


In [46]:
pn_violation_assoc_df.sort_values(['PWSID', 'PN_VIOLATION_ID'], ascending=[True, True]).to_csv('../data/processed_data/PN_VIOLATIONS_ASSOC.csv', index=False) # violations.groupby(['PWSID', 'VIOLATION_ID']).size()

In [45]:
columns = pn_violation_assoc_df.columns.to_list()

date_columns = ['COMPL_PER_BEGIN_DATE', 'COMPL_PER_END_DATE', 'FIRST_REPORTED_DATE', 'LAST_REPORTED_DATE']

print('''
LOAD DATA INFILE 'C:/ProgramData/MySQL/MySQL Server 8.0/Uploads/PN_VIOLATIONS_ASSOC.csv'
INTO TABLE PN_VIOLATIONS_ASSOC 
FIELDS TERMINATED BY ','
ESCAPED BY ''
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\\n'
IGNORE 1 ROWS
''', end='')
print('(', end='')
for col in columns:
    print(f'@{col}', end='')
    if col != columns[-1]:
        print(',', end='')
print(')')
print('SET')
for col in columns:
    if col in date_columns:
        print(f'{col} = IF(@{col} = \'\', NULL, STR_TO_DATE(@{col}, \'%Y-%m-%d\')),')
    else:
        print(f'{col} = IF(@{col} = \'\', NULL, @{col}),')


LOAD DATA INFILE 'C:/ProgramData/MySQL/MySQL Server 8.0/Uploads/PN_VIOLATIONS_ASSOC.csv'
INTO TABLE PN_VIOLATIONS_ASSOC 
FIELDS TERMINATED BY ','
ESCAPED BY ''
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS
(@SUBMISSIONYEARQUARTER,@PWSID,@PN_VIOLATION_ID,@RELATED_VIOLATION_ID,@COMPL_PER_BEGIN_DATE,@COMPL_PER_END_DATE,@VIOLATION_CODE,@CONTAMINANT_CODE,@FIRST_REPORTED_DATE,@LAST_REPORTED_DATE)
SET
SUBMISSIONYEARQUARTER = IF(@SUBMISSIONYEARQUARTER = '', NULL, @SUBMISSIONYEARQUARTER),
PWSID = IF(@PWSID = '', NULL, @PWSID),
PN_VIOLATION_ID = IF(@PN_VIOLATION_ID = '', NULL, @PN_VIOLATION_ID),
RELATED_VIOLATION_ID = IF(@RELATED_VIOLATION_ID = '', NULL, @RELATED_VIOLATION_ID),
COMPL_PER_BEGIN_DATE = IF(@COMPL_PER_BEGIN_DATE = '', NULL, STR_TO_DATE(@COMPL_PER_BEGIN_DATE, '%Y-%m-%d')),
COMPL_PER_END_DATE = IF(@COMPL_PER_END_DATE = '', NULL, STR_TO_DATE(@COMPL_PER_END_DATE, '%Y-%m-%d')),
VIOLATION_CODE = IF(@VIOLATION_CODE = '', NULL, @VIOLATION_CODE),
CONTAMINANT_CODE = IF(@C