In [1]:
import os
import polars as pl
import plotly.express as px
import plotly.graph_objs as go
from hampel import hampel
from datetime import datetime, timezone, timedelta
from typing import Literal

from utils import hermes_download_client
from utils import ambient_parameter_conversion as apc
from utils import calibration_processing as cp

DATA_DIRECTORY = os.environ.get("DATA_DIRECTORY")

sensor_id = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

# customize pipeline
download_files = False
outlier_removal = False

# Download to local db

In [2]:
# download from hermes database
# Use Download/download_from_hermes notebook
if download_files:
    
    while(True):
        try:
            component = hermes_download_client.Extract()
            df_raw = component.execute()
            break
        except Exception as e:
            print(e)

# Import data files

In [3]:
# load calibration bottle concentrations (preprocessed)
df_gas = pl.read_csv(os.path.join(DATA_DIRECTORY,"input", "averaged_gases.csv"))
# load local db: acropolis.parquet
if not download_files:
    df_raw = pl.read_parquet(os.path.join(DATA_DIRECTORY, "download", "acropolis.parquet")) 
    
df_raw = df_raw.filter(pl.col("system_name") != "test-sensor") \
    .with_columns(pl.col("system_name").str.extract(r'(\d+)',1).str.to_integer().alias("system_id"))

In [4]:
#extract wind data from df_raw
df_wind = df_raw.select(pl.col("creation_timestamp", "system_id", "^(wxt532_.*)$")) \
    .filter(pl.col('wxt532_direction_avg') > 0)
    
df_enclosure = df_raw.select(pl.col("creation_timestamp", "system_id", "^(enclosure_.*)$")) \
    .filter(pl.col('enclosure_bme280_temperature') > 0)

# extract measurement data from df_raw and aggregate to 1m 
df_1_m = df_raw.sort("creation_timestamp") \
    .select(pl.all().exclude('^wxt532_.*$', '^cal_.*$', '^enclosure_.*$', '^raspi_.*$', '^ups_.*$')) \
    .filter(pl.col('gmp343_filtered') > 0) \
    .filter(pl.col('gmp343_temperature') > 0) \
    .filter(pl.col('sht45_humidity') > 0) \
    .filter(pl.col('bme280_pressure') > 0) \
    .group_by_dynamic("creation_timestamp", every='1m', by= "system_id") \
    .agg(pl.all().exclude(["creation_timestamp","system_id"]).mean())
    
# extract calibration data from df_raw
df_dry_calibration = df_raw.filter(pl.col("cal_gmp343_filtered") > 0) \
    .filter(pl.col("cal_gmp343_temperature") > 0) \
    .filter(pl.col("cal_bme280_pressure") > 0) \
    .with_columns(pl.col("cal_sht45_humidity").fill_null(0.0)) \
    .with_columns(pl.struct(['cal_gmp343_temperature','cal_sht45_humidity','cal_bme280_pressure'])
    .map_elements(lambda x: apc.rh_to_molar_mixing(x['cal_sht45_humidity'],apc.absolute_temperature(x['cal_gmp343_temperature']),x['cal_bme280_pressure']*100)) \
    .alias("cal_h2o_v%")) \
    .with_columns(pl.struct(['cal_gmp343_filtered','cal_gmp343_temperature','cal_sht45_humidity','cal_bme280_pressure']) \
    .map_elements(lambda x: apc.calculate_co2dry(x['cal_gmp343_filtered'],x['cal_gmp343_temperature'],x['cal_sht45_humidity'],x['cal_bme280_pressure']*100))
    .alias("cal_gmp343_dry")) \
    .select("creation_timestamp","system_id", '^cal_.*$') \
    .filter((pl.col("cal_bottle_id") > 0) & (pl.col("cal_bottle_id") <= df_gas["cal_bottle_id"].max()))
    
df_raw = None

In [5]:
print(len(df_1_m))

9927284


# Perform Dry-Wet Conversion

### Measurement Data

