In [1]:
from pathlib import Path
from typing import Tuple
import pandas as pd

In [2]:
pd.set_option('future.no_silent_downcasting', True)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [3]:
data_path = Path("../data/preprocessed_data/water_consumption_cleaned_0.parquet")
df = pd.read_parquet(data_path)
df.head()

Unnamed: 0,timestamp,flow_in_(l/s),reservoir_level_(%),pressure_(mca),gmb_1_is_on,gmb_2_is_on
0,2023-03-17 11:27:06,68.59,29.86,38.2,0,1
1,2023-03-17 12:28:56,66.05,35.86,38.2,0,1
2,2023-03-17 12:31:26,65.64,36.16,38.06,0,1
3,2023-03-17 12:33:56,65.64,36.5,38.03,0,1
4,2023-03-17 12:36:26,65.64,36.8,38.17,0,1


In [4]:
def create_necessary_columns(original_df: pd.DataFrame, reservoir_total_capacity: int = 1_000_000) -> Tuple[pd.DataFrame, pd.Series]:
    df = original_df.copy()
    empty_reservoir_mask = df['reservoir_level_(%)'] == 0.0

    df['reservoir_level_(%)'] = df['reservoir_level_(%)'].replace(0.0, pd.NA).ffill().infer_objects(copy=False)
    df['reservoir_level_liters'] = df['reservoir_level_(%)'] * reservoir_total_capacity / 100
    df['time_passed_seconds'] = df['timestamp'].diff().dt.total_seconds()
    df['liters_should_have_entered'] = df['time_passed_seconds'] * df['flow_in_(l/s)']
    df['liters_entered'] = df['reservoir_level_liters'].diff()
    
    return df, empty_reservoir_mask


def fix_data_capture_errors(original_df: pd.DataFrame, empty_reservoir_mask: pd.Series, reservoir_level_capacity: int = 1_000_000) -> pd.DataFrame:
    df = original_df.copy()
    
    for idx in df[empty_reservoir_mask].index:
        if idx + 1 < len(df):
            level_difference = df.loc[idx + 1, 'liters_entered'] - df.loc[idx + 1, 'liters_should_have_entered']
            
            # Fix liters entered for the zero value
            df.loc[idx, 'liters_entered'] = level_difference
            df.loc[idx, 'liters_should_have_entered'] = level_difference
            df.loc[idx + 1, 'liters_entered'] -= level_difference
            
            # Update flow_in_(l/s)
            df.loc[idx, 'flow_in_(l/s)'] = df.loc[idx, 'liters_entered'] / df.loc[idx, 'time_passed_seconds']
            
            # Update reservoir_level_liters
            df.loc[idx, 'reservoir_level_liters'] += level_difference
            df.loc[idx + 1, 'reservoir_level_liters'] = df.loc[idx, 'reservoir_level_liters'] / reservoir_level_capacity * 100
            
    problematic_indexes = df['liters_should_have_entered'] < df['liters_entered']
    for idx in df[problematic_indexes].index:
        df.loc[idx, 'liters_should_have_entered'] = df.loc[idx, 'time_passed_seconds'] * (df.loc[idx, 'liters_entered'] / df.loc[idx, 'time_passed_seconds'])
        df.loc[idx, 'flow_in_(l/s)'] = df.loc[idx, 'liters_entered'] / df.loc[idx, 'time_passed_seconds']
        
    df['liters_out'] = df['liters_should_have_entered'] - df['liters_entered']
    df['flow_out_(l/s)'] = df['liters_out'] / df['time_passed_seconds']
    df = df.round(2).reset_index().rename(columns={'index': 'id'})
    
    return df

In [5]:
df, empty_reservoir_mask = create_necessary_columns(df)
df = fix_data_capture_errors(df, empty_reservoir_mask)
df.head(5)

Unnamed: 0,id,timestamp,flow_in_(l/s),reservoir_level_(%),pressure_(mca),gmb_1_is_on,gmb_2_is_on,reservoir_level_liters,time_passed_seconds,liters_should_have_entered,liters_entered,liters_out,flow_out_(l/s)
0,0,2023-03-17 11:27:06,68.59,29.86,38.2,0,1,298600.0,,,,,
1,1,2023-03-17 12:28:56,66.05,35.86,38.2,0,1,358600.0,3710.0,245045.5,60000.0,185045.5,49.88
2,2,2023-03-17 12:31:26,65.64,36.16,38.06,0,1,361600.0,150.0,9846.0,3000.0,6846.0,45.64
3,3,2023-03-17 12:33:56,65.64,36.5,38.03,0,1,365000.0,150.0,9846.0,3400.0,6446.0,42.97
4,4,2023-03-17 12:36:26,65.64,36.8,38.17,0,1,368000.0,150.0,9846.0,3000.0,6846.0,45.64


In [None]:
df.to_parquet('../data/cleaned_data/water_consumption_curated_1.parquet')