Skip to content

Process system

Paul Louth edited this page Mar 25, 2017 · 8 revisions

What's the Actor model?

The Echo Process system uses a theory of computation called the Actor Model.

  • An actor is a single threaded process
  • It has its own blob of state that only it can see and update
  • It has a message queue (inbox)
  • It processes the messages one at a time (single threaded remember)
  • When a message comes in, it can change its state; when the next message arrives it gets that modifiied state.
  • It has a parent Actor
  • It can spawn child Actors
  • It can tell messages to other Actors
  • It can ask for replies from other Actors
  • They're very lightweight, you can create 10,000s of them no problem

So you have a little bundle of self contained computation, attached to a blob of private state, that can get messages telling it to do stuff with its private state. Sounds like OO right? Well, it is, but as Alan Kay envisioned it. The slight difference with this is that it enforces execution order, and therefore there's no shared state access, and no race conditions (within the actor).

Distributed

The messages being sent to actors can also travel between machines, so now we have distributed processes too. This is how to send a message from one process to another on the same machine using Echo.Process:

    tell(processId, "Hello, World!");

Now this is how to send a message from one process to another on a different machine:

    tell(processId, "Hello, World!");

It's the same. Decoupled, thread-safe, without race conditions. What's not to like?

How?

Sometimes this stuff is just easier by example, so here's a quick example, it spawns three processes, one logging process, one that sends a 'ping' message and one that sends a 'pong' message. They schedule the delivery of messages every 100 ms. The logger is simply: Console.WriteLine:

    // Log process
    var logger = spawn<string>("logger", Console.WriteLine);

    // Ping process
    ping = spawn<string>("ping", msg =>
    {
        tell(logger, msg);
        tell(pong, "ping", TimeSpan.FromMilliseconds(100));
    });

    // Pong process
    pong = spawn<string>("pong", msg =>
    {
        tell(logger, msg);
        tell(ping, "pong", TimeSpan.FromMilliseconds(100));
    });

    // Trigger
    tell(pong, "start");

Purely functional programming without the actor model at some point needs to deal with the world, and therefore needs statefullness. So you end up with imperative semantics in your functional expressions (unless you use Haskell).

Now you could go the Haskell route, but I think there's something quite perfect about having a bag of state that you run expressions on as messages come in. Essentially it's a fold over a stream.

There are lots of Actor systems out there, so what makes this any different? Obviously I wanted to create some familiarity, so the differences aren't huge, but they exist. The things that I felt I was missing from other Actor systems was that they didn't seem to acknowledge anything outside of their system. Now I know that the Actor model is supposed to be a self contained thing, and that's where its power lies, but in the real world you often need to get information out of it and you need to interact with existing code: declaring another class to receive a message was getting a little tedious. So what I've done is:

  • Remove the need to declare new classes for processes (actors)
  • Added a publish system to the processes
  • Made process discovery simple
  • Made a 'functional first' API

Functions if you want them

If your process is stateless then you just provide an Action<TMsg> to handle the messages, if your process is stateful then you provide a Func<TState> setup function, and a Func<TState,TMsg, TState> to handle the messages (any seasoned functional programmer will notice that is the signature of a fold). This makes it easy to create new processes and reduces the cognitive overload of having loads of classes for what should be small packets of computation.

You still need to create classes for messages and the like, that's unavoidable (Use F# to create a 'messages' project, it's much quicker and easier). But also, it's desirable, because it's the messages that define the interface and the interaction, not the processing class.

Creating something to log string messages to the console is as easy as:

    ProcessId log = spawn<string>("logger", Console.WriteLine);

    tell(log, "Hello, World");

Or how about a stateful, thread-safe cache?

Let's set up some includes first:

using System;
using LanguageExt;
using static LanguageExt.Prelude;
using static Echo.Process;
using Echo;

using CacheState = LanguageExt.Map<string, (string value, System.DateTime time)>;