In [6]:
# perform dry conversion for measurement data                
df_1_m = df_1_m.with_columns(pl.struct(['gmp343_temperature','sht45_humidity']) \
    .map_elements(lambda x: apc.rh_to_ah(x['sht45_humidity'],apc.absolute_temperature(x['gmp343_temperature'])))
    .alias("h2o_ah")) \
    .with_columns(pl.struct(['gmp343_temperature','sht45_humidity','bme280_pressure'])
    .map_elements(lambda x: (apc.rh_to_molar_mixing(x['sht45_humidity'],apc.absolute_temperature(x['gmp343_temperature']),x['bme280_pressure']*100))*100) \
    .alias("h2o_v%")) \
    .with_columns(pl.struct(['gmp343_filtered','gmp343_temperature','sht45_humidity','bme280_pressure']) \
    .map_elements(lambda x: apc.calculate_co2dry(x['gmp343_filtered'],x['gmp343_temperature'],x['sht45_humidity'],x['bme280_pressure']*100))
    .alias("gmp343_dry"))

In [7]:
df_1_m.tail(3).select("creation_timestamp","system_id","gmp343_filtered", "h2o_ah", "h2o_v%" ,"gmp343_dry")

creation_timestamp,system_id,gmp343_filtered,h2o_ah,h2o_v%,gmp343_dry
"datetime[μs, UTC]",i64,f64,f64,f64,f64
2024-09-05 07:51:00 UTC,17,446.533333,20.466908,3.14928,461.05319
2024-09-05 07:52:00 UTC,17,447.0,20.449803,3.142459,461.502529
2024-09-05 07:53:00 UTC,17,448.9,20.428115,3.141724,463.460656


In [8]:
#df_1_m.write_parquet(os.path.join(DATA_DIRECTORY, "processed", "1m_acropolis_dry.parquet"))

### Calibration Data

# Calculate Calibration Correction

In [9]:
df_slope_intercept = df_dry_calibration.join(df_gas.cast({"cal_bottle_id": pl.Float64}), on = ["cal_bottle_id"], how= "left") \
    .with_columns((pl.col("creation_timestamp").dt.date()).alias("date")) \
    .sort("date") \
    .group_by([pl.col("date"), pl.col("system_id"), pl.col("cal_bottle_id")]) \
    .agg([
        pl.col("cal_gmp343_dry"),
        pl.col("cal_bottle_CO2").last(),
        pl.col("creation_timestamp").last(),
        ]) \
    .with_columns([pl.col("cal_gmp343_dry").map_elements(lambda x: cp.process_bottle(x))]) \
    .filter(pl.col("cal_gmp343_dry") > 0) \
    .sort(pl.col("cal_gmp343_dry")) \
    .group_by(["date", "system_id"]) \
    .agg([
        pl.col("cal_gmp343_dry"),
        pl.col("cal_bottle_CO2"),
        pl.col("creation_timestamp").last()
        ]) \
    .filter(pl.col("cal_gmp343_dry").list.len() == 2) \
    .with_columns(pl.struct(['cal_gmp343_dry','cal_bottle_CO2']) \
    .map_elements(lambda x: cp.two_point_calibration(x['cal_gmp343_dry'],x['cal_bottle_CO2'])) \
    .alias('slope, intercept')) \
    .with_columns([(pl.col("slope, intercept").list.first()).alias("slope"),
                   (pl.col("slope, intercept").list.last()).alias("intercept")
                   ]) \
    .select("creation_timestamp", "system_id", "slope", "intercept") \
    .filter(pl.col("slope") > 0)

In [10]:
df_slope_intercept.head(3)

creation_timestamp,system_id,slope,intercept
"datetime[μs, UTC]",i64,f64,f64
2023-08-04 11:30:14.370 UTC,6,1.042031,11.410671
2024-08-29 01:20:21.460 UTC,5,1.028262,10.131539
2024-07-21 01:21:59.350 UTC,5,1.024228,10.459554


In [11]:
# safe results to parquet
df_slope_intercept.write_parquet(os.path.join(DATA_DIRECTORY, "processed", "slope_intercept_acropolis.parquet"))

