-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Save source msg to dlq #74
Save source msg to dlq #74
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
@DrewMcArthur is attempting to deploy a commit to the Scribe Team on Vercel. A member of the Team first needs to authorize it. |
packages-rs/drainpipe/src/db.rs
Outdated
new_msg: &str, | ||
new_seq: i64, | ||
message: &str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this type instead be a Vec<u8>
, and then do the String::from_utf8
here? I'm thinking about matching the interface with the kind
argument, which has an internal type that gets converted to a db type. I think either they should both be db types (i32
and &str
), or both be internal types (ProcessErrorKind
and Vec<u8>
) and get converted here.
Only other thing to consider is the possibility of String::from_utf8
returning an Err, but I'm not sure what would cause that. There's also a from_utf8_unchecked
that might make sense to use here? not sure!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general it's ok for the contents to be best effort. The practical use case for the dlq involves re-consuming those sequences from the firehose. This message is more for quick historical debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And yeah I think performing the conversion inside of this function, ideally with a Serialize
trait from diesel if that exists (see other comment), is the way to go here.
For that we can wrap the msg Vec in a newtype if it doesn't already implement the trait.
packages-rs/drainpipe/src/main.rs
Outdated
seq: commit.sequence, | ||
error: e, | ||
inner: e, | ||
msg: message.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used clone
here since the borrow checker yelled, but i'd think that due to the ?
, borrowing (or even moving entirely) here would be okay since the function essentially ends, but I guess the compiler doesn't know that.
packages-rs/drainpipe/src/main.rs
Outdated
&String::from_utf8(error.msg) | ||
.unwrap_or("Unable to decode message".to_string()), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will transform the Vec<u8>
to a String
for the DB, and replace it with this placeholder string if from_utf8
throws an error. might want to do something else instead though? convert the bytes to a hex string or something?
Thanks for this! I'll take a look some time next week! |
packages-rs/drainpipe/src/schema.rs
Outdated
@@ -4,7 +4,9 @@ diesel::table! { | |||
dead_letter_queue (rowid) { | |||
rowid -> Integer, | |||
seq -> BigInt, | |||
msg -> Text, | |||
err_kind -> Integer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file is autogenerated. should we be worried that it specifies this column as an Integer
rather than a SmallInt
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
running the diesel setup
and other commands regenerated this differently, so I updated the rest of the code to use an Integer
instead of SmallInt
.
@DrewMcArthur Thanks so much for this! I just pushed a few commits, the main thing I changed is storing err_msg as a blob because it's not valid utf8 (it's a cbor msg/frame). |
Just deployed this to prod and it's working well, thanks again! |
I noticed a TODO on line 149 of
main.rs
:So I thought I'd give it a try :) my rust skills are fairly rudimentary also, but this currently compiles, and I'm going to poke around and do some testing now.
Changes
DeadLetter
tableerr_kind: Integer
0 for aDecodeError
, 1 for aProcessError
, since we weren't saving any of the former to the db.source: Text
, which is a UTF-8 encoded string from theVec<u8>
message that caused the error.ProcessError
s were being saved, so I adjusted that to saveDecodeErrors
as well (using-1
for theseq
)ProcessError
struct - since they were pretty similar enums, I adjusted it to be astruct
with aProcessErrorKind
enum to differentiate between the two kinds. this way, we're able to save the one struct to the dead letter, rather than needing to handle both enum types.This was just an attempt to tackle that TODO I noticed in the code, so please call out anything that could be adjusted, as you see fit!