Notice CacheState at the end. This isn't required, but it will make the examples below more concise and easier to parse.

Next we create some message types to represent some of the actions:

public class Add    : NewType<Add, (string, string)> { public Add((string Key, string Value) pair)   : base(pair) { } }
public class Get    : NewType<Get, (string, string)> { public Get((string Key, string Default) pair) : base(pair) { } }
public class Remove : NewType<Remove, string>        { public Remove(string key)                     : base(key) { } }
public class Show   : NewType<Show, string>          { public Show(string key)                       : base(key) { } }
public class GetCount { }

This is using the language-ext NewType system to make it easy to define types with one value (it's based on a feature in Haskell of the same name). The Add and the Get use tuples for their value.

Next we define the setup and inbox functions that handle the messages:

    public static class CacheProcess
    {
        public static CacheState Setup() => default(CacheState);

        public static CacheState Inbox(CacheState state, object msg) =>
            msg is Add a      ? Add(state, ((string, string))a)
          : msg is Remove r   ? Remove(state, r)
          : msg is Show s     ? Show(state, s)
          : msg is Get g      ? Get(state, ((string, string))g)
          : msg is int i      ? GetIndex(state, i)
          : msg is Unit _     ? GetCount(state)
          : msg is DateTime d ? Flush(state, d)
          : state;
    }

Notice how the msg is of type object. Normally it's better to use a base class for your messages, especially if you're sending over the wire, but for now this is a good example of the flexibility of the system, as we can send an int to get an item at the integer index, or Unit to get the current number of items in the cache, or a DateTime as a cut-off time for a cache flush operation.

So back to our CacheProcess class, let's add the functions to handle the messages:

static CacheState Add(CacheState state, (string key, string value) pair) =>
    state.AddOrUpdate(pair.key, (pair.value, DateTime.UtcNow));

static CacheState Remove(CacheState state, Remove msg) =>
    state.Remove((string)msg);

static CacheState GetIndex(CacheState state, int index) =>
    state.Skip(index)
         .HeadOrNone()
         .Match(
             Some: item =>
             {
                 reply(item.Value.value);
                 return state.SetItem(item.Key, (item.Value.value, DateTime.UtcNow));
             },
             None: () => state);

static CacheState GetCount(CacheState state)
{
    reply(state.Count);
    return state;
}

static CacheState Get(CacheState state, (string Key, string DefaultValue) pair) =>
    state.Find(pair.Key)
         .Match(
            Some: item =>
            {
                reply(item.value);
                return state.SetItem(pair.Key, (item.value, DateTime.UtcNow));
            },
            None: () =>
            {
                reply(pair.DefaultValue);
                return state;
            });

static CacheState Show(CacheState state, Show msg)
{
    Console.WriteLine(state.Find((string)msg)
                           .Map(toString)
                           .IfNone($"Item doesn't exist '{msg}'"));
    return state;
}

static CacheState Flush(Map<string, (string, DateTime time)> state, DateTime cutOff) =>
    state.Filter(item => item.time < cutOff);

They should hopefully all be relatively self explanatory. The main thing to look out for is the reply function which sends replies back to any process that asks. And note how the Inbox manages a state of CacheState which we defined as Map<string, (string, DateTime)>. This state is maintained and passed back in for subsequent messages. The (string, DateTime) is a tuple of the cached value and it's last accessed time. Notice how Get and GetIndex update the time-stamp as well as replying with the value.

We then need to start up the system and spawn our cache:

// Once only call to initialise the Process system
ProcessConfig.initialise();

// Spawn the Cache process with an empty Map as its initial state
var pid = spawn<CacheState, object>("cache", CacheProcess.Setup, CacheProcess.Inbox);

The "cache" process is now spawned at /root/user/cache. The pid variable is a ProcessId, which is just a wrapped string path, so you can serialise it and pass it around, and then anything can find and communicate with your cache:

    // Add a new item to the cache
    tell(pid, Add.New(("hello", "world")));

    // Get an item from the cache
    var thing = ask<string>(pid, Get<string, string>.New(("hello", "")));

    // Find the number of items
    var count = ask<int>(pid, unit);

    // Remove an item from the cache
    tell(pid, Remove.New("hello"));

    // Add several items
    tell(pid, Add.New(("a", "1")));
    tell(pid, Add.New(("b", "2")));
    tell(pid, Add.New(("c", "3")));

    // Get the item at index 0
    var item0 = ask<string>(pid, 0);

    // Get the item at index 1
    var item1 = ask<string>(pid, 1);

    // Get the item at index 2
    var item2 = ask<string>(pid, 2);

Obviously that's a little cluttered because of all the instantiation of messages. So I find it's good to wrap them in a light static class wrapper:

    public static class Cache
    {
        public static Unit Add(ProcessId pid, string key, string value) =>
            tell(pid, Caching.Add.New((key, value)));

        public static string Get(ProcessId pid, string key, string defaultValue) =>
            ask<string>(pid, Caching.Get.New((key, defaultValue)));

        public static Unit Remove(ProcessId pid, string key) =>
            tell(pid, Caching.Remove.New(key));

        public static Unit Show<K>(ProcessId pid, string key) =>
            tell(pid, Caching.Show.New(key));

        public static string ItemAt(ProcessId pid, int index) =>
            ask<string>(pid, index);

        public static int Count(ProcessId pid) =>
            ask<int>(pid, unit);

        public static Unit Flush(ProcessId pid, DateTime cutOff) =>
            tell(pid, cutOff);
    }

And now everything starts to make sense:

    // Add a new item to the cache
    Cache.Add(pid, "hello", "world");

    // Get an item from the cache
    var thing = Cache.Get(pid, "hello", "");

    // Find the number of items
    var count = Cache.Count(pid);

    // Remove an item from the cache
    Cache.Remove(pid, "hello");

    // Add several items
    Cache.Add(pid, "a", "1");
    Cache.Add(pid, "b", "2");
    Cache.Add(pid, "c", "3");

    // Get items at a specific index
    var item0 = Cache.ItemAt(pid, 0);
    var item1 = Cache.ItemAt(pid, 1);
    var item2 = Cache.ItemAt(pid, 2);

Periodically you will probably want to flush the cache contents. Just fire up another process, they're basically free (and by using functions rather than classes, very easy to put into little worker modules):

    public void SpawnCacheFlush(ProcessId cache)
    {
        // Spawns a process that tells the cache process to flush, and then sends
        // itself a message in 10 minutes which causes it to run again.
        
        var flush = spawn<Unit>(
            "cache-flush", _ =>
            {
                Cache.Flush(cache, DateTime.AddMinutes(-10));
                tellSelf(unit, TimeSpan.FromMinutes(10));
            });

        // Start the process running
        tell(flush, unit); 
    }

Classes if you want them

For those that actually prefer the class based approach - or would at least prefer the class based approach for the larger/more-complex processes with a large API surface (and therefore more message class types that need defining) then there is an interface proxy system. The previous Cache is a good example of where there's quite bit of boiler-plate because of C#'s lack of discriminated unions.

So to implement the same system with proxies we must first create an interface. This is shared throughout the app and/or cluster, and this is what the proxy system builds its proxy from.

    public interface ICache
    {
        void Add(string key, string value);
        void Remove(string key);
        void Show(string key);
        string Get(string key);
        string ItemAt(int index);
        int Count();
        void Flush(DateTime cutOff);
    }

We then need an implementation, so let's do that:

    public class Cache : ICache
    {
        CacheState state;

        public void Add(string key, string value) =>
            state = state.Add(key, (value, DateTime.UtcNow));

        public int Count() =>
            state.Count;

        public void Flush(DateTime cutOff) =>
            state = state.Filter(item => item.time < cutOff);

        public string Get(string key) =>
            (from pair in state.Find(key)
             let _ = state = Touch(state, key)
             select pair.value)
            .IfNone("");

        public string ItemAt(int index) =>
            (from pair in state.Skip(index).HeadOrNone()
             select Get(pair.Key))
            .IfNone("");

        public void Remove(string key) =>
            state = state.Remove(key);

        public void Show(string key) =>
            Console.WriteLine(state.Find(key).Map(toString).IfNone(""));

        public CacheState Touch(CacheState state, string key) =>
            state.TrySetItem(key, pair => (pair.value, DateTime.UtcNow));
    }

Next we need to spawn the cache Process:

    var pid = spawn<Cache>("cache");

Pass the concrete type, not the interface. There is a variant where you can pass the interface, but you must provide a setup function the invokes the concrete object.

From anywhere in your ecosystem you can call the following:

    var proxy = proxy<ICache>(pid);

And it will build a proxy for ICache that sends messages to pid. So as long as you know the interface for a type and the ProcessId you can communicate with it regardless of where it is running.

It is then trivial to invoke:

    // Add a new item to the cache
    proxy.Add("hello", "world");

    // Get an item from the cache
    var thing = proxy.Get("hello");

    // Find the number of items
    var count = proxy.Count();

    // Remove an item from the cache
    proxy.Remove("hello");

    // Add several items to the cache
    proxy.Add("a", "1");
    proxy.Add("b", "2");
    proxy.Add("c", "3");

    // Get items at a specific index
    var item0 = proxy.ItemAt(0);
    var item1 = proxy.ItemAt(1);
    var item2 = proxy.ItemAt(2);

    // Flush the cache
    proxy.Flush(DateTime.Now);

You could continue to use a stand-alone flush process, but it would need to use the proxy to communicate:

    var flush = spawn<Unit>(
        "cache-flush", _ =>
        {
            proxy<ICache>(pid).Flush(DateTime.UtcNow.AddMinutes(-10));
            tellSelf(unit, TimeSpan.FromMinutes(10));
        });

It looks a lot more like classic OO programming. Which is fine as well, ultimately each method is performing the role of a message-type in the function-based system. The arguments are the fields, and the object itself is the state. This is a pretty natural system for an OO programmer. I think it's worth using if you want a large API surface, but if you're keeping your processes light then I personally much prefer the function based approach.

The proxy can be built from anywhere, the Process system will auto-generate a concrete implementation for the interface that will dispatch to the Process specified. It also type checks your interface against what the actual Process is running adding an extra bit of type-safety to the procedure.

If you only need to work with the Process locally, then you can short-cut and go straight to the proxy:

    ICache cache = spawn<ICache>("cache", () => new Cache());

With the proxy approach we are back in the imperative world. But in some circumstances it is more valuable. If you imagine that each method on ICache is actually an inbox handler, you'll realise we still have the protection of single-threaded access and so the mutable nature of the internal state isn't the concern it would be if it was a regular class.

As you can see that's a pretty powerful technique. Remember the process could be running on another machine, and as long as the messages serialise you can talk to them by process ID or via proxy.

Publish system

Most other actor systems expect you to tell all messages directly to other actors. If you want a pub-sub model then you're required to create a publisher actor that can take subscription messages from other actors; the publisher actor then manages a 'registry' of subscribers to deliver messages to. It's all a bit bulky and unnecessary.

So with LanguageExt.Process each process manages its own internal subscriber list. If a process needs to announce something it calls:

    // Publish a message for anyone listening
    publish(msg);

Another process can subscribe to that by calling:

    subscribe(processId);

(The subscriber can do this in its setup phase, and the process system will auto-unsub when the process dies, and auto-resub when it restarts)

This means the messages that are published by one process can be consumed by any number of others (via their inbox in the normal way).

However, sometimes you want to jump outside of that system. For example, if your code is outside of the process system, it can get an IObservable stream instead:

    var sub = observe<Thing>(processId).Subscribe(msg => ...);

A good example of this is the 'Dead Letters' process, it gets all the messages that failed for one reason or another (serialisation problems, the process doesn't exist, the process crashed, etc.). All it does is call publish(msg), which allows you to subscribe to it for logging purposes. This is how it's defined:

    var deadLetters = spawn<DeadLetter>("dead-letters",publish);

That's it! For a key piece of infrastructure. So it's then possible to easily listen and log issues, or hook it up to a process that persists the dead letter messages.

'Discoverability'

Being able to find other Processes in a cluster (or within the same AppDomain) and dispatch or route to them is essential. There's a supervision hierarchy, where you have a root process, then a child user process under which you create your processes, and in turn they create child processes creating a tree structure with which you can use to route messages locally.

There's also system process under root that handles stuff like dead-letters and various other housekeeping tasks.

    /root/user/...
    /root/system/dead-letters
    etc.

When you create a Redis cluster connection the second argument is the name of the node in the 'cluster' (i.e. the name of the app/service/website, whatever it is). The third argument is the role of the node in the cluster (see Role.Broadcast, Role.LeastBusy, Role.Random, Role.RoundRobin, Role.First - for cluster dispatch methods). There is a static property Process.ClusterNodes that allows you to interrogate the nodes are online and what their role is.

    RedisCluster.register();
    ProcessConfig.initialise("sys", "web-front-end", "web-front-end-1", "localhost", "0");
  • "sys" is the 'system name', but easier to think of it as the name of the cluster as a whole. That means you can call it with a different value and point it at another Redis db for multiple clusters. But for now it's easier to call it sys and leave it.
  • "web-front-end" is the role, you can have multiple nodes in a role and the role based dispatchers allow you to implement high-availability and load balancing strategies.
  • "web-front-end-1" is the name of this node, and should be unique in the cluster
  • "localhost" is the Redis connection (can be comma separated for multiple Redis nodes)
  • "0" is the Redis catalogue to use ("0" - "15")

Then instead of having root as the top level Process in your hierarchy, you have my-stuff:

    /web-front-end-1/user/...
    /web-front-end-1/system/dead-letters

Therefore you know where things are, and what they're called, and they're easily addressable. You can just 'tell' the address:

    tell("/web-front-end-1/user/hello", "Hello!");

Or you can use the ProcessId API to build the path:

   ProcessId a = "/web-front-end-1/user/hello";
   ProcessId b = ProcessId.Top["web-front-end-1"]["user"]["hello"];
   // a == b

Even that isn't great if you don't know what the name of the 'app' that is running a Process. So processes can register by a single name, that goes into a 'shared namespace'. It's a kind of DNS for processes:

    /disp/reg/<name>

To register:

    register(myProcessId, "hello-world");

This goes in:

    /disp/reg/hello-world

Your process now has two addresses, the /my-stuff/user/hello-world address and the /disp/reg/hello-world address that anyone can find by calling find("hello-world"). This makes it very simple to bootstrap processes and get messages to them even if you don't know what system is actually dealing with it:

    tell(find("hello-world"), "Hi!");

Along with routers, dispatchers and roles the ability to find, route and dispatch to other nodes in the cluster is trivial. For a full discussion on routing, roles and dispatchers see here

Persistence

There is an ICluster interface that you can use the implement your own persistence layer. However out of the box there is persistence to Redis (using LanguageExt.Process.Redis).

You can optionally persist:

  • Inbox messages
  • Process state

Here's an example of persisting the inbox:

    var pid = spawn<string>("redis-inbox-sample", Inbox, ProcessFlags.PersistInbox);

Here's an example of persisting the state:

    var pid = spawn<string>("redis-inbox-sample", Inbox, ProcessFlags.PersistState);

Here's an example of persisting both:

    var pid = spawn<string>("redis-inbox-sample", Inbox, ProcessFlags.PersistAll);

Process system documentation

Clone this wiki locally