In [12]:
fig = px.scatter(df_slope_intercept.filter((pl.col("slope") > 0.7) & (pl.col("slope") < 1.1)), x="creation_timestamp", y = "slope", color = "system_id")
fig.show()
fig = px.histogram(df_slope_intercept.filter((pl.col("slope") > 0.7) & (pl.col("slope") < 1.1)), x="slope", color = "system_id")
fig.show()
fig = px.histogram(df_slope_intercept.filter((pl.col("intercept") < 100) & (pl.col("intercept") > -100)), x="intercept", color = "system_id")
fig.show()

# Perform Calibration Correction

## 1m aggregated data

In [13]:
df_systems = []

for id in sensor_id:
    df_slope_intercept_id = df_slope_intercept.filter(pl.col("system_id") == id) \
        .sort("creation_timestamp") \
        .drop("system_id")
        
    df_wind_id = df_wind.filter(pl.col("system_id") == id) \
        .sort("creation_timestamp") \
        .drop("system_id", "system_name", "date")
        
    df_enclosure_id = df_enclosure.filter(pl.col("system_id") == id) \
        .sort("creation_timestamp") \
        .drop("system_id", "system_name", "date")
    
    df_system = df_1_m.filter(pl.col("system_id") == id) \
        .sort("creation_timestamp") \
        .join_asof(df_slope_intercept_id, on="creation_timestamp", strategy="nearest", tolerance="10m") \
        .join_asof(df_wind_id, on="creation_timestamp", strategy="nearest", tolerance="10m") \
        .join_asof(df_enclosure_id, on="creation_timestamp", strategy="nearest", tolerance="10m") \
        .with_columns([
            pl.col("slope").interpolate(),
            pl.col("intercept").interpolate()
            ]) \
        .with_columns([
            pl.col("slope").forward_fill(),
            pl.col("intercept").forward_fill()
            ]) \
        .with_columns(((pl.col("gmp343_dry")) * pl.col("slope") + pl.col("intercept")).alias("gmp343_corrected")) \
        .with_columns((pl.col("creation_timestamp").dt.date()).alias("date"))
            
    
    df_systems.append(df_system)
        

df_1_m = pl.concat(df_systems, how="vertical") \
    .with_columns(pl.struct(["system_id"]) \
    .map_elements(lambda x: f"acropolis-{x['system_id']}") \
    .alias("sys_name_short"))

In [14]:
df_1_m.write_parquet(os.path.join(DATA_DIRECTORY, "processed", "1m_cal_corr_acropolis.parquet"))

## 10m aggregated data

In [15]:
df_10_m = df_1_m.sort("creation_timestamp") \
        .group_by_dynamic("creation_timestamp", every='10m', by=["system_id", "sys_name_short"]) \
        .agg(pl.all().exclude(["creation_timestamp","sys_name_short"]).mean(),
             pl.col("gmp343_corrected").std().alias("std"),
             pl.col("gmp343_corrected").var().alias("var"))
        
df_10_m.write_parquet(os.path.join(DATA_DIRECTORY, "processed", "10m_cal_corr_acropolis.parquet"))

## 1h aggregated data

In [16]:
df_1_h = df_1_m.sort("creation_timestamp") \
        .group_by_dynamic("creation_timestamp", every='1h', by=["system_id", "sys_name_short"]) \
        .agg(pl.all().exclude(["creation_timestamp","sys_name_short"]).mean(),
             pl.col("gmp343_corrected").std().alias("std"),
             (pl.col("gmp343_temperature").max() - pl.col("gmp343_temperature").min()).alias("gmp343_temperature_change"))
        
df_1_h.write_parquet(os.path.join(DATA_DIRECTORY, "processed", "1h_cal_corr_acropolis.parquet"))

In [17]:
df_1_h.head()

