Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate data frame writes in Aggregator agent #366

Merged
merged 7 commits into from
Jan 6, 2024

Conversation

BrianJKoopman
Copy link
Member

@BrianJKoopman BrianJKoopman commented Jan 5, 2024

Description

This PR adds an unsubscribe step to the OCSAgent class' onJoin method. We necessarily cache the Subscription objects to do so. This avoids any duplicate subscriptions that might be left over after a disconnect and re-connection to the crossbar server.

In my testing I've noticed some strange behavior of the subscriptions depending on the nature of the crossbar disconnection. The two possible outcomes of resubscribing without this patch are:

  1. Add a duplicate subscription with identical subscription_id to the existing subscription.
  2. Gain a second subscription with new, unique, subscription_id.

Case 1 is where we end up getting duplicated data frames as observed in #365. Case 2, while it appears similar, does not result in duplicate data frames.

Case 1 seems to only occur when the reason for the disconnection is network related (i.e. the crossbar server stays online) and the crossbar server (container) was freshly created, and has never been restarted. If it has been restarted at least once, case 1 does not occur on future network interruptions, instead you get repeated instances of case 2. I don't have an explanation for this.

The one thing I'm now seeing though, that I would like to resolve is an "Unhandled error in Deferred:" message that I'm not sure the source of.

Here's an example of the log output after this patch in context:

2024-01-04T21:39:13-0500 Using OCS version 0.10.3+7.gebf895b
2024-01-04T21:39:13-0500 ocs: starting <class 'ocs.ocs_agent.OCSAgent'> @ observatory.aggregator
2024-01-04T21:39:13-0500 log_file is apparently None
2024-01-04T21:39:13-0500 transport connected
2024-01-04T21:39:13-0500 session joined: {'authextra': {'x_cb_node': '7483fd66f3d0-8',
               'x_cb_peer': 'tcp4:192.168.96.1:34476',
               'x_cb_pid': 19,
               'x_cb_worker': 'worker001'},
 'authid': 'WEKA-RUH9-U5PV-KR9N-PKU4-GVUM',
 'authmethod': 'anonymous',
 'authprovider': 'static',
 'authrole': 'iocs_agent',
 'realm': 'test_realm',
 'resumable': False,
 'resume_token': None,
 'resumed': False,
 'serializer': 'cbor.batched',
 'session': 3554759306789730,
 'transport': {'channel_framing': 'websocket',
               'channel_id': {},
               'channel_serializer': None,
               'channel_type': 'tcp',
               'http_cbtid': None,
               'http_headers_received': None,
               'http_headers_sent': None,
               'is_secure': False,
               'is_server': False,
               'own': None,
               'own_fd': -1,
               'own_pid': 253584,
               'own_tid': 253584,
               'peer': 'tcp4:127.0.0.1:8001',
               'peer_cert': None,
               'websocket_extensions_in_use': None,
               'websocket_protocol': None}}
2024-01-04T21:39:13-0500 startup-op: launching record
2024-01-04T21:39:13-0500 start called for record
2024-01-04T21:39:13-0500 record:0 Status is now "starting".
2024-01-04T21:39:13-0500 Creating file: ./data-local/hk/17044/1704422353.g3
2024-01-04T21:39:13-0500 record:0 Status is now "starting".
2024-01-04T21:39:13-0500 record:0 Status is now "running".
2024-01-04T21:39:14-0500 Adding provider observatory.registry.feeds.agent_operations
2024-01-04T21:39:14-0500 Adding provider observatory.fake-data1.feeds.false_temperatures
2024-01-04T21:39:42-0500 session left: CloseDetails(reason=<wamp.close.transport_lost>, message='WAMP transport was lost without closing the session 3554759306789730 before')
2024-01-04T21:39:42-0500 transport disconnected
2024-01-04T21:39:42-0500 waiting for reconnection
2024-01-04T21:39:42-0500 Scheduling retry 1 to connect <twisted.internet.endpoints.TCP4ClientEndpoint object at 0x7f2230b55810> in 1.6787950440816615 seconds.
2024-01-04T21:39:44-0500 Scheduling retry 1 to connect <twisted.internet.endpoints.TCP4ClientEndpoint object at 0x7f2230b55810> in 1.9319902540492433 seconds.
2024-01-04T21:39:46-0500 transport connected
2024-01-04T21:39:46-0500 session joined: {'authextra': {'x_cb_node': '7483fd66f3d0-8',
               'x_cb_peer': 'tcp4:192.168.96.1:35912',
               'x_cb_pid': 19,
               'x_cb_worker': 'worker001'},
 'authid': 'SVXM-XVCH-QU3A-WVR9-FCWQ-6EK6',
 'authmethod': 'anonymous',
 'authprovider': 'static',
 'authrole': 'iocs_agent',
 'realm': 'test_realm',
 'resumable': False,
 'resume_token': None,
 'resumed': False,
 'serializer': 'cbor.batched',
 'session': 2917271506007808,
 'transport': {'channel_framing': 'websocket',
               'channel_id': {},
               'channel_serializer': None,
               'channel_type': 'tcp',
               'http_cbtid': None,
               'http_headers_received': None,
               'http_headers_sent': None,
               'is_secure': False,
               'is_server': False,
               'own': None,
               'own_fd': -1,
               'own_pid': 253584,
               'own_tid': 253584,
               'peer': 'tcp4:127.0.0.1:8001',
               'peer_cert': None,
               'websocket_extensions_in_use': None,
               'websocket_protocol': None}}
