11from __future__ import annotations
22
33import asyncio
4+ import json
45import logging
56from collections import deque
67from datetime import timedelta , datetime
7- from typing import MutableSequence , Optional , List
8+ from typing import MutableSequence , Optional , List , Set
89
910from aiohttp import ClientSession
1011from posthog .client import Client
@@ -56,6 +57,7 @@ def __init__(
5657 self .lock = asyncio .Lock ()
5758 self .last_fetched : Optional [datetime ] = None
5859 self .session : Optional [ClientSession ] = None
60+ self .white_listed_events : Set [str ] = set ()
5961
6062 async def capture (self , event : List [AnalyticsEvent ]) -> None :
6163 """
@@ -64,24 +66,34 @@ async def capture(self, event: List[AnalyticsEvent]) -> None:
6466 Only in the rare case when the queue size reached its maximum the queue will be flushed directly.
6567 """
6668 async with self .lock :
67- self .queue .extend (event )
69+ for e in event :
70+ if e .kind not in self .white_listed_events :
71+ log .debug (f"Event { e .kind } is not whitelisted and will be ignored." )
72+ continue
73+ self .queue .append (e )
6874
6975 if len (self .queue ) >= self .flush_at :
7076 await self .flush ()
7177
72- async def refresh_public_api_key (self ) -> None :
78+ async def refresh_from_cdn (self ) -> None :
7379 """
7480 The API key is public but not static, so we need to refresh it periodically.
7581 """
7682 try :
7783 if not self .session :
7884 self .session = ClientSession ()
79- async with self .session .get ("https://cdn.some.engineering/posthog/public_api_key" ) as resp :
80- api_key = (await resp .text ()).strip ()
85+ async with self .session .get ("https://cdn.some.engineering/posthog/posthog.json" ) as resp :
86+ ph = json .loads (await resp .text ())
87+ # update the api key
88+ api_key = ph ["api_key" ]
8189 self .client .api_key = api_key
8290 for consumer in self .client .consumers :
8391 consumer .api_key = api_key
92+ # update the events to report
93+ self .white_listed_events = set (ph ["events" ])
94+ # update the last fetched time
8495 self .last_fetched = utc ()
96+ log .debug ("Fetched latest posthog data from CDN." )
8597 except Exception as ex :
8698 log .debug (f"Could not fetch latest api key. Will use the current one. { ex } " )
8799
@@ -91,11 +103,11 @@ async def flush(self) -> None:
91103 """
92104 # check, if we need to fetch or refresh the public api key
93105 if not self .last_fetched :
94- await self .refresh_public_api_key ()
106+ await self .refresh_from_cdn ()
95107 sd = self .system_data
96108 self .client .identify (sd .system_id , {"run_id" : self .run_id , "created_at" : sd .created_at }) # type: ignore
97109 elif (utc () - self .last_fetched ) > timedelta (hours = 1 ):
98- await self .refresh_public_api_key ()
110+ await self .refresh_from_cdn ()
99111
100112 # acquire the lock, send all events to the client and clear the queue
101113 async with self .lock :
@@ -114,6 +126,7 @@ async def flush(self) -> None:
114126 self .queue .clear ()
115127
116128 async def start (self ) -> PostHogEventSender :
129+ await self .flush () # flush will make sure to load initial data from CDN
117130 await self .flusher .start ()
118131 return self
119132
0 commit comments