In [1]:
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
import numpy as np
import time
import json
import sys

In [1]:
import dagstermill

context = dagstermill.get_context()

date = context.op_config["date"]
metric = context.op_config["metric"][0]
notebook_name = context.op_config["notebook_name"]
tag = context.op_config["tag"]

checkpoint = context.resources.checkpoint
skyminer_api = context.resources.skyminer_api

start_absolute = checkpoint.get(notebook_name, metric, tag=tag)

In [None]:
def generate_query(metrics, start_absolute, end_absolute="", tag={}):
    metrics_part = []  # Initialize the metrics_part as a list
    tags_name = list(tag.keys())  # Get the list of tag names

    # Iterate through each metric and format it into the metrics_part
    for metric in metrics:
        metric_dict = {
            "name": metric
        }
        if tags_name:
            metric_dict["tags"] = {key: [tag[key]] for key in tags_name}
        metrics_part.append(metric_dict)

    # Construct the query dictionary based on whether end_absolute is provided
    query_dict = {
        "metrics": metrics_part,
        "time_zone": "UCT",
        "start_absolute": start_absolute
    }
    if end_absolute:
        query_dict["end_absolute"] = end_absolute

    # Convert the query dictionary to a formatted JSON string
    query = json.dumps(query_dict, indent=4)

    return query

In [None]:
metrics  = [metric, metric+".prediction_ML_dagster", metric+".prediction_ML_LB_dagster", metric+".prediction_ML_UB_dagster"]
skyminer_query = generate_query(metrics, start_absolute=(time.time()-3600*24)*1000, tag=tag)


In [None]:
df = skyminer_api.get_data(skyminer_query, "ONE_ROW_PER_TIMESTAMP")
data = df.copy()
data = data.resample("10s").first().bfill().ffill()
if metric+'.prediction_ML_dagster' in data.columns :
    data['anomaly'] = np.where((data[metric] < data[metric+".prediction_ML_LB_dagster"]) | (data[metric] > data[metric+".prediction_ML_UB_dagster"]),
                               data[metric], np.nan)
else :
    sys.exit('No data available for the previous day')

In [None]:
fig, ax = plt.subplots(figsize=(10, 6))

ax.plot(data.index, data[metric], label='metric', color='blue')
ax.plot(data.index, data[metric + '.prediction_ML_dagster'], label='prediction', color='red')

ax.fill_between(data.index, data[metric + '.prediction_ML_UB_dagster'], data[metric + '.prediction_ML_LB_dagster'],
                color='green', alpha=0.2, label='Confidence Interval')
ax.plot(data.index, data[metric + '.prediction_ML_UB_dagster'], color='green', linestyle='--')
ax.plot(data.index, data[metric + '.prediction_ML_LB_dagster'], color='green', linestyle='--')

ax.scatter(data.index[data['anomaly'] == 1], data[metric][data['anomaly'] == 1],
           color='red', label='Anomalies', s=80)

ax.set_title('Predictions and confidence interval with Anomalies')
ax.set_xlabel('Index')
ax.set_ylabel('Valeurs')
ax.legend(title='Légende')

plt.tight_layout(pad=3.0)  

plt.show()

In [None]:
num_anomalies = data['anomaly'].count()
print(f"Number of anomalies on the last day : {num_anomalies}")
def calculate_smape(y_true, y_pred):
    numerator = np.abs(y_true - y_pred)
    denominator = (np.abs(y_true) + np.abs(y_pred)) / 2
    smape = np.mean(numerator / denominator) * 100
    return smape

def calculate_rmse(y_true, y_pred):
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    return rmse

y_true = data[metric]  
y_pred = data[metric + ".prediction_ML_dagster"] 

smape_score = calculate_smape(y_true, y_pred)
rmse_score = calculate_rmse(y_true, y_pred)

print(f"Average percentage error (SMAPE) : {smape_score}%")
print(f"RMSE score : {round(rmse_score,3)}")