# `Reaqtor.IoT`

Notebook equivalent of the Playground console application.

## Reference the library

We'll just import the entire console application to get the transitive closure of referenced assemblies.

In [None]:
#r "bin/Debug/net6.0/Reaqtor.IoT.dll"

## (Optional) Attach a debugger

If you'd like to step through the source code of the library while running samples, run the following cell, and follow instructions to start a debugger (e.g. Visual Studio). Navigate to the source code of the library to set breakpoints.

In [None]:
System.Diagnostics.Debugger.Launch();

## Import some namespaces

In [None]:
using System;
using System.Linq;
using System.Linq.CompilerServices.TypeSystem;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;

using Nuqleon.DataModel;

using Reaqtive;
using Reaqtive.Scheduler;

using Reaqtor;
using Reaqtor.IoT;

## Configure environment

**Query engines** host reactive artifacts, e.g. subscriptions, which can be stateful.

Query engines are a failover unit. State for all artifacts is persisted via checkpointing.

Query engines depend on services from the environment:

* A **scheduler** to process events on:
  * There's one physical scheduler per host. Think of it as a thread pool.
  * Each engine has a logical scheduler. Think of it as a collection of tasks. The engine suspends/resumes all work for checkpoint/recovery.
* A **key/value store** for state persistence, including:
  * A transaction log of create/delete operations for reactive artifacts.
  * Periodic checkpoint state, which includes:
    * State of reactive artifacts (e.g. sum and count for an Average operator).
    * Watermarks for ingress streams, enabling replay of events upon failover.

This sample also parameterizes query engines on an ingress/egress manager to receive/send events across the engine/environment boundary.

To run query engines in the notebook, we write a simple `WithEngine` helper. This part takes care of setting up the engine and creating the environment. The general lifecycle of an engine is as follows.

* Instantiate the object, passing the environment services.
* Recover the engine's state from the key/value store.
* Use the engine (through the `action` callback in the helper).
* Checkpoint the engine's state. This is typically done periodically, e.g. once per minute. The interval is a tradeoff between:
  * I/O frequency versus I/O size, e.g. due to state growth as events get processed.
  * Replay capacity for ingress events and duration of replay, e.g. having to replay up to 1 minute worth of events from a source.
* Unloading the engine. This is optional but useful for graceful shutdown. In the Reactor service this is used when a primary moves to another node in the cluster. It allows reactive artifacts to unload resources (e.g. connections).

In [None]:
var store = new InMemoryKeyValueStore();
var iemgr = new IngressEgressManager();

async Task WithEngine(Func<MiniQueryEngine, Task> action)
{
    using var ps = PhysicalScheduler.Create();
    using var scheduler = new LogicalScheduler(ps);
    using var engine = new MiniQueryEngine(new Uri("iot://reactor/1"), scheduler, store, iemgr);

    using (var reader = store.GetReader())
        await engine.RecoverAsync(reader);

    await action(engine);

    using (var writer = store.GetWriter())
        await engine.CheckpointAsync(writer);

    await engine.UnloadAsync();
}

## Define artifacts

Illustrates populating the registry of defined artifacts in the engine. This is a one-time step for the environment creating a new engine.

* Artifact types that are defined include:
  * Observables, e.g. sources of events, or query operators.
  * Observers, e.g. sinks for events, or event handlers.
  * Stream factories, not shown here. Useful for creation of "subjects" local to the engine.
  * Subscription factories, not shown here. Useful for "templates" to create subscriptions with parameters.
* All Reactor artifacts use URIs for naming purposes.

The key take-away is that Reactor engines are empty by default and have no built-in artifacts whatsoever. The environment controls the registry, which includes standard query operators, specialized query operators, etc.

> **Note:** There's an alternative approach to having artifacts defined in and persisted by individual engine instances. The engine can also be parameterized on a queryable external catalog. This is useful for homogeneous environments.

