Most Vector sinks involve some form of network connectivity. Connecting to a network requires more involved functionality than we have covered so far in our basic sink. This tutorial will modify the sink created in the previous tutorial to send the events to an HTTP endpoint. We will cover a number of Vector framework components that make adding this functionality easy.
To start, update our imports to the following:
use std::task::Poll;
use crate::{
sinks::prelude::*,
http::HttpClient,
internal_events::SinkRequestBuildError,
};
use vector_core::config::telemetry;
use bytes::Bytes;
First we want to update our config to allow an endpoint to be specified. Add
this field to the BasicConfig
struct:
/// The endpoint to send HTTP traffic to.
///
/// This should include the protocol and host, but can also include the port, path, and any other valid part of a URI.
#[configurable(metadata(
docs::examples = "http://localhost:3000/",
docs::examples = "http://example.com/endpoint/",
))]
pub endpoint: String,
Every field in the configuration struct must have a doc comment (///
).
These are used to generate documentation for the Sink. The metadata
attribute added here is used to generate examples for the documentation.
(This is possible because the config struct is annotated with
#[configurable_component(sink("basic"))]
). Since the comments here are
used for user-facing documentation they should be good grammar and be
correctly capitalized and punctuated.
We then want to update our sink to take the endpoint from the config. At the
same time let's create an HttpClient
that will handle sending
the data. HttpClient
is our wrapper over hyper
used to send
data over http.
Update the BasicSink
struct to look like:
#[derive(Debug, Clone)]
struct BasicSink {
endpoint: String,
client: HttpClient,
}
impl BasicSink {
pub fn new(config: &BasicConfig) -> Self {
let tls = TlsSettings::from_options(&None).unwrap();
let client = HttpClient::new(tls, &Default::default()).unwrap();
let endpoint = config.endpoint.clone();
Self { client, endpoint }
}
}
Now we want to create an encoder that will take our event and convert it to raw bytes.
#[derive(Clone)]
struct BasicEncoder;
The Encoder must implement the Encoder
trait:
impl encoding::Encoder<Event> for BasicEncoder {
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<(usize, GroupedCountByteSize)> {
}
}
The Encoder
trait is generic over the type of input that we are
expecting. In our case it is Event
since we will be encoding a
single event at a time. Other Sinks may encode a Vec<Event>
if they are
sending batches of events, or they may send a completely different type if each
event is processed in some way prior to encoding.
encode_input
serializes the event to a String and
writes these bytes. The function also creates a [GroupedCountByteSize
]
grouped_count_byte_size object. This object tracks the size of the event
that is sent by the sink, optionally grouped by the source and service that
originated the event if Vector has been configured to do so. It is necessary to
calculate the sizes in this function since the encode function sometimes drops
fields from the event prior to encoding. We need the size to be calculated after
these fields have been dropped.
fn encode_input(
&self,
input: Event,
writer: &mut dyn std::io::Write,
) -> std::io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
byte_size.add_event(&input, input.estimated_json_encoded_size_of());
let event = serde_json::to_string(&input).unwrap();
write_all(writer, 1, event.as_bytes()).map(|()| (event.len(), byte_size))
}
Next we create a request builder that turns the event into a request. The request that we build here is a struct containing any data required by a Tower service that is responsible for actually sending the data to the sink's final destination external to Vector, in this case the HTTP endpoint.. We will build this service shortly.
The request looks like:
#[derive(Clone)]
struct BasicRequest {
payload: Bytes,
finalizers: EventFinalizers,
metadata: RequestMetadata,
}
The fields in the request are:
payload - the payload is the actual bytes that we will be sending out. These are the bytes
generated by our BasicEncoder
.
finalizers - EventFinalizers
is a collection of
EventFinalizer
s. An EventFinalizer
is used to track the
status of a given event and is used to support [end to end acknowledgements]
(https://vector.dev/docs/about/under-the-hood/guarantees/#acknowledgement-
guarantees).
metadata - the metadata contains additional data that is used to emit various metrics when a request is successfully sent.
We need to implement a number of traits for the request to access these fields:
impl MetaDescriptive for BasicRequest {
fn get_metadata(&self) -> &RequestMetadata {
&self.metadata
}
fn metadata_mut(&mut self) -> &mut RequestMetadata {
&mut self.metadata
}
}
impl Finalizable for BasicRequest {
fn take_finalizers(&mut self) -> EventFinalizers {
self.finalizers.take_finalizers()
}
}
The request builder must implement the RequestBuilder<>
trait:
impl RequestBuilder<Event> for BasicRequestBuilder {
There are a number of stages in the request builder process:
- The input is split out into metadata and actual event data.
- This event data is encoded using the encoder we created earlier.
- The results from the encoding are passed along with the metadata to create
the final request that is passed to the
Tower
service.
Here, the trait is generic over Event
which is the input type that
is passed in to start the request building process.
There are a number of associated types:
type Metadata = EventFinalizers;
type Events = Event;
type Encoder = BasicEncoder;
type Payload = Bytes;
type Request = BasicRequest;
type Error = std::io::Error;
Metadata - any information to be passed while building the request
that is additional to the actual event being used. In this case we just need the
EventFinalizers
.
Events - the event type passed to the Encoder
.
Encoder - the type that is used to encode the event to create the final
payload. We are using the BasicEncoder
described earlier.
Payload - the final data that is encoded.
Request - the type that is sent to the final service. This is the BasicRequest
we described earlier.
Error - any errors that are creating while encoding the event.
The following functions for the RequestBuilder
trait need
implementing:
compression - The payload for the built request can be compressed. Here we return
Compression::None
to indicate that we will not be compressing.
fn compression(&self) -> Compression {
Compression::None
}
encoder - We return the encoder to use. This is the BasicEncoder
defined earlier.
fn encoder(&self) -> &Self::Encoder {
&self.encoder
}
split_input - takes the input and extracts the
metadata from the events. In this case we are returning the input
parameter
unprocessed.
This may not always be the case. For example, the amqp
sink will initially
process the event to extract fields to be used to calculate the amqp
exchange to send the message to. The exchange is bundled with the event to
split_input
. split_input
splits that out into the event for encoding and
the metadata containing the exchange which will be used to route the message
when sending the event to an amqp
server.
fn split_input(
&self,
mut input: Event,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = input.take_finalizers();
let metadata_builder = RequestMetadataBuilder::from_event(&input);
(finalizers, metadata_builder, input)
}
build_request - used to build the
final request that will contain the encoded payload and the metadata. The
BasicRequest
object we return here is passed to our Tower
tower service
where the data is actually sent.
fn build_request(
&self,
metadata: Self::Metadata,
request_metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
BasicRequest {
finalizers: metadata,
payload: payload.into_payload(),
metadata: request_metadata,
}
}
⚠ NOTE! This section implements an HTTP tower Service
from scratch, for the
purpose of demonstration only. Many sinks will require implementing Service
in this way. Any new HTTP-based sink should ideally utilize the
HttpService
structure, which abstracts away most of the logic shared
amongst HTTP-based sinks.
We need to create a Tower
service that is responsible for actually
sending our final encoded data.
struct BasicService {
endpoint: String,
client: HttpClient,
}
The two fields the service contains, endpoint
and client
are the endpoint
and client
passed in from the BasicSink
described earlier.
BasicService
implements the tower::Service
trait:
impl tower::Service<BasicRequest> for BasicService {
}
A number of associated types need defining:
type Response = BasicResponse;
type Error = &'static str;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
poll_ready - is called used to indicate when the service is ready to send
data. This service has no reason to block, so we always return Poll::Ready
.
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
call - where the data is actually sent over HTTP. It returns a future that will be invoked to send the actual data.
fn call(&mut self, request: BasicRequest) -> Self::Future {
let byte_size = request.payload.len();
let body = hyper::Body::from(request.payload);
let req = http::Request::post(&self.endpoint)
.header("Content-Type", "application/json")
.body(body)
.unwrap();
let mut client = self.client.clone();
Box::pin(async move {
match client.call(req).await {
Ok(response) => {
if response.status().is_success() {
Ok(BasicResponse {
byte_size,
json_size: request
.metadata
.into_events_estimated_json_encoded_byte_size(),
})
} else {
Err("received error response")
}
}
Err(_error) => Err("oops"),
}
})
}
That future returns BasicResponse
.
The return from our service must be an object that implements the
DriverResponse
trait.
struct BasicResponse {
byte_size: usize,
json_size: GroupedCountByteSize,
}
impl DriverResponse for BasicResponse {
fn event_status(&self) -> EventStatus {
EventStatus::Delivered
}
fn events_sent(&self) -> &GroupedCountByteSize {
&self.json_size
}
fn bytes_sent(&self) -> Option<usize> {
Some(self.byte_size)
}}
Vector calls the methods in this trait to determine if the event was delivered successfully. This is used to emit internal metrics and satisfy end to end acknowledgements.
Finally, we need to update the run_inner
method of our BasicSink
trait.
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = tower::ServiceBuilder::new().service(BasicService {
client: self.client.clone(),
endpoint: self.endpoint.clone(),
});
let sink = input
.request_builder(
None,
BasicRequestBuilder {
encoder: BasicEncoder,
},
)
.filter_map(|request| async move {
match request {
Err(error) => {
emit!(SinkRequestBuildError { error });
None
}
Ok(req) => Some(req),
}
})
.into_driver(service);
sink.run().await
}
After creating our service, we run a number of custom extension methods on
BoxStream
that process the stream of events.
request_builder - indicates which request
builder to use to build our request. We pass in the BasicRequestBuilder
described earlier. The first parameter is the limit to the number of concurrent
request builders that should be in operation at any time. We pass None
which
means no limit is applied.
filter_map - If building the request errors, we want to emit an error and then filter the event from being processed further.
into_driver - The Driver
is the
final stage of the process that drives the interaction between the stream of
incoming events and the BasicService
we created above.
We can now run our new sink.
Here is a potential simple_http_server.py
server to accept the responses our sink sends:
import http.server
import socketserver
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
# Print the request line and headers
print(f"Request Line: {self.requestline}")
print("Headers:")
for header, value in self.headers.items():
print(f"{header}: {value}")
# Send a response
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'GET request received')
def do_POST(self):
print(f"Request Line: {self.requestline}")
print("Headers:")
for header, value in self.headers.items():
print(f"{header}: {value}")
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
print(f"Body: {post_data.decode('utf-8')}")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'POST request received')
PORT = 3000
Handler = RequestHandler
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print(f"Serving HTTP on port {PORT}...")
httpd.serve_forever()
Run the server:
python3 simple_http_server.py
Our sink has a new configuration field for the endpoint. Update it to look like:
sinks:
basic:
type: basic
endpoint: http://localhost:3000
inputs:
- stdin
Then run Vector:
cargo vdev run ./basic.yml
If we type something into the console, this should now be sent to our HTTP server:
METHOD: POST
URI: /
HEADERS:
content-type application/json
user-agent Vector/0.26.0 (x86_64-unknown-linux-gnu debug=full)
accept-encoding identity
host localhost:3000
content-length 131
BODY:
{"log":{"host":"computer","message":"zork","source_type":"stdin","timestamp":"2023-01-23T10:21:57.215019942Z"}}