<a href="https://colab.research.google.com/github/tbnorth/LOads_colab/blob/master/LOads.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install the PyDrive wrapper & import libraries.
# This only needs to be done once per notebook.
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client.
# This only needs to be done once per notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)



In [121]:
import os
from pathlib import Path
from google.colab import drive

drive.mount("/content/gdrive")
ROOT = Path("/content/gdrive/My Drive/Colab Notebooks")
BASE = ROOT.joinpath('LOads')
BASE.mkdir(exist_ok=True)
os.chdir(BASE)


Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
import numpy as np
import pandas as pd
from collections import namedtuple
from dateutil.parser import parse

In [0]:
# uploaded get_wqdata.py to Colab Notebooks folder, now add to import path
import sys
if BASE not in sys.path:
    sys.path.append(str(BASE))
from get_wqdata import GetWQData

wq = GetWQData()

Want to work out which CharacteristicName values to request.  The WQ portal web interface says they come [from here](http://iaspub.epa.gov/sor_internet/registry/substreg/home/overview/home.do) but that's hard to search, so request all obs. in our bounding box and search that list instead.

In [124]:
query = dict(
    _desc="All sample types",
    bBox="-80.0402,43.0646,-75.9697,44.355",
    siteType=["Aggregate surface-water-use", "Stream"],
    sampleMedia=["Water", "water"],
    # siteid=["USGS-04231600"],
    # startDateLo="01-01-2018",
    # startDateHi="02-01-2018",
)
print("Reading data")
all_data = wq.get_data(query)  # gets path to .csv file
d = pd.read_csv(all_data)  # , nrows=10)
print("Data read")
print(d.columns)
po4 = set(d["CharacteristicName"])
keep = set()
for term in 'po4', 'phosphor', 'srp', 'phosphate', 'flow', 'guage', 'cfs':
    for cname in po4:
        if term in cname.lower():
            keep.add(cname)
print(sorted(keep))

Reading data


  interactivity=interactivity, compiler=compiler, result=result)