2024-01-04T21:39:46-0500 Unhandled error in Deferred:
2024-01-04T21:39:46-0500
2024-01-04T21:41:11-0500 Creating file: ./data-local/hk/17044/1704422471.g3
^C2024-01-04T21:41:21-0500 caught SIGINT!
2024-01-04T21:41:21-0500 Stopping all running sessions
2024-01-04T21:41:21-0500 Stopping session record
2024-01-04T21:41:21-0500 stop called for record
2024-01-04T21:41:21-0500 record:0 Status is now "stopping".
2024-01-04T21:41:21-0500 Stopper for "record" terminated with ok=True and message ((True, 'Stopping aggregation'),)
2024-01-04T21:41:22-0500 record:0 Aggregation has ended
2024-01-04T21:41:22-0500 record:0 Status is now "done".
2024-01-04T21:41:24-0500 stopping reactor
2024-01-04T21:41:24-0500 session left: CloseDetails(reason=<wamp.close.transport_lost>, message='WAMP transport was lost without closing the session 2917271506007808 before')
2024-01-04T21:41:24-0500 transport disconnected
2024-01-04T21:41:24-0500 waiting for reconnection
2024-01-04T21:41:24-0500 Scheduling retry 1 to connect <twisted.internet.endpoints.TCP4ClientEndpoint object at 0x7f2230b55810> in 1.6039927404781809 seconds.
2024-01-04T21:41:24-0500 Main loop terminated.

Motivation and Context

Resolves #365.

How Has This Been Tested?

My test setup consists of a crossbar server container and two agents being run on my host system -- the aggregator agent and the fake data agent. It's important to run them on the host for the network connection interruption test, which I'll describe shortly.

Steps to run are:

  1. Remove any existing crossbar container instance, and restart by running docker-compose up -d. 2. Start-up both agents with ocs-agent-cli
  2. Disconnect the crossbar server with docker network disconnect <network name> <container name>. This breaks the connection to the agents.
  3. Reconnect the crossbar server with docker network connect <network name> <container name>
  4. The agents should reconnect.
  5. Wait for the aggregator's file to rotate (I set the file length to slightly over a minute -- 70 seconds, since frame times default to 60 seconds.)
  6. Plot the output to inspect for duplicate data.

The same process has been tested with a docker stop <crossbar container> and a docker start <crossbar container> to the same effect (though this doesn't duplicate the data.)

Here's an example of performing a disconnect/reconnect, getting identical two subscriptions with identical subscription_id and thus writing duplicate data:
times_1704397881

And here is the same example, with this patch:
times_1704422191

Another example with this patch, but with a docker restart of the crossbar container:
times_1704422353

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Checklist:

  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.

This reverts commit a18485c.

This didn't actually work. For some reason the agent doesn't pick back up
receiving data after a crossbar connection drop when doing this. Makes me
wonder how we end up oversubscribing...
I think this does solve the problem (though haven't checked output data for
that), because it forces a new subscription ID and causes the old subscription
to become inactive (is_active=False). However, we are generating an "Unhandled
error in Deferred" that I can't seem to find and that has no traceback.
@BrianJKoopman BrianJKoopman marked this pull request as ready for review January 5, 2024 15:36
@BrianJKoopman
Copy link
Member Author

Alright, ready for review. Thanks again to @mhasself for pointing out the need for the Errback on the unsubscribe call.

Copy link
Member

@mhasself mhasself left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

@BrianJKoopman BrianJKoopman merged commit 078bbcb into main Jan 6, 2024
4 checks passed
@BrianJKoopman BrianJKoopman deleted the koopman/duplicate-data-patch-unsub branch January 6, 2024 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Aggregator writes duplicate data
2 participants