# Project 2 - Lab 5 - Filter and aggregate the water quality data

Recall that one of the files (starts with `mces`) contains water quality measurements for lakes in the Twin Cities.  In this lab, we will narrow down the list of lakes for which we have at least one of each measurement type (phosphorus and secchi depth) for each year between 2004 and 2015.

**Important note.** Recall that we fixed an issue with the water quality data in our terminal exploration of the data. Be sure to use the corrected version of the water quality data located in the `data/MinneMUDAC_raw_files_fixed/mces_lakes_1999_2014_v2.txt` file.

In [1]:
import polars as pl
import polars.selectors as cs
from glob import glob
import re
from functools import reduce
from operator import mul, add

## Problem 1 - Inspect the data

We will be focusing on two of water quality measurements: phosphorus and secchi depth.  Before we start trimming the data set, we should explore these metrics.

1. Each of the measures has a `QUALIFIER` column.  Group and aggregate by each of these columns and note any problematic values.  **Hint.** This search should indicate that some of the phosphorus measurements should be dropped.  Make sure you include this action as part of your primary query.
2. Each measure also includes a `Units` column.  Check that all measurement are in the same units, and convert as needed.

In [2]:
(mces_cols_to_keep := [
 'DNR_ID_Site_Number',
 'END_DATE',
 'LAKE_NAME',
 'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude'
])

['DNR_ID_Site_Number',
 'END_DATE',
 'LAKE_NAME',
 'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude']

In [6]:
# Your code here
schema_override_lakes = {'longitude': pl.String(), 'latitude': pl.String()}

(
    mces :=
    pl.read_csv('./data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt', 
                separator="\t",
                infer_schema_length=10000,
                schema_overrides=schema_override_lakes,
                columns=mces_cols_to_keep
                )
)

