# Project 2 - Lab 4 - Filter and aggregate the water quality data

Recall that one of the files (starts with `mces`) contains water quality measurements for lakes in the Twin Cities.  In this lab, we will narrow down the list of lakes for which we have at least one of each measurement type (phosphorus and secchi depth) for each year between 2004 and 2015.

**Important note.** Recall that we fixed an issue with the water quality data in our terminal exploration of the data. Be sure to use the corrected version of the water quality data located in the `data/MinneMUDAC_raw_files_fixed/mces_lakes_1999_2014_v2.txt` file.

## Problem 1 - Inspect the data

We will be focusing on two of water quality measurements: phosphorus and secchi depth.  Before we start trimming the data set, we should explore these metrics.

1. Each of the measures has a `QUALIFIER` column.  Group and aggregate by each of these columns and note any problematic values.  **Hint.** This search should indicate that some of the phosphorus measurements should be dropped.  Make sure you include this action as part of your primary query.
2. Each measure also includes a `Units` column.  Check that all measurement are in the same units, and convert as needed.

In [2]:
import re,os, functools
import polars as pl
from glob import glob
import polars.selectors as cs
from glob import glob
from operator  import mul
from columns import cols_to_keep, column_schema # self made file to store column names and schema to be used for filtering that we found in our exploration of the data.

# the goal of this  file is to filter data and drop any columns that won't be necessary for  our analysis, output will be  a parquet file that will be used in the next step of our analysis.
# Will create final format of water quality data(one lake-year per row)  

In [3]:
# List all .txt files in the data/MinneMUDAC_raw_files_fixed folder using glob (no saving, no filtering)

glob('data/MinneMUDAC_raw_files/**/*.txt', recursive=True)

['data/MinneMUDAC_raw_files/2002_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2003_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2004_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2005_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2006_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2007_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2008_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2009_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2010_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2011_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2012_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2013_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2014_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/2015_metro_tax_parcels.txt',
 'data/MinneMUDAC_raw_files/mces_lakes_1999_2014.txt',
 'data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt',
 'data/MinneMUDAC_raw_files/Parcel_Lake_Monitoring_Site_Xref.txt']

In [4]:
# Grabs only the columns that we  need to keep in the lakes quality data 

(wq_cols_to_keep := ['DNR_ID_Site_Number',
'END_DATE',
'LAKE_NAME',
'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude'])

['DNR_ID_Site_Number',
 'END_DATE',
 'LAKE_NAME',
 'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude']

In [5]:
# Load the MCES water quality v2 data using polars and show result 
(
    water_quality := pl.read_csv(
        'data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt',
        separator='\t',
        infer_schema_length=10000,
        columns=wq_cols_to_keep
    )
)

LAKE_NAME,DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude
str,str,str,str,f64,str,str,str,f64,str,str,f64,f64
"""Acorn Lake""","""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-09-30""",,,,"""m""",,,,"""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""",-92.971711,45.016556
"""Acorn Lake""","""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""",-92.971711,45.016556
…,…,…,…,…,…,…,…,…,…,…,…,…
"""Zumbra Lake""","""10004100-01""","""2002-09-16""",,,,"""m""",,0.224,"""Approved""","""mg/L""",-93.667689,44.883817
"""Zumbra Lake""","""10004100-01""","""2002-10-01""",,2.7,"""Approved""","""m""","""~""",0.026,"""Approved""","""mg/L""",-93.667689,44.883817
"""Zumbra Lake""","""10004100-01""","""2002-10-01""",,,,"""m""","""~""",0.015,"""Approved""","""mg/L""",-93.667689,44.883817
"""Zumbra Lake""","""10004100-01""","""2002-10-14""",,3.0,"""Approved""","""m""","""~""",0.011,"""Approved""","""mg/L""",-93.667689,44.883817


In [6]:
# Show the columns of the water_quality DataFrame
water_quality.columns

['LAKE_NAME',
 'DNR_ID_Site_Number',
 'END_DATE',
 'Secchi_Depth_RESULT_SIGN',
 'Secchi_Depth_RESULT',
 'Secchi_Depth_QUALIFIER',
 'Secchi_Depth_Units',
 'Total_Phosphorus_RESULT_SIGN',
 'Total_Phosphorus_RESULT',
 'Total_Phosphorus_QUALIFIER',
 'Total_Phosphorus_Units',
 'longitude',
 'latitude']

In [7]:
# how many of the Secchi depth's are approved vs not.
water_quality.group_by('Secchi_Depth_QUALIFIER').len()

Secchi_Depth_QUALIFIER,len
str,u32
"""Approved""",35104
,13153


In [8]:
# total number of rows of secchi depth data.
water_quality.group_by('Secchi_Depth_Units').len()

Secchi_Depth_Units,len
str,u32
"""m""",48257


## Problem 2 - Find filter and aggregate.

#### Tasks

Remember that our goal is to narrow the data to one row per lake per year.  Build a query that groups and aggregates to find the yearly average values for both phosphorus and secchi depth.  To do this your will want to