In [None]:
await WithEngine(async engine =>
{
    var ctx = new ReactorContext(engine);

    await ctx.DefineObserverAsync(new Uri("iot://reactor/observers/cout"), ctx.Provider.CreateQbserver<T>(Expression.New(typeof(ConsoleObserver<T>))), null, CancellationToken.None);
    await ctx.DefineObservableAsync<TimeSpan, DateTimeOffset>(new Uri("iot://reactor/observables/timer"), t => new TimerObservable(t).AsAsyncQbservable(), null, CancellationToken.None);
});

## Define query operators

Illustration of defining query operators, similar to defining other artifacts higher up. A few remarks:

- No operators are built-in. Below, we define essential operators like `Where`, `Select`, and `Take`. The URI for these is not even prescribed; the environment picks those.
- Implementations of the operators are provided in `Reaqtive`, similar to `System.Reactive` for classic Rx. The difference is mainly due to support for state persistence, which classic Rx lacks.
- Custom operators are as first-class as "standard query operators". That is, the query engine does not have an opinion about the operator surface provided.

Some ugly technicalities show up below, but those are entirely irrelevant to the user experience. The code below is part of the one-time setup provided by the environment. In particular:

- Define operations are done through `IReactiveProxy`, but could also be done straight on the engine (though it brings some additional complexity when doing so).
- There's some conversion friction to build expressions that fit through a "queryable" expression-tree based API but eventually bind to types in Reaqtive. That's all the As* stuff below.

