Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ActorFailure Clarification #84

Closed
bryang-spindance opened this issue Jan 24, 2022 · 6 comments
Closed

ActorFailure Clarification #84

bryang-spindance opened this issue Jan 24, 2022 · 6 comments

Comments

@bryang-spindance
Copy link

bryang-spindance commented Jan 24, 2022

From the Using.pdf (page 13 section 2.2.5) documentation's Actor Failure section:

If the Actor code handling a message fails, the ActorSystem will automatically handle that failure and restart the Actor. For this reason, Actor-based code is typically much simpler than conventional code because it does not have the overhead of lots of exception checking and error condition handling.

When writing Actor code, the general philosophy is "fail early"; the ActorSystem itself will restart the Actor in the event of a failure and possibly retry the message or abort that specific message. See PoisonMessage Returns for more details.

According to this, the actor will be restarted following a failure. However I am unsure of what is meant by restarting the Actor in this context.

I set up some test code with a parent and a child actor.

The parent actor code is below:

from thespian.actors import ActorTypeDispatcher
import child
import random

class Parent(ActorTypeDispatcher):
    failed = False
    def __init__(self, *args, **kwargs):
        super(Parent, self).__init__(*args, **kwargs)

    def receiveMsg_Initialize(self, msg, sender):
        self.child_actor = self.createActor(child.Child)
        x = random.randint(0, 100)
        self.send(self.child_actor, child.Initialize(x=x))
        self.wakeupAfter(timePeriod=1)

    def receiveMsg_WakeupMessage(self, msg, sender):
        if self.failed is False:
            self.send(self.child_actor, child.FailMessage())
            self.failed = True
            self.wakeupAfter(timePeriod=1)
        else:
            self.send(self.child_actor, child.PrintValues())

    def receiveMsg_PoisonMessage(self, msg, sender):
        self.send(self.child_actor, child.PrintValues())

...and the child code:

from thespian.actors import ActorTypeDispatcher
import random

class FailMessage:
    def __init__(self):
        pass

class PrintValues:
    def __init__(self):
        pass

class Initialize:
    def __init__(self, x=None) -> None:
        self.x = x

class Child(ActorTypeDispatcher):
    x1 = None
    x2 = None
    def __init__(self, *args, **kwargs):
        super(Child, self).__init__(*args, **kwargs)
        x1 = random.randint(0, 100)
        self.x1 = x1
        print(f'__init__: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')

    def receiveMsg_Initialize(self, msg, sender):
        self.x2 = msg.x
        print(f'Initialize: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')

    def receiveMsg_FailMessage(self, msg, sender):
        print('Forcing exception in child!')
        raise ValueError

    def receiveMsg_PrintValues(self, msg, sender):
        print(f'PrintValues: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')

When running this code, the child actor will print out its class' address in memory and the two random values; x1 being set in the class __init__ and x2 being set after receiving the Initialize message.

In this example, all three of class address, x1, and x2 have the same values after the actor has failed and restarted. From the documentation, I had assumed that when an actor restarts, it would essentially run the createActor function while retaining the previous ActorAddress however that doesn't appear to be the case since the __init__ function isn't being run again.

Here is my current understanding of the Actor Failure process:

  1. Message is sent to an actor.
  2. receiveMsg function raises exception in called actor.
  3. PoisonMessage returned to sender and message is sent a second time.
  4. receiveMsg function raises exception again in the called actor.
  5. PoisonMessage returned to sender again.
  6. Failing actor is restarted by unknown means.
  7. Failing actor's ActorAddress is retained after restart. Sort of like a rollback of the actor's state to just before the exception was raised, maybe?

So what I'm looking for is a more in depth explanation of HOW actors restart upon an Actor Failure since the documentation is pretty vague.

@kquick
Copy link
Owner

kquick commented Jan 24, 2022

You are correct: the documentation is vague and needs some cleanup and clarifications.

In regards "failure and retry", the failure is any uncaught Exception, and the "ActorSystem will automatically restart" is not quite correct; the latter should have said "a failure in one or more Actors is handled elsewhere in the actor system" and was primarily intended to encourage a different perspective on writing Actors but ended up over-promising. I will correct the documentation in these areas, but let me provide some more details and describe some situations where the above is true:

In regards to your understanding, 1 and 2 are correct, but 3 does not send a PoisonMessage, simply sends the message a second time (or more precisely, the Thespian internal wrapper for an Actor calls the receiveMessage again with the message). Steps 4 and 5 are correct (although 5 is the first and only PoisonMessage send). At that point, there are no more attempts to deliver that message, but subsequent messages are still delivered.

