# Project 2 - Lab 5 - Filter and aggregate the water quality data

Recall that one of the files (starts with `mces`) contains water quality measurements for lakes in the Twin Cities.  In this lab, we will narrow down the list of lakes for which we have at least one of each measurement type (phosphorus and secchi depth) for each year between 2004 and 2015.

**Important note.** Recall that we fixed an issue with the water quality data in our terminal exploration of the data. Be sure to use the corrected version of the water quality data located in the `data/MinneMUDAC_raw_files_fixed/mces_lakes_1999_2014_v2.txt` file.

## Problem 1 - Inspect the data

We will be focusing on two of water quality measurements: phosphorus and secchi depth.  Before we start trimming the data set, we should explore these metrics.

1. Each of the measures has a `QUALIFIER` column.  Group and aggregate by each of these columns and note any problematic values.  **Hint.** This search should indicate that some of the phosphorus measurements should be dropped.  Make sure you include this action as part of your primary query.
2. Each measure also includes a `Units` column.  Check that all measurement are in the same units, and convert as needed.

In [116]:
import polars as pl
import polars.selectors as cs
from glob import glob
import re
from functools import reduce
import requests, zipfile, os, shutil, io
from humanize import naturalsize
from pathlib import Path
from toolz import pipe
from toolz.curried import map
from more_itertools import consume

In [120]:
mces = (
    pl.scan_csv(
    "data/MinneMUDAC_raw_files/mces_lakes_1999_2014.txt",
    has_header=True,
    separator="\t",
    infer_schema_length=0
    )
    .filter(pl.col("Total_Phosphorus_QUALIFIER") == "Approved")
    .filter(pl.col("Secchi_Depth_QUALIFIER") == "Approved")
    .with_columns(
        pl.col("START_DATE").str.strptime(pl.Date, "%Y-%m-%d").dt.year().alias("Year")
    )
)

In [121]:
phosphorus_clean = (
    mces
    # casting correct units
    .with_columns([
        pl.col("Total_Phosphorus_RESULT").cast(pl.Float64).alias("Total_Phosphorus_RESULT_num"),
        pl.when(pl.col("Total_Phosphorus_Units") == "mg/L")
          .then(pl.col("Total_Phosphorus_RESULT").cast(pl.Float64) * 1000)
          .otherwise(pl.col("Total_Phosphorus_RESULT").cast(pl.Float64))
          .alias("Phosphorus_value_standard"),
        pl.lit("µg/L").alias("Phosphorus_units_standard"),
        ])
)
secchi_clean = (
    mces
    .with_columns([
        pl.col("Secchi_Depth_RESULT").cast(pl.Float64).alias("Secchi_Depth_RESULT_num"),
        # casting to correct units
        pl.when(pl.col("Secchi_Depth_Units") == "ft")
          .then(pl.col("Secchi_Depth_RESULT").cast(pl.Float64) * 0.3048)
          .otherwise(pl.col("Secchi_Depth_RESULT").cast(pl.Float64))
          .alias("Secchi_value_standard"),
        pl.lit("m").alias("Secchi_units_standard"),
        ])
)

phos_coverage = (
    phosphorus_clean
    # groups by ID and year and get the phospherous count
    .group_by(["DNR_ID_Site_Number", "Year"])
    .agg(pl.count().alias("phos_count"))
)

secchi_coverage = (
    secchi_clean
    # groups by ID and year and get the secchi count
    .group_by(["DNR_ID_Site_Number", "Year"])
    .agg(pl.count().alias("secchi_count"))
)

coverage = (
    phos_coverage
    # join these tables on ID and year
    .join(secchi_coverage, on=["DNR_ID_Site_Number", "Year"], how="inner")
    # the counts have to be greater than 0 to satisfy
    .filter((pl.col("phos_count") > 0) & (pl.col("secchi_count") > 0))
)

# create a valid lakes column, needs to have 12 years 
valid_lakes = (
    coverage
    .group_by("DNR_ID_Site_Number")
    .agg(pl.col("Year").n_unique().alias("years_with_data"))
    .filter(pl.col("years_with_data") == 12) 
    .select("DNR_ID_Site_Number")
)

# join the valid lakes with phorphorus
phosphorus_final = (
    phosphorus_clean
    .join(valid_lakes, on="DNR_ID_Site_Number", how="inner")
)

