Skip to content

Message going AWOL #38

@andatt

Description

@andatt

Hi Kevin

Sorry I didn't get back the previous logging ticket yet, just haven't had time. Another issue has come up in the meantime. I am sending batches of 100 messages between actors. For some batches the first message of the batch is sent by the first actor (as confirmed by logs) but is never received by the second actor (again confirmed by logs). All the rest of the messages in the batch are sent and received successfully.

I am pretty stumped right now as I have tried to set the THESPLOG_THRESHOLD env var to "Debug" and "Info" but there is nothing in /tmp/ dir (i.e. thespian.log does not exist). I have written some simplified code to try and reproduce what happens in my main system but so far the simplified code works fine.

Again this is running inside a docker container. My simplified code is below but as I said this runs fine without dropping any messages. Anything you can suggest to help me get more information from the Thespian internal logging for example?

Thanks

Andrew

from thespian.troupe import troupe
from thespian.actors import ActorTypeDispatcher
from thespian.actors import ActorSystem
from thespian.actors import WakeupMessage
import logging


class ActorLogFilter(logging.Filter):
    def filter(self, logrecord):
        return 'actorAddress' in logrecord.__dict__


class NotActorLogFilter(logging.Filter):
    def filter(self, logrecord):
        return 'actorAddress' not in logrecord.__dict__


def log_config(log_file_path_1, log_file_path_2):
    return {
        'version': 1,
        'formatters': {
            'normal': {'format': '%(levelname)-8s %(message)s'},
            'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
        'filters': {'isActorLog': {'()': ActorLogFilter},
                    'notActorLog': {'()': NotActorLogFilter}},
        'handlers': {'h1': {'class': 'logging.FileHandler',
                            'filename': log_file_path_1,
                            'formatter': 'normal',
                            'filters': ['notActorLog'],
                            'level': logging.INFO},
                     'h2': {'class': 'logging.FileHandler',
                            'filename': log_file_path_2,
                            'formatter': 'actor',
                            'filters': ['isActorLog'],
                            'level': logging.INFO}, },
        'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
    }


class PrimaryActor(ActorTypeDispatcher):

    def receiveMsg_WakeupMessage(self, msg ,sender):

        if not hasattr(self, "batch_number"):
            self.submission_actor_pool_size = 2
            self.batch_number = 0
            self.last_secondary_actor_used = -1
            self.secondary_actors = self.create_secondary_actor_pool(
                SecondaryActor,
                self.submission_actor_pool_size
            )
        next_submission_actor_to_use = self.last_secondary_actor_used + 1
        for x in range(0, 100):
            message = {"number": x, "batch": self.batch_number}
            logging.info("Sending message number {0} {1}".format(self.batch_number, x))

            self.send(self.secondary_actors[next_submission_actor_to_use], message)

        self.batch_number += 1

        if self.batch_number >= 5:
            return

        if next_submission_actor_to_use > self.submission_actor_pool_size - 1:
            next_submission_actor_to_use = -1

        self.last_submission_actor_used = next_submission_actor_to_use
        self.wakeupAfter(1)

    def create_secondary_actor_pool(self, actor_code, pool_size):
        submission_actor_pool = []
        for x in range(0, pool_size):
            submission_actor_pool.append(self.createActor(actor_code))
        return submission_actor_pool


@troupe(max_count=4000, idle_count=1)
class SecondaryActor(ActorTypeDispatcher):

    def receiveMsg_dict(self, msg, sender):

        logging.info("Received message number {0} {1}".format(msg["batch"], msg["number"]))

thespian_system = ActorSystem(
    "multiprocTCPBase",
    {},
    logDefs=log_config("bug_check_1.log", "bug_check_2.log")
)

primary_actor = thespian_system.createActor(PrimaryActor)

thespian_system.tell(primary_actor, WakeupMessage(delayPeriod=1))

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions