Skip to content

Latest commit

 

History

History
568 lines (449 loc) · 18.3 KB

2_http_sink.md

File metadata and controls

568 lines (449 loc) · 18.3 KB

Most Vector sinks involve some form of network connectivity. Connecting to a network requires more involved functionality than we have covered so far in our basic sink. This tutorial will modify the sink created in the previous tutorial to send the events to an HTTP endpoint. We will cover a number of Vector framework components that make adding this functionality easy.

Imports

To start, update our imports to the following:

use std::task::Poll;

use crate::{
    sinks::prelude::*,
    http::HttpClient,
    internal_events::SinkRequestBuildError,
};
use vector_core::config::telemetry;
use bytes::Bytes;

Configuration

First we want to update our config to allow an endpoint to be specified. Add this field to the BasicConfig struct:

    /// The endpoint to send HTTP traffic to.
    ///
    /// This should include the protocol and host, but can also include the port, path, and any other valid part of a URI.
    #[configurable(metadata(
        docs::examples = "http://localhost:3000/",
        docs::examples = "http://example.com/endpoint/",
    ))]
    pub endpoint: String,

Every field in the configuration struct must have a doc comment (///). These are used to generate documentation for the Sink. The metadata attribute added here is used to generate examples for the documentation. (This is possible because the config struct is annotated with #[configurable_component(sink("basic"))]). Since the comments here are used for user-facing documentation they should be good grammar and be correctly capitalized and punctuated.

We then want to update our sink to take the endpoint from the config. At the same time let's create an HttpClient that will handle sending the data. HttpClient is our wrapper over hyper used to send data over http.

Update the BasicSink struct to look like:

#[derive(Debug, Clone)]
struct BasicSink {
    endpoint: String,
    client: HttpClient,
}

impl BasicSink {
    pub fn new(config: &BasicConfig) -> Self {
        let tls = TlsSettings::from_options(&None).unwrap();
        let client = HttpClient::new(tls, &Default::default()).unwrap();
        let endpoint = config.endpoint.clone();

        Self { client, endpoint }
    }
}

Encoder

Now we want to create an encoder that will take our event and convert it to raw bytes.

#[derive(Clone)]
struct BasicEncoder;

The Encoder must implement the Encoder trait:

impl encoding::Encoder<Event> for BasicEncoder {
    fn encode_input(
        &self,
        input: Event,
        writer: &mut dyn std::io::Write,
    ) -> std::io::Result<(usize, GroupedCountByteSize)> {
    }
}

The Encoder trait is generic over the type of input that we are expecting. In our case it is Event since we will be encoding a single event at a time. Other Sinks may encode a Vec<Event> if they are sending batches of events, or they may send a completely different type if each event is processed in some way prior to encoding.

encode_input serializes the event to a String and writes these bytes. The function also creates a [GroupedCountByteSize] grouped_count_byte_size object. This object tracks the size of the event that is sent by the sink, optionally grouped by the source and service that originated the event if Vector has been configured to do so. It is necessary to calculate the sizes in this function since the encode function sometimes drops fields from the event prior to encoding. We need the size to be calculated after these fields have been dropped.

    fn encode_input(
        &self,
        input: Event,
        writer: &mut dyn std::io::Write,
    ) -> std::io::Result<(usize, GroupedCountByteSize)> {
        let mut byte_size = telemetry().create_request_count_byte_size();
        byte_size.add_event(&input, input.estimated_json_encoded_size_of());

        let event = serde_json::to_string(&input).unwrap();
        write_all(writer, 1, event.as_bytes()).map(|()| (event.len(), byte_size))
    }

Request Builder

Next we create a request builder that turns the event into a request. The request that we build here is a struct containing any data required by a Tower service that is responsible for actually sending the data to the sink's final destination external to Vector, in this case the HTTP endpoint.. We will build this service shortly.

The request looks like:

#[derive(Clone)]
struct BasicRequest {
    payload: Bytes,
    finalizers: EventFinalizers,
    metadata: RequestMetadata,
}

Fields

The fields in the request are:

payload - the payload is the actual bytes that we will be sending out. These are the bytes generated by our BasicEncoder.

finalizers - EventFinalizers is a collection of EventFinalizers. An EventFinalizer is used to track the status of a given event and is used to support [end to end acknowledgements] (https://vector.dev/docs/about/under-the-hood/guarantees/#acknowledgement- guarantees).

metadata - the metadata contains additional data that is used to emit various metrics when a request is successfully sent.

Traits

We need to implement a number of traits for the request to access these fields:

impl MetaDescriptive for BasicRequest {
    fn get_metadata(&self) -> &RequestMetadata {
        &self.metadata
    }

    fn metadata_mut(&mut self) -> &mut RequestMetadata {
        &mut self.metadata
    }
}

impl Finalizable for BasicRequest {
    fn take_finalizers(&mut self) -> EventFinalizers {
        self.finalizers.take_finalizers()
    }
}

The request builder must implement the RequestBuilder<> trait:

impl RequestBuilder<Event> for BasicRequestBuilder {

There are a number of stages in the request builder process:

  1. The input is split out into metadata and actual event data.
  2. This event data is encoded using the encoder we created earlier.
  3. The results from the encoding are passed along with the metadata to create the final request that is passed to the Tower service.

Here, the trait is generic over Event which is the input type that is passed in to start the request building process.

Associated types

There are a number of associated types:

    type Metadata = EventFinalizers;
    type Events = Event;
    type Encoder = BasicEncoder;
    type Payload = Bytes;
    type Request = BasicRequest;
    type Error = std::io::Error;

Metadata - any information to be passed while building the request that is additional to the actual event being used. In this case we just need the EventFinalizers.

Events - the event type passed to the Encoder.

Encoder - the type that is used to encode the event to create the final payload. We are using the BasicEncoder described earlier.

Payload - the final data that is encoded.

Request - the type that is sent to the final service. This is the BasicRequest we described earlier.

Error - any errors that are creating while encoding the event.

Functions

The following functions for the RequestBuilder trait need implementing:

compression - The payload for the built request can be compressed. Here we return Compression::None to indicate that we will not be compressing.

    fn compression(&self) -> Compression {
        Compression::None
    }

encoder - We return the encoder to use. This is the BasicEncoder defined earlier.

    fn encoder(&self) -> &Self::Encoder {
        &self.encoder
    }

split_input - takes the input and extracts the metadata from the events. In this case we are returning the input parameter unprocessed.

This may not always be the case. For example, the amqp sink will initially process the event to extract fields to be used to calculate the amqp exchange to send the message to. The exchange is bundled with the event to split_input. split_input splits that out into the event for encoding and the metadata containing the exchange which will be used to route the message when sending the event to an amqp server.

    fn split_input(
        &self,
        mut input: Event,
    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
        let finalizers = input.take_finalizers();
        let metadata_builder = RequestMetadataBuilder::from_event(&input);
        (finalizers, metadata_builder, input)
    }

build_request - used to build the final request that will contain the encoded payload and the metadata. The BasicRequest object we return here is passed to our Towertower service where the data is actually sent.

    fn build_request(
        &self,
        metadata: Self::Metadata,
        request_metadata: RequestMetadata,
        payload: EncodeResult<Self::Payload>,
    ) -> Self::Request {
        BasicRequest {
            finalizers: metadata,
            payload: payload.into_payload(),
            metadata: request_metadata,
        }
    }

Service

⚠ NOTE! This section implements an HTTP tower Service from scratch, for the purpose of demonstration only. Many sinks will require implementing Service in this way. Any new HTTP-based sink should ideally utilize the HttpService structure, which abstracts away most of the logic shared amongst HTTP-based sinks.

We need to create a Tower service that is responsible for actually sending our final encoded data.

struct BasicService {
    endpoint: String,
    client: HttpClient,
}

The two fields the service contains, endpoint and client are the endpoint and client passed in from the BasicSink described earlier.

BasicService implements the tower::Service trait:

impl tower::Service<BasicRequest> for BasicService {
}

Associated types

A number of associated types need defining:

    type Response = BasicResponse;
    type Error = &'static str;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

Functions

poll_ready - is called used to indicate when the service is ready to send data. This service has no reason to block, so we always return Poll::Ready.

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

call - where the data is actually sent over HTTP. It returns a future that will be invoked to send the actual data.

    fn call(&mut self, request: BasicRequest) -> Self::Future {
        let byte_size = request.payload.len();
        let body = hyper::Body::from(request.payload);
        let req = http::Request::post(&self.endpoint)
            .header("Content-Type", "application/json")
            .body(body)
            .unwrap();

        let mut client = self.client.clone();

        Box::pin(async move {
            match client.call(req).await {
                Ok(response) => {
                    if response.status().is_success() {
                        Ok(BasicResponse {
                            byte_size,
                            json_size: request
                                .metadata
                                .into_events_estimated_json_encoded_byte_size(),
                        })
                    } else {
                        Err("received error response")
                    }
                }
                Err(_error) => Err("oops"),
            }
        })
    }

That future returns BasicResponse.

BasicResponse

The return from our service must be an object that implements the DriverResponse trait.

struct BasicResponse {
    byte_size: usize,
    json_size: GroupedCountByteSize,
}

impl DriverResponse for BasicResponse {
    fn event_status(&self) -> EventStatus {
        EventStatus::Delivered
    }

    fn events_sent(&self) -> &GroupedCountByteSize {
        &self.json_size
    }

    fn bytes_sent(&self) -> Option<usize> {
        Some(self.byte_size)
    }}

Vector calls the methods in this trait to determine if the event was delivered successfully. This is used to emit internal metrics and satisfy end to end acknowledgements.

Sink

Finally, we need to update the run_inner method of our BasicSink trait.

    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let service = tower::ServiceBuilder::new().service(BasicService {
            client: self.client.clone(),
            endpoint: self.endpoint.clone(),
        });

        let sink = input
            .request_builder(
                None,
                BasicRequestBuilder {
                    encoder: BasicEncoder,
                },
            )
            .filter_map(|request| async move {
                match request {
                    Err(error) => {
                        emit!(SinkRequestBuildError { error });
                        None
                    }
                    Ok(req) => Some(req),
                }
            })
            .into_driver(service);

        sink.run().await
    }

After creating our service, we run a number of custom extension methods on BoxStream that process the stream of events.

request_builder - indicates which request builder to use to build our request. We pass in the BasicRequestBuilder described earlier. The first parameter is the limit to the number of concurrent request builders that should be in operation at any time. We pass None which means no limit is applied.

filter_map - If building the request errors, we want to emit an error and then filter the event from being processed further.

into_driver - The Driver is the final stage of the process that drives the interaction between the stream of incoming events and the BasicService we created above.

Running our sink

We can now run our new sink.

Here is a potential simple_http_server.py server to accept the responses our sink sends:

import http.server
import socketserver


class RequestHandler(http.server.BaseHTTPRequestHandler):
   def do_GET(self):
      # Print the request line and headers
      print(f"Request Line: {self.requestline}")
      print("Headers:")
      for header, value in self.headers.items():
         print(f"{header}: {value}")

      # Send a response
      self.send_response(200)
      self.send_header('Content-type', 'text/html')
      self.end_headers()
      self.wfile.write(b'GET request received')

   def do_POST(self):
      print(f"Request Line: {self.requestline}")
      print("Headers:")
      for header, value in self.headers.items():
         print(f"{header}: {value}")

      content_length = int(self.headers['Content-Length'])
      post_data = self.rfile.read(content_length)
      print(f"Body: {post_data.decode('utf-8')}")

      self.send_response(200)
      self.send_header('Content-type', 'text/html')
      self.end_headers()
      self.wfile.write(b'POST request received')


PORT = 3000
Handler = RequestHandler

with socketserver.TCPServer(("", PORT), Handler) as httpd:
   print(f"Serving HTTP on port {PORT}...")
   httpd.serve_forever()

Run the server:

python3 simple_http_server.py

Our sink has a new configuration field for the endpoint. Update it to look like:

sinks:
  basic:
    type: basic
    endpoint: http://localhost:3000
    inputs:
      - stdin

Then run Vector:

cargo vdev run ./basic.yml

If we type something into the console, this should now be sent to our HTTP server:

METHOD:  POST
URI:     /

HEADERS:
content-type      application/json
user-agent        Vector/0.26.0 (x86_64-unknown-linux-gnu debug=full)
accept-encoding   identity
host              localhost:3000
content-length    131

BODY:
{"log":{"host":"computer","message":"zork","source_type":"stdin","timestamp":"2023-01-23T10:21:57.215019942Z"}}