Skip to content

Commit

Permalink
Merge branch 'master' into vector-dot-dev-tailwind
Browse files Browse the repository at this point in the history
  • Loading branch information
Luc Perkins committed May 11, 2021
2 parents fa4eea9 + 848d922 commit 364ef6c
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 31 deletions.
6 changes: 4 additions & 2 deletions distribution/helm/vector-agent/templates/daemonset.yaml
Expand Up @@ -33,6 +33,9 @@ spec:
serviceAccountName: {{ include "libvector.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
{{- if .Values.podPriorityClassName }}
priorityClassName: {{ .Values.podPriorityClassName }}
{{- end }}
containers:
- name: vector
securityContext:
Expand Down Expand Up @@ -126,8 +129,7 @@ spec:
path: /var/lib/
# Vector will store it's data here.
- name: data-dir
hostPath:
path: /var/lib/vector/
{{- toYaml .Values.dataVolume | nindent 10 }}
# Vector config dir.
- name: config-dir
projected:
Expand Down
Expand Up @@ -12,6 +12,7 @@ spec:
volumes:
- 'hostPath'
- 'configMap'
- 'emptyDir'
- 'secret'
- 'projected'
allowedHostPaths:
Expand All @@ -21,6 +22,10 @@ spec:
readOnly: true
- pathPrefix: "/var/lib/vector"
readOnly: false
- pathPrefix: "/sys"
readOnly: true
- pathPrefix: "/proc"
readOnly: true
{{- range .Values.extraVolumes }}
{{- if .hostPath }}
- pathPrefix: {{ .hostPath.path }}
Expand Down
8 changes: 8 additions & 0 deletions distribution/helm/vector-agent/values.yaml
Expand Up @@ -52,6 +52,9 @@ podAnnotations: {}
# Labels to add to the `Pod`s managed by `DaemonSet`.
podLabels: {}

# Priority class name to add to the `Pod`s managed by `DaemonSet`.
podPriorityClassName: ""

# PodSecurityContext to set at the `Pod`s managed by `DaemonSet`.
podSecurityContext: {}
# fsGroup: 2000
Expand Down Expand Up @@ -96,6 +99,11 @@ extraVolumes: []
# managed by `DaemonSet`.
extraVolumeMounts: []

# Vector will store it's data here.
dataVolume:
hostPath:
path: /var/lib/vector/

rbac:
# Whether to create rbac resources or not. Disable for non-rbac clusters.
enabled: true
Expand Down
Expand Up @@ -33,6 +33,9 @@ spec:
serviceAccountName: {{ include "libvector.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
{{- if .Values.podPriorityClassName }}
priorityClassName: {{ .Values.podPriorityClassName }}
{{- end }}
containers:
- name: vector
securityContext:
Expand Down
3 changes: 3 additions & 0 deletions distribution/helm/vector-aggregator/values.yaml
Expand Up @@ -54,6 +54,9 @@ podAnnotations: {}
# Labels to add to the `Pod`s managed by `StatefulSet`.
podLabels: {}

# Priority class name to add to the `Pod`s managed by `StatefulSet`.
podPriorityClassName: ""

# PodSecurityContext to set at the `Pod`s managed by `StatefulSet`.
podSecurityContext: {}
# fsGroup: 2000
Expand Down
129 changes: 111 additions & 18 deletions lib/vector-core/src/event/finalization.rs
Expand Up @@ -146,11 +146,16 @@ impl BatchNotifier {
}

/// Update this notifier's status from the status of a finalized event.
#[allow(clippy::missing_panics_doc)] // Panic is unreachable
fn update_status(&self, status: EventStatus) {
// The status starts as Delivered and can only change to Failed
// here. A store cycle is much faster than fetch+update.
if status == EventStatus::Failed {
self.status.store(BatchStatus::Failed, Ordering::Relaxed);
// The status starts as Delivered and can only change if the new
// status is different than that.
if status != EventStatus::Delivered && status != EventStatus::Dropped {
self.status
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |old_status| {
Some(old_status.update(status))
})
.unwrap_or_else(|_| unreachable!());
}
}

Expand Down Expand Up @@ -179,10 +184,29 @@ pub enum BatchStatus {
/// All events in the batch were accepted (the default)
#[derivative(Default)]
Delivered,
/// At least one event in the batch failed delivery.
/// At least one event in the batch had a transient error in delivery.
Errored,
/// At least one event in the batch had a permanent failure.
Failed,
}

impl BatchStatus {
/// Update this status with another batch's delivery status, and return the result.
#[allow(clippy::match_same_arms)] // False positive: https://github.com/rust-lang/rust-clippy/issues/860
fn update(self, status: EventStatus) -> Self {
match (self, status) {
// `Dropped` and `Delivered` do not change the status.
(_, EventStatus::Dropped) | (_, EventStatus::Delivered) => self,
// `Failed` overrides `Errored` and `Delivered`
(Self::Failed, _) | (_, EventStatus::Failed) => Self::Failed,
// `Errored` overrides `Delivered`
(Self::Errored, _) | (_, EventStatus::Errored) => Self::Errored,
// No change for `Delivered`
_ => self,
}
}
}

// Can be dropped when this issue is closed:
// https://github.com/LukasKalbertodt/atomig/issues/3
impl AtomInteger for BatchStatus {}
Expand All @@ -197,7 +221,9 @@ pub enum EventStatus {
Dropped,
/// All copies of this event were delivered successfully.
Delivered,
/// At least one copy of this event failed to be delivered.
/// At least one copy of this event encountered a retriable error.
Errored,
/// At least one copy of this event encountered a permanent failure or rejection.
Failed,
/// This status has been recorded and should not be updated.
Recorded,
Expand All @@ -209,20 +235,29 @@ impl AtomInteger for EventStatus {}

impl EventStatus {
/// Update this status with another event's finalization status and return the result.
#[allow(clippy::match_same_arms)] // https://github.com/rust-lang/rust-clippy/issues/860
///
/// # Panics
///
/// Passing a new status of `Dropped` is a programming error and
/// will panic in debug/test builds.
#[allow(clippy::match_same_arms)] // False positive: https://github.com/rust-lang/rust-clippy/issues/860
pub fn update(self, status: Self) -> Self {
match (self, status) {
// Recorded always overwrites existing status.
(_, Self::Recorded)
// Dropped always updates to the new status.
| (Self::Dropped, _) => status,
// Recorded is never updated.
(Self::Recorded, _)
// Delivered may update to `Failed`, but not to `Dropped`.
| (Self::Delivered, Self::Dropped)
// Failed does not otherwise update.
| (Self::Failed, _) => self,
(Self::Delivered, _) => status,
// `Recorded` always overwrites existing status and is never updated
(_, Self::Recorded) | (Self::Recorded, _) => Self::Recorded,
// `Dropped` always updates to the new status.
(Self::Dropped, _) => status,
// Updates *to* `Dropped` are nonsense.
(_, Self::Dropped) => {
debug_assert!(false, "Updating EventStatus to Dropped is nonsense");
self
}
// `Failed` overrides `Errored` or `Delivered`.
(Self::Failed, _) | (_, Self::Failed) => Self::Failed,
// `Errored` overrides `Delivered`.
(Self::Errored, _) | (_, Self::Errored) => Self::Errored,
// No change for `Delivered`.
(Self::Delivered, Self::Delivered) => Self::Delivered,
}
}
}
Expand Down Expand Up @@ -340,4 +375,62 @@ mod tests {
assert_eq!(finalizer.count_finalizers(), 1);
(finalizer, receiver)
}

#[test]
fn event_status_updates() {
use EventStatus::{Delivered, Dropped, Errored, Failed, Recorded};

assert_eq!(Dropped.update(Dropped), Dropped);
assert_eq!(Dropped.update(Delivered), Delivered);
assert_eq!(Dropped.update(Errored), Errored);
assert_eq!(Dropped.update(Failed), Failed);
assert_eq!(Dropped.update(Recorded), Recorded);

//assert_eq!(Delivered.update(Dropped), Delivered);
assert_eq!(Delivered.update(Delivered), Delivered);
assert_eq!(Delivered.update(Errored), Errored);
assert_eq!(Delivered.update(Failed), Failed);
assert_eq!(Delivered.update(Recorded), Recorded);

//assert_eq!(Errored.update(Dropped), Errored);
assert_eq!(Errored.update(Delivered), Errored);
assert_eq!(Errored.update(Errored), Errored);
assert_eq!(Errored.update(Failed), Failed);
assert_eq!(Errored.update(Recorded), Recorded);

//assert_eq!(Failed.update(Dropped), Failed);
assert_eq!(Failed.update(Delivered), Failed);
assert_eq!(Failed.update(Errored), Failed);
assert_eq!(Failed.update(Failed), Failed);
assert_eq!(Failed.update(Recorded), Recorded);

//assert_eq!(Recorded.update(Dropped), Recorded);
assert_eq!(Recorded.update(Delivered), Recorded);
assert_eq!(Recorded.update(Errored), Recorded);
assert_eq!(Recorded.update(Failed), Recorded);
assert_eq!(Recorded.update(Recorded), Recorded);
}

#[test]
fn batch_status_update() {
use BatchStatus::{Delivered, Errored, Failed};

assert_eq!(Delivered.update(EventStatus::Dropped), Delivered);
assert_eq!(Delivered.update(EventStatus::Delivered), Delivered);
assert_eq!(Delivered.update(EventStatus::Errored), Errored);
assert_eq!(Delivered.update(EventStatus::Failed), Failed);
assert_eq!(Delivered.update(EventStatus::Recorded), Delivered);

assert_eq!(Errored.update(EventStatus::Dropped), Errored);
assert_eq!(Errored.update(EventStatus::Delivered), Errored);
assert_eq!(Errored.update(EventStatus::Errored), Errored);
assert_eq!(Errored.update(EventStatus::Failed), Failed);
assert_eq!(Errored.update(EventStatus::Recorded), Errored);

assert_eq!(Failed.update(EventStatus::Dropped), Failed);
assert_eq!(Failed.update(EventStatus::Delivered), Failed);
assert_eq!(Failed.update(EventStatus::Errored), Failed);
assert_eq!(Failed.update(EventStatus::Failed), Failed);
assert_eq!(Failed.update(EventStatus::Recorded), Failed);
}
}
6 changes: 4 additions & 2 deletions rfcs/2021-03-26-6517-end-to-end-acknowledgement.md
Expand Up @@ -167,8 +167,8 @@ Event finalization is a three step process:
1. When a sink completes delivery of an event, the delivery status is
recorded in the finalizer status that is shared across all clones of
the event. This may change that status from `Dropped` (the
initialization state) to either `Delivered` or `Failed`, or from
`Delivered` to `Failed`. The `Recorded` state is never changed.
initialization state) to either `Delivered`, `Errored`, or `Failed`.
The `Recorded` state is never changed.
2. If one of those sinks is configured to be authoritative, it will
immediately update the status of all its source batches and update
the event status to `Recorded` that no extraneous updates happen.
Expand Down Expand Up @@ -221,12 +221,14 @@ struct BatchNotifier {

enum BatchStatus {
Delivered,
Errored,
Failed,
}

enum EventStatus {
Dropped, // default status
Delivered,
Errored,
Failed,
Recorded,
}
Expand Down
30 changes: 28 additions & 2 deletions src/buffers/disk/leveldb_buffer.rs
Expand Up @@ -14,7 +14,7 @@ use std::{
collections::VecDeque,
convert::TryInto,
mem::size_of,
path::PathBuf,
path::{Path, PathBuf},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -327,6 +327,31 @@ impl Reader {

pub struct Buffer;

/// Read the byte size of the database
///
/// There is a mismatch between LevelDB's mechanism and vector's. While
/// vector would prefer to keep as little in-memory as possible LevelDB,
/// being a database, has the opposite consideration. As such it may mmap
/// 1000 of its LDB files into vector's address space at a time with no
/// ability for us to change this number. See
/// https://github.com/google/leveldb/issues/866. Because we do need to know
/// the byte size of our store we are forced to iterate through all the LDB
/// files on disk, meaning we impose a huge memory burden on our end users
/// right at the jump in conditions where the disk buffer has filled
/// up. This'll OOM vector, meaning we're trapped in a catch 22.
///
/// This function does not solve the problem -- LevelDB will still map 1000
/// files if it wants -- but we at least avoid forcing this to happen at the
/// start of vector.
fn db_initial_size(path: &Path) -> Result<usize, Error> {
let mut options = Options::new();
options.create_if_missing = true;
let db: Database<Key> = Database::open(&path, options).with_context(|| DataDirOpenError {
data_dir: path.parent().expect("always a parent"),
})?;
Ok(db.value_iter(ReadOptions::new()).map(|v| v.len()).sum())
}

impl super::DiskBuffer for Buffer {
type Writer = Writer;
type Reader = Reader;
Expand All @@ -337,6 +362,8 @@ impl super::DiskBuffer for Buffer {
let max_uncompacted_size = (max_size as f64 * MAX_UNCOMPACTED) as usize;
let max_size = max_size - max_uncompacted_size;

let initial_size = db_initial_size(&path)?;

let mut options = Options::new();
options.create_if_missing = true;

Expand All @@ -355,7 +382,6 @@ impl super::DiskBuffer for Buffer {
tail = if iter.valid() { iter.key().0 + 1 } else { 0 };
}

let initial_size = db.value_iter(ReadOptions::new()).map(|v| v.len()).sum();
let current_size = Arc::new(AtomicUsize::new(initial_size));

let write_notifier = Arc::new(AtomicWaker::new());
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/util/http.rs
Expand Up @@ -377,6 +377,10 @@ impl<T: fmt::Debug> sink::Response for http::Response<T> {
fn is_successful(&self) -> bool {
self.status().is_success()
}

fn is_transient(&self) -> bool {
self.status().is_server_error()
}
}

#[derive(Debug, Default, Clone)]
Expand Down
23 changes: 16 additions & 7 deletions src/sinks/util/sink.rs
Expand Up @@ -427,17 +427,21 @@ where
.err_into()
.map(move |result| {
let status = match result {
Ok(response) if response.is_successful() => {
trace!(message = "Response successful.", ?response);
EventStatus::Delivered
}
Ok(response) => {
error!(message = "Response wasn't successful.", ?response);
EventStatus::Failed
if response.is_successful() {
trace!(message = "Response successful.", ?response);
EventStatus::Delivered
} else if response.is_transient() {
error!(message = "Response wasn't successful.", ?response);
EventStatus::Errored
} else {
error!(message = "Response failed.", ?response);
EventStatus::Failed
}
}
Err(error) => {
error!(message = "Request failed.", %error);
EventStatus::Failed
EventStatus::Errored
}
};
for metadata in metadata {
Expand Down Expand Up @@ -497,9 +501,14 @@ pub trait Response: fmt::Debug {
fn is_successful(&self) -> bool {
true
}

fn is_transient(&self) -> bool {
true
}
}

impl Response for () {}

impl<'a> Response for &'a str {}

#[cfg(test)]
Expand Down

0 comments on commit 364ef6c

Please sign in to comment.