# Project 2 - Lab 4 - Filter and aggregate the water quality data

Recall that one of the files (starts with `mces`) contains water quality measurements for lakes in the Twin Cities.  In this lab, we will narrow down the list of lakes for which we have at least one of each measurement type (phosphorus and secchi depth) for each year between 2004 and 2015.

**Important note.** Recall that we fixed an issue with the water quality data in our terminal exploration of the data. Be sure to use the corrected version of the water quality data located in the `data/MinneMUDAC_raw_files_fixed/mces_lakes_1999_2014_v2.txt` file.

## Problem 1 - Inspect the data

We will be focusing on two of water quality measurements: phosphorus and secchi depth.  Before we start trimming the data set, we should explore these metrics.

1. Each of the measures has a `QUALIFIER` column.  Group and aggregate by each of these columns and note any problematic values.  **Hint.** This search should indicate that some of the phosphorus measurements should be dropped.  Make sure you include this action as part of your primary query.
2. Each measure also includes a `Units` column.  Check that all measurement are in the same units, and convert as needed.

In [1]:
# Your code here

To understand the reliability of the water-quality measurements, we begin by inspecting the QUALIFIER and Units fields for both Secchi depth and total phosphorus. The QUALIFIER fields reveal whether a measurement is estimated, censored, below detection limits, or formally approved. The Units fields show whether all measurements use consistent scales. Any QUALIFIER patterns suggesting unreliable phosphorus readings will be excluded in later filtering.

In [2]:
import polars as pl

# Correct water quality file — make sure this is the right path
wq_path = "./data/MinneMUDAC_raw_files/mces_lakes_1999_2014.v2.txt"

# Columns we want
wq_cols = [
    "DNR_ID_Site_Number",
    "LAKE_NAME",
    "END_DATE",
    "Secchi_Depth_RESULT",
    "Secchi_Depth_QUALIFIER",
    "Secchi_Depth_Units",
    "Total_Phosphorus_RESULT",
    "Total_Phosphorus_QUALIFIER",
    "Total_Phosphorus_Units",
    "longitude",
    "latitude",
]

# Load lazily and select columns afterward
wq_lf = (
    pl.scan_csv(wq_path, separator="\t", infer_schema_length=2000)
      .select(wq_cols)
)

wq_lf.head().collect()


DNR_ID_Site_Number,LAKE_NAME,END_DATE,Secchi_Depth_RESULT,Secchi_Depth_QUALIFIER,Secchi_Depth_Units,Total_Phosphorus_RESULT,Total_Phosphorus_QUALIFIER,Total_Phosphorus_Units,longitude,latitude
str,str,str,f64,str,str,f64,str,str,f64,f64
"""82010200-01""","""Acorn Lake""","""2006-04-16""",1.0,"""Approved""","""m""",0.156,"""Approved""","""mg/L""",-92.971711,45.016556
"""82010200-01""","""Acorn Lake""","""2006-09-30""",,,"""m""",,,"""mg/L""",-92.971711,45.016556
"""82010200-01""","""Acorn Lake""","""2006-05-02""",0.66,"""Approved""","""m""",0.107,"""Approved""","""mg/L""",-92.971711,45.016556
"""82010200-01""","""Acorn Lake""","""2006-05-16""",0.66,"""Approved""","""m""",0.141,"""Approved""","""mg/L""",-92.971711,45.016556
"""82010200-01""","""Acorn Lake""","""2006-05-30""",0.5,"""Approved""","""m""",0.029,"""Approved""","""mg/L""",-92.971711,45.016556


We must check which Secchi depth and phosphorus measurements are “Approved” vs. flagged.

In [3]:
# Check Secchi Depth QUALIFIER values
wq_secchi_qual = (
    wq_lf
    .group_by("Secchi_Depth_QUALIFIER")
    .len()
    .collect()
)
wq_secchi_qual




Secchi_Depth_QUALIFIER,len
str,u32
"""Approved""",35104
,13153


In [4]:
# Check Phosphorus QUALIFIER values
wq_phos_qual = (
    wq_lf
    .group_by("Total_Phosphorus_QUALIFIER")
    .len()
    .collect()
)
wq_phos_qual


Total_Phosphorus_QUALIFIER,len
str,u32
"""Approved""",43639
"""Suspect""",35
,4583


Inspect measurement units to ensure they are consistent

In [5]:
# Secchi depth units check
wq_secchi_units = (
    wq_lf
    .group_by("Secchi_Depth_Units")
    .len()
    .collect()
)
wq_secchi_units