In [None]:
await WithEngine(async engine =>
{
    var ctx = new ReactorContext(engine);

    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, bool>, T>(new Uri("iot://reactor/observables/filter"), (source, predicate) => source.AsSubscribable().Where(predicate).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, int, bool>, T>(new Uri("iot://reactor/observables/filter/indexed"), (source, predicate) => source.AsSubscribable().Where(predicate).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, R>, R>(new Uri("iot://reactor/observables/map"), (source, selector) => source.AsSubscribable().Select(selector).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, int, R>, R>(new Uri("iot://reactor/observables/map/indexed"), (source, selector) => source.AsSubscribable().Select(selector).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, int, T>(new Uri("iot://reactor/observables/take"), (source, count) => source.AsSubscribable().Take(count).AsAsyncQbservable(), null, CancellationToken.None);
});

## Ingress and egress

Illustration of defining ingress/egress proxies as observable/observer artifacts.

Also see the implementation of `IngressObservable<T>` and `EgressObserver<T>`, which use the ingress/egress manager to connect to the outside world. The essence is this:

- To the query running inside the engine, these look like ordinary Rx artifacts implemented using interfaces base classes provided by Reactor:
  - `ISubscribable<T>` rather than `IObservable<T>`, to support the richer lifecycle of artifacts in Reactor compared to Rx.
  - `Load`/`Save` state operations for checkpointing.
- The external world communicates with the engine using a variant of the observable/observer interfaces, namely `IReliable*<T>`:
  - Events received and produced have sequence numbers.
  - Subscription handles to receive events from the outside world have additional operations:
    - `Start(long)` to replay events from the given sequence number.
    - `AcknowledgeRange(long)` to allow the external service to (optionally) prune events that are no longer needed by the engine.
- Proxies in the engine use the sequence number to provide reliability:
  - `Save` persists the latest received sequence number. `Load` gets it back.
  - Upon restart of an ingress proxy, the restored sequence number is used to ask for replay of events.
  - Upon a successful checkpoint, the latest received sequence number is acknowledged to the source (allowing pruning).

The Reactor service implements such ingress/egress mechanisms using services like EventHub.

In [None]:
await WithEngine(async engine =>
{
    var ctx = new ReactorContext(engine);

    await ctx.DefineObserverAsync<string, T>(new Uri("iot://reactor/observers/egress"), stream => new EgressObserver<T>(stream).AsAsyncQbserver(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<string, T>(new Uri("iot://reactor/observables/ingress"), stream => new IngressObservable<T>(stream).AsAsyncQbservable(), null, CancellationToken.None);
});

## Define more query operators

Illustrates the definition of higher-order operators such as SelectMany and GroupBy which operate on sequences of sequences (IObservable<IObservable<T>>) which is one of the most powerful aspects of Rx.

In [None]:
await WithEngine(async engine =>
{
    var ctx = new ReactorContext(engine);

    // Average
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<int>, double>(new Uri("iot://reactor/observables/average/int32"), source => source.AsSubscribable().Average().AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<long>, double>(new Uri("iot://reactor/observables/average/int64"), source => source.AsSubscribable().Average().AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<double>, double>(new Uri("iot://reactor/observables/average/double"), source => source.AsSubscribable().Average().AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, int>, double>(new Uri("iot://reactor/observables/average/selector/int32"), (source, selector) => source.AsSubscribable().Average(selector).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, long>, double>(new Uri("iot://reactor/observables/average/selector/int64"), (source, selector) => source.AsSubscribable().Average(selector).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, double>, double>(new Uri("iot://reactor/observables/average/selector/double"), (source, selector) => source.AsSubscribable().Average(selector).AsAsyncQbservable(), null, CancellationToken.None);

    // DistinctUntilChanged
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, T>(new Uri("iot://reactor/observables/distinct"), source => source.AsSubscribable().DistinctUntilChanged().AsAsyncQbservable(), null, CancellationToken.None);

    // SelectMany
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, ISubscribable<R>>, R>(new Uri("iot://reactor/observables/bind"), (source, selector) => source.AsSubscribable().SelectMany(selector).AsAsyncQbservable(), null, CancellationToken.None);

    // Window
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, TimeSpan, ISubscribable<T>>(new Uri("iot://reactor/observables/window/hopping/time"), (source, duration) => source.AsSubscribable().Window(duration).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, int, ISubscribable<T>>(new Uri("iot://reactor/observables/window/hopping/count"), (source, count) => source.AsSubscribable().Window(count).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, TimeSpan, TimeSpan, ISubscribable<T>>(new Uri("iot://reactor/observables/window/sliding/time"), (source, duration, shift) => source.AsSubscribable().Window(duration, shift).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, int, int, ISubscribable<T>>(new Uri("iot://reactor/observables/window/sliding/count"), (source, count, skip) => source.AsSubscribable().Window(count, skip).AsAsyncQbservable(), null, CancellationToken.None);
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, TimeSpan, int, ISubscribable<T>>(new Uri("iot://reactor/observables/window/ferry"), (source, duration, count) => source.AsSubscribable().Window(duration, count).AsAsyncQbservable(), null, CancellationToken.None);

    // GroupBy
    await ctx.DefineObservableAsync<IAsyncReactiveQbservable<T>, Func<T, R>, IGroupedSubscribable<R, T>>(new Uri("iot://reactor/observables/group"), (source, selector) => source.AsSubscribable().GroupBy(selector).AsAsyncQbservable(), null, CancellationToken.None);
});

## Entity types

Reactor Core is built to be flexible with regards to data models, but the default data model that's well-supported originates from a graph database effort in Bing that predates Reactor. The `[Mapping]` attributes below are the means to annotate properties. These property names are used to normalize entity types in the serialized expression representation, so the query is not dependent on a concrete type in an assembly, thus allowing the structure of data types (here to represent events) to be serialized across machine boundaries without deployment of binaries.

In [None]:
public class SensorReading
{
    [Mapping("iot://sensor/reading/room")]
    public string Room { get; set; }

    [Mapping("iot://sensor/reading/temperature")]
    public double Temperature { get; set; }
}

## A temperature simulator

Add other streams to connect to the environment, simulating a temperature sensor reading and a feedback channel to control an A/C unit.

In [None]:
var readings = iemgr.CreateSubject<SensorReading>("bart://sensors/home/livingroom/temperature/readings");
var settings = iemgr.CreateSubject<double?>("bart://sensors/home/livingroom/temperature/settings");

Next, we define a few constants for the simulation.

In [None]:
var rand = new Random();

//
// Speed and granularity of simulation.
//
var timeStep = TimeSpan.FromMinutes(15);
var simulationDelay = TimeSpan.FromMilliseconds(250);

//
// Absolute value of temperature gain/loss per unit time of the house adjusting to the outside temperature.
//
var insulationTemperatureIncrement = 0.1;

//
// Absolute value of temperature gain/loss per unit time due to the A/C unit cooling down or heating up.
//
var acTemperatureIncrement = 0.2;

//
// Temperature sensitivity of the thermostat to trigger turning off the A/C unit, i.e. within this range from target.
//
var thermostatSensitivity = 0.5;

//
// Configuration of simulation: minimum and maximum temperature outside, and coldest time of day.
//
var outsideMin = 55;
var outsideMax = 85;
var coldestTime = new TimeSpan(5, 0, 0); // 5AM

//
// Scale for the temperature range, to multiply [0..1] by to obtain a temperature value that can be added to the minimum.
//
var scale = outsideMax - outsideMin;

//
// Offset to the midpoint of the temperature range. Outside temperature will vary as a sine wave around this value.
//
var offset = outsideMin + scale / 2;

#pragma warning disable CA5394 // Do not use insecure randomness. (Okay for simulation purposes.)

//
// Random initial value inside, within the range of temperatures.
//
var inside = outsideMin + rand.NextDouble() * scale;

#pragma warning restore CA5394

//
// null if A/C unit is off; otherwise, target temperature.
//
var target = default(double?);

//
// Clock driven by the simulation.
//
var time = DateTime.Today;

In [None]:
interactive.registerCommandHandler({commandType: 'SmartHomeCommand', handle: c => {
    let visualisation = window.frames.visualisation; 
    
    if (visualisation)
    {
        visualisation.postMessage(c.command, '*');
    }
}});

In [None]:
using Microsoft.DotNet.Interactive;
using Microsoft.DotNet.Interactive.Commands;

public class SmartHomeCommand : KernelCommand
{
    public SmartHomeCommand(): base("javascript") {}
    public DateTime? Timestamp {get; set; }
    public Thermostat Thermostat {get; set; }
    public Temperature Temperature {get; set; }
    public ReaqtorStatus Reaqtor {get; set; }
}

public class Temperature 
{
    public double Inside {get; set; }
    public double Outside {get; set; }
}

public class Thermostat
{
    public string State {get; set; }
    public string Mode {get; set; }
}

public class ReaqtorStatus
{
    public ReaqtorState State {get; set; } = ReaqtorState.Off;
}

public enum ReaqtorState
{
    Off = 0,
    Starting = 1,
    Crashing = 2,
    FailingOver = 3,
    Recovered = 4,
    ShuttingDownGracefully = 5,
    CreatingSubscription = 6,
    DisposingSubscription = 7,
    Running = 8,
}

In [None]:
var jsKernel = Kernel.Root.FindKernel("javascript");
jsKernel.RegisterCommandType<SmartHomeCommand>();

Now we can write a simulator routine that will generate `reading`, and set up a subscription to the `settings` stream to show the results emitted by the Reaqtor query we'll construct later.

In [None]:
IDisposable SubscribeToSettingsStream()
{
    //
    // Print commands arriving at thermostat.
    //
    return settings.Subscribe(Observer.Create<(long sequenceId, double? item)>(s =>
    {
        target = s.item;
       
        Console.WriteLine($"STSS: {time} thermostat> {(target == null ? "OFF" : "ON " + (target > inside ? "heating" : "cooling") + " to " + target)}");

         var task = jsKernel.SendAsync(
             new SmartHomeCommand 
             {
               Timestamp = time,
               Thermostat = new Thermostat 
               {
                 State = (target == null ? "OFF" : "ON"),
                 Mode = (target > inside ? "heating" : "cooling")
               }
           });
    }));
}

Task RunReadingsGenerator(CancellationToken token)
{
    //
    // Run simulation which adjusts both inside and outside temperature.
    //
    return Task.Run(async () =>
    {
        while (!token.IsCancellationRequested)
        {
            var now = (time.TimeOfDay - coldestTime - TimeSpan.FromHours(6)).TotalSeconds;
            var secondsPerDay = TimeSpan.FromHours(24).TotalSeconds;

            var outside = scale * Math.Sin(2 * Math.PI * now / secondsPerDay) / 2 + offset;

            var environmentEffect = outside < inside ? -insulationTemperatureIncrement : insulationTemperatureIncrement;
            var acUnitEffect = target != null ? (target < inside ? -acTemperatureIncrement : acTemperatureIncrement) : 0.0;

            inside += environmentEffect + acUnitEffect;

            if (target != null && Math.Abs(target.Value - inside) < thermostatSensitivity)
            {
                target = null;
            }

            await jsKernel.SendAsync(
                new SmartHomeCommand 
                {
                  Timestamp = time,
                  Temperature = new Temperature 
                  {
                    Inside = inside,
                    Outside = outside
                  }
              });

            // Console.WriteLine($"RRG: {time} temperature> inside = {inside} outside = {outside} target = {target}");
            readings.OnNext((Environment.TickCount, new SensorReading { Room = "Hallway", Temperature = inside }));

            await Task.Delay(simulationDelay);
            time += timeStep;
        }
    });
}

In [None]:
<!DOCTYPE html>
<html>
<head>
    <title></title>
    <meta charset="utf-8" />
</head>
<body>
    <iframe name="visualisation" src="https://reaqtor-house.netlify.app/?auto=false" width="100%" height="500"></iframe>
</body>
</html>

In the next cell, we'll write a higher-order query using `Window` and `SelectMany`, and run the simulator and logger while the query is running.

In [None]:
var stopEventProducer = new CancellationTokenSource();

Console.WriteLine("Starting simulator for temperature sensor readings...");

var logger = SubscribeToSettingsStream();
var producer = RunReadingsGenerator(stopEventProducer.Token);

var subUri = new Uri("iot://reactor/subscription/BD/livingroom/comfy");

Console.WriteLine("Setting up query engine...");

await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.Starting }});

await WithEngine(async engine =>
{
    var ctx = new ReactorContext(engine);

    var input = ctx.GetObservable<string, SensorReading>(new Uri("iot://reactor/observables/ingress"));
    var output = ctx.GetObserver<string, double?>(new Uri("iot://reactor/observers/egress"));

    var readings = input("bart://sensors/home/livingroom/temperature/readings");
    var settings = output("bart://sensors/home/livingroom/temperature/settings");

    Console.WriteLine("Creating subscription...");
    await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.CreatingSubscription }});

    await readings.Window(4).SelectMany(w => w.Average(r => r.Temperature)).Select(t => t < 70 || t > 80 ? 75 : default(double?)).DistinctUntilChanged().SubscribeAsync(settings, subUri, null, CancellationToken.None);

    await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.Running }});

    // Run for a bit.
    await Task.Delay(TimeSpan.FromSeconds(30));

    await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.FailingOver }});

    Console.WriteLine("Engine failing over... Note we'll continue to see the producer's `temperature>` traces, but no `thermostat>` outputs.");
});

await Task.Delay(TimeSpan.FromSeconds(2));

await WithEngine(async engine =>
{
    await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.Recovered }});
    Console.WriteLine("Engine recovered!");

    var ctx = new ReactorContext(engine);

    await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.Running }});
    
    // Run for a bit more.
    await Task.Delay(TimeSpan.FromSeconds(30));

    await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.DisposingSubscription }});
    Console.WriteLine("Disposing subscription...");

    await ctx.GetSubscription(subUri).DisposeAsync();
});

await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.ShuttingDownGracefully }});
Console.WriteLine("Stopping simulator...");

stopEventProducer.Cancel();
producer.Wait();

logger.Dispose();

await jsKernel.SendAsync(new SmartHomeCommand { Reaqtor = new ReaqtorStatus { State = ReaqtorState.Off }});

Console.WriteLine("Done!");