Skip to content

Commit f5be446

Browse files
author
Sebastian Molenda
authored
remove telemetry manager (#222)
* remove telemetry manager
1 parent b541bd7 commit f5be446

36 files changed

+23
-360
lines changed

pubnub/endpoints/endpoint.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,6 @@ def callback(params_to_merge):
204204
custom_params['pnsdk'] = self.pubnub.sdk_name
205205
custom_params['uuid'] = self.pubnub.uuid
206206

207-
for query_key, query_value in self.pubnub._telemetry_manager.operation_latencies().items():
208-
custom_params[query_key] = query_value
209-
210207
if self.is_auth_required():
211208
if self.pubnub._get_token():
212209
custom_params["auth"] = self.pubnub._get_token()

pubnub/enums.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ class PNOperationType(object):
127127
PNRemoveSpaceUsersOperation = 82
128128
PNFetchUserMembershipsOperation = 85
129129
PNFetchSpaceMembershipsOperation = 86
130-
# NOTE: remember to update PubNub.managers.TelemetryManager.endpoint_name_for_operation() when adding operations
131130

132131

133132
class PNHeartbeatNotificationOptions(object):

pubnub/managers.py

Lines changed: 9 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
import logging
22
from abc import abstractmethod, ABCMeta
33

4-
import time
5-
import copy
64
import base64
75
import random
86

97
from cbor2 import loads
108

11-
from . import utils
12-
from .enums import PNStatusCategory, PNReconnectionPolicy, PNOperationType
13-
from .models.consumer.common import PNStatus
14-
from .models.server.subscribe import SubscribeEnvelope
15-
from .dtos import SubscribeOperation, UnsubscribeOperation
16-
from .callbacks import SubscribeCallback, ReconnectionCallback
17-
from .models.subscription_item import SubscriptionItem
18-
from .errors import PNERR_INVALID_ACCESS_TOKEN
19-
from .exceptions import PubNubException
9+
from pubnub import utils
10+
from pubnub.enums import PNStatusCategory, PNReconnectionPolicy
11+
from pubnub.models.consumer.common import PNStatus
12+
from pubnub.models.server.subscribe import SubscribeEnvelope
13+
from pubnub.dtos import SubscribeOperation, UnsubscribeOperation
14+
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
15+
from pubnub.models.subscription_item import SubscriptionItem
16+
from pubnub.errors import PNERR_INVALID_ACCESS_TOKEN
17+
from pubnub.exceptions import PubNubException
2018

2119
logger = logging.getLogger("pubnub")
2220

@@ -398,171 +396,6 @@ def get_custom_params(self):
398396
return {}
399397

400398

401-
class TelemetryManager:
402-
TIMESTAMP_DIVIDER = 1000
403-
MAXIMUM_LATENCY_DATA_AGE = 60
404-
CLEAN_UP_INTERVAL = 1
405-
CLEAN_UP_INTERVAL_MULTIPLIER = 1000
406-
407-
def __init__(self):
408-
self.latencies = {}
409-
410-
@abstractmethod
411-
def _start_clean_up_timer(self):
412-
pass
413-
414-
@abstractmethod
415-
def _stop_clean_up_timer(self):
416-
pass
417-
418-
def operation_latencies(self):
419-
operation_latencies = {}
420-
421-
for endpoint_name, endpoint_latencies in self.latencies.items():
422-
latency_key = 'l_' + endpoint_name
423-
424-
endpoint_average_latency = self.average_latency_from_data(endpoint_latencies)
425-
426-
if endpoint_average_latency > 0:
427-
operation_latencies[latency_key] = endpoint_average_latency
428-
429-
return operation_latencies
430-
431-
def clean_up_telemetry_data(self):
432-
current_timestamp = time.time()
433-
copy_latencies = copy.deepcopy(self.latencies)
434-
435-
for endpoint_name, endpoint_latencies in copy_latencies.items():
436-
for latency_information in endpoint_latencies:
437-
if current_timestamp - latency_information["timestamp"] > self.MAXIMUM_LATENCY_DATA_AGE:
438-
self.latencies[endpoint_name].remove(latency_information)
439-
440-
if len(self.latencies[endpoint_name]) == 0:
441-
del self.latencies[endpoint_name]
442-
443-
def store_latency(self, latency, operation_type):
444-
if operation_type != PNOperationType.PNSubscribeOperation and latency > 0:
445-
endpoint_name = self.endpoint_name_for_operation(operation_type)
446-
447-
store_timestamp = time.time()
448-
449-
if endpoint_name not in self.latencies:
450-
self.latencies[endpoint_name] = []
451-
452-
latency_entry = {
453-
"timestamp": store_timestamp,
454-
"latency": latency,
455-
}
456-
457-
self.latencies[endpoint_name].append(latency_entry)
458-
459-
@staticmethod
460-
def average_latency_from_data(endpoint_latencies):
461-
total_latency = 0
462-
463-
for latency_data in endpoint_latencies:
464-
total_latency += latency_data['latency']
465-
466-
return total_latency / len(endpoint_latencies)
467-
468-
@staticmethod
469-
def endpoint_name_for_operation(operation_type):
470-
endpoint = {
471-
PNOperationType.PNPublishOperation: 'pub',
472-
PNOperationType.PNFireOperation: 'pub',
473-
PNOperationType.PNSendFileNotification: "pub",
474-
475-
PNOperationType.PNHistoryOperation: 'hist',
476-
PNOperationType.PNHistoryDeleteOperation: 'hist',
477-
PNOperationType.PNMessageCountOperation: 'mc',
478-
479-
PNOperationType.PNUnsubscribeOperation: 'pres',
480-
PNOperationType.PNWhereNowOperation: 'pres',
481-
PNOperationType.PNHereNowOperation: 'pres',
482-
PNOperationType.PNGetState: 'pres',
483-
PNOperationType.PNSetStateOperation: 'pres',
484-
PNOperationType.PNHeartbeatOperation: 'pres',
485-
486-
PNOperationType.PNAddChannelsToGroupOperation: 'cg',
487-
PNOperationType.PNRemoveChannelsFromGroupOperation: 'cg',
488-
PNOperationType.PNChannelGroupsOperation: 'cg',
489-
PNOperationType.PNChannelsForGroupOperation: 'cg',
490-
PNOperationType.PNRemoveGroupOperation: 'cg',
491-
492-
PNOperationType.PNAddPushNotificationsOnChannelsOperation: 'push',
493-
PNOperationType.PNPushNotificationEnabledChannelsOperation: 'push',
494-
PNOperationType.PNRemoveAllPushNotificationsOperation: 'push',
495-
PNOperationType.PNRemovePushNotificationsFromChannelsOperation: 'push',
496-
497-
PNOperationType.PNAccessManagerAudit: 'pam',
498-
PNOperationType.PNAccessManagerGrant: 'pam',
499-
PNOperationType.PNAccessManagerRevoke: 'pam',
500-
PNOperationType.PNTimeOperation: 'pam',
501-
502-
PNOperationType.PNAccessManagerGrantToken: 'pamv3',
503-
PNOperationType.PNAccessManagerRevokeToken: 'pamv3',
504-
505-
PNOperationType.PNSignalOperation: 'sig',
506-
507-
PNOperationType.PNSetUuidMetadataOperation: 'obj',
508-
PNOperationType.PNGetUuidMetadataOperation: 'obj',
509-
PNOperationType.PNRemoveUuidMetadataOperation: 'obj',
510-
PNOperationType.PNGetAllUuidMetadataOperation: 'obj',
511-
512-
PNOperationType.PNSetChannelMetadataOperation: 'obj',
513-
PNOperationType.PNGetChannelMetadataOperation: 'obj',
514-
PNOperationType.PNRemoveChannelMetadataOperation: 'obj',
515-
PNOperationType.PNGetAllChannelMetadataOperation: 'obj',
516-
517-
PNOperationType.PNSetChannelMembersOperation: 'obj',
518-
PNOperationType.PNGetChannelMembersOperation: 'obj',
519-
PNOperationType.PNRemoveChannelMembersOperation: 'obj',
520-
PNOperationType.PNManageChannelMembersOperation: 'obj',
521-
522-
PNOperationType.PNSetMembershipsOperation: 'obj',
523-
PNOperationType.PNGetMembershipsOperation: 'obj',
524-
PNOperationType.PNRemoveMembershipsOperation: 'obj',
525-
PNOperationType.PNManageMembershipsOperation: 'obj',
526-
527-
PNOperationType.PNAddMessageAction: 'msga',
528-
PNOperationType.PNGetMessageActions: 'msga',
529-
PNOperationType.PNDeleteMessageAction: 'msga',
530-
531-
PNOperationType.PNGetFilesAction: 'file',
532-
PNOperationType.PNDeleteFileOperation: 'file',
533-
PNOperationType.PNGetFileDownloadURLAction: 'file',
534-
PNOperationType.PNFetchFileUploadS3DataAction: 'file',
535-
PNOperationType.PNDownloadFileAction: 'file',
536-
PNOperationType.PNSendFileAction: 'file',
537-
538-
539-
PNOperationType.PNFetchMessagesOperation: "hist",
540-
541-
PNOperationType.PNCreateSpaceOperation: "obj",
542-
PNOperationType.PNUpdateSpaceOperation: "obj",
543-
PNOperationType.PNFetchSpaceOperation: "obj",
544-
PNOperationType.PNFetchSpacesOperation: "obj",
545-
PNOperationType.PNRemoveSpaceOperation: "obj",
546-
547-
PNOperationType.PNCreateUserOperation: "obj",
548-
PNOperationType.PNUpdateUserOperation: "obj",
549-
PNOperationType.PNFetchUserOperation: "obj",
550-
PNOperationType.PNFetchUsersOperation: "obj",
551-
PNOperationType.PNRemoveUserOperation: "obj",
552-
553-
PNOperationType.PNAddUserSpacesOperation: "obj",
554-
PNOperationType.PNAddSpaceUsersOperation: "obj",
555-
PNOperationType.PNUpdateUserSpacesOperation: "obj",
556-
557-
PNOperationType.PNUpdateSpaceUsersOperation: "obj",
558-
PNOperationType.PNFetchUserMembershipsOperation: "obj",
559-
PNOperationType.PNFetchSpaceMembershipsOperation: "obj",
560-
561-
}[operation_type]
562-
563-
return endpoint
564-
565-
566399
class TokenManager:
567400
def __init__(self):
568401
self.token = None

pubnub/pubnub.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
- Message queueing and worker thread management
2020
- Automatic reconnection handling
2121
- Custom request handler support
22-
- Telemetry tracking
2322
2423
Usage Example:
2524
```python
@@ -71,7 +70,7 @@
7170
from pubnub.endpoints.presence.leave import Leave
7271
from pubnub.endpoints.pubsub.subscribe import Subscribe
7372
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
74-
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager
73+
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
7574
from pubnub.models.consumer.common import PNStatus
7675
from pubnub.pnconfiguration import PNConfiguration
7776
from pubnub.pubnub_core import PubNubCore
@@ -127,8 +126,6 @@ def __init__(self, config: PNConfiguration, *, custom_request_handler: Type[Base
127126

128127
self._publish_sequence_manager = PublishSequenceManager(PubNubCore.MAX_SEQUENCE)
129128

130-
self._telemetry_manager = NativeTelemetryManager()
131-
132129
def sdk_platform(self) -> str:
133130
"""Get the SDK platform identifier.
134131
@@ -716,9 +713,3 @@ def reset(self):
716713
self.result = None
717714
self.status = None
718715
self.done_event.clear()
719-
720-
721-
class NativeTelemetryManager(TelemetryManager):
722-
def store_latency(self, latency, operation_type):
723-
super(NativeTelemetryManager, self).store_latency(latency, operation_type)
724-
self.clean_up_telemetry_data()

pubnub/pubnub_asyncio.py

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async def main():
7171
from pubnub.request_handlers.base import BaseRequestHandler
7272
from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler
7373
from pubnub.workers import SubscribeMessageWorker
74-
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager
74+
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
7575
from pubnub import utils
7676
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
7777
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
@@ -153,7 +153,6 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None, *,
153153
self._subscription_manager = subscription_manager(self)
154154

155155
self._publish_sequence_manager = AsyncioPublishSequenceManager(self.event_loop, PubNubCore.MAX_SEQUENCE)
156-
self._telemetry_manager = AsyncioTelemetryManager()
157156

158157
@property
159158
def _connector(self):
@@ -835,23 +834,3 @@ async def wait_for_presence_on(self, *channel_names):
835834
continue
836835
finally:
837836
self.presence_queue.task_done()
838-
839-
840-
class AsyncioTelemetryManager(TelemetryManager):
841-
def __init__(self):
842-
TelemetryManager.__init__(self)
843-
self.loop = asyncio.get_event_loop()
844-
self._schedule_next_cleanup()
845-
846-
def _schedule_next_cleanup(self):
847-
self._timer = self.loop.call_later(
848-
self.CLEAN_UP_INTERVAL * self.CLEAN_UP_INTERVAL_MULTIPLIER / 1000,
849-
self._clean_up_schedule_next
850-
)
851-
852-
def _clean_up_schedule_next(self):
853-
self.clean_up_telemetry_data()
854-
self._schedule_next_cleanup()
855-
856-
def _stop_clean_up_timer(self):
857-
self._timer.cancel()

pubnub/pubnub_core.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ def my_listener(message, event):
145145
from pubnub.endpoints.push.remove_channels_from_push import RemoveChannelsFromPush
146146
from pubnub.endpoints.push.remove_device import RemoveDeviceFromPush
147147
from pubnub.endpoints.push.list_push_provisions import ListPushProvisions
148-
from pubnub.managers import TelemetryManager
149148

150149
if TYPE_CHECKING:
151150
from pubnub.endpoints.file_operations.send_file_asyncio import AsyncioSendFile
@@ -192,7 +191,6 @@ def __init__(self, config: PNConfiguration) -> None:
192191

193192
self._subscription_manager = None
194193
self._publish_sequence_manager = None
195-
self._telemetry_manager = TelemetryManager()
196194
self._base_path_manager = BasePathManager(config)
197195
self._token_manager = TokenManager()
198196
self._subscription_registry = PNSubscriptionRegistry(self)

pubnub/request_handlers/async_aiohttp.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import aiohttp
22
import asyncio
33
import logging
4-
import time
54
import json # noqa # pylint: disable=W0611
65
import urllib
76

@@ -98,7 +97,6 @@ async def async_request(self, options_func, cancellation_event):
9897
try:
9998
if not self._session:
10099
await self.create_session()
101-
start_timestamp = time.time()
102100
response = await asyncio.wait_for(
103101
self._session.request(
104102
options.method_string,
@@ -205,8 +203,6 @@ async def async_request(self, options_func, cancellation_event):
205203
)
206204
)
207205
else:
208-
self.pubnub._telemetry_manager.store_latency(time.time() - start_timestamp, options.operation_type)
209-
210206
return AsyncioEnvelope(
211207
result=create_response(data) if not options.non_json_response else create_response(response, data),
212208
status=create_status(

pubnub/request_handlers/async_httpx.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from asyncio import Event
22
import asyncio
33
import logging
4-
import time
54
import httpx
65
import json # noqa # pylint: disable=W0611
76
import urllib
@@ -113,7 +112,6 @@ async def async_request(self, options_func, cancellation_event):
113112
try:
114113
if not self._session:
115114
await self.create_session()
116-
start_timestamp = time.time()
117115
response = await asyncio.wait_for(
118116
self._session.request(**request_arguments),
119117
options.request_timeout
@@ -215,8 +213,6 @@ async def async_request(self, options_func, cancellation_event):
215213
)
216214
)
217215
else:
218-
self.pubnub._telemetry_manager.store_latency(time.time() - start_timestamp, options.operation_type)
219-
220216
return AsyncioEnvelope(
221217
result=create_response(data) if not options.non_json_response else create_response(response, data),
222218
status=create_status(

requirements-dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pytest-cov>=6.0.0
33
pycryptodomex>=3.21.0
44
flake8>=7.1.2
55
pytest>=8.3.5
6-
pytest-asyncio>=0.24.0,<1.0.0
6+
pytest-asyncio>=1.0.0
77
httpx>=0.28
88
h2>=4.1
99
requests>=2.32.2
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# flake8: noqa
2+
import os
23
from examples.native_sync.file_handling import main as test_file_handling
4+
from examples.native_sync.message_reactions import main as test_message_reactions
35

4-
from examples.native_sync.message_reactions import main as test_message_reactions
6+
os.environ['CI'] = '1'

0 commit comments

Comments
 (0)