# join the valid lakes with secchi
secchi_final = (
    secchi_clean
    .join(valid_lakes, on="DNR_ID_Site_Number", how="inner")
)

# creates lake measurments with ID, year, phorphorus values, and secchi values
lake_measurements = (
    phosphorus_final
    .select(["DNR_ID_Site_Number", "Year", "Phosphorus_value_standard"])
    .join(
        secchi_final.select(["DNR_ID_Site_Number", "Year", "Secchi_value_standard"]),
        on=["DNR_ID_Site_Number", "Year"],
        how="inner"
    )
)

(Deprecated in version 0.20.5)
  .agg(pl.count().alias("phos_count"))
(Deprecated in version 0.20.5)
  .agg(pl.count().alias("secchi_count"))


In [122]:
print("Phosphorus sample:\n", phosphorus_final.limit(10).collect())
print("Secchi sample:\n", secchi_final.limit(10).collect())
print("Combined sample:\n", lake_measurements.limit(10).collect())

Phosphorus sample:
 shape: (10, 37)
┌────────────┬─────────────┬────────────┬────────┬───┬──────┬────────────┬────────────┬────────────┐
│ PROJECT_ID ┆ DATA_SET_TI ┆ LAKE_NAME  ┆ CITY   ┆ … ┆ Year ┆ Total_Phos ┆ Phosphorus ┆ Phosphorus │
│ ---        ┆ TLE         ┆ ---        ┆ ---    ┆   ┆ ---  ┆ phorus_RES ┆ _value_sta ┆ _units_sta │
│ str        ┆ ---         ┆ str        ┆ str    ┆   ┆ i32  ┆ ULT_num    ┆ ndard      ┆ ndard      │
│            ┆ str         ┆            ┆        ┆   ┆      ┆ ---        ┆ ---        ┆ ---        │
│            ┆             ┆            ┆        ┆   ┆      ┆ f64        ┆ f64        ┆ str        │
╞════════════╪═════════════╪════════════╪════════╪═══╪══════╪════════════╪════════════╪════════════╡
│ 7108       ┆ Citizen     ┆ Brickyard  ┆ Chaska ┆ … ┆ 2002 ┆ 0.012      ┆ 12.0       ┆ µg/L       │
│            ┆ Assisted    ┆ Clayhole   ┆        ┆   ┆      ┆            ┆            ┆            │
│            ┆ Monitoring  ┆ Lake       ┆        ┆   ┆ 

## Problem 2 - Find filter and aggregate.

#### Tasks

Remember that our goal is to narrow the data to one row per lake per year.  Build a query that groups and aggregates to find the yearly average values for both phosphorus and secchi depth.  To do this your will want to

1. Filter based on what you learned in **Problem 1.**
2. Make sure that the `END_DATE` has the correct type and extract the year.  
3. Filter to the correct range of years.
4. Now you should group and aggregate to compute the yearly means.  We want to keep both the `LAKE_NAME` and lake ID to allow us to join these data to the parcel features we will construct in the next lab.

In [123]:
phosphorus_yearly = (
    phosphorus_clean
    # filters to correct year
    .filter((pl.col("Year") >= 2004) & (pl.col("Year") <= 2015))
    .group_by(["DNR_ID_Site_Number", "LAKE_NAME", "Year"])
    .agg([
        pl.col("Phosphorus_value_standard").mean().alias("Phosphorus_mean")
    ])
)

secchi_yearly = (
    secchi_clean
    .filter((pl.col("Year") >= 2004) & (pl.col("Year") <= 2015))
    .group_by(["DNR_ID_Site_Number", "LAKE_NAME", "Year"])
    .agg([
        pl.col("Secchi_value_standard").mean().alias("Secchi_mean")
    ])
)

lake_yearly = (
    phosphorus_yearly
    .join(secchi_yearly, on=["DNR_ID_Site_Number", "LAKE_NAME", "Year"], how="inner")
)

print(lake_yearly.limit(10).collect())