system_id,sys_name_short,creation_timestamp,system_name,gmp343_raw,gmp343_compensated,gmp343_filtered,gmp343_temperature,sht45_humidity,sht45_temperature,bme280_humidity,bme280_temperature,bme280_pressure,revision,receipt_timestamp,h2o_ah,h2o_v%,gmp343_dry,slope,intercept,wxt532_speed_avg,wxt532_speed_min,wxt532_speed_max,wxt532_direction_avg,wxt532_direction_min,wxt532_direction_max,wxt532_last_update_time,wxt532_temperature,wxt532_heating_voltage,wxt532_supply_voltage,wxt532_reference_voltage,enclosure_bme280_humidity,enclosure_bme280_pressure,enclosure_bme280_temperature,gmp343_corrected,date,std,gmp343_temperature_change
i64,str,"datetime[μs, UTC]",str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,"datetime[ns, UTC]",f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,date,f64,f64
7,"""acropolis-7""",2023-06-26 13:00:00 UTC,,451.9875,499.741667,499.741667,28.766667,34.302917,27.8275,34.74375,27.675833,947.8975,,,9.73744,1.431375,506.998869,,,,,,,,,,,,,,22.71,956.75,32.52,,2023-06-26,,0.166667
7,"""acropolis-7""",2023-06-26 14:00:00 UTC,,458.494286,510.672143,510.672143,30.466905,30.349286,30.298333,30.580881,30.135595,947.677214,,,9.447762,1.396939,517.907103,,,,,,,,,,,,,,21.358,956.582,33.508,,2023-06-26,,0.167857
7,"""acropolis-7""",2023-06-27 08:00:00 UTC,,451.513333,491.048788,491.048788,26.469394,32.074242,25.773,32.926455,25.631758,952.857091,,,8.021782,1.164139,496.831439,,,,,,,,,,,,,,24.381818,961.297273,28.753636,,2023-06-27,,1.2
7,"""acropolis-7""",2023-06-27 15:00:00 UTC,,364.905833,413.294167,413.294167,31.696667,24.709333,30.867333,25.083,30.724083,937.707167,,,8.216871,1.232834,418.453082,,,,,,,,,,,,,,18.188,956.864,37.438,,2023-06-27,,0.166667
7,"""acropolis-7""",2023-06-27 16:00:00 UTC,,371.352976,413.486135,413.486135,31.734889,25.655277,30.495581,26.005487,30.356663,950.393405,,,8.547194,1.265258,418.784836,,,,,,,,,,,,,,18.580167,956.753333,36.830333,,2023-06-27,,0.45


# Renaming and column selection for ICOS cities portal

In [18]:
assert(outlier_removal)

AssertionError: 

In [None]:
#df_1_m = pl.read_parquet(os.path.join(DATA_DIRECTORY, "processed", "1m_cal_corr_acropolis.parquet"))

In [None]:
# Select the columns to be present in the ICOS cities portal product
selected_columns = ["creation_timestamp", "system_id", "sys_name_short", "gmp343_corrected","h2o_v%", "wxt532_speed_avg", "wxt532_direction_avg"]

df_1_m = df_1_m.select(selected_columns) \
    .rename({"gmp343_corrected": "co2", "h2o_v%":"h2o", "wxt532_speed_avg":"ws","wxt532_direction_avg":"wd"})

In [None]:
df_1_m.tail(1)

# Outlier Removal

In [None]:
df_systems = []

for id in sensor_id:
    df_filtered = df_1_m.filter(pl.col("system_id")==id) \
        .cast({"co2": pl.Float32}) \
        .filter(pl.col("co2") > 0)
    
    # Convert CO2 column to pandas series 
    data = df_filtered.get_column("co2").to_pandas()
      
    # Apply the Hampel filter  
    result = hampel(data, window_size=120, n_sigma=3.0)
    
    # Print share of detected spikes
    print(f"System ID: {id}, Detected spikes: {(len(result.outlier_indices) / len(data)):.4f}")
    
    # Create column "OriginalFlag" = 389 indicating local contamination
    df_system = df_filtered.with_columns((pl.from_pandas(result.filtered_data)).alias("co2_hampel_filtered")) \
        .with_columns(pl.when(pl.col("co2").ne(pl.col("co2_hampel_filtered"))).then(pl.lit(185)).otherwise(pl.lit(0)).alias("OriginalFlag")) \
        .drop("co2_hampel_filtered")
    
    df_systems.append(df_system)
    
df_1_m_spike_detected = pl.concat(df_systems, how="vertical")

# Option to add additional OriginalFlags

In [None]:
df_checkpoint = df_1_m_spike_detected

