- This notebook uses a runid from the 'Experiments' tab or 'Models' label data.
- This notebook assumes a model (selected from Experiments or Registered Models) have been chosen.
- Change relevant parameters in main_param.py
- Important: every time you change variables in main_param.py you need to detach and reattach the cluster by clicking in 'single node' in the right upper corner of the browser window (next to 'Run all') and selecting Dettach-reattach. Then click run again.

#### Installing required libraries/modules and running required notebooks

In [None]:
# Install packages that are not already part of the Databricks environment

%pip install pyhydroqc==0.0.4
%pip install detecta==0.0.5

In [None]:
%run ./sensor_utilities_inference

In [None]:
# Import libraries

from parameters_inference import site_params,senid, calib_params, logged_model_arima
import os
import pandas as pd
import pyhydroqc
import mlflow
import pandas as pd
import matplotlib.pyplot as plt
import math
from pyhydroqc import anomaly_utilities
from pyhydroqc import calibration
from pyhydroqc import model_workflow
from pyhydroqc import rules_detect
from pyhydroqc import modeling_utilities
from pyhydroqc.model_workflow import ModelType
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from detecta import detect_cusum
import plotly.express as px
from datetime import datetime

In [None]:
#  Gather and clean data from a specified environment and sensor ID. The function doesn't take any arguments and returns two pandas DataFrames: `cleaned_df` and `filtered_df`.

# Get the environment name from the environment variable
env = os.getenv('tfenvironmentnameshort')

# Call the 'call' function to upload the etdl file
etdl_path = call('userupload', 'etdl', env, '/sensor_drift/measurements')

# Clean the data using the 'clean' function
cleaned_df = clean(etdl_path, f"{senid.senid}", f"{senid.start_date}", f"{senid.end_date}")

# Filter the data using the 'filter_dataset' function
filtered_df = filter_dataset(etdl_path, f"{senid.senid}", f"{senid.start_date}", f"{senid.end_date}")

# Set the 'datetime' column as the index of the cleaned DataFrame
cleaned_df = cleaned_df.set_index('datetime')

### Loading water quality measurement data and preprocessing

This section also runs the arima model, including calibration parameters, ranges and persistence

In [None]:
# Call data from blob 

site = 'East Trinity'
sensors = senid.prm
name = f"{senid.senid}_{senid.start_date}_{senid.end_date}"
sensor_array = get_data(sensors= sensors, df = df_2)
for snsr in sensor_array:
    print(snsr + str(sensor_array[snsr]))
    
n_wqp =sensors[0]

In [None]:
# Print selected parameters and check that ranges, windows and parameters make sense

for snsr in sensors:
    print(snsr + str(site_params[site][snsr]))
print('calib' + str(calib_params))

In [None]:
# Apply ranges and persistence preprocessing

range_count = dict()
persist_count = dict()
rules_metrics = dict()
for snsr in sensor_array:
    sensor_array[snsr], range_count[snsr] = pyhydroqc.range_check(df=sensor_array[snsr],
                                                                     maximum=site_params[site][snsr].max_range,
                                                                     minimum=site_params[site][snsr].min_range)
    sensor_array[snsr], persist_count[snsr] = pyhydroqc.persistence(df=sensor_array[snsr],
                                                                       length=120, 
                                                                       output_grp=True)
    sensor_array[snsr] = pyhydroqc.interpolate(df=sensor_array[snsr])
print('Rules based detection complete.\n')

### Model detections

In [None]:
# Get arima detections

arima = dict()
for snsr in sensors:
    arima[snsr] = pyhydroqc.arima_detect(df=sensor_array[snsr], sensor=snsr, params= site_params[site][snsr],
                                              rules=False, plots=False, summary=False, compare=False)

In [None]:
# Aggregate model detections

results_all = dict()
for snsr in sensors:
    models = dict()
    models['arima'] = arima[snsr].df
    results_all[snsr] = pyhydroqc.aggregate_results(df=sensor_array[snsr], 
                                                            models=models, 
                                                            verbose=True)
    
results_all[snsr].reset_index()

In [None]:
# This cell is used to preprocess the data for CUSUM (Cumulative Sum) analyses.

# Get the first sensor's data from the sensor_array dictionary
first_sensor = list(sensor_array.values())[0]