If the Actor failure is not just an exception but is fatal to the process then a multiproc_ base will detect that failure and send a ChildActorExitedmessage to the parent actor (this message is also sent when a child actor terminates by normal means, such as receiving an ActorExitRequest message). The parent actor can then make the decision about whether a new instance of the child should be created.

In general, it is usually not possible to guarantee that the same address can be set for the replacement Child for most transports, so there's no provision for retaining the same address. This is where the globalAddress can be useful, or an actor that manages address registrations.

One mechanism whereby Actors which fail by process failure can be restarted via Thespian itself is to use the thespian.troupe (https://thespianpy.com/doc/using.html#hH-c796bc0d-a150-4fb9-8918-58f470813175). Another way is to use the Thespian Director and the TLI configuration file (https://thespianpy.com/doc/director.html#h:fd910797-75e7-42c7-ae9d-1ae87a4040b8) to specify actors that must be present in the system. The Director will create new instances (via createActor) of a TLI-specified actor any time one is not running and update the GlobalName as needed to point to that running actor.

Please let me know if this helps answer your questions or if you would like additional information. Thank you for the clear report and accompanying code, and I will leave this issue open until I have the opportunity to update the documentation.

@bryang-spindance
Copy link
Author

bryang-spindance commented Jan 24, 2022

Thank you for the quick response!

I think this answers all my questions. I'll take a deeper look into the Director suggestion as that seems like it might be a good fit for my application.

P.S.
In the using.pdf documentation, the link to the Thespian Director documentation sends me to a 404 Error. The link in that document is https://thespianpy.com/doc/director.org instead of https://thespianpy.com/doc/director.pdf. The link you provided in your reply works, this is just something I noticed while reviewing using.pdf earlier.

Update: Also in the director.pdf documentation, the link to the Using Thespian documentation sends me to https://thespianpy.com/doc/using.org which is also a 404.

@bryang-spindance
Copy link
Author

bryang-spindance commented Jan 25, 2022

I suppose I do have one more quick question, and let me know if you want me to create a new issue with this question instead of putting it here.

In the example code above, if I were to kill the child actor's process ID using kill -9 12345, should I expect a ChildActorExited message in the parent actor immediately or will I only receive that message the next time a message is sent to the child?

In the thespian example code (multi-proc Act4), the ChildActorExited message is only sent to the parent on the next instance of a message being sent to the killed child actor. Is this the intended behavior or is something set up incorrectly in my environment?

The documentation doesn't mention this behavior anywhere so from reading it I had assumed that this message would be sent immediately following a child process exiting but that doesn't appear to be the case.

I need the ChildActorExited message as soon as a process fails since one of the child actors in my projects works based on messages from a subscribed MQTT topic. Thus if this child actor dies, the parent wouldn't ever be notified since the child isn't being sent any messages from other actors.

My naive approach to getting the ChildActorExited message when a process is killed would be to set up a "heartbeat" using a WakeupMessage for each child actor that simply sends a message to the child actor on an interval. However if there is something built in to the thespian system to handle this type of issue I'd much rather use that instead.

Thank you again for your time and assistance.

@bryang-spindance
Copy link
Author

bryang-spindance commented Jan 25, 2022

Here is some code I mocked up to explain my naive approach to the ChildActorExited issue.

  1. Child actor initialized and Parent actor registered as DeadLetter Handler.
  2. wakeupAfter is set up on Parent
  3. On WakeupMessage, Parent sends heartbeat message to child along with a unique identifier (UID) for the heartbeat.
    • This unique identifier is appended to a list of heartbeat UIDs
    • Another wakeupAfter is set up on Parent. (this continues for lifetime of Parent actor)
  4. Child returns heartbeat to sender along with the UID it received.
  5. Parent receives heartbeat response. UID is associated with a previously send heartbeat.
    • If UID exists in list of UIDs, remove from list. Otherwise ignore the message.
  6. Child's process if manually killed with kill -9 12345
  7. On WakeupMessage Parent sends another heartbeat to child along with a UID.
  8. Parent receives DeadEnvelope message.
    • If DeadEnvelope.deadMessage is <class child.Heartbeat> and number of UIDs in list is greater than threshold (5 in this example), the child actor is restarted.

Parent Code:

from thespian.actors import *
import child
import random
from datetime import timedelta
import logging as log
import uuid

class Initialize:
    def __init__(self):
        pass

class Parent(ActorTypeDispatcher):
    heartbeats = []

    def __init__(self, *args, **kwargs):
        super(Parent, self).__init__(*args, **kwargs)

    def receiveMsg_Initialize(self, msg, sender):
        # Register as dead letter handler
        self.handleDeadLetters()

        self.child_actor = self.createActor(child.Child)
        x = random.randint(0, 100)
        self.send(self.child_actor, child.Initialize(x=x))

        self.wakeupAfter(timePeriod=1)

    def receiveMsg_WakeupMessage(self, msg, sender):
        uid = str(uuid.uuid4())
        self.heartbeats.append(uid)
        self.send(self.child_actor, child.Heartbeat(uid=uid))
        log.debug(f'Sending heartbeat: {uid}')
        self.wakeupAfter(timePeriod=1)

    def receiveMsg_Heartbeat(self, msg, sender):
        log.debug(f'Received heartbeat: {msg.uid}')
        if msg.uid in self.heartbeats:
            idx = self.heartbeats.index(msg.uid)
            if idx == len(self.heartbeats) - 1:
                # If most recent heartbeat received, clear list
                self.heartbeats = []
            else:
                # Otherwise, remove the received heartbeat only.
                self.heartbeats.pop(idx)

    def receiveMsg_DeadEnvelope(self, msg, sender):
        if type(msg.deadMessage) == child.Heartbeat and msg.deadAddress == self.child_actor:
            if len(self.heartbeats) > 5: # fail after 5 missed hearbeats
                log.critical(f'Restarting child actor after 5 missed heartbeats.')
                old_address = self.child_actor
                self.send(old_address, ActorExitRequest()) # attempt clean shutdown
                self.child_actor = self.createActor(child.Child)
                self.heartbeats = []
                self.send(self.child_actor, child.Initialize(x=random.randint(0, 100)))

    def receiveMsg_str(self, msg, sender):
        self.send(self.child_actor, child.PrintValues())

    def receiveMsg_PoisonMessage(self, msg, sender):
        self.send(self.child_actor, child.PrintValues())
    
    def receiveMsg_ChildActorExited(self, msg, sender):
        log.critical(f'ChildActorExited: {msg.childAddress}')

    def receiveMsg_ActorExitRequest(self, msg, sender):
        # Unset as handler for dead letters
        self.handleDeadLetters(False)

Child Code:

from thespian.actors import *
import random
import os
import logging as log

class Heartbeat:
    def __init__(self, uid):
        self.uid = uid

class FailMessage:
    def __init__(self):
        pass

class PrintValues:
    def __init__(self):
        pass

class Initialize:
    def __init__(self, x=None) -> None:
        self.x = x

class Child(ActorTypeDispatcher):
    x1 = None
    x2 = None
    
    def __init__(self, *args, **kwargs):
        super(Child, self).__init__(*args, **kwargs)
        x1 = random.randint(0, 100)
        self.x1 = x1
        log.debug(f'__init__: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')

    def receiveMsg_Initialize(self, msg, sender):
        self.x2 = msg.x
        log.debug(f'Initialize: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')

    def receiveMsg_Heartbeat(self, msg, sender):
        self.send(sender, Heartbeat(uid=msg.uid))

    def receiveMsg_FailMessage(self, msg, sender):
        log.debug('Forcing exception in child!')
        raise ValueError

    def receiveMsg_PrintValues(self, msg, sender):
        log.debug(f'PrintValues: addr={hex(id(self))}, x1={self.x1}, x2={self.x2}')

@kquick
Copy link
Owner

kquick commented Feb 7, 2022

I've updated and published the documentation that should be a bit clearer in the areas you identified. Thanks for helping identify the ambiguities in the docs!

Thanks for the notes about the links in the PDF files. Unfortunately, it appears the org-mode processing doesn't do any adjustment of the file extension when exporting to LaTeX mode (which makes sense, since it cannot now what the ultimate format is, but unfortunately there's no way I can find to provide this information either). The hyperlinks should work well for the HTML form of the document and will look at filing an issue with org-mode-latex-export to handle this.

With regards to your updated code examples, I don't see anything particularly objectionable about them. I would suggest that perhaps you can also restart the child when you receive the ChildActorExited message as an alternative to waiting for 5 failed heartbeats.

However, I'm also not entirely sure what your scenario is where you don't see the ChildActorExited delivered immediately when the child exits. I do see this being delivered immediately, running with the multiprocTCPBase on a Linux system. I see the ChildActorExited delivered for both a SIGKILL/-9 signal or for a more customary SIGTERM/15 signal delivery.

@bryang-spindance
Copy link
Author

@kquick Yeah I realized afterward that those .org links work fine in the github previews.

I have also created a new issue for the ChildActorExited problem to avoid clutter.
It can be found here #85

Thank you again for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants