diff --git a/core/app/feeds.py b/core/app/feeds.py index 41d4767a..736c0147 100644 --- a/core/app/feeds.py +++ b/core/app/feeds.py @@ -21,6 +21,7 @@ metric_timer, ) from .utils import ( + json_dumps, json_loads, sub_dict_lower, ) @@ -34,7 +35,8 @@ def parse_feed_config(feed_config): by_feed_type = { 'activity_stream': ActivityStreamFeed, 'zendesk': ZendeskFeed, - 'maxemail': MaxemailFeed, + 'maxemail': EventFeed, + 'aventri': EventFeed, } return by_feed_type[feed_config['TYPE']].parse_config(feed_config) @@ -252,6 +254,169 @@ def company_numbers(description): ] +class EventFeed(Feed): + + down_grace_period = 60 * 60 * 4 + + full_ingest_page_interval = 3 + updates_page_interval = 60 * 60 * 24 * 30 + exception_intervals = [120, 180, 240, 300] + + @classmethod + def parse_config(cls, config): + return cls(**sub_dict_lower( + config, ['UNIQUE_ID', 'SEED', 'ACCOUNT_ID', 'API_KEY', 'AUTH_URL', 'EVENT_URL', + 'ATTENDEES_LIST_URL', 'ATTENDEE_URL' + ])) + + def __init__(self, unique_id, seed, account_id, api_key, auth_url, + event_url, attendees_list_url, attendee_url): + self.lock = asyncio.Lock() + self.unique_id = unique_id + self.seed = seed + self.account_id = account_id + self.api_key = api_key + self.auth_url = auth_url + self.event_url = event_url + self.attendees_list_url = attendees_list_url + self.attendee_url = attendee_url + self.accesstoken = None + + @staticmethod + def next_href(_): + """ aventri API does not support pagination + returns (None) + """ + return None + + async def auth_headers(self, context, __): + result = await http_make_request( + context.session, context.metrics, 'POST', self.auth_url, data={ + 'accountid': self.account_id, 'key': self.api_key, + }, headers={} + ) + result.raise_for_status() + self.access_token = json_loads(result._body)['accesstoken'] + return { + 'accesstoken': self.access_token, + } + + async def get_activities(self, context, feed): + async def get_attendees(event_id): + url = self.attendees_list_url.format(event_id=event_id) + + with logged(context.logger.debug, context.logger.warning, + 'Fetching attendee list (%s)', [url]): + result = await http_make_request( + context.session, context.metrics, 'GET', url, data=b'', + headers={'accesstoken': self.access_token}) + result.raise_for_status() + result_body = json_loads(result._body) + + if 'error' in result_body: + return [] + + attendee_ids = [a['attendeeid'] for a in result_body] + attendees = [] + for attendee_id in attendee_ids: + attendee_lookup = await context.redis_client.execute('GET', f'attendee-{attendee_id}') + if attendee_lookup: + try: + attendees.append(json_loads(attendee_lookup.decode('utf-8'))) + except UnicodeDecodeError: + await context.redis_client.execute('DEL', f'attendee-{attendee_id}') + attendees.append(await fetch_attendee(event_id, attendee_id)) + + else: + attendees.append(await fetch_attendee(event_id, attendee_id)) + return [a for a in attendees if a is not None] + + async def fetch_attendee(event_id, attendee_id): + url = self.attendee_url.format(event_id=event_id, attendee_id=attendee_id) + with logged(context.logger.debug, context.logger.warning, + 'Fetching attendee (%s)', [url]): + result = await http_make_request( + context.session, context.metrics, 'GET', url, data=b'', + headers={'accesstoken': self.access_token}) + result.raise_for_status() + result_body = json_loads(result._body) + + if 'error' in result_body: + return None + + attendee_object = { + 'type': ['Attendee', 'dit:aventri:Attendee'], + 'id': 'dit:aventri:Attendee:' + result_body['attendeeid'], + 'registration_status': result_body['registrationstatus'], + 'individual_cost': result_body['individualcost'], + 'cost': result_body['cost'], + 'total_cost': result_body['totalcost'], + 'created': result_body['created'] + } + await context.redis_client.execute( + 'SETEX', f'attendee-{attendee_id}', 60*60*24*7, json_dumps(attendee_object)) + return attendee_object + + async def get_event(event_id): + event_lookup = await context.redis_client.execute('GET', f'event-{event_id}') + if event_lookup: + try: + return json_loads(event_lookup.decode('utf-8')) + except UnicodeDecodeError: + await context.redis_client.execute('DEL', f'event-{event_id}') + return await fetch_event(event_id) + else: + return await fetch_event(event_id) + + async def fetch_event(event_id): + url = self.event_url.format(event_id=event_id) + + with logged(context.logger.debug, context.logger.warning, + 'Fetching event (%s)', [url]): + result = await http_make_request( + context.session, context.metrics, 'GET', url, data=b'', + headers={'accesstoken': self.access_token}) + result.raise_for_status() + result_body = json_loads(result._body) + if 'error' in result_body: + return None + + await context.redis_client.execute( + 'SETEX', f'event-{event_id}', 60*60*24*7, json_dumps(result_body)) + return result_body + + now = datetime.datetime.now().isoformat() + return [ + { + 'type': 'Create', + 'id': 'dit:aventri:Event:' + str(event['eventid']) + ':Create', + 'published': now, + 'eventid': event['eventid'], + 'dit:application': 'aventri', + 'object': { + 'type': ['Event', 'dit:aventri:Event'], + 'id': 'dit:aventri:Event:' + event['eventid'], + 'name': event['name'], + 'url': event['url'], + 'content': event['description'], + 'startdate': event['startdate'], + 'enddate': event['enddate'], + 'foldername': event['foldername'], + 'location': event['location'], + 'language': event['defaultlanguage'], + 'timezone': event['timezone'], + 'currency': event['standardcurrency'], + 'price_type': event['price_type'], + 'price': event['pricepoints'], + 'attributedTo': await get_attendees(event['eventid']) + } + } + for page_event in feed + for event in [await get_event(page_event['eventid'])] + if event is not None + ] + + class MaxemailFeed(Feed): down_grace_period = 60 * 60 * 4 diff --git a/core/app/utils.py b/core/app/utils.py index ce176879..2c430734 100644 --- a/core/app/utils.py +++ b/core/app/utils.py @@ -202,7 +202,7 @@ def json_loads(data): def main(run_application_coroutine): stdout_handler = logging.StreamHandler(sys.stdout) app_logger = logging.getLogger('activity-stream') - app_logger.setLevel(logging.INFO) + app_logger.setLevel(logging.DEBUG) app_logger.addHandler(stdout_handler) loop = asyncio.get_event_loop()