# Reset the index of the 'observed' column and store it in a new DataFrame gg
first_sensor_reset = first_sensor['observed'].reset_index()

# Keep only the 'observed' column in the DataFrame
first_sensor_reset = first_sensor_reset[['observed']]

# Initialize a StandardScaler, which will standardize features by removing the mean and scaling to unit variance
std_scaler = StandardScaler()

# Initialize a MinMaxScaler, which scales and translates each feature individually such that it is in the given range on the training set, i.e between zero and one.
mmscaler = MinMaxScaler()

# Apply the StandardScaler to the 'observed' column and convert the result to a numpy array
df_scaled = std_scaler.fit_transform(first_sensor_reset.to_numpy())

# Convert the scaled data back to a DataFrame and name the column 'observed'
df_scaled = pd.DataFrame(df_scaled, columns=['observed'])

In [None]:
# Run cusum analyses

if f"{n_wqp}" == 'ph':
    ta, tai, taf, amp = detect_cusum(df_scaled, 1, 0.4, True, True)
    
elif f"{n_wqp}" == 'cond':
    ta, tai, taf, amp = detect_cusum(df_scaled, 1, 0.1, True, True)

elif f"{n_wqp}" == 'temp':
    ta, tai, taf, amp = detect_cusum(df_scaled, 1, 0.001, True, True)
    
start_df = pd.DataFrame(tai, columns= ['start'])
end_df = pd.DataFrame(taf,  columns= ['finish'])

end = start_df.join(end_df)
sensor_df = sensor_array[snsr].copy()
test_df = pd.DataFrame(results_all[snsr]).reset_index()

start = test_df[['datetime']].rename(columns = {'datetime':'start'}).reset_index()

final = test_df[['datetime']].rename(columns = {'datetime':'final'}).reset_index()


# Write results back to lake

intermediate = pd.concat([start, final], axis=1)

intermediate_df = intermediate[['start', 'final']]
spark.createDataFrame(intermediate_df).write.mode("overwrite").parquet(f'abfss://lake-userupload@etdllake{env}.dfs.core.windows.net/sensor_drift/training/{name}_timestamp')

results_df = sensor_array[snsr].reset_index()
results_df = results_df.rename(columns={"raw": f"{n_wqp}"})
spark.createDataFrame(results_df).write.mode("overwrite").parquet(f'abfss://lake-userupload@etdllake{env}.dfs.core.windows.net/sensor_drift/training/{name}_timestamp')

### R section

In [None]:
%r

library(sparklyr)
library(glue)
library(dplyr)
library(lubridate)
install.packages(c("mltest", "mlflow", "reticulate"))
library(mlflow)
library(mltest)
library(caret)
library(tidyr)
library(reticulate)
library(ggplot2)

#### Read files produced with Python (above), variables defined by user, and files containing calibration, tides and rain data

In [None]:
%r

# Call files produced with Python. These were defined in main_param.py

env = Sys.getenv('tfenvironmentnameshort')
sc <- spark_connect(method = "databricks")
file_path <-  glue('abfss://lake-userupload@etdllake{env}.dfs.core.windows.net/sensor_drift/rain/') 
file_path3 <-  glue('abfss://lake-userupload@etdllake{env}.dfs.core.windows.net/sensor_drift/cal_tps/')



rainr_spark <- spark_read_parquet(sc, 
                            path = file_path,
                            header = TRUE,
                            infer_schema = TRUE)

cal_spark <- spark_read_parquet(sc, 
                            path = file_path3,
                            header = TRUE,
                            infer_schema = TRUE)
rainr<-collect(rainr_spark)

calr<-collect(cal_spark)


file_path5 <- glue('abfss://lake-userupload@etdllake{env}.dfs.core.windows.net/sensor_drift/tides_r/')


tides_r_spark <- spark_read_parquet(sc, 
                            path = file_path5,
                            header = TRUE,
                            infer_schema = TRUE)

tides_r<- collect(tides_r_spark)

In [None]:
%r

# Import a Python module. # Call parameters defined by user (location, sensor, model). These were defined in parameters_inference.py

py2 <- import("main_param")  
senid <- py2$senid