In [None]:
df_1_m_spike_detected = df_1_m_spike_detected.with_columns(pl.when(pl.col("OriginalFlag") > 0).then(pl.lit('K')).otherwise(pl.lit('O')).alias("Flag"))

# save a 1m product for ICOS cities portal
df_1_m_spike_detected.write_parquet(os.path.join(DATA_DIRECTORY, "processed", "1m_level_1_cities_portal.parquet"))

In [None]:
df_1_m_spike_detected.tail(1)

In [None]:
# save a 1h product for ICOS cities portal
df_1_h_despiked = df_1_m_spike_detected.sort("creation_timestamp") \
        .filter(pl.col("Flag") == 'O') \
        .drop("Flag", "OriginalFlag") \
        .group_by_dynamic("creation_timestamp", every='1h', by=["system_id", "sys_name_short"]) \
        .agg(pl.all().exclude(["creation_timestamp","sys_name_short"]).mean(),
             pl.col("co2").std().alias("Stdev"),
             pl.col("co2").count().alias("NbPoints")) \
        .with_columns(pl.when(pl.col("NbPoints") < 40).then(pl.lit(389)).otherwise(pl.lit(0)).alias("OriginalFlag")) \
        .with_columns(
                pl.when(pl.col("OriginalFlag") > 0).then(pl.lit('K')).otherwise(pl.lit('O')).alias("Flag"),
                (pl.col("creation_timestamp") + timedelta(minutes=30)))
             
df_1_h_despiked.write_parquet(os.path.join(DATA_DIRECTORY, "processed", "1h_level_1_cities_portal.parquet"))

In [None]:
df_1_h_despiked.tail(1)

# Plot Despiked Data with Continous Error Bars

In [None]:
#df_1_h_despiked= pl.read_parquet(os.path.join(DATA_DIRECTORY, "processed", "1h_level_1_cities_portal.parquet"))

In [None]:
start_date = datetime(2024, 7, 1, 0, 0, 0).replace(tzinfo=timezone.utc)
end_date = datetime(2024, 8, 30, 23, 59, 59).replace(tzinfo=timezone.utc)

def create_figure(df, system_name:str, start_date, end_date, color:Literal["red", "blue", "green"]):
    
    df_plot = df.filter(pl.col("creation_timestamp").is_between(start_date, end_date)) \
        .filter(pl.col("sys_name_short")==system_name) \
        .with_columns(upper = pl.col("co2") + pl.col("Stdev"),
                    lower = pl.col("co2") - pl.col("Stdev"))
    
    
    if color=='red':
        color_set = ('#b91c1c', 'rgba(239, 68, 68, 0.3)')
    if color=='blue':
        color_set = ('#1d4ed8','rgba(59, 131, 246, 0.3)')
    if color=='green':
        color_set = ('#15803d','rgba(34, 197, 94, 0.3)')
    
    return [
        go.Scatter(
            name=system_name,
            x=df_plot["creation_timestamp"],
            y=df_plot["co2"],
            mode='lines',
            line=dict(color=color_set[0]),
        ),
        go.Scatter(
            name='Upper Bound',
            x=df_plot["creation_timestamp"],
            y=df_plot["upper"],
            mode='lines',
            marker=dict(color=color_set[1]),
            line=dict(width=0),
            showlegend=False
        ),
        go.Scatter(
            name='Lower Bound',
            x=df_plot["creation_timestamp"],
            y=df_plot["lower"],
            marker=dict(color=color_set[1]),
            line=dict(width=0),
            mode='lines',
            fillcolor=color_set[1],
            fill='tonexty',
            showlegend=False
        )
    ]
  
figures = create_figure(df_1_h_despiked, "acropolis-14", start_date, end_date, color="red") \
    + create_figure(df_1_h_despiked, "acropolis-7", start_date, end_date, color="green") \
    #+ create_figure(df_1_h_despiked, "acropolis-6", start_date, end_date, color="blue")

fig = go.Figure(figures)
fig.update_layout(
    yaxis_title='CO2 (ppm)',
    xaxis_title='UTC Time (hourly aggregated)',
    title='Continuous, variable value error bars',
    hovermode="x"
)
fig.show()