Skip to content

Commit

Permalink
feature: re-add Aventri event feed, now with attendee information
Browse files Browse the repository at this point in the history
  • Loading branch information
abbas123456 committed Feb 2, 2021
1 parent 23c5974 commit 84fbf1d
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 2 deletions.
167 changes: 166 additions & 1 deletion core/app/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
metric_timer,
)
from .utils import (
json_dumps,
json_loads,
sub_dict_lower,
)
Expand All @@ -34,7 +35,8 @@ def parse_feed_config(feed_config):
by_feed_type = {
'activity_stream': ActivityStreamFeed,
'zendesk': ZendeskFeed,
'maxemail': MaxemailFeed,
'maxemail': EventFeed,
'aventri': EventFeed,
}
return by_feed_type[feed_config['TYPE']].parse_config(feed_config)

Expand Down Expand Up @@ -252,6 +254,169 @@ def company_numbers(description):
]


class EventFeed(Feed):

down_grace_period = 60 * 60 * 4

full_ingest_page_interval = 3
updates_page_interval = 60 * 60 * 24 * 30
exception_intervals = [120, 180, 240, 300]

@classmethod
def parse_config(cls, config):
return cls(**sub_dict_lower(
config, ['UNIQUE_ID', 'SEED', 'ACCOUNT_ID', 'API_KEY', 'AUTH_URL', 'EVENT_URL',
'ATTENDEES_LIST_URL', 'ATTENDEE_URL'
]))

def __init__(self, unique_id, seed, account_id, api_key, auth_url,
event_url, attendees_list_url, attendee_url):
self.lock = asyncio.Lock()
self.unique_id = unique_id
self.seed = seed
self.account_id = account_id
self.api_key = api_key
self.auth_url = auth_url
self.event_url = event_url
self.attendees_list_url = attendees_list_url
self.attendee_url = attendee_url
self.accesstoken = None

@staticmethod
def next_href(_):
""" aventri API does not support pagination
returns (None)
"""
return None

async def auth_headers(self, context, __):
result = await http_make_request(
context.session, context.metrics, 'POST', self.auth_url, data={
'accountid': self.account_id, 'key': self.api_key,
}, headers={}
)
result.raise_for_status()
self.access_token = json_loads(result._body)['accesstoken']
return {
'accesstoken': self.access_token,
}

async def get_activities(self, context, feed):
async def get_attendees(event_id):
url = self.attendees_list_url.format(event_id=event_id)

with logged(context.logger.debug, context.logger.warning,
'Fetching attendee list (%s)', [url]):
result = await http_make_request(
context.session, context.metrics, 'GET', url, data=b'',
headers={'accesstoken': self.access_token})
result.raise_for_status()
result_body = json_loads(result._body)

if 'error' in result_body:
return []

attendee_ids = [a['attendeeid'] for a in result_body]
attendees = []
for attendee_id in attendee_ids:
attendee_lookup = await context.redis_client.execute('GET', f'attendee-{attendee_id}')
if attendee_lookup:
try:
attendees.append(json_loads(attendee_lookup.decode('utf-8')))
except UnicodeDecodeError:
await context.redis_client.execute('DEL', f'attendee-{attendee_id}')
attendees.append(await fetch_attendee(event_id, attendee_id))

else:
attendees.append(await fetch_attendee(event_id, attendee_id))
return [a for a in attendees if a is not None]

async def fetch_attendee(event_id, attendee_id):
url = self.attendee_url.format(event_id=event_id, attendee_id=attendee_id)
with logged(context.logger.debug, context.logger.warning,
'Fetching attendee (%s)', [url]):
result = await http_make_request(
context.session, context.metrics, 'GET', url, data=b'',
headers={'accesstoken': self.access_token})
result.raise_for_status()
result_body = json_loads(result._body)

if 'error' in result_body:
return None

attendee_object = {
'type': ['Attendee', 'dit:aventri:Attendee'],
'id': 'dit:aventri:Attendee:' + result_body['attendeeid'],
'registration_status': result_body['registrationstatus'],
'individual_cost': result_body['individualcost'],
'cost': result_body['cost'],
'total_cost': result_body['totalcost'],
'created': result_body['created']
}
await context.redis_client.execute(
'SETEX', f'attendee-{attendee_id}', 60*60*24*7, json_dumps(attendee_object))
return attendee_object

async def get_event(event_id):
event_lookup = await context.redis_client.execute('GET', f'event-{event_id}')
if event_lookup:
try:
return json_loads(event_lookup.decode('utf-8'))
except UnicodeDecodeError:
await context.redis_client.execute('DEL', f'event-{event_id}')
return await fetch_event(event_id)
else:
return await fetch_event(event_id)

async def fetch_event(event_id):
url = self.event_url.format(event_id=event_id)

with logged(context.logger.debug, context.logger.warning,
'Fetching event (%s)', [url]):
result = await http_make_request(
context.session, context.metrics, 'GET', url, data=b'',
headers={'accesstoken': self.access_token})
result.raise_for_status()
result_body = json_loads(result._body)
if 'error' in result_body:
return None

await context.redis_client.execute(
'SETEX', f'event-{event_id}', 60*60*24*7, json_dumps(result_body))
return result_body

now = datetime.datetime.now().isoformat()
return [
{
'type': 'Create',
'id': 'dit:aventri:Event:' + str(event['eventid']) + ':Create',
'published': now,
'eventid': event['eventid'],
'dit:application': 'aventri',
'object': {
'type': ['Event', 'dit:aventri:Event'],
'id': 'dit:aventri:Event:' + event['eventid'],
'name': event['name'],
'url': event['url'],
'content': event['description'],
'startdate': event['startdate'],
'enddate': event['enddate'],
'foldername': event['foldername'],
'location': event['location'],
'language': event['defaultlanguage'],
'timezone': event['timezone'],
'currency': event['standardcurrency'],
'price_type': event['price_type'],
'price': event['pricepoints'],
'attributedTo': await get_attendees(event['eventid'])
}
}
for page_event in feed
for event in [await get_event(page_event['eventid'])]
if event is not None
]


class MaxemailFeed(Feed):

down_grace_period = 60 * 60 * 4
Expand Down
2 changes: 1 addition & 1 deletion core/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def json_loads(data):
def main(run_application_coroutine):
stdout_handler = logging.StreamHandler(sys.stdout)
app_logger = logging.getLogger('activity-stream')
app_logger.setLevel(logging.INFO)
app_logger.setLevel(logging.DEBUG)
app_logger.addHandler(stdout_handler)

loop = asyncio.get_event_loop()
Expand Down

0 comments on commit 84fbf1d

Please sign in to comment.