parameters<- glue("{senid}_{py2$start_date}_{py2$end_date}")
meas<- py2$prm
meas<-meas[1]
m2<-py2$logged_model_arima

path_inference <-  glue('abfss://lake-userupload@etdllake{env}.dfs.core.windows.net/sensor_drift/inference')

name<-parameters

sensor_b<-senid

r<-meas

model<-m2

In [None]:
%r

#detections from model
model_detections_csv<- spark_read_csv(sc, name = "test_table",  path = glue('{path_inference}/{name}_test.csv'), header=TRUE)
model_detections <- collect(model_detections_csv)

#detections from cusum
cusum_detections <- spark_read_csv(sc, name = "test_table",  path = glue('{path_inference}/{name}_timestamp.csv'), header=TRUE)
cusum_intervals <- collect(cusum_detections)

In [None]:
%r

# Loop over each date and interval to look for false positives

false_positives- split(cusum_intervals, factor(cusum_intervals$start))
fin <- vector("list", length(dataset_1))
for (i in seq_along(dataset_1)) {
  
  fin[[i]]<- interval((false_positives[[i]]$start), (false_positives[[i]]$final))
}



#### Merging detections from model with cusum analyses

In [None]:
%r
# Identify detections from the model that are within the cusum detections
selected_drift <- pickle_data$datetime %within% fin

# Convert to a dataframe
selected_drift_df <- as.data.frame(selected_drift)

# Filter for TRUE values
true_values <- selected_drift_df %>% filter(selected_drift_df == TRUE)

# Combine the original data with the selected drift data
combined_data <- cbind(pickle_data, selected_drift_df)
combined_data$d_event <- as.logical(combined_data$detected_event)

# Read the CSV file into a Spark dataframe
spark_df <- spark_read_csv(sc, name = "test_table",  path = glue('{path_inference}/{name}_qual.csv'), header=TRUE)
collected_data <- collect(spark_df)

# Select relevant columns and convert datetime to datetime format
quality_data <- collected_data %>% select(datetime, contains(glue('{r}_qual'))) %>% mutate(datetime = as_datetime(datetime))

# Join the combined data with the quality data
merged_data <- left_join(combined_data, quality_data)

# Create a new column to indicate if a drift was detected by either the model or cusum
model_cusum <- merged_data %>% mutate(drift_model_cusum = ifelse(d_event == 'TRUE' | sel_drift == 'TRUE', "TRUE", "FALSE"))
model_cusum <- model_cusum %>% mutate(date = as_date(datetime))

# Label the data according to requirements: '82' for doubtful and '83' for anomaly
model_cusum <- model_cusum %>% mutate(sensor_qual= ifelse(d_event == 'TRUE' & sel_drift == 'TRUE', '83',
                                                      ifelse((d_event == 'FALSE' & sel_drift == 'TRUE') | (d_event == 'TRUE' & sel_drift == 'FALSE') , '82',  model_cusum[,4])))
# Filter for TRUE values in the drift_model_cusum column
filtered_data <- model_cusum %>% filter(drift_model_cusum == 'TRUE')

# Group by sensor_qual and count the number of occurrences
grouped_data <- model_cusum %>% group_by(sensor_qual) %>% summarise(n=n())

In [None]:
%r


# This cell is used to filter the tide data to account for false positives.

# Add a 'date' column to 'tides_r', which is the 'time' column converted to date format
# Group by 'date' and filter to keep only the rows where 'tide2' is 'HH' or 'LL'
# Then, ungroup the data
tides_with_date_df <- tides_r %>%
  mutate(date = as_date(time)) %>%
  group_by(date) %>%
  filter(tide2 == 'HH' | tide2 == 'LL') %>%
  ungroup()

# Filter 'tides_with_date_df' to keep only the rows where 'level' is less than -1.2
low_tide_levels_df <- tides_with_date_df %>% filter(level < -1.2)

#max 2.14
#min -1.76
#median 0.132

In [None]:
%r

# Include rain to account for false positives

# Convert MeasurementDTTM to datetime format and extract the date
rain_data = rainr %>%
  mutate(data_timestamp = ymd_hms(MeasurementDTTM)) %>%
  mutate(date = floor_date(data_timestamp, unit = "day")) %>%
  mutate(date = as_date(date)) %>%
  select(-Location)