Secchi_Depth_Units,len
str,u32
"""m""",48257


In [6]:
# Phosphorus units check
wq_phos_units = (
    wq_lf
    .group_by("Total_Phosphorus_Units")
    .len()
    .collect()
)
wq_phos_units


Total_Phosphorus_Units,len
str,u32
"""mg/L""",48257


Problem 1 Summary

To understand data quality, I inspected the QUALIFIER and Units columns for Secchi depth and phosphorus. The QUALIFIER tables show that most measurements are marked Approved, but there are also flagged or missing values which should be removed before analysis. The units tables confirm that each metric is reported consistently (Secchi depth in meters and phosphorus in µg/L). Any non-approved measurements will be filtered out in the main query for Problem 2.

## Problem 2 - Find filter and aggregate.

#### Tasks

Remember that our goal is to narrow the data to one row per lake per year.  Build a query that groups and aggregates to find the yearly average values for both phosphorus and secchi depth.  To do this your will want to

1. Filter based on what you learned in **Problem 1.**
2. Make sure that the `END_DATE` has the correct type and extract the year.  
3. Filter to the correct range of years.
4. Now you should group and aggregate to compute the yearly means.  We want to keep both the `LAKE_NAME` and lake ID to allow us to join these data to the parcel features we will construct in the next lab.

In [7]:
# Your code here

Based on the inspection in Problem 1, I filtered the dataset to keep only measurements where both Secchi depth and phosphorus have “Approved” qualifiers. All suspect or missing values were removed. I also converted END_DATE into a proper year column and restricted the data to 2004–2015. Finally, I grouped by lake and year to compute average Secchi depth and phosphorus levels, producing one row per lake per year.

In [8]:
# Filter approved values and meaningful numeric results
wq_filtered = (
    wq_lf
    # Only "Approved"
    .filter(
        (pl.col("Secchi_Depth_QUALIFIER") == "Approved") &
        (pl.col("Total_Phosphorus_QUALIFIER") == "Approved")
    )
    # Remove unusable numeric rows (null, zero or negative)
    .filter(
        pl.col("Secchi_Depth_RESULT").is_not_null() &
        (pl.col("Secchi_Depth_RESULT") > 0) &
        pl.col("Total_Phosphorus_RESULT").is_not_null() &
        (pl.col("Total_Phosphorus_RESULT") > 0)
    )
    # Extract year from END_DATE
    .with_columns(
        pl.col("END_DATE")
          .str.slice(0, 4)
          .cast(pl.Int32)
          .alias("Year")
    )
    # Restrict year range
    .filter((pl.col("Year") >= 2004) & (pl.col("Year") <= 2015))
)


In [9]:
wq_summary = (
    wq_filtered
    .group_by([
        "DNR_ID_Site_Number",
        "LAKE_NAME",
        "Year",
        "latitude",
        "longitude"
    ])
    .agg([
        pl.col("Secchi_Depth_RESULT").mean().alias("avg_secchi"),
        pl.col("Total_Phosphorus_RESULT").mean().alias("avg_phosphorus")
    ])
)

wq_summary.head().collect()  # preview


DNR_ID_Site_Number,LAKE_NAME,Year,latitude,longitude,avg_secchi,avg_phosphorus
str,str,i32,f64,f64,f64,f64
"""19002100-01""","""Alimagnet Lake""",2004,44.748126,-93.248213,0.445,0.1645
"""70005000-01""","""McMahon Lake""",2012,44.633096,-93.413739,1.511111,0.061778
"""70001800-01""","""Cates Lake""",2005,44.727471,-93.390371,1.941667,0.021417
"""19003000-01""","""Kingsley Lake""",2014,44.707096,-93.299156,3.022222,0.010667
"""82004400-01""","""West Boot Lake""",2014,45.16316,-92.83822,3.754545,0.017364


In [10]:
(
    wq_summary
    .filter(
        pl.col("avg_secchi").is_null() |
        pl.col("avg_phosphorus").is_null()
    )
).collect()


DNR_ID_Site_Number,LAKE_NAME,Year,latitude,longitude,avg_secchi,avg_phosphorus
str,str,i32,f64,f64,f64,f64


## Problem 3 - Find lakes with complete yearly averages.

We want to make sure that we don't have any missing data in the target vectors, so we need to build a query that leads to a list of lake names and codes that fit the following criteria.

1. Only contains years after 2003.
2. Has a non-missing value for both means.
3. Contains both the lake name and the lake code.

