Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ coveralls = "*"
flake8 = "*"
pymongo = "*"
semver = "*"
ipython = "*"

[packages]
python-arango = "==5.0.0"
Expand Down
88 changes: 87 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion deploy.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,11 @@ schema-collection = {{ schema_collection }}

# A URL pointing to a configuration file for any metadata validators to be installed on startup.
# See the readme file for a description of the file contents.
metadata-validator-config-url = {{ metadata_validator_config_url }}
metadata-validator-config-url = {{ metadata_validator_config_url }}

# bootstrap.servers parameter for Kafka notifications. Leave blank to disable the Kafka
# notifier.
kafka-bootstrap-servers = {{ kafka_bootstrap_servers }}
# The topic to which the Kafka notifier should write. Required if kafka-bootstrap-servers is
# provided.
kafka-topic = {{ kafka_topic }}
21 changes: 16 additions & 5 deletions lib/SampleService/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from SampleService.core.storage.arango_sample_storage import ArangoSampleStorage \
as _ArangoSampleStorage
from SampleService.core.arg_checkers import check_string as _check_string
from SampleService.core.notification import KafkaNotifier as _KafkaNotifer
from SampleService.core.user_lookup import KBaseUserLookup
from SampleService.core.workspace import WS as _WS

Expand Down Expand Up @@ -67,13 +68,17 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]:
ws_token = _check_string_req(config.get('workspace-read-admin-token'),
'config param workspace-read-admin-token')

kafka_servers = _check_string(config.get('kafka-bootstrap-servers'),
'config param kafka-bootstrap-servers',
optional=True)
kafka_topic = None
if kafka_servers: # have to start the server twice to test no kafka scenario
kafka_topic = _check_string(config.get('kafka-topic'), 'config param kafka-topic')

metaval_url = _check_string(config.get('metadata-validator-config-url'),
'config param metadata-validator-config-url',
optional=True)

# build the validators before trying to connect to arango
metaval = get_validators(metaval_url) if metaval_url else MetadataValidatorSet()

# meta params may have info that shouldn't be logged so don't log any for now.
# Add code to deal with this later if needed
print(f'''
Expand All @@ -95,10 +100,15 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]:
auth-full-admin-roles: {', '.join(full_roles)}
auth-read-admin-roles: {', '.join(read_roles)}
workspace-url: {ws_url}
workspace-read-admin-token: [REDACTED FOR YOUR PLEASURE]
workspace-read-admin-token: [REDACTED FOR YOUR ULTIMATE PLEASURE]
kafka-bootstrap-servers: {kafka_servers}
kafka-topic: {kafka_topic}
metadata-validators-config-url: {metaval_url}
''')

# build the validators before trying to connect to arango
metaval = get_validators(metaval_url) if metaval_url else MetadataValidatorSet()

arangoclient = _arango.ArangoClient(hosts=arango_url)
arango_db = arangoclient.db(
arango_db, username=arango_user, password=arango_pwd, verify=True)
Expand All @@ -114,9 +124,10 @@ def build_samples(config: Dict[str, str]) -> Tuple[Samples, KBaseUserLookup]:
col_schema,
)
storage.start_consistency_checker()
kafka = _KafkaNotifer(kafka_servers, _cast(str, kafka_topic)) if kafka_servers else None
user_lookup = KBaseUserLookup(auth_root_url, auth_token, full_roles, read_roles)
ws = _WS(_Workspace(ws_url, token=ws_token))
return Samples(storage, user_lookup, metaval, ws), user_lookup
return Samples(storage, user_lookup, metaval, ws, kafka), user_lookup


def split_value(d: Dict[str, str], key: str):
Expand Down
12 changes: 11 additions & 1 deletion lib/SampleService/core/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(self, bootstrap_servers: str, topic: str):
# presumably this can be removed once idempotence is supported
max_in_flight_requests_per_connection=1,
)
self._closed = False

def notify_new_sample_version(self, sample_id: UUID, sample_ver: int):
if sample_ver < 1:
Expand All @@ -104,11 +105,20 @@ def notify_new_sample_version(self, sample_id: UUID, sample_ver: int):
self._send_message(msg)

def _send_message(self, message):
if self._closed:
raise ValueError('client is closed')
future = self._prod.send(self._topic, _json.dumps(message).encode('utf-8'))
# ensure the message was send correctly, or if not throw an exeption in the correct thread
future.get(timeout=35) # this is very difficult to test

# TODO KAFKA notify on sample save
def close(self):
"""
Close the notifier.
"""
# handle with context at some point
self._prod.close()
self._closed = True

# TODO KAFKA notify on ACL change
# TODO KAFKA notify on new link
# TODO KAFKA notify on expired link via update
Expand Down
Loading