1. Filter based on what you learned in **Problem 1.**
2. Make sure that the `END_DATE` has the correct type and extract the year.  
3. Filter to the correct range of years.
4. Now you should group and aggregate to compute the yearly means.  We want to keep both the `LAKE_NAME` and lake ID to allow us to join these data to the parcel features we will construct in the next lab.

In [9]:
# filter to only  the water quality features where both phosphorus and secchi depth are approved,
# Pull in  the year column,  by using string spliting
# filter to only years  2004 and after

(
    water_quality_filtered := pl.read_csv(
        'data/MinneMUDAC_raw_files/mces_lakes_1999_2014_v2.txt',
        separator='\t',
        infer_schema_length=10000,
        columns=wq_cols_to_keep
    ).filter(
        (pl.col('Secchi_Depth_QUALIFIER') == 'Approved') &
        (pl.col('Total_Phosphorus_QUALIFIER') == 'Approved')
    ).with_columns([
        pl.col('END_DATE').str.split('-').list.get(0).cast(pl.Int32).alias('Year')
    ]).filter(
        (pl.col('Year') >= 2004) & (pl.col('Year') <= 2015)
    )

)

LAKE_NAME,DNR_ID_Site_Number,END_DATE,Secchi_Depth_RESULT_SIGN,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT_SIGN,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude,Year
str,str,str,str,f64,str,str,str,f64,str,str,f64,f64,i32
"""Acorn Lake""","""82010200-01""","""2006-04-16""",,1.0,"""Approved""","""m""",,0.156,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-05-02""",,0.66,"""Approved""","""m""",,0.107,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-05-16""",,0.66,"""Approved""","""m""",,0.141,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-05-30""",,0.5,"""Approved""","""m""",,0.029,"""Approved""","""mg/L""",-92.971711,45.016556,2006
"""Acorn Lake""","""82010200-01""","""2006-06-11""",,0.5,"""Approved""","""m""",,0.058,"""Approved""","""mg/L""",-92.971711,45.016556,2006
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""Woodpile Lake""","""82013200-01""","""2014-08-13""",,3.05,"""Approved""","""m""",,0.023,"""Approved""","""mg/L""",-92.903328,45.068687,2014
"""Woodpile Lake""","""82013200-01""","""2014-08-25""",,2.74,"""Approved""","""m""",,0.026,"""Approved""","""mg/L""",-92.903328,45.068687,2014
"""Woodpile Lake""","""82013200-01""","""2014-09-09""",,2.13,"""Approved""","""m""",,0.03,"""Approved""","""mg/L""",-92.903328,45.068687,2014
"""Woodpile Lake""","""82013200-01""","""2014-09-22""",,3.66,"""Approved""","""m""",,0.04,"""Approved""","""mg/L""",-92.903328,45.068687,2014


In [10]:
# Aggregate yearly averages for Secchi depth and total phosphorus, grouped by lake and year
(
    water_quality_summaries := water_quality_filtered.group_by([
        'DNR_ID_Site_Number',
        'Year',
        'LAKE_NAME',
        'latitude',
        'longitude'
    ]).agg([
        pl.col('Secchi_Depth_RESULT').mean().alias('avg_secchi_depth'),
        pl.col('Total_Phosphorus_RESULT').mean().alias('avg_total_phosphorus')
    ])
)

DNR_ID_Site_Number,Year,LAKE_NAME,latitude,longitude,avg_secchi_depth,avg_total_phosphorus
str,i32,str,f64,f64,f64,f64
"""19002300-01""",2013,"""Farquar Lake""",44.758637,-93.16464,0.72,0.112538
"""27071100-01""",2013,"""Westwood Lake""",44.970826,-93.387652,1.4,0.0575
"""27017901-01""",2009,"""Little Long Lake""",44.947739,-93.708212,4.273077,0.014846
"""27071100-01""",2009,"""Westwood Lake""",44.970826,-93.387652,1.033333,0.0405
"""82001000-01""",2004,"""McDonald Lake""",45.016963,-92.843923,2.541667,0.038917
…,…,…,…,…,…,…
"""82010900-01""",2006,"""Eagle Point Lake""",44.977095,-92.911558,0.15,0.7206
"""70007200-01""",2008,"""Upper Prior Lake""",44.715362,-93.442084,1.292857,0.061857
"""82001502-01""",2010,"""Loon Lake""",45.114143,-92.8372,0.463636,0.083182
"""19002601-01""",2006,"""Marion Lake""",44.658257,-93.27557,2.016667,0.046467


## Problem 3 - Find lakes with complete yearly averages.

We want to make sure that we don't have any missing data in the target vectors, so we need to build a query that leads to a list of lake names and codes that fit the following criteria.

1. Only contains years after 2003.
2. Has a non-missing value for both means.
3. Contains both the lake name and the lake code.

You should save this list of lake IDs in a variable named `lakes_w_complete_info` in a file named `lake.py`.  Restart your kernel and confirm that you can import this data.