You should save this list of lake IDs in a variable named `lakes_w_complete_info` in a file named `lake.py`.  Restart your kernel and confirm that you can import this data.

In [11]:
# Your code here

In [12]:
# Add completeness indicator
wq_complete_flag = (
    wq_summary
    .with_columns(
        (
            pl.col("avg_secchi").is_not_null() &
            pl.col("avg_phosphorus").is_not_null()
        ).alias("complete_year")
    )
)


In [13]:
# Count completeness per lake
lake_completeness = (
    wq_complete_flag
    .group_by(["DNR_ID_Site_Number", "LAKE_NAME"])
    .agg([
        pl.col("complete_year").min().alias("all_years_complete"),
        pl.col("Year").min().alias("min_year"),
        pl.col("Year").max().alias("max_year"),
        pl.col("Year").n_unique().alias("years_present")
    ])
)


In [14]:
# Allow lakes with full data for *their entire available span*
lakes_w_complete_info = (
    lake_completeness
    .filter(
        (pl.col("min_year") >= 2004) &        
        (pl.col("all_years_complete") == 1)
    )
    .select("DNR_ID_Site_Number")
    .collect()
    .get_column("DNR_ID_Site_Number")
    .to_list()
)

lakes_w_complete_info


['27009100-01',
 '10022600-01',
 '27104501-01',
 '13002200-01',
 '82039900-01',
 '82011200-01',
 '27004201-01',
 '62007200-01',
 '82010100-01',
 '82011301-01',
 '19008200-01',
 '19002500-01',
 '02000900-01',
 '82016200-01',
 '27016900-01',
 '13002300-01',
 '70007800-01',
 '19002900-01',
 '62003600-01',
 '82009400-01',
 '82010400-01',
 '82007700-01',
 '82015100-01',
 '82014000-01',
 '82000200-01',
 '13005700-01',
 '10022500-01',
 '19002400-01',
 '27014600-01',
 '10021600-01',
 '10000200-01',
 '02058500-01',
 '02000600-01',
 '10000200-02',
 '82013300-01',
 '10021700-01',
 '70001800-01',
 '27008500-01',
 '82012200-01',
 '82013700-01',
 '62005800-01',
 '10001200-01',
 '82036500-01',
 '82010600-01',
 '82005400-01',
 '02002200-01',
 '82048200-01',
 '62004901-01',
 '19045100-01',
 '10006600-01',
 '19004100-01',
 '19009500-01',
 '27012700-01',
 '82006200-01',
 '19005700-01',
 '82028700-01',
 '70009100-02',
 '02008100-01',
 '27019900-01',
 '19003000-01',
 '02009100-01',
 '82010300-01',
 '820109

save to new file

In [15]:
with open("lake.py", "w") as f:
    f.write("lakes_w_complete_info = " + repr(lakes_w_complete_info))


## Problem 4 - Create and write the final water quality table.

Finally, you should filter the table from **Problem 2.** to the lakes with complete information, then write this table to a parquet file named `water_quality_by_year.parquet`.

In [16]:
# Your code here

In [17]:
# 1. Filter to lakes with full information
wq_final = (
    wq_summary
    .filter(pl.col("DNR_ID_Site_Number").is_in(lakes_w_complete_info))
)

# Check the first few rows
wq_final.collect().head()

# 2. Save to Parquet
wq_final.collect().write_parquet(
    "./data/water_quality_by_year.parquet",
)


Testing to see of it is not showing null


In [18]:
wq_final.collect().shape
wq_final.collect().head()


DNR_ID_Site_Number,LAKE_NAME,Year,latitude,longitude,avg_secchi,avg_phosphorus
str,str,i32,f64,f64,f64,f64
"""70007400-01""","""Dean Lake""",2011,44.774099,-93.44465,1.566667,0.070667
"""82030100-01""","""Schroeder Pond""",2007,45.143685,-92.83846,1.872429,0.081
"""82001700-01""","""Carol Lake""",2006,45.097839,-92.841013,1.193833,0.034833
"""82016200-01""","""Shields Lake""",2013,45.252874,-92.944859,1.213333,0.167
"""19002500-01""","""Keller Lake""",2006,44.726505,-93.252024,1.244231,0.080308


In [19]:
wq_final.columns


  wq_final.columns


['DNR_ID_Site_Number',
 'LAKE_NAME',
 'Year',
 'latitude',
 'longitude',
 'avg_secchi',
 'avg_phosphorus']