# Filter out NA values and select relevant columns
filtered_rain_data = rainr %>%
  filter(ramount_24hrtot != 'NA') %>%
  mutate(date = as_date(MeasurementDTTM)) %>%
  select(date, ramount_24hrtot)

# Join the original and filtered rain data
joined_rain_data = full_join(filtered_rain_data, rain_data)

# Convert datetime to date format in the model_cusum dataframe
model_cusum = model_cusum %>% mutate(date = as_date(datetime))

# Join the model_cusum and joined_rain_data dataframes
rain_event_data = left_join(model_cusum, joined_rain_data, by = 'date')

# Create a new column to indicate if a drift was detected by either the model or cusum and there was more than 10mm of rain
rain_event_data = rain_event_data %>%
  mutate(rain = ifelse((sensor_qual == '82' | sensor_qual == '83') & ramount_24hrtot > 10, "FP", rain_event_data$drift_model_cusum))

In [None]:
%r

# Accounting for calibration

# Filter the calibration data for the current sensor
sensor_calibration_data <- calr %>% filter(series == glue('{sensor_b}'))

# Split the data by calibration start time
split_calibration_data <- split(sensor_calibration_data, factor(sensor_calibration_data$calibration_starttime))

# Initialize a list to store the calibration intervals
calibration_intervals <- vector("list", length(split_calibration_data))

# Loop through the split data and create intervals for each calibration period
for (i in seq_along(split_calibration_data)) {
  calibration_intervals[[i]] <- interval((split_calibration_data[[i]]$calibration_starttime), (split_calibration_data[[i]]$calibration_endtime))
}

# Check if the datetime in the pickle data is within the calibration intervals
calibration_status <- pickle_data$datetime %within% calibration_intervals

# Convert to a dataframe
calibration_status_df <- as.data.frame(calibration_status)

# Filter for TRUE values
calibration_true_values <- calibration_status_df %>% filter(calibration_status_df == TRUE)

# Combine the model_cusum data with the calibration status data
combined_calibration_data <- cbind(model_cusum, calibration_status_df)

# Convert detected_event to logical
combined_calibration_data$d_event <- as.logical(combined_calibration_data$detected_event)

# Filter for TRUE values in the d_event column
detected_event_true_values <- combined_calibration_data %>% filter(d_event == TRUE)

# Create a new column to indicate if a drift was detected by either the model or cusum and there was a calibration
calibration_drift_data <- combined_calibration_data %>%
  mutate(drift_cal = ifelse((sensor_qual == '82' | sensor_qual == '83') & calibration_status_df == "TRUE", "FP", combined_calibration_data$d_event))

# Add a date column
calibration_drift_data <- calibration_drift_data %>%
  mutate(date = as_date(datetime))

In [None]:
%r

# Accounting for tidal changes

# Select date and level columns
tidal_data <- tides_with_date_df %>% select(date, level)

# Filter for extreme tide levels and select relevant columns
filtered_tidal_data <- tides_with_date_df %>% filter(level < -1.3 | level > 1.5) %>% select(date, level, tide)

# Select relevant columns from model_cusum
model_data <- tides_with_date_df %>% select(date, labeled_anomaly, drift_model_cusum, sensor_qual)

# Join model_data and filtered_tidal_data
joined_tidal_data <- left_join(model_data, filtered_tidal_data)

# Filter for non-NA tide values and true drift_model_cusum values
filtered_joined_tidal_data <- joined_tidal_data %>% filter(tide != 'NA' & drift_model_cusum == 'TRUE')

# Create a new column to indicate if a drift was detected by either the model or cusum and there was an extreme tide
tidal_drift_data <- joined_tidal_data %>% mutate(tide_drift = ifelse((sensor_qual == '82' | sensor_qual == '83') & (level < -1.3 | level > 1.5), 'FALSE', drift_model_cusum))

# Filter for true drift_model_cusum values and non-NA tide_drift values
filtered_tidal_drift_data <- tidal_drift_data %>% filter(drift_model_cusum == 'TRUE' & tide_drift != 'NA')

# Get unique dates
unique_dates <- filtered_tidal_drift_data %>% group_by(date) %>% unique()