In [11]:
# Recreate wq_complete_information table for lakes with complete yearly averages
# this will be used to  filter  the resulting data set to only those lake's  that have data across all years.

(
    wq_complete_information := (
        water_quality_summaries
        .with_columns([
            (
                pl.col('avg_secchi_depth').is_not_null() &
                pl.col('avg_total_phosphorus').is_not_null()
                
            ).cast(pl.Int8).alias('summary_complete')
        ])
         .pivot(
             values='summary_complete',
             index=['DNR_ID_Site_Number','LAKE_NAME'],
             columns='Year',
             aggregate_function='sum'
         )
        .with_columns(
            all_complete = pl.reduce(mul, cs.integer())
        )
        .filter(pl.col('all_complete') == 1)
    )
)

  .pivot(


DNR_ID_Site_Number,LAKE_NAME,2013,2009,2004,2011,2010,2006,2012,2008,2005,2007,2014,all_complete
str,str,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64,i64
"""19002300-01""","""Farquar Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""27071100-01""","""Westwood Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82007700-01""","""Goggins Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""19002900-01""","""Lee Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""27003501-01""","""Sweeney Lake""",1,1,1,1,1,1,1,1,1,1,1,1
…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""82010400-01""","""Jane Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""19044600-01""","""Lac Lavon Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""82033400-01""","""Kismet Lake""",1,1,1,1,1,1,1,1,1,1,1,1
"""10012100-01""","""Eagle Lake""",1,1,1,1,1,1,1,1,1,1,1,1


In [12]:
# Extract DNR_ID_Site_Number column as a Python list from wq_complete_information using assignment expression
# this will be used to  filter  the resulting data set to only those lake's  that have data across all years.
(
    dnr_ids_complete := wq_complete_information
        .get_column('DNR_ID_Site_Number')
        .to_list()
)

['19002300-01',
 '27071100-01',
 '82007700-01',
 '19002900-01',
 '27003501-01',
 '82008900-01',
 '02000500-01',
 '82002000-01',
 '82036800-01',
 '10005200-01',
 '82010300-01',
 '82012300-01',
 '13005300-01',
 '82013700-01',
 '27007000-01',
 '19034800-01',
 '27004201-01',
 '19002700-01',
 '82009002-01',
 '82005400-01',
 '27062700-01',
 '10001900-01',
 '82009400-01',
 '27005300-01',
 '19002601-01',
 '19003300-01',
 '19002100-01',
 '10000200-01',
 '19002200-01',
 '10009500-01',
 '82015900-01',
 '82003400-01',
 '82009200-01',
 '82009700-01',
 '70002600-01',
 '82015300-01',
 '82008700-01',
 '19002400-01',
 '82010100-01',
 '10001100-01',
 '82012200-01',
 '19003100-01',
 '82011602-01',
 '82010400-01',
 '19044600-01',
 '82033400-01',
 '10012100-01',
 '19002500-01']

## Problem 4 - Create and write the final water quality table.

Finally, you should filter the table from **Problem 2.** to the lakes with complete information, then write this table to a parquet file named `water_quality_by_year.parquet`.

In [13]:
# Filter water_quality_summaries to only include rows with DNR_ID_Site_Number in dnr_ids_complete
(
    water_quality_summaries_complete := water_quality_summaries.filter(
        pl.col('DNR_ID_Site_Number').is_in(dnr_ids_complete)
    ).sort('LAKE_NAME')
)

DNR_ID_Site_Number,Year,LAKE_NAME,latitude,longitude,avg_secchi_depth,avg_total_phosphorus
str,i32,str,f64,f64,f64,f64
"""19002100-01""",2004,"""Alimagnet Lake""",44.748126,-93.248213,0.445,0.1645
"""19002100-01""",2014,"""Alimagnet Lake""",44.748126,-93.248213,0.966667,0.0825
"""19002100-01""",2010,"""Alimagnet Lake""",44.748126,-93.248213,0.980769,0.076462
"""19002100-01""",2009,"""Alimagnet Lake""",44.748126,-93.248213,0.705,0.0972
"""19002100-01""",2012,"""Alimagnet Lake""",44.748126,-93.248213,1.404545,0.068909
…,…,…,…,…,…,…
"""19002400-01""",2004,"""Wood Lake""",44.741118,-93.26586,1.437857,0.048143
"""19002400-01""",2012,"""Wood Lake""",44.741118,-93.26586,1.276923,0.076692
"""19002400-01""",2009,"""Wood Lake""",44.741118,-93.26586,1.864444,0.037556
"""19002400-01""",2010,"""Wood Lake""",44.741118,-93.26586,2.293846,0.039923


In [14]:
#Write water_quality_summaries_complete to a parquet file in the data folder, partitioned by Year and DNR_ID_Site_Number
# writes the combined file to a parquet file.  

water_quality_summaries_complete.write_parquet(
    'data/water_quality_by_year.parquet',
    partition_by=['Year', 'DNR_ID_Site_Number']
)