Skip to content

Commit

Permalink
Move event emit code to common module. (#220)
Browse files Browse the repository at this point in the history
* Move event emit code to common module.

This moves the event emit code to a central module so it can be
used elsewhere in the code.

Signed-off-by: David Brown <dmlb2000@gmail.com>

* Add orm eventing

Signed-off-by: David Brown <dmlb2000@gmail.com>

* try to fix python 2 unicode issues

Signed-off-by: David Brown <dmlb2000@gmail.com>

* Add doi update events

Signed-off-by: David Brown <dmlb2000@gmail.com>
  • Loading branch information
dmlb2000 committed May 11, 2019
1 parent c72dfe4 commit 3dc4d71
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 50 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
repos:
- repo: https://github.com/pre-commit/mirrors-autopep8
rev: v1.4.3
rev: v1.4.4
hooks:
- id: autopep8
- repo: git://github.com/pre-commit/pre-commit-hooks
rev: v2.1.0
rev: v2.2.1
hooks:
- id: fix-encoding-pragma
- id: trailing-whitespace
Expand Down
24 changes: 24 additions & 0 deletions pacifica/metadata/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,30 @@ def get_config():
'NOTIFICATIONS_DISABLED', 'False'))
configparser.set('notifications', 'url', getenv(
'NOTIFICATIONS_URL', 'http://127.0.0.1:8070/receive'))
configparser.set('notifications', 'ingest_eventtype', getenv(
'NOTIFICATIONS_INGEST_EVENT_TYPE', 'org.pacifica.metadata.ingest'))
configparser.set('notifications', 'ingest_source', getenv(
'NOTIFICATIONS_INGEST_SOURCE', 'http://metadata.pacifica.org/transactions?_id={_id}'))
configparser.set('notifications', 'ingest_eventid', getenv(
'NOTIFICATIONS_INGEST_EVENTID', 'metadata.ingest.{_id}'))
configparser.set('notifications', 'orm_eventtype', getenv(
'NOTIFICATIONS_ORM_EVENT_TYPE', 'org.pacifica.metadata.orm'))
configparser.set('notifications', 'orm_source', getenv(
'NOTIFICATIONS_ORM_SOURCE', 'http://metadata.pacifica.org/{object_name}'))
configparser.set('notifications', 'orm_eventid', getenv(
'NOTIFICATIONS_ORM_EVENTID', 'metadata.orm.{object_name}.{uuid}'))
configparser.set('notifications', 'doientry_eventtype', getenv(
'NOTIFICATIONS_DOIENTRY_EVENT_TYPE', 'org.pacifica.metadata.doientry'))
configparser.set('notifications', 'doientry_source', getenv(
'NOTIFICATIONS_DOIENTRY_SOURCE', 'http://metadata.pacifica.org/doientry'))
configparser.set('notifications', 'doientry_eventid', getenv(
'NOTIFICATIONS_DOIENTRY_EVENTID', 'metadata.doi.{uuid}'))
configparser.set('notifications', 'doiupdate_eventtype', getenv(
'NOTIFICATIONS_DOIUPDATE_EVENT_TYPE', 'org.pacifica.metadata.doiupdate'))
configparser.set('notifications', 'doiupdate_source', getenv(
'NOTIFICATIONS_DOIUPDATE_SOURCE', 'http://metadata.pacifica.org/doientry?doi={doi}'))
configparser.set('notifications', 'doiupdate_eventid', getenv(
'NOTIFICATIONS_DOIUPDATE_EVENTID', 'metadata.doi.{doi}.{uuid}'))
configparser.read(CONFIG_FILE)
return configparser

Expand Down
17 changes: 13 additions & 4 deletions pacifica/metadata/rest/doi_queries/doi_registration_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
# -*- coding: utf-8 -*-
"""CherryPy DOI Registration Updater object class."""
from __future__ import print_function
from uuid import uuid4
from cherrypy import tools, request, HTTPError
from pacifica.metadata.rest.user_queries.user_search import UserSearch
from pacifica.metadata.rest.doi_queries.doi_registration_base import DOIRegistrationBase
from pacifica.metadata.orm import DOITransaction, TransactionUser, DOIInfo, Relationships
from pacifica.metadata.orm.base import DB
from ...config import get_config
from ...orm.base import DB
from ...orm import DOITransaction, TransactionUser, DOIInfo, Relationships
from ..user_queries.user_search import UserSearch
from ..events import emit_event
from .doi_registration_base import DOIRegistrationBase
# pylint: disable=too-few-public-methods


