Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): Add http source using Warp #1650

Open
wants to merge 12 commits into
base: master
from

Conversation

@gedkins
Copy link

gedkins commented Jan 31, 2020

This addresses #328.

This is based off the logplex source. Code that may be reused by other http source plugins has been separated out into a util/http.rs.

Supports text, json and ndjson body.

Signed-off-by: Giles Edkins gedkins@bluecatnetworks.com

This is based off the logplex source. Code that may be reused by other http source plugins has been separated out into a util/http.rs.

Supports text, json and ndjson body.

Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
@leshow

This comment has been minimized.

Copy link
Contributor

leshow commented Jan 31, 2020

Hey! I got a notification for this, I hope you don't mind if I nitpick a few random things.

src/sources/http.rs Outdated Show resolved Hide resolved
src/sources/http.rs Outdated Show resolved Hide resolved
fn json_parse_array_of_object(value: JsonValue) -> Result<Vec<Event>, ErrorMessage> {
match value {
JsonValue::Array(v) => {
let mut out_vec = vec![];

This comment has been minimized.

Copy link
@leshow

leshow Jan 31, 2020

Contributor
        JsonValue::Array(v) => Ok(v
            .into_iter()
            .map(json_parse_object)
            .collect::<Result<Vec<Event>, _>>()?),

There's a FromIterator<Result<A, E>> that makes this collect() call work. Kinda like traverse from Haskell. I didn't run any tests but I think this says the same thing.

@binarylogic

This comment has been minimized.

Copy link
Member

binarylogic commented Jan 31, 2020

Thanks! @lukesteensen is the best person to review this since he built the logplex source.

Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
@binarylogic binarylogic requested review from Hoverbear and removed request for lukesteensen Feb 3, 2020
@binarylogic binarylogic assigned Hoverbear and unassigned lukesteensen Feb 3, 2020
@binarylogic binarylogic added this to In progress in Weekly Sprint (Feb 3, 2020) via automation Feb 3, 2020
}

fn json_error() -> ErrorMessage {
ErrorMessage::new(StatusCode::BAD_REQUEST, "Bad JSON".to_string())

This comment has been minimized.

Copy link
@Hoverbear

Hoverbear Feb 3, 2020

Contributor

Maybe this should support some error context? Eg "Expected Array or Object, got Number"

}

fn json_parse(json_str: String) -> Result<JsonValue, ErrorMessage> {
serde_json::from_str(&json_str).map_err(|_| json_error())

This comment has been minimized.

Copy link
@Hoverbear

Hoverbear Feb 3, 2020

Contributor

I don't know if this function's purpose warrants it's existence. If all it does is throw away error context from from_str maybe we can just remove it entirely?

src/sources/http.rs Outdated Show resolved Hide resolved
)
.and_then(|events| {
out.send_all(futures::stream::iter_ok(events)).map_err(
move |_: mpsc::SendError<Event>| {

This comment has been minimized.

Copy link
@Hoverbear

Hoverbear Feb 3, 2020

Contributor

Is there any context we can get from this error? It'd be sad to throw it away.

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 4, 2020

We sort of can get more context.
We can log the event it was trying to send when it failed (which is a good idea), but the only reason it would fail is if the receiver is dropped, and the error doesn't give any more insight than that.

This comment has been minimized.

Copy link
@Hoverbear

Hoverbear Feb 13, 2020

Contributor

OK, fair!

@Hoverbear

This comment has been minimized.

Copy link
Contributor

Hoverbear commented Feb 3, 2020

Once you're feeling this is ready to go code-wise we should also add documentation! You can see what that would look like here: https://github.com/timberio/vector/pull/1264/files#diff-d97eef340a70cc48bfd4c26e66a12780

bill-bateman added 2 commits Feb 4, 2020
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
@gedkins gedkins requested a review from binarylogic as a code owner Feb 4, 2020
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
@gedkins gedkins changed the title feat(http source): add http source using Warp feat(http source): Add http source using Warp Feb 5, 2020
gedkins added 2 commits Feb 5, 2020
Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
@gedkins gedkins changed the title feat(http source): Add http source using Warp feat(sources): Add http source using Warp Feb 5, 2020
@binarylogic binarylogic added this to In progress in Weekly Sprint (Feb 10, 2020) via automation Feb 10, 2020
@binarylogic binarylogic moved this from In progress to To do in Weekly Sprint (Feb 10, 2020) Feb 10, 2020
@bill-bateman

This comment has been minimized.

Copy link

bill-bateman commented Feb 13, 2020

Just checking in if you are waiting on anything from us.

Is the documentation that we added enough?

@Hoverbear

This comment has been minimized.

Copy link
Contributor

Hoverbear commented Feb 13, 2020

@bill-bateman Somehow this notification got lost, I'm sorry about the delay. :(

@bill-bateman

This comment has been minimized.

Copy link

bill-bateman commented Feb 13, 2020

@Hoverbear no worries!

@Hoverbear Hoverbear requested a review from LucioFranco Feb 13, 2020
@Hoverbear

This comment has been minimized.

Copy link
Contributor

Hoverbear commented Feb 13, 2020

@LucioFranco I'm asking you to give a second set of eyes to this since I feel like it has some big picture ideas involved I might be yet familiar with.

.meta/sources/http.toml Outdated Show resolved Hide resolved
Weekly Sprint (Feb 10, 2020) automation moved this from To do to Review in progress Feb 13, 2020

fn string_to_static_str(s: String) -> &'static str {
//necessary because warp 0.1.18 needs a &'static str for the path
Box::leak(s.into_boxed_str())

This comment has been minimized.

Copy link
@Hoverbear

Hoverbear Feb 13, 2020

Contributor

I don't think this is safe, and it definitely looks like a memory leak. https://doc.rust-lang.org/std/boxed/struct.Box.html#method.leak

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 13, 2020

Yeah it's kinda gross. Couldn't see a way around this if we wanted the path to come from the config, since the version of warp needs &'static str.

If there's a way around this that I don't know then I'd love to know it.

Or if it's more important to not have the leak, we could just pass in a &'static str and have it be hard-coded.

This comment has been minimized.

Copy link
@Hoverbear

Hoverbear Feb 13, 2020

Contributor

I'd prefer to see it hard coded until we upgrade warp to 0.2.x, since this problem only exists for us in 0.1.x. :)

If we're listening on a socket, I suggest just listen to everything on that socket. The path could be part of the event maybe? @binarylogic 's spec in #328 didn't mention anything about being able to set paths.

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 13, 2020

OK! - I think I'll just make it hard-coded for now, since it doesn't seem to be needed.

You're right, there is to be a way to get the full-path after the fact (see here) but they say

should probably not be used for request matching/routing.

so not sure about that, better I think to have the hard-coded path and worry about this when someone wants a configurable path (by which point we'll hopefully be on warp 0.2.x!)

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

Yeah, for now I think removing path support is fine 👍

@Hoverbear

This comment has been minimized.

Copy link
Contributor

Hoverbear commented Feb 13, 2020

@gedkins Can you ammend that commit with a DCO sign? git commit --amend -s

gedkins and others added 2 commits Feb 13, 2020
Co-Authored-By: Ana Hobden <operator@hoverbear.org>
Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
@gedkins gedkins force-pushed the gedkins:http_source_warp branch from 876e9f3 to 9384507 Feb 13, 2020
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
let trigger = trigger.clone();
info!("Handling http request: {:?}", headers);

futures::future::result(

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

btw if you bring in the IntoFuture trait then you can do Result::into_future.

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 14, 2020

oh nice didn't know about that

pub trait HttpSource: Clone + Send + Sync + 'static {
fn build_event(
&self,
body: impl Buf,

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

Is there a reason we don't just make this FullBody?

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 14, 2020

FullBody doesn't seem to exist in 0.2.x versions of warp so I just didn't use it.

0.1.20 (our version) vs 0.2.1

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

Right it should exist since we are using 0.1, right?

}
let svc = filter
.and(warp::path::end())
.boxed()

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

why does this need to be boxed here?

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 14, 2020

doesn't need to be


fn string_to_static_str(s: String) -> &'static str {
//necessary because warp 0.1.18 needs a &'static str for the path
Box::leak(s.into_boxed_str())

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

Yeah, for now I think removing path support is fine 👍

Err(r)
}
};
futures::future::result(err)

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

should be able to use into_future here too

.filter(|s| !s.is_empty())
}

fn decode_body(body: impl Buf, enc: Encoding) -> Result<Vec<Event>, ErrorMessage> {

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

So I think it may be a bit better here to do something like:

let body = buf.collect::<BytesMut>();

then for the encodings that need them in lines format we can take advantage of https://github.com/timberio/vector/blob/master/lib/codec/src/lib.rs#L18

then for the json encoding we can do something like:

serde_json::from_slice(&body[..])

This would avoid some additional copying

This comment has been minimized.

Copy link
@bill-bateman

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 14, 2020

new commit is mostly for this. lots of changes to the body_to_lines function, which is now not the prettiest... let me know what you think of it, if there are better ways to do what I'm doing.

one issue I had is that the codec won't give the last line if it doesn't end in a newline, so I have a special case for that.

This comment has been minimized.

Copy link
@LucioFranco

LucioFranco Feb 14, 2020

Member

Our codec that we wrote should handle that, you may want to call decode_eof though instead which should handle the last line.

This comment has been minimized.

Copy link
@bill-bateman

bill-bateman Feb 14, 2020

oh perfect! Thanks :)

Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Weekly Sprint (Feb 10, 2020)
  
Review in progress
Linked issues

Successfully merging this pull request may close these issues.

None yet

7 participants
You can’t perform that action at this time.