- Optionally, develop a shell script that will initialize the environment for the consumer (i.e., that will install any requirements that cannot be listed in a Python3 requirements file).
- The filename of the shell script is
./init.sh
. - If the shell script file is not present, then the system assumes that it is empty.
- Optionally, develop a Python3 requirements file for the consumer.
- The filename for the requirements file is
./requirements.txt
. - If the requirements file is not present, then the system assumes that it is empty.
- Develop a JSONPath to be tested against the JSON object for a given CloudEvents notification (corresponding to a given Pacifica transaction).
- The filename for the JSONPath is
jsonpath2.txt
. - The usage of the JSONPath is as follows:
- If the test returns a non-empty result set, then the consumer will be notified.
- If the test returns an empty result set, then the consumer will not be notified.
- Develop a top-level Python3 script (viz., the entry-point) that will be executed by the consumer within a virtual environment. The behavior of the entry-point is to act upon a given CloudEvents notification for a given Pacifica transaction (and its associated input data files and key-value pairs) and then to generate a new Pacifica transaction (and its associated output data files and key-value pairs).
- The filename of the entry-point is
./__main__.py
. - The usage of the entry-point is
./__main__.py SRC DST
, where:SRC
is the path to the temporary directory for the incoming Pacifica transaction.DST
is the path to the temporary directory for the outgoing Pacifica transaction.
- The input data files (downloaded from Pacifica) are located in the
SRC/downloads/
subdirectory. - The output data files (uploaded to Pacifica) are located in the
DST/uploads/
subdirectory. - The JSON object for the CloudEvents notification is
SRC/notification.json
. - The execution of the entry-point terminates with the following exit status codes:
0
= Terminated successfully.>0
= Terminated unsuccessfully.
- Compress the entry-point, the requirements file, the JSONPath file and any additional files that are necessary for execution of the entry-point into a zip archive called
consumer.zip
.
In this example, we develop a consumer that copies the input data files with all cased characters converted to uppercase.
The directory-tree listing for consumer.zip
is as follows:
/
init.sh
jsonpath2.txt
requirements.txt
__main__.py
The shell script does nothing.
#!/usr/bin/env sh
exit 0
The JSONPath returns the set of IDs for input data files whose MIME type is text/plain
.
$[?(@["eventID"] and (@["eventType"] = "org.pacifica.metadata.ingest") and (@["source"] = "/pacifica/metadata/ingest"))]["data"][*][?((@["destinationTable"] = "Files") and (@["mimetype"] = "text/plain"))]["_id"]
The requirements file specifies the JSONPath, Pacifica notification service consumer and Promise packages.
jsonpath2
pacifica-notifications-consumer
promises
The entry-point is as follows:
#!/usr/bin/env python3
import json
import os
import shutil
import sys
from jsonpath2 import Path
from pacifica_notifications_consumer import download, upload
from promise import Promise
# execute only if run as a top-level script
if __name__ == '__main__':
# path to directory for input CloudEvents notification
orig_event_path = sys.argv[1]
# path to directory for output CloudEvents notification
new_event_path = sys.argv[2]
def upload_did_fulfill(d):
"""Callback for Pacifica uploader promise's eventual value.
Args:
d (dict): CloudEvents notification.
Returns:
Promise[bool]: True for success, False otherwise.
Raises:
BaseException: If an error occurs.
"""
# delete entire directory tree for input CloudEvents notification
shutil.rmtree(orig_event_path, ignore_errors=True)
# delete entire directory tree for output CloudEvents notification
shutil.rmtree(new_event_path, ignore_errors=True)
# return True for success
return Promise(lambda resolve, reject: resolve(True))
def download_did_fulfill(d):
"""Callback for Pacifica downloader promise's eventual value.
Args:
d (dict): CloudEvents notification.
Returns:
Promise[bool]: True for success, False otherwise.
Raises:
BaseException: If an error occurs.
"""
# iterate over plain-text files
for file_d in [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Files" and @["mimetype"] = "text/plain")]').match(d) ]:
# open input data file
with open(os.path.join(orig_event_path, 'downloads', file_d['subdir'], file_d['name']), 'r') as orig_file:
# open output data file
with open(os.path.join(new_event_path, 'uploads', file_d['subdir'], file_d['name']), 'w') as new_file:
# read input data file, convert cased characters to uppercase, and then write output data file
new_file.write(orig_file.read().upper())
# invoke Pacifica uploader with specified transaction attributes and key-value pairs
return upload(new_event_path, {
'Transactions.instrument': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.instrument")]["value"]').match(d) ][0],
'Transactions.proposal': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.proposal")]["value"]').match(d) ][0],
'Transactions.submitter': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions.submitter")]["value"]').match(d) ][0],
}, {
'Transactions._id': [ match_data.current_value for match_data in Path.parse_str('$["data"][*][?(@["destinationTable"] = "Transactions._id")]["value"]').match(d) ][0],
}).then(upload_did_fulfill)
# invoke Pacifica downloader
download(orig_event_path).then(download_did_fulfill)
- Download and install the
pacifica-notifications-consumer
package.
- When the
pacifica-notifications-consumer
is successfully installed, thestart-pacifica-notifications-consumer
command is available on thePATH
. - When started, the behavior of the
start-pacifica-notifications-consumer
command is to: (i) extract the contents ofconsumer.zip
to a temporary location, (ii) create a new virtual environment, (iii) install the contents of the requirements file within said virtual environment, (iv) start a new asynchronous job processing queue, (v) start a new web service end-point for a new consumer, and (vi) register said web service end-point for said consumer with the given producer. - When stopped, the behavior of the
start-pacifica-notifications-consumer
command is reverse the side-effects of the start-up behavior (i.e., to clean up after itself).
- Execute the
start-pacifica-notifications-consumer
command, specifying as command-line arguments:
- The location of
consumer.zip
; - The URL and authentication credentials for the producer; and
- The configuration for the asynchronous job processing queue, web service end-point, etc.
Yes. Downloaded files may be deleted by the entry-point (e.g., using the shutil.rmtree
method; see __main__.py
in the example).
Yes. Locally-generated files may be deleted by the entry-point (e.g., using the shutil.rmtree
method; see __main__.py
in the example).
Yes. However, this behavior must be implemented by the entry-points themselves and is not provided by the default behavior of the start-pacifica-notifications-consumer
command.
For example, to wait for two CloudEvents notifications:
- Develop two consumers with two entry-points.
- The entry-point for the first consumer receives and stores the first CloudEvents notification.
- The entry-point for the second consumer receives the second CloudEvents notification, retrieves the first CloudEvents notification, and then does its work.
No. Only Python3 is supported as the programming language for the entry-point file (viz., __main__.py
).
Non-Python3 executables can be called from within the entry-point using the subprocess
module.
Other programming languages can be called from within the entry-point using the appropriate interface, e.g., the jpy
package for the Java programming language, and the rpy2
package for the R programming language.
Authentication credentials are specified in the configuration for the pacifica-notifications
package (see https://pacifica-notifications.readthedocs.io/en/latest/configuration.html for more information).
Authentication credentials are included in all HTTP requests that are issued by the consumer, e.g., the username is specified via the Http-Remote-User
header (see https://pacifica-notifications.readthedocs.io/en/latest/exampleusage.html for more information).
- Consumer: A software system that receives CloudEvents notifications (corresponding to Pacifica transactions) from producers. CloudEvents notifications are filtered by testing against a JSONPath. If the test for a given CloudEvents notification is successful, then the consumer routes said CloudEvents notification to a processor.
- Processor: A software system that downloads the input data files and metadata for a given Pacifica transaction, processes said input data files and associated metadata, generates output data files and associated metadata, and then creates a new Pacifica transaction.
- Producer: A software system that sends CloudEvents notifications (corresponding to Pacifica transactions) to consumers.