Expand Down Expand Up @@ -85,4 +88,10 @@ def POST():
'entry': new_entry,
'was_created': _created
})
emit_event(
eventType=get_config().get('notifications', 'doientry_eventtype'),
source=get_config().get('notifications', 'doientry_source'),
eventID=get_config().get('notifications', 'doientry_eventid').format(uuid=uuid4()),
data=new_entries_info
)
return new_entries_info
18 changes: 15 additions & 3 deletions pacifica/metadata/rest/doi_queries/doi_registration_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
# -*- coding: utf-8 -*-
"""CherryPy DOI Registration Updater object class."""
from __future__ import print_function
from uuid import uuid4
from xml.etree import ElementTree
from cherrypy import tools, request, HTTPError
from dateutil.parser import parse
from pacifica.metadata.rest.doi_queries.doi_registration_base import DOIRegistrationBase
from pacifica.metadata.orm import DOIAuthors, DOIAuthorMapping, DOIEntries
from pacifica.metadata.orm.base import DB
from ...orm import DOIAuthors, DOIAuthorMapping, DOIEntries
from ...orm.base import DB
from ...config import get_config
from ..events import emit_event
from .doi_registration_base import DOIRegistrationBase

# pylint: disable=too-few-public-methods

Expand Down Expand Up @@ -131,4 +134,13 @@ def POST():
doi_string = DOIRegistrationUpdate._process_updated_osti_info(
osti_xml_string)

emit_event(
eventType=get_config().get('notifications', 'doiupdate_eventtype'),
source=get_config().get('notifications', 'doiupdate_source'),
eventID=get_config().get('notifications', 'doiupdate_eventid').format(
doi=doi_string,
uuid=uuid4()
),
data=doi_string
)
return doi_string
32 changes: 32 additions & 0 deletions pacifica/metadata/rest/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Emit events for different things."""
from datetime import datetime
import logging
from json import dumps
import requests
from requests import RequestException
from ..config import get_config


def emit_event(**kwargs):
"""Emit a cloud event that the data is now accepted."""
try:
resp = requests.post(
get_config().get('notifications', 'url'),
data=dumps({
'cloudEventsVersion': '0.1',
'eventType': kwargs.get('eventType'),
'source': kwargs.get('source'),
'eventID': kwargs.get('eventID'),
'eventTime': datetime.now().replace(microsecond=0).isoformat(),
'extensions': kwargs.get('extensions', {}),
'contentType': 'application/json',
'data': kwargs.get('data', {})
}),
headers={'Content-Type': 'application/json'}
)
resp_major = int(int(resp.status_code)/100)
assert resp_major == 2
except (RequestException, AssertionError) as ex:
logging.warning('Unable to send notification: %s', ex)
51 changes: 13 additions & 38 deletions pacifica/metadata/rest/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,8 @@
]
"""
from __future__ import print_function
from os import getenv
from datetime import datetime
import logging
import hashlib
from json import dumps
from six import binary_type
import requests
from requests.exceptions import RequestException
from cherrypy import request, tools
from pacifica.metadata.config import get_config
from pacifica.metadata.orm.transsip import TransSIP
Expand All @@ -55,8 +49,11 @@
from pacifica.metadata.orm.files import Files
from pacifica.metadata.orm.base import db_connection_decorator

from .events import emit_event

# pylint: disable=too-few-public-methods


class IngestAPI(object):
"""Uploader ingest API."""

Expand Down Expand Up @@ -195,37 +192,6 @@ def extract_files(json):
files.append(file_hash)
return files

def emit_event(json):
"""Emit a cloud event that the data is now accepted."""
try:
resp = requests.post(
get_config().get('notifications', 'url'),
data=dumps({
'cloudEventsVersion': '0.1',
'eventType': getenv('CLOUDEVENT_TYPE', 'org.pacifica.metadata.ingest'),
'source': getenv(
'CLOUDEVENT_SOURCE_URL',
'http://metadata.pacifica.org/transactions?_id={}'.format(
pull_value_by_attr(
json, 'Transactions._id', 'value')
)
),
'eventID': 'metadata.ingest.{}'.format(
pull_value_by_attr(
json, 'Transactions._id', 'value')
),
'eventTime': datetime.now().replace(microsecond=0).isoformat(),
'extensions': {},
'contentType': 'application/json',
'data': json
}),
headers={'Content-Type': 'application/json'}
)
resp_major = int(int(resp.status_code)/100)
assert resp_major == 2
except (RequestException, AssertionError) as ex:
logging.warning('Unable to send notification: %s', ex)

transaction_hash = {
'_id': pull_value_by_attr(request.json, 'Transactions._id', 'value')
}
Expand All @@ -244,5 +210,14 @@ def emit_event(json):
FileKeyValue()._insert(generate_fkvs(request.json))
# pylint: enable=protected-access
if not get_config().getboolean('notifications', 'disabled'):
emit_event(request.json)
emit_event(
eventType=get_config().get('notifications', 'ingest_eventtype'),
source=get_config().get('notifications', 'ingest_source').format(
_id=pull_value_by_attr(request.json, 'Transactions._id', 'value')
),
eventID=get_config().get('notifications', 'ingest_eventid').format(
_id=pull_value_by_attr(request.json, 'Transactions._id', 'value')
),
data=request.json
)
return {'status': 'success'}
77 changes: 74 additions & 3 deletions pacifica/metadata/rest/orm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Core interface for each ORM object to interface with CherryPy."""
from uuid import uuid4
import json
import cherrypy
from cherrypy import HTTPError
from peewee import DoesNotExist
from pacifica.metadata.orm.base import PacificaModel, db_connection_decorator
from pacifica.metadata.orm.utils import datetime_now_nomicrosecond, datetime_converts, UUIDEncoder
from .events import emit_event
from ..config import get_config


def json_handler(*args, **kwargs):
Expand All @@ -31,6 +35,36 @@ class CherryPyAPI(PacificaModel):
}
exposed = True

@staticmethod
def _primary_keys_as_dict(obj):
"""Turn the primary keys for self into get parameters for web access."""
ret = {}
for pkey in obj.get_primary_keys():
if pkey == 'id':
ret['_id'] = obj.to_hash()['_id']
else:
ret[pkey] = obj.to_hash()[pkey]
return ret

@staticmethod
def _send_orm_event(obj_cls, objs, method):
if not get_config().getboolean('notifications', 'disabled'):
emit_event(
eventType=get_config().get('notifications', 'orm_eventtype'),
source=get_config().get('notifications', 'orm_source').format(
object_name=obj_cls.get_object_info()['callable_name']
),
eventID=get_config().get('notifications', 'orm_eventid').format(
object_name=obj_cls.get_object_info()['callable_name'],
uuid=uuid4()
),
data={
'obj_type': obj_cls.get_object_info()['callable_name'],
'obj_primary_keys': objs,
'method': method
}
)

def _select(self, **kwargs):
"""Internal select method."""
primary_keys = []
Expand Down Expand Up @@ -64,12 +98,21 @@ def _update(self, update_hash, **kwargs):
update_hash['updated'] = update_hash.get(
'updated', datetime_now_nomicrosecond())
did_something = False
for obj in self.select().where(self.where_clause(kwargs)):
query = self.select().where(self.where_clause(kwargs))
for obj in query:
did_something = True
obj.from_hash(update_hash)
obj.save()
if not did_something:
raise HTTPError(500, "Get args didn't select any objects.")
self._send_orm_event(
self.__class__,
[
self._primary_keys_as_dict(obj)
for obj in query
],
'update'
)

def _set_or_create(self, objs):
"""Set or create the object if it doesn't already exist."""
Expand Down Expand Up @@ -107,9 +150,28 @@ def _insert(self, objs):
message += 'Remove or change the duplicated id values'
raise HTTPError(400, message)
clean_objs = self._insert_many_format(objs)
# this is a PostgreSQL specific option...
# pylint: disable=no-value-for-parameter
self.__class__.insert_many(clean_objs).execute()
id_list = self.__class__.insert_many(clean_objs).execute()
clean_id_list = []
for obj_ids in id_list:
new_objids = []
for obj_id in obj_ids:
try:
json.dumps(obj_id)
new_objids.append(obj_id)
except TypeError:
new_objids.append(str(obj_id))
clean_id_list.append(new_objids)
# pylint: enable=no-value-for-parameter
self._send_orm_event(
self.__class__,
[
dict(zip(self.get_primary_keys(), obj_ids))
for obj_ids in clean_id_list
],
'insert'
)

@classmethod
def check_for_key_existence(cls, object_list):
Expand Down Expand Up @@ -145,8 +207,17 @@ def _insert_many_format(cls, obj_hashes):
def _force_delete(self, **kwargs):
"""Force delete entries in the database."""
recursive = kwargs.pop('recursive', False)
for obj in self.select().where(self.where_clause(kwargs)):
query = self.select().where(self.where_clause(kwargs))
for obj in query:
obj.delete_instance(recursive)
self._send_orm_event(
self.__class__,
[
self._primary_keys_as_dict(obj)
for obj in query
],
'delete'
)

def _delete(self, **kwargs):
"""Internal delete object method."""
Expand Down

0 comments on commit 3dc4d71

Please sign in to comment.