shape: (10, 5)
┌────────────────────┬─────────────────────┬──────┬─────────────────┬─────────────┐
│ DNR_ID_Site_Number ┆ LAKE_NAME           ┆ Year ┆ Phosphorus_mean ┆ Secchi_mean │
│ ---                ┆ ---                 ┆ ---  ┆ ---             ┆ ---         │
│ str                ┆ str                 ┆ i32  ┆ f64             ┆ f64         │
╞════════════════════╪═════════════════════╪══════╪═════════════════╪═════════════╡
│ 82010400-01        ┆ Jane Lake           ┆ 2008 ┆ 18.947368       ┆ 4.706263    │
│ 19002400-01        ┆ Wood Lake           ┆ 2007 ┆ 60.769231       ┆ 1.816923    │
│ 82012500-01        ┆ Pat Lake            ┆ 2013 ┆ 48.583333       ┆ 2.2175      │
│ 82011700-01        ┆ Kramer Pond         ┆ 2009 ┆ 661.0           ┆ 0.359143    │
│ 19003000-01        ┆ Kingsley Lake       ┆ 2007 ┆ 13.642857       ┆ 2.621429    │
│ 82030100-01        ┆ Schroeder Pond      ┆ 2004 ┆ 48.5            ┆ 2.100929    │
│ 27010200-01        ┆ Schmidt Lake        ┆ 2004 ┆ 38.428571

## Problem 3 - Find lakes with complete yearly averages.

We want to make sure that we don't have any missing data in the target vectors, so we need to build a query that leads to a list of lake names and codes that fit the following criteria.

1. Only contains years after 2003.
2. Has a non-missing value for both means.
3. Contains both the lake name and the lake code.

You should save this list of lake IDs in a variable named `lakes_w_complete_info` in a file named `lake.py`.  Restart your kernel and confirm that you can import this data.

In [133]:
lakes_w_complete_info = (
    lake_yearly
    .filter(pl.col("Year") > 2003)
    .filter(pl.col("Phosphorus_mean").is_not_null() & pl.col("Secchi_mean").is_not_null())
    .unique()
    .collect()
)

print(lakes_w_complete_info)

shape: (1_649, 5)
┌────────────────────┬─────────────────┬──────┬─────────────────┬─────────────┐
│ DNR_ID_Site_Number ┆ LAKE_NAME       ┆ Year ┆ Phosphorus_mean ┆ Secchi_mean │
│ ---                ┆ ---             ┆ ---  ┆ ---             ┆ ---         │
│ str                ┆ str             ┆ i32  ┆ f64             ┆ f64         │
╞════════════════════╪═════════════════╪══════╪═════════════════╪═════════════╡
│ 82000900-01        ┆ Cloverdale Lake ┆ 2012 ┆ 46.555556       ┆ 2.566667    │
│ 10008400-01        ┆ Burandt Lake    ┆ 2013 ┆ 27.2            ┆ 1.92        │
│ 13005700-01        ┆ School Lake     ┆ 2006 ┆ 69.714286       ┆ 1.284571    │
│ 19002200-01        ┆ Long Lake       ┆ 2010 ┆ 76.305085       ┆ 1.804576    │
│ 19034800-01        ┆ Valley Lake     ┆ 2009 ┆ 54.142857       ┆ 2.285714    │
│ …                  ┆ …               ┆ …    ┆ …               ┆ …           │
│ 27067500-01        ┆ Pamela Pond     ┆ 2004 ┆ 84.5            ┆ 1.245       │
│ 10000500-01        ┆

In [138]:
from operator import mul
import polars.selectors as cs

wq_complete_information = (
    lakes_w_complete_info
        .with_columns([
            (pl.col('Secchi_mean').is_not_null() & pl.col('Phosphorus_mean').is_not_null())
                .cast(pl.Int8)
                .fill_null(0)
                .alias('summary_complete')
        ])

        .pivot(
            values='summary_complete',
            index=['DNR_ID_Site_Number', 'LAKE_NAME'],
            columns='Year',
            aggregate_function='sum'
        )
    .with_columns(all_complete = pl.reduce(mul, cs.integer()))
    .filter(pl.col('all_complete') == 1)
)
print(wq_complete_information)

