In [1]:
from datetime import datetime
from typing import Mapping

import numpy as np

import milvus

ModuleNotFoundError: No module named 'milvus'

# Minimalist system description

In [3]:
class Sensors(milvus.Record):
    timestamp: datetime
    temperature: float
    fan_rpm: float


sensor_api = milvus.data_sources.REST("example.com/sensors", poll=True)
sensor_readings = milvus.stream("sensor_readings", value_type=Sensors)


@milvus.node(inputs=[sensor_api], outputs=[sensor_readings])
async def ingest_sensor_data(self, sensor_events):
    async for event in sensor_events:
        yield Sensors(
            **event,
            timestamp=milvus.parse_time(event["strange_timetamp"]),
        )

In [5]:
class Machine(milvus.Record):
    name: str
    fan_control_curve: Mapping[float, float]
    alert_temperature: float


machine_context = milvus.context("machine_properties", value_type=Machine)

# Slotting in a predictive model

In [6]:
predicted_temperatures = milvus.stream("predicted_temperatures", value_type=float)

model_config = milvus.context("model_config", value_type=dict)
model_artifacts = milvus.context("model_artifacts")

preprocessing_pipeline = milvus.Sequential(
    {
        "add_history": milvus.Windowed(minutes=10),
        "clean_data": milvus.Filter(
            lambda window: len(window) > 10 and not np.any(np.isnan(window))
        ),
        "whiten": milvus.Whiten(),
    }
)

is_next_temperature_bad = milvus.Model(
    "is_next_temperature_bad",
    model=milvus.models.MagicTemperatureClassificationModel,
    inputs=preprocessing_pipeline(sensor_readings),
    outputs=predicted_temperatures,
    config=model_config,
    artifacts=model_artifacts,
)

# Monitoring

In [7]:
system_alerts = milvus.stream("system_alerts", value_type=str)
rpm_monitoring_config = milvus.context(
    "rpm_monitoring_config",
    default={
        "acceptable_deviation_ratio": 0.2,
        "acceptable_deviations_per_window": 2,
    },
)


@milvus.node(
    inputs=[preprocessing_pipeline["clean_data"]],
    contexts=[machine_context, rpm_monitoring_config],
    outputs=[system_alerts],
)
async def check_rpm(windows):
    def is_rpm_bad(sensor_state):
        rpm_deviation = abs(
            sensor_state.rpm
            - machine_context.fan_control_curve(sensor_state.temperature)
        )
        return rpm_deviation / rpm > rpm_monitoring_config["acceptable_deviation_ratio"]

    async for window in windows:
        num_deviations = sum(is_rpm_bad(sensor_state) for sensor_wtate in window)
        if num_deviations > rpm_monitoring_config["acceptable_deviations_per_window"]:
            yield "Too many deviations!"

# Deployment