# Check if the date in the model_cusum dataframe is within the unique_dates
date_status <- calibration_drift_data$date %in% unique_dates$date

# Convert to a dataframe
date_status_df <- as.data.frame(date_status)

# Combine the model_cusum data with the date_status data
combined_date_data <- cbind(calibration_drift_data, date_status_df)

# Create a new column to indicate if a drift was detected by either the model or cusum and there was an extreme tide
tide_drift_data <- combined_date_data %>% mutate(tides = ifelse((sensor_qual == '82' | sensor_qual == '83') & date_status_df == 'TRUE', 'FP', drift_model_cusum))

# Merging all together

# Select relevant columns
tide_data <- tide_drift_data %>% select(datetime, tides)
rain_data <- rain %>% select(datetime, rain)
calibration_data <- cali %>% select(datetime, drift_cal)

# Merge tide_data and rain_data
merged_data <- merge(tide_data, rain_data, by = 'datetime')

# Merge merged_data and calibration_data
merged_all_data <- merge(merged_data, calibration_data, by = 'datetime')

# Filter for FP values in the tides, rain, or drift_cal columns
filtered_merged_data <- merged_all_data %>% filter(tides == 'FP' | rain == 'FP' | drift_cal == 'FP') %>% group_by(datetime) %>% slice_head() %>% ungroup() %>% mutate(sensor_qual = 'FALSE') %>% select(datetime, sensor_qual)

#### Wrangling and writing back to Blob Storage

In [None]:
%r

# Select relevant columns and join with 'todo' dataframe
selected_calibration_data <- calibration_drift_data %>% select(2:5, 14,15, 16) %>% left_join(todo) %>% ungroup()

# Replace NA values in sensor_qual column with values from labeled_anomaly column
updated_calibration_data <- selected_calibration_data %>% mutate(sensor_qual = ifelse(is.na(sensor_qual), labeled_anomaly, sensor_qual))

# Filter for TRUE values in the drift_model_cusum column
filtered_calibration_data <- updated_calibration_data %>% filter(drift_model_cusum == 'TRUE')

# Rename columns starting with "cond", "ph", and "temp" to "EC", "pH", and "WaterTemp" respectively
renamed_calibration_data <- updated_calibration_data %>% 
                            rename_if(startsWith(names(.), "cond"), ~paste0("EC")) %>% 
                            rename_if(startsWith(names(.), "ph"), ~paste0("pH")) %>% 
                            rename_if(startsWith(names(.), "temp"), ~paste0("WaterTemp"))

In [None]:
%r

# Plotting the data
plot_data <- ggplot(df3, aes(x=datetime, y=df3[,2], col=sensor_qual)) + geom_point(size=1)

# Renaming the second column of df3
sensor_name <- names(df3[2])

# Renaming columns starting with "sensor" to "{sensor_name}_quality" and adding a new column "Location_code"
quality_data <- df3 %>% 
                rename_if(startsWith(names(.), "sensor"), ~paste0(glue('{sensor_name}_quality'))) %>%
                mutate(Location_code = sensor_b)

# Gathering the "{sensor_name}_quality" column into two columns: "measurement_quality" and "measurement_quality_value"
quality_gathered_data <- gather(quality_data, measurement_quality, measurement_quality_value, glue('{sensor_name}_quality'))

# Gathering the "{sensor_name}" column into two columns: "measurement" and "measurement_value"
measurement_gathered_data <- gather(quality_gathered_data, measurement, measurement_value, glue('{sensor_name}'))

# Adding a new column "model" with the value of "{model}"
final_data <- measurement_gathered_data %>% mutate(model= glue('{model}') )

In [None]:
%r

# Write to lake

filename = 'test'

if(nrow(df6) == 0){
  print("This data frame is empty")
}else{
  d3 <- copy_to(sc, df6, overwrite= TRUE)

  spark_write_parquet(
  d3,
  glue('abfss://lake-raw@etdllake{env}.dfs.core.windows.net/sensor_drift/inference'),
  header = TRUE,
  delimiter = ",",
  quote = "\"",
  escape = "\\",
  charset = "UTF-8",
  null_value = NULL,
  options = list(),
  partition_by = NULL, mode = "append"
)
}