LAKE_NAME,DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude
str,str,str,str,f64,str,str,str,f64,str,str,str,str
"""Acorn Lake""","""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
"""Acorn Lake""","""82010200-01""","""2006-09-30""",,,,"""m""",,,,"""mg/L""","""-92.97171054""","""45.01655642"""
"""Acorn Lake""","""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
"""Acorn Lake""","""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
"""Acorn Lake""","""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642"""
…,…,…,…,…,…,…,…,…,…,…,…,…
"""Zumbra Lake""","""10004100-01""","""2002-09-16""",,,,"""m""",,0.224,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""
"""Zumbra Lake""","""10004100-01""","""2002-10-01""",,2.7,"""Approved""","""m""","""~""",0.026,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""
"""Zumbra Lake""","""10004100-01""","""2002-10-01""",,,,"""m""","""~""",0.015,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""
"""Zumbra Lake""","""10004100-01""","""2002-10-14""",,3.0,"""Approved""","""m""","""~""",0.011,"""Approved""","""mg/L""","""-93.66768906""","""44.88381717"""


In [12]:
mces.group_by('Secchi_Depth_QUALIFIER').len()

Secchi_Depth_QUALIFIER,len
str,u32
"""Approved""",35104
,13153


In [13]:
mces.group_by('Total_Phosphorus_QUALIFIER').len()

Total_Phosphorus_QUALIFIER,len
str,u32
,4583
"""Suspect""",35
"""Approved""",43639


In [14]:
mces.group_by('Secchi_Depth_Units').len()

Secchi_Depth_Units,len
str,u32
"""m""",48257


In [15]:
mces.group_by('Total_Phosphorus_Units').len()

Total_Phosphorus_Units,len
str,u32
"""mg/L""",48257


## Problem 2 - Find filter and aggregate.

#### Tasks

Remember that our goal is to narrow the data to one row per lake per year.  Build a query that groups and aggregates to find the yearly average values for both phosphorus and secchi depth.  To do this your will want to

1. Filter based on what you learned in **Problem 1.**
2. Make sure that the `END_DATE` has the correct type and extract the year.  
3. Filter to the correct range of years.
4. Now you should group and aggregate to compute the yearly means.  We want to keep both the `LAKE_NAME` and lake ID to allow us to join these data to the parcel features we will construct in the next lab.

In [7]:
# Your code here
(
    mces :=
    pl.read_csv('./data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt', 
                separator="\t",
                infer_schema_length=10000,
                schema_overrides=schema_override_lakes,
                columns=mces_cols_to_keep
                )
    .filter((pl.col('Secchi_Depth_QUALIFIER') == 'Approved') &
            (pl.col('Total_Phosphorus_QUALIFIER') == pl.lit('Approved')))
    .with_columns(year = pl.col('END_DATE').str.split('-').list.get(0).cast(pl.Int64()))
    .filter((pl.col('year') > 2003) & (pl.col('year') < 2016))
)

LAKE_NAME,DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude,year
str,str,str,str,f64,str,str,str,f64,str,str,str,str,i64
"""Acorn Lake""","""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
"""Acorn Lake""","""82010200-01""","""2006-06-11""",,0.5,"""Approved""","""m""",,0.058,"""Approved""","""mg/L""","""-92.97171054""","""45.01655642""",2006
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""Woodpile Lake""","""82013200-01""","""2014-08-13""",,3.05,"""Approved""","""m""",,0.023,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014
"""Woodpile Lake""","""82013200-01""","""2014-08-25""",,2.74,"""Approved""","""m""",,0.026,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014
"""Woodpile Lake""","""82013200-01""","""2014-09-09""",,2.13,"""Approved""","""m""",,0.03,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014
"""Woodpile Lake""","""82013200-01""","""2014-09-22""",,3.66,"""Approved""","""m""",,0.04,"""Approved""","""mg/L""","""-92.90332792""","""45.06868739""",2014


In [8]:
(
    mces_summaries :=
    mces
    .group_by([
        'DNR_ID_Site_Number',
        'year',
        'LAKE_NAME'  
    ])
    .agg(cs.ends_with('RESULT').mean())
)

DNR_ID_Site_Number,year,LAKE_NAME,Secchi_Depth_RESULT,Total_Phosphorus_RESULT
str,i64,str,f64,f64
"""82012000-01""",2012,"""Benz Lake""",1.0425,0.12175
"""02065400-01""",2009,"""Cenaiko Lake""",1.954545,0.018909
"""10000200-02""",2010,"""Riley Lake""",2.1,0.077
"""82012300-01""",2006,"""Bass Lake""",2.452048,0.053476
"""62005800-01""",2010,"""Little Johanna Lake""",0.8125,0.06375
…,…,…,…,…
"""62004901-01""",2009,"""Langton Lake""",0.99,0.052
"""82051400-01""",2014,"""Rest Area Pond""",0.979714,0.0775
"""27005700-01""",2008,"""Meadow Lake""",0.617143,0.299071
"""10021700-01""",2010,"""Jonathan Lake""",0.532143,0.138143


## Problem 3 - Find lakes with complete yearly averages.

We want to make sure that we don't have any missing data in the target vectors, so we need to build a query that leads to a list of lake names and codes that fit the following criteria.

1. Only contains years after 2003.
2. Has a non-missing value for both means.
3. Contains both the lake name and the lake code.

You should save this list of lake IDs in a variable named `lakes_w_complete_info` in a file named `lake.py`.  Restart your kernel and confirm that you can import this data.

In [43]:
# Your code here
(
    mces_summaries
    .select(pl.col('year'))
    .unique()
    .sort('year')
)

year
i64
2004
2005
2006
2007
2008
…
2010
2011
2012
2013


In [11]:
(complete_ids :=
    mces_summaries
    .filter(
        (pl.col('DNR_ID_Site_Number').is_not_null()) &
        (pl.col('LAKE_NAME').is_not_null())
    )
    .with_columns(summary = pl.when(pl.col('Secchi_Depth_RESULT').is_not_null() & pl.col('Total_Phosphorus_RESULT').is_not_null())
                              .then(pl.lit(1)).otherwise(0)
                  )
    .pivot(
        values = 'summary',
        index = ['DNR_ID_Site_Number', 'LAKE_NAME'],
        columns = 'year'
    )
    .with_columns(in_all = pl.reduce(mul, cs.integer()))
    .filter(pl.col('in_all').is_not_null())
    .select(pl.col('DNR_ID_Site_Number'))
    .unique()
    .get_column('DNR_ID_Site_Number')
    .to_list()
)

  .pivot(


['82007700-01',
 '19002200-01',
 '82013700-01',
 '27003501-01',
 '82010400-01',
 '82011602-01',
 '82008700-01',
 '19003100-01',
 '19002300-01',
 '10001100-01',
 '19034800-01',
 '10001900-01',
 '19002601-01',
 '19002700-01',
 '10005200-01',
 '02000500-01',
 '27005300-01',
 '19003300-01',
 '70002600-01',
 '27062700-01',
 '82012200-01',
 '82010300-01',
 '82009002-01',
 '13005300-01',
 '27007000-01',
 '82012300-01',
 '82036800-01',
 '10000200-01',
 '82009400-01',
 '82009700-01',
 '82002000-01',
 '82015900-01',
 '10012100-01',
 '82011301-01',
 '19044600-01',
 '82003400-01',
 '82009200-01',
 '27071100-01',
 '82010100-01',
 '19002900-01',
 '19002500-01',
 '10009500-01',
 '19002400-01',
 '19002100-01',
 '82005400-01',
 '27004201-01',
 '82015300-01',
 '82033400-01',
 '82008900-01']

In [10]:
from lakes import complete_lake_ids

complete_lake_ids

['82003400-01',
 '82010100-01',
 '82005400-01',
 '10001100-01',
 '82009002-01',
 '27004201-01',
 '82002000-01',
 '19002100-01',
 '82008700-01',
 '82015900-01',
 '82011301-01',
 '13005300-01',
 '82008900-01',
 '19002700-01',
 '19002500-01',
 '10001900-01',
 '27007000-01',
 '82009400-01',
 '82009200-01',
 '82012300-01',
 '19002300-01',
 '19002900-01',
 '82015300-01',
 '19002601-01',
 '27005300-01',
 '27062700-01',
 '19034800-01',
 '10012100-01',
 '82033400-01',
 '82013700-01',
 '82036800-01',
 '82009700-01',
 '19044600-01',
 '02000500-01',
 '82012200-01',
 '19003100-01',
 '27003501-01',
 '19003300-01',
 '10005200-01',
 '10000200-01',
 '82010400-01',
 '10009500-01',
 '82010300-01',
 '82007700-01',
 '19002400-01',
 '82011602-01',
 '19002200-01',
 '27071100-01',
 '70002600-01']

## Problem 4 - Create and write the final water quality table.

Finally, you should filter the table from **Problem 2.** to the lakes with complete information, then write this table to a parquet file named `water_quality_by_year.parquet`.

In [12]:
# Your code here
(
    mces_summaries_final :=
    mces
    .filter(pl.col('DNR_ID_Site_Number').is_in(complete_ids))
    .group_by([
        'DNR_ID_Site_Number',
        'year',
        'LAKE_NAME'  
    ])
    .agg(cs.ends_with('RESULT').mean())
)

DNR_ID_Site_Number,year,LAKE_NAME,Secchi_Depth_RESULT,Total_Phosphorus_RESULT
str,i64,str,f64,f64
"""27007000-01""",2012,"""Mitchell Lake""",1.691667,0.0585
"""82011301-01""",2007,"""Goose Lake""",0.921833,0.080556
"""82010300-01""",2009,"""Olson Lake""",3.228571,0.018714
"""82009002-01""",2008,"""Wilmes Lake""",1.109091,0.078727
"""19002500-01""",2009,"""Keller Lake""",0.752222,0.097889
…,…,…,…,…
"""19002100-01""",2012,"""Alimagnet Lake""",1.404545,0.068909
"""82015900-01""",2013,"""Forest Lake""",1.610588,0.03125
"""82015300-01""",2012,"""Sunset Lake""",3.164286,0.025429
"""82005400-01""",2009,"""Bone Lake""",1.808333,0.039417


In [15]:
(
   mces_summaries_final.write_parquet('./data/water_quality_by_year.parquet')
)