# Project 2 - Lab 5 - Filter and aggregate the water quality data

Recall that one of the files (starts with `mces`) contains water quality measurements for lakes in the Twin Cities.  In this lab, we will narrow down the list of lakes for which we have at least one of each measurement type (phosphorus and secchi depth) for each year between 2004 and 2015.

**Important note.** Recall that we fixed an issue with the water quality data in our terminal exploration of the data. Be sure to use the corrected version of the water quality data located in the `data/MinneMUDAC_raw_files_fixed/mces_lakes_1999_2014_v2.txt` file.

## Problem 1 - Inspect the data

We will be focusing on two of water quality measurements: phosphorus and secchi depth.  Before we start trimming the data set, we should explore these metrics.

1. Each of the measures has a `QUALIFIER` column.  Group and aggregate by each of these columns and note any problematic values.  **Hint.** This search should indicate that some of the phosphorus measurements should be dropped.  Make sure you include this action as part of your primary query.
2. Each measure also includes a `Units` column.  Check that all measurement are in the same units, and convert as needed.

In [20]:
# Your code here

from glob import glob

glob('./data/MinneMUDAC_raw_files/*.txt')

['./data/MinneMUDAC_raw_files/2003_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2015_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2009_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/mces_lakes_1999_2014.txt',
 './data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt',
 './data/MinneMUDAC_raw_files/2007_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2011_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2005_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2013_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt',
 './data/MinneMUDAC_raw_files/2014_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2002_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2008_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2010_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2006_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2012_metro_tax_parcels.txt',
 './data/MinneMUDAC_raw_files/2004_metro_tax_pa

In [21]:
(water_quality_columns_to_keep :=
 ['DNR_ID_Site_Number',
  'Secchi_Depth_RESULT_SIGN',
  'Secchi_Depth_RESULT',
  'Secchi_Depth_QUALIFIER',
  'Secchi_Depth_Units',
  'Total_Phosphorus_RESULT_SIGN',
  'Total_Phosphorus_RESULT',
  'Total_Phosphorus_QUALIFIER',
  'Total_Phosphorus_Units',
  'longitude',
  'latitude',
  'END_DATE',
  'LAKE_NAME'
 ])

['DNR_ID_Site_Number',
 'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude',
 'END_DATE',
 'LAKE_NAME']

In [22]:
import polars as pl

(water_quality := (pl.read_csv('./data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt',
                               separator = '\t',
                               infer_schema_length = 10000,
                               columns = water_quality_columns_to_keep
                              )
                  )
)

LAKE_NAME,DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude
str,str,str,str,f64,str,str,str,f64,str,str,f64,f64
"""Acorn Lake""","""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-09-30""",,,,"""m""",,,,"""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""",-92.971711,45.016556
…,…,…,…,…,…,…,…,…,…,…,…,…
"""Zumbra Lake""","""10004100-01""","""2002-09-16""",,,,"""m""",,0.224,"""Approved""","""mg/L""",-93.667689,44.883817
"""Zumbra Lake""","""10004100-01""","""2002-10-01""",,2.7,"""Approved""","""m""","""~""",0.026,"""Approved""","""mg/L""",-93.667689,44.883817
"""Zumbra Lake""","""10004100-01""","""2002-10-01""",,,,"""m""","""~""",0.015,"""Approved""","""mg/L""",-93.667689,44.883817
"""Zumbra Lake""","""10004100-01""","""2002-10-14""",,3.0,"""Approved""","""m""","""~""",0.011,"""Approved""","""mg/L""",-93.667689,44.883817


## Problem 2 - Find filter and aggregate.

#### Tasks

Remember that our goal is to narrow the data to one row per lake per year.  Build a query that groups and aggregates to find the yearly average values for both phosphorus and secchi depth.  To do this your will want to

1. Filter based on what you learned in **Problem 1.**
2. Make sure that the `END_DATE` has the correct type and extract the year.  
3. Filter to the correct range of years.
4. Now you should group and aggregate to compute the yearly means.  We want to keep both the `LAKE_NAME` and lake ID to allow us to join these data to the parcel features we will construct in the next lab.

In [23]:
# Your code here

(water_quality_filtered := (water_quality
                            .filter((pl.col('Secchi_Depth_QUALIFIER') == 'Approved') &
                                    (pl.col('Total_Phosphorus_QUALIFIER') == 'Approved')
                                   )
                            .with_columns(pl.col('END_DATE').str.split('-').list.get(0).cast(pl.Int32).alias('Year'))
                            .filter((pl.col('Year') >= 2004) & ( pl.col('Year') <= 2015))
                           )
)

LAKE_NAME,DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude,Year
str,str,str,str,f64,str,str,str,f64,str,str,f64,f64,i32
"""Acorn Lake""","""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-06-11""",,0.5,"""Approved""","""m""",,0.058,"""Approved""","""mg/L""",-92.971711,45.016556,2006
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""Woodpile Lake""","""82013200-01""","""2014-08-13""",,3.05,"""Approved""","""m""",,0.023,"""Approved""","""mg/L""",-92.903328,45.068687,2014
"""Woodpile Lake""","""82013200-01""","""2014-08-25""",,2.74,"""Approved""","""m""",,0.026,"""Approved""","""mg/L""",-92.903328,45.068687,2014
"""Woodpile Lake""","""82013200-01""","""2014-09-09""",,2.13,"""Approved""","""m""",,0.03,"""Approved""","""mg/L""",-92.903328,45.068687,2014
"""Woodpile Lake""","""82013200-01""","""2014-09-22""",,3.66,"""Approved""","""m""",,0.04,"""Approved""","""mg/L""",-92.903328,45.068687,2014


In [24]:
(water_quality_summaries := (
    water_quality_filtered
    .group_by(['DNR_ID_Site_Number',
               'Year',
               'LAKE_NAME',
              ])
    .agg([pl.col('Secchi_Depth_RESULT').mean().alias('average_secchi_depth'),
          pl.col('Total_Phosphorus_RESULT').mean().alias('average_total_phosphorus')
         ])
))

DNR_ID_Site_Number,Year,LAKE_NAME,average_secchi_depth,average_total_phosphorus
str,i32,str,f64,f64
"""82013700-01""",2012,"""Fish Lake""",1.506333,0.108897
"""02000900-01""",2006,"""Reshanau Lake""",0.616667,0.1055
"""82013500-01""",2006,"""Echo Lake""",0.741,0.071
"""82009700-01""",2009,"""La Lake""",0.9625,0.133875
"""10009500-01""",2009,"""Swede Lake""",0.410714,0.387929
…,…,…,…,…
"""27003501-01""",2011,"""Sweeney Lake""",1.166667,0.041389
"""27005300-01""",2012,"""Cobblecrest Lake""",0.55,0.1185
"""27004201-01""",2005,"""Twin Lake""",1.434054,0.082378
"""19019800-01""",2013,"""Scout Lake""",0.939286,0.075143


## Problem 3 - Find lakes with complete yearly averages.

We want to make sure that we don't have any missing data in the target vectors, so we need to build a query that leads to a list of lake names and codes that fit the following criteria.

1. Only contains years after 2003.
2. Has a non-missing value for both means.
3. Contains both the lake name and the lake code.

You should save this list of lake IDs in a variable named `lakes_w_complete_info` in a file named `lake.py`.  Restart your kernel and confirm that you can import this data.

In [25]:
# Your code here

import polars.selectors as cs
from operator import mul

(water_quality_complete_information := (water_quality_summaries
                                        .with_columns([
                                            ((pl.col('average_secchi_depth')
                                                .is_not_null() & pl.col('average_total_phosphorus').is_not_null())
                                                .cast(pl.Int8)).alias('summary_complete')
                                        ])
                                        .pivot(
                                            values = 'summary_complete',
                                            index = ['DNR_ID_Site_Number', 'LAKE_NAME'],
                                            on = 'Year',
                                            aggregate_function = 'sum'
                                        )
                                        .with_columns(all_complete = pl.reduce(mul, cs.integer()))
                                        .filter(pl.col('all_complete') == 1)
                                       )
)

DNR_ID_Site_Number,LAKE_NAME,2012,2006,2009,2008,2007,2014,2010,2011,2004,2013,2005,all_complete
str,str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64
"""82013700-01""","""Fish Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82009700-01""","""La Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""10009500-01""","""Swede Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82010300-01""","""Olson Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82009400-01""","""Colby Lake""",1,1,1,1,1,1,1,1,1,1,1,1
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""82015300-01""","""Sunset Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82009200-01""","""Powers Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""27062700-01""","""Northwood Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""19034800-01""","""Valley Lake""",1,1,1,1,1,1,1,1,1,1,1,1


In [26]:
(dnr_id_site_numbers :=
 water_quality_complete_information
 .get_column('DNR_ID_Site_Number')
 .to_list()
)

['82013700-01',
 '82009700-01',
 '10009500-01',
 '82010300-01',
 '82009400-01',
 '10001100-01',
 '10012100-01',
 '82009002-01',
 '82011301-01',
 '19044600-01',
 '27004201-01',
 '13005300-01',
 '10000200-01',
 '27071100-01',
 '19003300-01',
 '82015900-01',
 '82012200-01',
 '82008700-01',
 '10005200-01',
 '19002500-01',
 '82010400-01',
 '27005300-01',
 '82003400-01',
 '19002900-01',
 '82036800-01',
 '82010100-01',
 '19002601-01',
 '19003100-01',
 '82012300-01',
 '19002100-01',
 '19002300-01',
 '19002700-01',
 '19002400-01',
 '27003501-01',
 '70002600-01',
 '27007000-01',
 '19002200-01',
 '82007700-01',
 '82011602-01',
 '82008900-01',
 '82002000-01',
 '82005400-01',
 '10001900-01',
 '02000500-01',
 '82015300-01',
 '82009200-01',
 '27062700-01',
 '19034800-01',
 '82033400-01']

## Problem 4 - Create and write the final water quality table.

Finally, you should filter the table from **Problem 2.** to the lakes with complete information, then write this table to a parquet file named `water_quality_by_year.parquet`.

In [30]:
# Your code here

(water_quality_summaries_complete := (water_quality_summaries
                                      .filter(pl.col('DNR_ID_Site_Number').is_in(dnr_id_site_numbers))
                                     )
)

DNR_ID_Site_Number,Year,LAKE_NAME,average_secchi_depth,average_total_phosphorus
str,i32,str,f64,f64
"""82013700-01""",2012,"""Fish Lake""",1.506333,0.108897
"""82009700-01""",2009,"""La Lake""",0.9625,0.133875
"""10009500-01""",2009,"""Swede Lake""",0.410714,0.387929
"""82010300-01""",2006,"""Olson Lake""",2.926,0.0464
"""82009400-01""",2006,"""Colby Lake""",0.642857,0.254143
…,…,…,…,…
"""82010400-01""",2006,"""Jane Lake""",5.122222,0.014
"""82009002-01""",2004,"""Wilmes Lake""",1.408333,0.084917
"""27003501-01""",2011,"""Sweeney Lake""",1.166667,0.041389
"""27005300-01""",2012,"""Cobblecrest Lake""",0.55,0.1185


In [28]:
water_quality_summaries_complete.write_parquet('./data/water_quality_by_year.parquet')