Data read
Index(['OrganizationIdentifier', 'OrganizationFormalName',
       'ActivityIdentifier', 'ActivityTypeCode', 'ActivityMediaName',
       'ActivityMediaSubdivisionName', 'ActivityStartDate',
       'ActivityStartTime/Time', 'ActivityStartTime/TimeZoneCode',
       'ActivityEndDate', 'ActivityEndTime/Time',
       'ActivityEndTime/TimeZoneCode',
       'ActivityDepthHeightMeasure/MeasureValue',
       'ActivityDepthHeightMeasure/MeasureUnitCode',
       'ActivityDepthAltitudeReferencePointText',
       'ActivityTopDepthHeightMeasure/MeasureValue',
       'ActivityTopDepthHeightMeasure/MeasureUnitCode',
       'ActivityBottomDepthHeightMeasure/MeasureValue',
       'ActivityBottomDepthHeightMeasure/MeasureUnitCode', 'ProjectIdentifier',
       'ActivityConductingOrganizationText', 'MonitoringLocationIdentifier',
       'ActivityCommentText', 'SampleAquifer', 'HydrologicCondition',
       'HydrologicEvent', 'SampleCollectionMethod/MethodIdentifier',
       'SampleCollectionMethod/

Now make a sensible list from the values we found.

In [0]:
use_cache = {
    'characteric_names': [
        # 'O-Ethyl O-methyl S-propyl phosphorothioate',
        # 'O-Ethyl S-methyl S-propyl phosphorodithioate',
        # 'O-Ethyl S-propyl phosphorothioate',
        # 'Organic phosphorus',
        'Orthophosphate',
        'Orthophosphate as P',
        'Phosphate-phosphorus',
        'Phosphate-phosphorus as P',
        'Phosphorus',
        'Stream flow, instantaneous',
        'Stream flow, mean. daily',
        # 'Tributyl phosphate',
        # 'Triphenyl phosphate',
        # 'Tris(1,3-dichloro-2-propyl)phosphate',
        # 'Tris(2-butoxyethyl) phosphate',
        # 'Tris(2-chloroethyl) phosphate',
    ]
}

names = use_cache.get('characteric_names')

Now grab all the data we're interested in

In [126]:
query = dict(
    _desc="Target sample types",
    bBox="-80.0402,43.0646,-75.9697,44.355",
    siteType=["Aggregate surface-water-use", "Stream"],
    sampleMedia=["Water", "water"],
    characteristicName=names,
)
print("Reading data")
# first get the Site info. flavored response
data = wq.get_data(query, type_='site')  # gets path to .csv file
site = pd.read_csv(data)
# print('\n'.join(sorted(site.columns)))
# then get the Result info.
data = wq.get_data(query, type_='result')
d = pd.read_csv(data, low_memory=False)
d.columns = [i.replace('/', '_') for i in d.columns]
# get rid of "mg/l<space><space><space>" units
for col in ['ResultMeasure_MeasureUnitCode']:
    d[col] = [str(i).strip() for i in d[col]]
had = len(d)
d = d.loc[d['ResultMeasure_MeasureUnitCode'] != 'nan', :]
print("Lost %d for missing units" % (had - len(d)))


Reading data
Lost 2251 for missing units


In [127]:
# find common columns
common = set(site.columns).intersection(set(d.columns))
print('\n'.join(sorted(common)))
# use this common column
common = 'MonitoringLocationIdentifier'

MonitoringLocationIdentifier
OrganizationFormalName
OrganizationIdentifier
ProviderName


In [128]:
# create a list of unique sites
locs = site.loc[:, [common, 'LatitudeMeasure', 'LongitudeMeasure']]
locs.drop_duplicates(inplace=True)
length = len(locs)
length

230

In [129]:
# copy lat / lon to main data
old_len = len(d)
d = d.join(locs, rsuffix='CULL')
assert len(d) == old_len
d.drop(columns=[i for i in d.columns if 'CULL' in i], inplace=True)
d.head()

Unnamed: 0,OrganizationIdentifier,OrganizationFormalName,ActivityIdentifier,ActivityTypeCode,ActivityMediaName,ActivityMediaSubdivisionName,ActivityStartDate,ActivityStartTime_Time,ActivityStartTime_TimeZoneCode,ActivityEndDate,ActivityEndTime_Time,ActivityEndTime_TimeZoneCode,ActivityDepthHeightMeasure_MeasureValue,ActivityDepthHeightMeasure_MeasureUnitCode,ActivityDepthAltitudeReferencePointText,ActivityTopDepthHeightMeasure_MeasureValue,ActivityTopDepthHeightMeasure_MeasureUnitCode,ActivityBottomDepthHeightMeasure_MeasureValue,ActivityBottomDepthHeightMeasure_MeasureUnitCode,ProjectIdentifier,ActivityConductingOrganizationText,MonitoringLocationIdentifier,ActivityCommentText,SampleAquifer,HydrologicCondition,HydrologicEvent,SampleCollectionMethod_MethodIdentifier,SampleCollectionMethod_MethodIdentifierContext,SampleCollectionMethod_MethodName,SampleCollectionEquipmentName,ResultDetectionConditionText,CharacteristicName,ResultSampleFractionText,ResultMeasureValue,ResultMeasure_MeasureUnitCode,MeasureQualifierCode,ResultStatusIdentifier,StatisticalBaseCode,ResultValueTypeName,ResultWeightBasisText,ResultTimeBasisText,ResultTemperatureBasisText,ResultParticleSizeBasisText,PrecisionValue,ResultCommentText,USGSPCode,ResultDepthHeightMeasure_MeasureValue,ResultDepthHeightMeasure_MeasureUnitCode,ResultDepthAltitudeReferencePointText,SubjectTaxonomicName,SampleTissueAnatomyName,ResultAnalyticalMethod_MethodIdentifier,ResultAnalyticalMethod_MethodIdentifierContext,ResultAnalyticalMethod_MethodName,MethodDescriptionText,LaboratoryName,AnalysisStartDate,ResultLaboratoryCommentText,DetectionQuantitationLimitTypeName,DetectionQuantitationLimitMeasure_MeasureValue,DetectionQuantitationLimitMeasure_MeasureUnitCode,PreparationStartDate,ProviderName,LatitudeMeasure,LongitudeMeasure
0,USGS-NY,USGS New York Water Science Center,nwisny.01.96300165,Sample-Routine,Water,Surface Water,1963-05-08,,,,,,,,,,,,,,,USGS-04217500,,,Not determined,Routine sample,USGS,USGS,USGS,Unknown,,"Stream flow, mean. daily",,99.0,ft3/s,,Historical,Mean,Actual,,1 Day,,,,,60.0,,,,,,,,,,,,,,,,,NWIS,43.091111,-78.453889
1,USGS-NY,USGS New York Water Science Center,nwisny.01.96300166,Sample-Routine,Water,Surface Water,1963-07-02,,,,,,,,,,,,,,,USGS-04217500,,,Not determined,Routine sample,USGS,USGS,USGS,Unknown,,"Stream flow, mean. daily",,35.0,ft3/s,,Historical,Mean,Actual,,1 Day,,,,,60.0,,,,,,,,,,,,,,,,,NWIS,43.093056,-78.636111
2,USGS-NY,USGS New York Water Science Center,nwisny.01.96400190,Sample-Routine,Water,Surface Water,1964-03-05,04:50:00,EST,,,,,,,,,,,,,USGS-04217500,,,Not determined,Routine sample,USGS,USGS,USGS,Unknown,,"Stream flow, mean. daily",,2200.0,ft3/s,,Historical,Mean,Actual,,1 Day,,,,,60.0,,,,,,,,,,,,,,,,,NWIS,43.086169,-78.696976
3,USGS-NY,USGS New York Water Science Center,nwisny.01.96400191,Sample-Routine,Water,Surface Water,1964-04-16,,,,,,,,,,,,,,,USGS-04217500,,,Not determined,Routine sample,USGS,USGS,USGS,Unknown,,"Stream flow, mean. daily",,261.0,ft3/s,,Historical,Mean,Actual,,1 Day,,,,,60.0,,,,,,,,,,,,,,,,,NWIS,43.120336,-78.518081
4,USGS-NY,USGS New York Water Science Center,nwisny.01.96300169,Sample-Routine,Water,Surface Water,1963-05-08,,,,,,,,,,,,,,,USGS-04218000,,,Not determined,Routine sample,USGS,USGS,USGS,Unknown,,"Stream flow, mean. daily",,165.0,ft3/s,,Historical,Mean,Actual,,1 Day,,,,,60.0,,,,,,,,,,,,,,,,,NWIS,43.086169,-78.727532


Data has different `MonitoringLocationIdentifier`s for the same coords.

In [0]:
# generate a location based site field
dd = d.copy(deep=False)
# round lat lon to snap nearby points together
dd['_lat'] = np.round(dd['LatitudeMeasure'], 3)
dd['_lon'] = np.round(dd['LongitudeMeasure'], 3)
dd.set_index(['_lat', '_lon'], inplace=True)
# use mean of snapped points, same as original for
# points that were all together to start with
mean = dd.groupby(by=['_lat', '_lon']).mean()
dd = dd.join(mean, on=['_lat', '_lon'], rsuffix="MEAN")
d['lat'] = dd['LatitudeMeasureMEAN'].values
d['lon'] = dd['LongitudeMeasureMEAN'].values
d['site'] = np.round(d['lat'],4).astype(str) + np.round(d['lon'], 4).astype(str)
assert len(d) == old_len

Now copy site info. into locs table

In [131]:
d.set_index(['LatitudeMeasure', 'LongitudeMeasure'], inplace=True, drop=False)
locs = locs.join(d, on=['LatitudeMeasure', 'LongitudeMeasure'], rsuffix='CULL')
for col in [i for i in locs.columns if 'CULL' in i]:
    locs.drop(columns=col, inplace=True)
locs.drop_duplicates(inplace=True, subset=['site', 'MonitoringLocationIdentifier'])
locs.to_csv("locs.csv", index=False)
offset = max((locs['lat']-locs['LatitudeMeasure']).abs())
assert offset < 0.001, offset
assert len(locs) == length, (length, len(locs))
locs.to_csv("locs.csv")
offset

0.00013884999999902448

In [132]:
print("%d locations" % len(locs))
distinct = locs.set_index(["LatitudeMeasure", "LongitudeMeasure"]).index
print("%d distinct" % len(distinct.unique()))
distinct = locs.set_index(["lat", "lon"]).index
print("%d really distinct" % len(distinct.unique()))

230 locations
192 distinct
187 really distinct


In [133]:
# tabulate units used for results
d['ResultSampleFractionText'] = [
    i if i != 'nan' else 'NA'
    for i in d['ResultSampleFractionText'].astype(str)
]

res_types = (
    d.groupby(
        [
            'CharacteristicName',
            'ResultMeasure_MeasureUnitCode',
            'ResultSampleFractionText',
        ]
    )
    .count()
    .iloc[:, 0]
)
print(res_types)

CharacteristicName          ResultMeasure_MeasureUnitCode  ResultSampleFractionText
Orthophosphate              mg/l                           Dissolved                     181
                                                           NA                             36
                                                           Total                         119
                            mg/l as P                      Dissolved                   18392
                                                           Total                         380
                            mg/l asPO4                     Dissolved                   18393
Orthophosphate as P         mg/l                           Dissolved                     122
                                                           Total                           1
Phosphate-phosphorus        mg/l                           Total                         417
Phosphate-phosphorus as P   mg/l                           NA                  

In [0]:
duration = wq.db.get('duration', [])  # cache expensive calc. in wq.db
if not duration:
    for row in d.itertuples():
        try:
            t0 = parse(
                "%s %s"
                % (row.ActivityStartDate, row.ActivityStartTime_Time)
            )
            t1 = parse(
                "%s %s" % (row.ActivityEndDate, row.ActivityEndTime_Time)
            )
            duration.append((t1 - t0).seconds)
        except ValueError:
            duration.append(0)
    wq.db['duration'] = duration
d['duration'] = duration

Create an ID field for each sampling event

In [0]:
d['sampling'] = (
    d['ActivityStartTime_Time'].astype(str)
    + d['ActivityEndTime_Time'].astype(str)
    + ' '
    + d['ActivityStartDate'].astype(str)
    + ' '
    + d['ActivityEndDate'].astype(str)
    + ' '
    + d['LatitudeMeasure'].astype(str)
    + ' '
    + d['LongitudeMeasure'].astype(str)
)

## Save output data

In [0]:
d.to_csv('data.csv')

Now we want to match P conc. data and flow data for each sampling event

(start to refer to `d` as `data` from here on)

In [137]:
data = pd.read_csv("data.csv", low_memory=False)
pdata = data.loc[:, ['sampling', 'site']]
pdata.drop_duplicates(subset=['sampling'], inplace=True)
pdata_len = len(pdata)
print("%d into %d" % (len(data), pdata_len))
cname = 'CharacteristicName'
cunit = 'ResultMeasure_MeasureUnitCode'
vname = 'ResultMeasureValue'
TP = namedtuple("ToProcess", "to cname unit trans")
po4_to_p = (30.97 + 4 * 16) / 30.97
to_process = [
    TP('flow', 'Stream flow, instantaneous', 'ft3/s', None),
    TP(
        'flow',
        'Stream flow, instantaneous',
        'm3/sec',
        lambda x: x * 35.3147,
    ),
    TP('flow', 'Stream flow, mean. daily', 'ft3/s', None),
    TP('conc', 'Phosphorus', 'mg/l as P', None),
    TP('conc', 'Phosphorus', 'mg/l', None),
    TP('conc', 'Phosphorus', 'ug/l', lambda x: 1000 * x),
    TP('conc', 'Phosphorus', 'mg/l PO4', lambda x: x / po4_to_p),
    TP('conc', 'Phosphate-phosphorus as P', 'mg/l', None),
    TP('conc', 'Phosphate-phosphorus', 'mg/l', None),
    TP('conc', 'Orthophosphate as P', 'mg/l', None),
    TP('conc', 'Orthophosphate', 'mg/l as P', None),
    TP('conc', 'Orthophosphate', 'mg/l asPO4', lambda x: x / po4_to_p),
    TP('conc', 'Orthophosphate', 'mg/l', lambda x: x / po4_to_p),
    # exclude as it's in bed sediment
    # TP('conc', 'Phosphorus', 'mg/kg as P', None),
]

82386 into 21877


We're merging 82386 observations into 21877 flow + conc. records.  In a perfect world we'd have one flow and one conc. record for each sampling, but we have both redundant and mismatched (flow without conc. and visa versa) records, roughly 4:1 instead of 2:1.

`to_process` is an **ordered** list of items where each item reocords the field we're trying to fill (`to`), the `CharacteristicName` (`cname`) of our prefered source field, our prefered `unit`, and any conversion (`trans`) necessary to use this source field / unit combination.

Now we're going to fill the `to` fields (`flow`, `conc`) with the values selected using the list items, filling as many records with the first set of selected data, and only filling missing records with the next (less desireable) list item.

In [138]:
selected = 0
data.set_index('sampling', drop=False, inplace=True)
pdata.set_index('sampling', drop=False, inplace=True)
for tp_i, tp in enumerate(to_process):
    select = np.logical_and(
        data[cname] == tp.cname, data[cunit] == tp.unit
    )
    selected += sum(select)
    print(
        "%d Selected %d %s %s (%d)"
        % (
            tp_i, sum(select),
            tp.cname,
            tp.unit,
            len(data['sampling'][select].unique()),
        )
    )
    mean = data.loc[select, :]
    mean = mean.groupby(level=0).mean()
    pdata = pdata.join(mean.loc[:, vname])
    assert len(pdata) == pdata_len, (pdata_len, len(pdata))
    if tp.trans is not None:
        pdata[vname] = pdata[vname].map(tp.trans)

    def missing(x, f):
        if f not in x.columns:
            return np.ones(len(x)).astype(bool)
        x = x[f].astype(str)
        ans = np.logical_or(np.equal(x, 'None'), np.equal(x, ''))
        ans = np.logical_or(ans, np.equal(x, 'nan'))
        ans = np.logical_or(ans, np.equal(x, 'NaN'))
        return ans

    missing0 = missing(pdata, tp.to)
    if tp.to not in pdata.columns:
        pdata.rename(columns={vname: tp.to}, inplace=True)
    else:
        # print(len(missing), sum(missing), missing.shape)
        #X pdata[tp.to][missing0] = pdata[vname][missing0]
        pdata.loc[missing0, tp.to] = pdata[vname][missing0]
        pdata.drop(columns=vname, inplace=True)
    missing1 = missing(pdata, tp.to)
    print(
        "%d, Needed %d, used %d, still need %d"
        % (tp_i, sum(missing0), sum(missing0) - sum(missing1), sum(missing1))
    )
    assert len(pdata) == pdata_len, (pdata_len, len(pdata))
print("Selected %s total" % selected)
pdata = pdata[~np.isnan(pdata['conc'])]
pdata = pdata[~np.isnan(pdata['flow'])]
print("Lost %d to blank data" % (pdata_len-len(pdata)))
pdata.to_csv("pdata.csv")

0 Selected 3244 Stream flow, instantaneous ft3/s (3210)
0, Needed 21877, used 3210, still need 18667
1 Selected 3244 Stream flow, instantaneous m3/sec (3210)
1, Needed 18667, used 32, still need 18635
2 Selected 15321 Stream flow, mean. daily ft3/s (15191)
2, Needed 18635, used 15082, still need 3553
3 Selected 21072 Phosphorus mg/l as P (19628)
3, Needed 21877, used 19628, still need 2249
4 Selected 359 Phosphorus mg/l (299)
4, Needed 2249, used 292, still need 1957
5 Selected 3 Phosphorus ug/l (3)
5, Needed 1957, used 3, still need 1954
6 Selected 736 Phosphorus mg/l PO4 (680)
6, Needed 1954, used 110, still need 1844
7 Selected 358 Phosphate-phosphorus as P mg/l (335)
7, Needed 1844, used 334, still need 1510
8 Selected 417 Phosphate-phosphorus mg/l (404)
8, Needed 1510, used 7, still need 1503
9 Selected 123 Orthophosphate as P mg/l (115)
9, Needed 1503, used 0, still need 1503
10 Selected 18772 Orthophosphate mg/l as P (18192)
10, Needed 1503, used 374, still need 1129
11 Selected

In [139]:
# update locs with the number of obs. for each site
count = pdata.groupby(by=['site']).count().iloc[:, 0]
count.columns = ['count']
locs = locs.join(count, on='site')
locs.to_csv("locs.csv")

Unnamed: 0_level_0,sampling,site,flow,conc
sampling,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
08:00:00nan 1987-04-15 nan 43.0745076 -77.436104,08:00:00nan 1987-04-15 nan 43.0745076 -77.436104,43.0745-77.4361,1000.0,0.019892
08:00:00nan 1987-04-28 nan 43.090618400000004 -77.53083079999999,08:00:00nan 1987-04-28 nan 43.090618400000004 ...,43.0906-77.5308,320.0,0.010000
12:00:00nan 1988-03-22 nan 43.135062899999994 -77.4986078,12:00:00nan 1988-03-22 nan 43.135062899999994 ...,43.1351-77.4986,320.0,0.030001
12:45:00nan 1988-04-06 nan 43.147285 -77.5124973,12:45:00nan 1988-04-06 nan 43.147285 -77.5124973,43.1473-77.5125,2000.0,0.000000
11:45:00nan 1988-05-03 nan 43.1761737 -77.52666479999999,11:45:00nan 1988-05-03 nan 43.1761737 -77.5266...,43.1762-77.5267,640.0,0.000000
14:30:00nan 2015-04-08 nan nan nan,14:30:00nan 2015-04-08 nan nan nan,nannan,239000.0,0.013000
10:40:00nan 2015-06-11 nan nan nan,10:40:00nan 2015-06-11 nan nan nan,nannan,240000.0,0.007000
10:00:00nan 2015-07-09 nan nan nan,10:00:00nan 2015-07-09 nan nan nan,nannan,273000.0,0.009000
14:45:00nan 2015-07-30 nan nan nan,14:45:00nan 2015-07-30 nan nan nan,nannan,242000.0,0.010000
12:15:00nan 2015-11-03 nan nan nan,12:15:00nan 2015-11-03 nan nan nan,nannan,223000.0,0.026000


Now we need flow data for places without conc. data.  Doesn't seem to be in the WQ portal, not strictly speaking a WQ parameter (well, it's Water Quantity, not Water Quality).

Ran //waterservices.usgs.gov/nwis/site/?format=rdb&bBox=-80.040200,43.064600,-75.969700,44.355000&seriesCatalogOutput=true&siteType=ST&siteStatus=all and created `waterservices.usgs.gov.txt`


In [54]:
d = pd.read_csv(
    "waterservices.usgs.gov.txt", sep=r'\t', comment='#', engine='python'
)
print(len(d))
d.set_index(['dec_lat_va', 'dec_long_va'], inplace=True, drop=False)
print(len(d.index.unique()))
d.groupby(level=[0, 1]).min().to_csv("locs2.csv")

28902
286