shape: (49, 14)
┌────────────────────┬──────────────────┬──────┬──────┬───┬──────┬──────┬──────┬──────────────┐
│ DNR_ID_Site_Number ┆ LAKE_NAME        ┆ 2012 ┆ 2013 ┆ … ┆ 2004 ┆ 2008 ┆ 2014 ┆ all_complete │
│ ---                ┆ ---              ┆ ---  ┆ ---  ┆   ┆ ---  ┆ ---  ┆ ---  ┆ ---          │
│ str                ┆ str              ┆ i64  ┆ i64  ┆   ┆ i64  ┆ i64  ┆ i64  ┆ i64          │
╞════════════════════╪══════════════════╪══════╪══════╪═══╪══════╪══════╪══════╪══════════════╡
│ 19002200-01        ┆ Long Lake        ┆ 1    ┆ 1    ┆ … ┆ 1    ┆ 1    ┆ 1    ┆ 1            │
│ 19034800-01        ┆ Valley Lake      ┆ 1    ┆ 1    ┆ … ┆ 1    ┆ 1    ┆ 1    ┆ 1            │
│ 82011301-01        ┆ Goose Lake       ┆ 1    ┆ 1    ┆ … ┆ 1    ┆ 1    ┆ 1    ┆ 1            │
│ 82013700-01        ┆ Fish Lake        ┆ 1    ┆ 1    ┆ … ┆ 1    ┆ 1    ┆ 1    ┆ 1            │
│ 82003400-01        ┆ East Boot Lake   ┆ 1    ┆ 1    ┆ … ┆ 1    ┆ 1    ┆ 1    ┆ 1            │
│ …                  ┆ …

  .pivot(


In [139]:
dnr_id_site_numbers = wq_complete_information
    .get_column('DNR_ID_Site_Number')
    .to_list()

## Problem 4 - Create and write the final water quality table.

Finally, you should filter the table from **Problem 2.** to the lakes with complete information, then write this table to a parquet file named `water_quality_by_year.parquet`.

In [142]:
water_quality_summaries_complete = (
    lakes_w_complete_info
    .filter(pl.col('DNR_ID_Site_Number').is_in(dnr_id_site_numbers))
)
print(water_quality_summaries_complete)

shape: (539, 5)
┌────────────────────┬────────────────┬──────┬─────────────────┬─────────────┐
│ DNR_ID_Site_Number ┆ LAKE_NAME      ┆ Year ┆ Phosphorus_mean ┆ Secchi_mean │
│ ---                ┆ ---            ┆ ---  ┆ ---             ┆ ---         │
│ str                ┆ str            ┆ i32  ┆ f64             ┆ f64         │
╞════════════════════╪════════════════╪══════╪═════════════════╪═════════════╡
│ 19002200-01        ┆ Long Lake      ┆ 2010 ┆ 76.305085       ┆ 1.804576    │
│ 19034800-01        ┆ Valley Lake    ┆ 2009 ┆ 54.142857       ┆ 2.285714    │
│ 82011301-01        ┆ Goose Lake     ┆ 2011 ┆ 93.6            ┆ 0.909       │
│ 19002200-01        ┆ Long Lake      ┆ 2004 ┆ 87.5625         ┆ 1.364328    │
│ 82013700-01        ┆ Fish Lake      ┆ 2008 ┆ 58.5            ┆ 1.502722    │
│ …                  ┆ …              ┆ …    ┆ …               ┆ …           │
│ 10000200-01        ┆ Riley Lake     ┆ 2005 ┆ 51.642857       ┆ 2.05        │
│ 82011602-01        ┆ Armstrong Lak

In [143]:
# change to a lazy dataframe so that it can join with another lazy data frame
lakes_lazy = water_quality_summaries_complete.lazy()

water_quality_by_year = (
    lake_yearly
    .join(lakes_lazy, on="DNR_ID_Site_Number", how="inner")
)
#print(water_quality_by_year.limit(10).collect())

In [151]:
lake_path = r"C:\Users\fx3734qp\Downloads\DSCI 326\project_2\data\MinneMUDAC_raw_files"
lake_partitioned_out_path = lake_path.replace(".txt", "_by_base.parquet")
water_quality_lazy = water_quality_by_year.lazy()

# Partitioned parquet write
(water_quality_lazy
    .sink_parquet(
        pl.PartitionByKey(
            lake_partitioned_out_path,
            by="DNR_ID_Site_Number" 
        ),
        mkdir=True
    )
)


In [146]:
water_quality_summaries_complete.write_csv("data/MinneMUDAC_raw_files/water_quality_summaries_complete.csv")