Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40414: Write PipelineTask to transmit alerts to Kafka #191

Merged
merged 3 commits into from Mar 1, 2024

Conversation

bsmartradio
Copy link
Contributor

Add a producer to packageAlerts in ap_association. The producer will send alerts from the pipeline to the alert stream using several environment variables to set the server, user, password, and topic.

Jenkins run here: https://rubin-ci.slac.stanford.edu/blue/organizations/jenkins/stack-os-matrix/detail/stack-os-matrix/571/pipeline

Copy link
Contributor

@ebellm ebellm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good overall, a range of suggestions throughout.

from confluent_kafka import KafkaException
import confluent_kafka
except ImportError as error:
error.msg += ("Could not import confluent_kafka. Alerts will not be sent to the alert stream")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message isn't logged anywhere...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be more appropriate to use error.add_note("Could not import...") rather than trying to modify the original error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this so I could add logging.


"""Methods for packaging Apdb and Pipelines data into Avro alerts.
"""
_ConfluentWireFormatHeader = struct.Struct(">bi")
latest_schema = Schema.from_file().definition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't load this here--you want to use the schemaFile set up in PackageAlertsConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved _ConfluentWireFormatHeader and swapped to schemaFile

topic = self.kafka_topic

for alert in alerts:
alert_bytes = self._serialize_alert(alert, schema=latest_schema, schema_id=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
alert_bytes = self._serialize_alert(alert, schema=latest_schema, schema_id=1)
alert_bytes = self._serialize_alert(alert, schema=self.alertSchema.definition, schema_id=1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -372,3 +451,76 @@ def streamCcdDataToBytes(self, cutout):
cutout.write(streamer, format="fits")
cutoutBytes = streamer.getvalue()
return cutoutBytes

def _serialize_alert(self, alert, schema=latest_schema, schema_id=0):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _serialize_alert(self, alert, schema=latest_schema, schema_id=0):
def _serialize_alert(self, alert, schema=self.alertSchema.definition, schema_id=0):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -63,6 +77,12 @@ class PackageAlertsConfig(pexConfig.Config):
default=os.path.join(os.getcwd(), "alerts"),
)

produceAlertsConfig = pexConfig.Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
produceAlertsConfig = pexConfig.Field(
doProduceAlerts = pexConfig.Field(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapped to this and added doWriteAlerts

if self.config.produceAlertsConfig and "confluent_kafka" in sys.modules:
self.produceAlerts(alerts, ccdVisitId)

else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer that we use separate configs for producing and writing alerts: doProduceAlerts and doWriteAlerts. Then we can raise an error if doProduceAlerts is set and confluent_kafka isn't loaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, we now have both, as well as a check if neither is called set since that would be a situation where nothing happens to the alerts.

Format prefix.
"""
buf = io.BytesIO()
# TODO: Use a proper schema versioning system
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# TODO: Use a proper schema versioning system
# TODO: Use a proper schema versioning system (DM-42606)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

try:
import confluent_kafka
except ImportError as error:
error.msg += ("Could not import confluent_kafka. Alerts will not be sent to the alert stream")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this error message isn't being used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import has been moved inside the __init__ to use logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import now is back with the main imports and sets confluent_kafka = None when it fails. This then triggers an error message to be logged if doProduceAlerts was set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test applying and interpreting the Confluent wire format header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added as test_serialize_alert_round_trip. As per our discussion, there is some concern on the loss of float accuracy during the round trip, as it can vary between 5 and 7 decimal points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is now fixed with setting variables explicitly to 32 bit floats.


"""Methods for packaging Apdb and Pipelines data into Avro alerts.
"""
_ConfluentWireFormatHeader = struct.Struct(">bi")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really need to be a global? Can it live in PackageAlertsTask?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now lives in the PackageAlertTask _serialize_confluent_wire_header and _deserialize since it isn't needed elsewhere.

@bsmartradio bsmartradio force-pushed the tickets/DM-40414 branch 3 times, most recently from 65493e9 to e7a419f Compare February 13, 2024 20:07
@bsmartradio bsmartradio force-pushed the tickets/DM-40414 branch 2 times, most recently from 64a723f to ca76522 Compare February 15, 2024 20:35
Copy link
Contributor

@parejkoj parejkoj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a lot of cleanup comments. I didn't look at all of the test: make the skipIf decorator cleanups and I'll look at the tests more.

You have a mix of snake_case and camelCase in the changes here: please be consistent in this package. The existing code is camelCase, so please use that for all variables and functions.

python/lsst/ap/association/diaPipe.py Outdated Show resolved Hide resolved
python/lsst/ap/association/diaPipe.py Outdated Show resolved Hide resolved
python/lsst/ap/association/packageAlerts.py Outdated Show resolved Hide resolved
@@ -63,6 +69,18 @@ class PackageAlertsConfig(pexConfig.Config):
default=os.path.join(os.getcwd(), "alerts"),
)

doProduceAlerts = pexConfig.Field(
dtype=bool,
doc="Turn on alert production to kafka if true. Set to false by default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't repeat the default in the docstring, it will be displayed in the docs automatically. Same for the below.

Suggested change
doc="Turn on alert production to kafka if true. Set to false by default",
doc="Turn on alert production to kafka if true.",


doWriteAlerts = pexConfig.Field(
dtype=bool,
doc="Write alerts to disk if true. Set to true by default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this config interact with doProduceAlerts above? This suggests we're just writing to disk, the other suggests we're sending to kafka: are they exclusive, or can you do both? That should go in these docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are both independent of one another, and can both be set or one or the other. An error only needs to be set if both are not set.


def _delivery_callback(self, err, msg):
if err:
_log.debug('%% Message failed delivery: %s\n' % err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it failed, shouldn't it be log.error here or log.warning?

tests/test_packageAlerts.py Outdated Show resolved Hide resolved
tests/test_packageAlerts.py Outdated Show resolved Hide resolved
tests/test_packageAlerts.py Outdated Show resolved Hide resolved
tests/test_packageAlerts.py Outdated Show resolved Hide resolved
Copy link
Contributor

@parejkoj parejkoj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we run the tests that depend on kafka, if kafka isn't in rubin-env? Is it in rubin-env-developer?

@bsmartradio
Copy link
Contributor Author

bsmartradio commented Feb 15, 2024 via email

@bsmartradio bsmartradio force-pushed the tickets/DM-40414 branch 3 times, most recently from 8378085 to 2b06795 Compare February 16, 2024 23:46
Copy link
Contributor

@ebellm ebellm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting there! A couple more small items.

doWriteAlerts = pexConfig.Field(
dtype=bool,
doc="Write alerts to disk if true.",
default=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default=True,
default=False,

This should default to false to match the existing behavior.

Also, I just appreciated that we need to formally deprecate doPackageAlerts rather than simply removing it: see https://developer.lsst.io/stack/deprecating-interfaces.html#config-deprecation

also needs an RFC: https://developer.lsst.io/communications/rfc.html#standard-procedures-that-require-an-rfc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to avoid the song and dance we could just continue to use doPackageAlerts in place of doWriteAlerts. It's less clear but less hassle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll go this way for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the package again, I think we can actually leave doWriteAlerts as doPackageAlerts is actually at the diaPipe level and not in the PackageAlertsTask? Do doPackageAlerts gets set in dia pipe, then inside there doWriteAlerts or doProduceAlerts gets set. There was never a config to turn on or off writing the alerts out.

python/lsst/ap/association/packageAlerts.py Outdated Show resolved Hide resolved

header_bytes = alert_bytes[:5]
version = self._deserialize_confluent_wire_header(header_bytes)
assert version == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.assertEqual instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed changing that one, thanks for pointing it out. Swapped to self.assertEqual

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose of that bare assert was to check that the version coming out of the header is always 0. I don't know if that's actually the behavior we want here, but I don't think it's a unittest-related thing, but rather a "is the code sensible" test, hence a bare assert. I might be wrong, though?

Comment on lines 544 to 553
try:
self.alertPackager.run(associatedDiaSources,
diaCalResult.diaObjectCat,
loaderResult.diaSources,
diaForcedSources,
diffIm,
template)
except ValueError as err:
# Continue processing even if alert sending fails
self.log.error(err)
Copy link
Contributor

@parejkoj parejkoj Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task no longer raises ValueError in run: I think you should probably just call run without a try: block here (aka, remove your change), and we'll sort out what error conditions we might want to handle with a post RFC-958 custom error. As it stands, you're already catching and logging kafka errors, which I think is all that we should do for error handling here, until proven otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, removed

python/lsst/ap/association/packageAlerts.py Show resolved Hide resolved
Comment on lines 139 to 140
self.log.error("Produce alerts is set but confluent_kafka is not present in "
"the environment. Alerts will not be sent to the alert stream.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a raise (probably RuntimeError for now?)? If someone sets that to True, they want to produce alerts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume then it would be fine to remove the self.log.error since raise would be logged?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we just raise we don't need to log.

python/lsst/ap/association/packageAlerts.py Outdated Show resolved Hide resolved
Comment on lines 100 to 111
self.password = os.getenv("AP_KAFKA_PRODUCER_PASSWORD")
self.username = os.getenv("AP_KAFKA_PRODUCER_USERNAME")
self.server = os.getenv("AP_KAFKA_SERVER")
self.kafkaTopic = os.getenv("AP_KAFKA_TOPIC")

if not self.password:
raise ValueError("Kafka password environment variable was not set.")
if not self.username:
raise ValueError("Kafka username environment variable was not set.")
if not self.server:
raise ValueError("Kafka server environment variable was not set.")
if not self.kafkaTopic:
raise ValueError("Kafka topic environment variable was not set.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you can't do these with a walrus, at least put each if directly under the value that is being assigned to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 519 to 520
self.log.debug('%% Message delivered to %s [%d] @ %d\n'
% (msg.topic(), msg.partition(), msg.offset()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use lazy formatting:

Suggested change
self.log.debug('%% Message delivered to %s [%d] @ %d\n'
% (msg.topic(), msg.partition(), msg.offset()))
self.log.debug('%% Message delivered to %s [%d] @ %d', msg.topic(), msg.partition(), msg.offset())

Comment on lines 139 to 157
def __init__(self, *args, **kwargs):
super(TestPackageAlerts, self).__init__(*args, **kwargs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary, please remove.


header_bytes = alert_bytes[:5]
version = self._deserialize_confluent_wire_header(header_bytes)
self.assertEqual(version, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this assert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a sanity check leftover to make sure the version was correct. I can remove it since it isn't necessary for the unit checks.

@@ -153,6 +204,49 @@ def setUp(self):
self.cutoutWcs.wcs.cd = self.exposure.getWcs().getCdMatrix()
self.cutoutWcs.wcs.ctype = ["RA---TAN", "DEC--TAN"]

def _deserialize_alert(self, alert_bytes):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this, and the _deserialize_confluent_wire_header should both be free functions here: they don't need to live in the test class at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed _deserialized_confluent_wire_header because its not needed if we aren't doing the header version sanity check.


header_bytes = alert_bytes[:5]
version = self._deserialize_confluent_wire_header(header_bytes)
assert version == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose of that bare assert was to check that the version coming out of the header is always 0. I don't know if that's actually the behavior we want here, but I don't think it's a unittest-related thing, but rather a "is the code sensible" test, hence a bare assert. I might be wrong, though?

Copy link
Contributor

@parejkoj parejkoj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More changes.

@parejkoj
Copy link
Contributor

There are some comments from the previous review that were apparently missed: I marked them with 👀 .

@ebellm ebellm self-requested a review February 27, 2024 20:51
@@ -63,6 +67,18 @@ class PackageAlertsConfig(pexConfig.Config):
default=os.path.join(os.getcwd(), "alerts"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I view this behavior as only meant for debugging, for which the cwd is a reasonable default. There's no sensible choice in production as the PP pod will get blown away. (And for that reason we should figure out a more useful scheme on a future ticket.)

bsmartradio and others added 2 commits February 28, 2024 10:50
Add confluent_kafka to ap_association so that alerts can be sent to the alert stream when doProduceAlerts is set. Additionally, update the write alerts function to only write when doWriteAlerts is set. Add unit tests to test the new functionality.
The schema is f32, so we have to use that in the test data to make it
round trip.
The assert in the test itself needs to be exact equality.
@bsmartradio
Copy link
Contributor Author

Copy link
Contributor

@parejkoj parejkoj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple remaining comments about docstrings, but otherwise this looks much better, thank you.

Comment on lines 267 to 268
Parameters
----------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aren't lined up.

"""Serialize an alert to a byte sequence for sending to Kafka.

Parameters
----------`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excess ` at the end

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If people are interested, I have been experimenting with numpydoc validation in middleware packages and can point people at that. Also, the velin package can fix some problems automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timj Yes I definitely would be interested in that!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

velin fixes automatically both of the problems that @parejkoj points out above. It also fixes some other indent issues and variable name and section headings. We don't install velin in the shared stack by default (it's not in rubin-develop-env but maybe it could be cc/ @ktlim ).

numpydoc is also not in the rubin env but has many more extensive checks. It also needs to have a lot of configuration. The quickest way to see what you need to do is to look at lsst/daf_butler#922 and see the changes I made to pyproject.toml and the github workflows.

You can run the validation with:

$ python -m numpydoc.hooks.validate_docstrings $(find python -name "*.py")

after you've installed numpydoc. This will definitely need some tuning in your config files to keep the checks you think are useful.

Moved functions in unit tests and cleaned up packageAlerts.
@bsmartradio bsmartradio merged commit 61a7c33 into main Mar 1, 2024
2 checks passed
@bsmartradio bsmartradio deleted the tickets/DM-40414 branch March 1, 2024 04:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants