Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-17549: Schema compatibility fixes. #22

Merged
merged 13 commits into from May 9, 2019
Merged

Conversation

jdswinbank
Copy link
Contributor

No description provided.

Rather than trying to retrofit an external schema onto the data which has been
loaded.
This is *not* updating the event contents to the newer v2.0 schema: this is
exactly the same data, but rewriting the stored data so that the corresponding
schema is bit-for-bit identical with what we're currently calling v1.0.
(Exactly what generated these is lost in the mists of time...)
@JulienPeloton
Copy link
Contributor

Hi @jdswinbank - switching the discussion from #21 here.

Thanks for the changes, it works like a charm! I had no problem:

  • reconstructing the container, and pulling new data from git LFS
  • encoding and sending alerts
  • consuming alerts using the repo tools (monitorStream.py, and so on).
  • catch & decode alerts using third party tool (see image below):

alert_5sec_window

alert_total

Just few minor comments though:

Schema hash vs alert data

I had some trouble initially to decode the alerts from the alert itself (i.e. not using the alert schemata delivered in the sample-avro-alert repo) because of the newly introduced hash. I found quickly the solution reading at the code, but I think that would be good to emphasis somewhere the split between the hash and then the data:

alert = # some bytearray coming

# Schema hash
schema_hash = struct.unpack("!I", alert[1:5])[0]

# The alert data to be decoded using schema
binary_data = alert[5:]

Schema version in the alert?

Currently the decoded alert data itself does not contain the schema version number:

print(decoded_alert.keys())
dict_keys(['alertId', 'l1dbId', 'diaSource', 'prv_diaSources', 
  'diaObject', 'ssObject', 'diaObjectL2', 'diaSourcesL2', 
  'cutoutDifference', 'cutoutTemplate'])

I understand the hash at the beginning of the binary message should play this role, but would it be possible to add en entry in the alert data with schema_version=X.Y?

Thanks!

@jdswinbank
Copy link
Contributor Author

Great! Glad to hear it's working for you.

Schema hash vs alert data

I agree with your comments here. The best reference for this is likely DMTN-093, but I will add a note to the readme file in this repository as well.

Schema version in the alert?

I don't see any problem with doing this. In fact, it was my original intention, until I decided to follow Confluent's approach with the wire format instead. I will add a new (v2.1) schema on lsst/alert_packet#5 for you and @ebellm to experiment with.

@jdswinbank
Copy link
Contributor Author

Schema version in the alert?

I gave this some more thought, and actually I'm not sure why it's useful. In order to deserialize the alert you already have to have the schema which wrote it, so you presumably already know its version. What use case does having the schema version embedded in the alert satisfy?

(I'd previously thought you might get away with being able to read the alert using only a reader schema of a different version from the writer. However, that's not the case — you always need the writer schema, even if you also provide a different reader schema to help with schema migration.)

@JulienPeloton
Copy link
Contributor

Thanks @jdswinbank for the update of the README.

I gave this some more thought, and actually I'm not sure why it's useful. In order to deserialize the alert you already have to have the schema which wrote it, so you presumably already know its version. What use case does having the schema version embedded in the alert satisfy?

I agree with the fact you must have the schema to deserialize the alert in between the alert system and a broker. My initial idea was about redundancy - especially in the context of redistributing alerts. Avro might not be format for redistribution and having some versioning system attached to the data makes this process less prone to mismatch.

Having said that I also imagine that alert size must be as small as possible - and adding 100% redundant information might not be wise after all... Brokers can handle this.

In any case I have no strong opinion about it - and the current state of the PR looks good to me!

@jdswinbank
Copy link
Contributor Author

Ok, thanks @JulienPeloton.

I'm inclined to steer clear of adding a schema version for now, because I think it opens a whole can of semantic worms. Apart from just making the packets bigger (which I suspect is not a big deal really, but I've not run any numbers to check), I worry about what the schema version would mean. In an Avro packet, that's clear: it would be the version of the schema used to write that packet. But what happens when I deserialize that to JSON? Is it the version of the schema that I used to write it, or to read it? What about if I go through multiple decode-encode cycles using different versions of the schema, but preserving the same semantic content (say, schema version N has different optional fields than schema version M, but is otherwise the same — if I serialize with N, deserialize, serialize with M, deserialize... what should be the schema version attribute in the resulting JSON data?).

Just thinking about this makes my head hurt, so I'm going to duck the question until & unless we have a really concrete use case that can guide this thinking! :-)

@JulienPeloton
Copy link
Contributor

@jdswinbank, I agree postponing the discussion until a concrete use case emerges.
Defining a meaningful versioning system in this context is the key indeed. Thanks for all the inputs and changes!

@stefanv
Copy link

stefanv commented Feb 22, 2019

This PR fixes several issues I ran into while trying to get a sample stream up—thank you! One remaining issue: I cannot locate lsst.alert.packet. @jdswinbank Can you confirm whether this should have been included?

@jdswinbank
Copy link
Contributor Author

lsst.alert.packet comes as part of the lsst-dm/sample-avro-alert repository. You'll need the tickets/DM-17549 branch, which you can find here: https://github.com/lsst-dm/sample-avro-alert/tree/tickets/DM-17549

And, of course, you'll need to make sure that's on your PYTHONPATH, which I haven't yet properly handled.

I'm sorry that this work is sitting on obscure branches in non-obviously-named repositories at the moment: I hope to schedule some time to clean it up shortly.

@stefanv
Copy link

stefanv commented Feb 22, 2019

Thanks for clarifying, John. Does that version of lsst replace the one in the current repo, or do these live together somehow as a namespace package? PYTHONPATH is no problem; it would be good to have both these changesets merged, though!

@ebellm
Copy link
Contributor

ebellm commented Feb 22, 2019 via email

@jdswinbank
Copy link
Contributor Author

They live together as a “native namespace package” (assuming modern enough Python) — sample-avro-alert provides lsst.alert.packet, and this provides lsst.alert.stream. Just stick 'em both on your PYTHONPATH and you should be all set.

@JulienPeloton
Copy link
Contributor

You might already know it, but if the two repos (lsst-dm/sample-avro-alert & lsst-dm/alert_stream) are intended to be kept separated but users must have one in order for the other to work, you could release the lsst-dm/sample-avro-alert repo under the git submodules tool to make the link clearer. From my experience this is only great for a submodule that does not change often, otherwise it is not without hiccups and troubles...

@jdswinbank
Copy link
Contributor Author

jdswinbank commented Feb 24, 2019

Thanks @JulienPeloton!

I agree that submodules could be useful here, although LSST tends to eschew them for software in favour of a system built on EUPS. I personally think submodules would be more appropriate here, though, and since this code is relatively independent of the rest of the LSST codebase, we might have more freedom to choose a different path. That'll take some negotiation with the rest of the project, though.

Anyway, the full story here is that, after Eric has finished his code review and (assuming he is happy!) this work is merged to master, I'd like to look at a reorganization of these repositories. I've not thought about this in detail yet, but I envision something like:

  • lsst-dm/alert_schema — which only contains alert schemata in JSON format;
  • lsst-dm/alert_library — which contains the lsst.alert.packet and lsst.alert.stream library code;
  • lsst-dm/alert_deploy — which contains scripts and configuration for Docker, Kubernetes, etc.

At that point, we'll be able to think about sensible ways to manage the dependencies between them.

Of course, I am not the only stakeholder here; Eric and others will also have opinions on how this material should be structured.

@JulienPeloton
Copy link
Contributor

Thanks @jdswinbank for the detailed explanations! Good point about EUPS, I tend to forget about this.

I like the split into schema, library and deploy. Especially having deployment scripts living a separate repo would be a must as I can imagine it can be useful for other codes depending on such infrastructure (Kafka/Zookeeper) :-)

@ebellm ebellm self-requested a review February 26, 2019 00:40
Dockerfile Outdated

# Get schemas and template data.
WORKDIR /home
RUN git clone https://github.com/lsst-dm/sample-avro-alert.git
RUN git clone https://github.com/lsst-dm/sample-avro-alert.git && cd sample-avro-alert && git checkout tickets/DM-17549
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we merge the sample-avro-alert branch of this ticket let's remove the ticket branch here...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the sample-avro-alert changes are approved I suggest you go ahead and merge it and remove the ticket branch.

README.rst Outdated
==================

The LSST Alert Distribution Service distributes alert packets, formatted using `Apache Avro`_ , using the `Apache Kafka`_ streaming platform.
Each alert is packaged as a separate Kafka message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "transmitted" is better than "packaged" here.

README.rst Outdated

The LSST Alert Distribution Service distributes alert packets, formatted using `Apache Avro`_ , using the `Apache Kafka`_ streaming platform.
Each alert is packaged as a separate Kafka message.
Schemata are not sent with the alerts: the consumer is assumed to receive a copy of the schema through some other mechanism (currently by cloning https://github.com/lsst-dm/sample-avro-alert).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schemas.

README.rst Outdated
Schemata are not sent with the alerts: the consumer is assumed to receive a copy of the schema through some other mechanism (currently by cloning https://github.com/lsst-dm/sample-avro-alert).
Alerts are packaged using the `Confluent Wire Format`_.
This means that the first byte of the message received may be ignored, the next four constitute a “schema identifier” (which may be used to identify the schema used to write the packet) and the remainder constitute the alert data serialized in Avro format.
Intepreting the packet in (for example) Python may then be done as follows::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... I'm concerned about the mismatch between the wire format coming through Kafka and the on-disk serialization being written by sample-avro-alert.

Instead of presenting the user the wire format here, I think we should direct them to tools in sample-avro-alert that abstract away those concerns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this (or, at least, things closely related) in the Jira ticket. I think that there we agreed (for now, at least) that it's appropriate for the on-disk format and the on-wire format to be different, so I won't address that here.

Beyond that, I certainly agree that we should make things as easy as possible for users to quickly access and play with the alert stream.

However, I do think we also need to clearly document the wire format. I would expect that many users, and likely all brokers, will want to write their own code to receive data from Kafka directly, rather than trying to interface with our AlertConsumer. To do that, they need to understand what they're actually going to be receiving. Given that, I'm reluctant to remove this text entirely.

How about if I move this section to the end of the README and write some extra text to make clear that it's the “expert user mode”?

else:
print(msg)

streamWatcher.poll()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... I don't understand why you eliminated the message printing, which provides a useful way of watching the stream function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm confused about what this tool is supposed to do. If we want a tool for printing the messages, isn't that printStream.py?

That said, I've no wish to break existing workflows here, so I'll put the print back for now, and file another ticket to try to get a description of what monitorStream.py is actually for.

self.producer.produce(self.topic, data)
outgoing_bytes = BytesIO()
outgoing_bytes.write(struct.pack("!b", 0))
outgoing_bytes.write(struct.pack("!I", SchemaRegistry.calculate_hash(self.schema)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well calculate the hash once and save it as a class attribute, unless we envision changing schemas on the fly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do envision that, as it's a natural consequence of https://github.com/lsst-dm/alert_stream/pull/22/files#r260997692 (although I should go and check my thinking on that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obsolete, given changes elsewhere.

# For now, we hard-code filter outputs to be in schema v1.0; ultimately,
# we should consider whether we want to preserve the received schema
# version.
outgoing_schema = SchemaRegistry.from_filesystem().get_version("1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filtering service should simply be forwarding alerts with the same schema they were received in. I think this can be accomplished without too much trouble by waiting for the first message (or maybe every message) from the consumer and retrieving the appropriate schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a little awkward due to the design of the code, but I think I have done something workable.

README.rst Outdated
-------------------------------

Sample data is included in the ``data`` directory.
You can also mount a local volume of data following the instructions below.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should say "You can also mount a local volume of data by replacing $PWD/data in the command below with an appropriate path to other alert data.

This (temporarily) breaks ``printStream.py``, ``filterStream.py`` and other
code which relies on AlertConsumer.

Note that his removes a bunch of unexplained special cases in poll() and
decode_message(). It's not clear if they were left over experiemnts or working
around weird failure modes. Things seem to be working fine without them for
now...
In particular, mentioning the Confluent wire format and providing brief
instructions for unpacking.
This means we have to specify the schema to be used to serialize the data
every time we send a new message. That's arguably clunky, but it makes it
easier to “pass through” the schema from received message to filtered stream.
self.producer.produce(self.topic, data)
outgoing_bytes = BytesIO()
outgoing_bytes.write(struct.pack("!b", 0))
outgoing_bytes.write(struct.pack("!I", SchemaRegistry.calculate_id(schema)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a bug--we need to instantiate a concrete SchemaRegistry in order to call calculate_id.

Otherwise I get:

  File "bin/sendAlertStream.py", line 47, in schedule_delays
    yield from asyncio.ensure_future(delay(wait_time, function, arg))
  File "bin/sendAlertStream.py", line 27, in delay
    return function(*args)
  File "bin/sendAlertStream.py", line 80, in send_visit
    streamProducer.send(schema, record)
  File "/home/alert_stream/python/lsst/alert/stream/alertProducer.py", line 48, in send
    outgoing_bytes.write(struct.pack("!I", SchemaRegistry.calculate_id(schema)))
AttributeError: type object 'SchemaRegistry' has no attribute 'calculate_id'

Copy link
Contributor Author

@jdswinbank jdswinbank May 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, weird, we shouldn't need to instantiate it, because it's a static method.

My guess is actually that you are running in Docker (following the instructions), but have an old version of sample-avro-alert cached which is being added to the container. Could you try running docker build --no-cache -t "alert_stream" . and see if that fixes the problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ultimately had to wipe out my entire Docker cache, but it does seem to work now. monitorStream.py doesn't seem to behave quite the way I remember, but I can't really make sense of it from the code, and it's not essential.

Copy link
Contributor

@ebellm ebellm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a bug running sendAlertStream.py

@jdswinbank jdswinbank merged commit 1620c07 into master May 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants