Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fully upgrade to tokio 1.x #5872

Merged
merged 120 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from 82 commits
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
dcbd76d
chore: begin upgrade to tokio 1.0.0
lukesteensen Jan 5, 2021
03000da
pin sleeps
lukesteensen Jan 11, 2021
3825dc5
fix AsyncRead usage
lukesteensen Jan 11, 2021
28d64ba
remove hyper patch and stop using Resolver for http client
lukesteensen Jan 11, 2021
0ccb1ef
bump more deps
lukesteensen Jan 13, 2021
939d953
replace some usages of incoming
lukesteensen Jan 19, 2021
8caea51
update tokio-openssl api usage
lukesteensen Jan 19, 2021
8ac1d19
try experimental sink alternative for socket source
lukesteensen Jan 19, 2021
d52a8ee
temporarily comment out socket shutdown and keepalive
lukesteensen Jan 19, 2021
e9e9eb6
more crate upgrades
lukesteensen Jan 20, 2021
f814f70
more crate upgrades
lukesteensen Jan 25, 2021
fa0ad7b
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Mar 2, 2021
f7b4ca7
Upgrade to rdkafka 0.25.0
pablosichert Mar 11, 2021
07b19c1
Fix more compiler errors
pablosichert Mar 4, 2021
f6fee83
Remove bytes 0.5
pablosichert Mar 12, 2021
750d747
Fix more compiler errors
pablosichert Mar 12, 2021
4de48b1
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Mar 12, 2021
6296f2a
Fix futures not being Send
pablosichert Mar 12, 2021
ba6c205
Fix incorrect future type
pablosichert Mar 12, 2021
e47e0b1
Resolve Send/Sync issues in futures
pablosichert Mar 12, 2021
5a3d0e9
Fix usage of rdkafka error code
pablosichert Mar 12, 2021
4e95a98
Upgrade to bollard 0.10.1
pablosichert Mar 12, 2021
2bbbc0c
Upgrade to prost 0.7.0 in libraries
pablosichert Mar 12, 2021
b256879
Add missing unwrap
pablosichert Mar 12, 2021
8ec9907
Remove unneeded mut
pablosichert Mar 12, 2021
49dd1f0
Remove unused imports
pablosichert Mar 12, 2021
df0b50b
cargo fmt
pablosichert Mar 12, 2021
51ddbaa
Fix moved value in future
pablosichert Mar 12, 2021
8cdcd5b
Fix set_receive_buffer_size on TCP stream
pablosichert Mar 12, 2021
0b2f0a0
Fix tokio delay_for -> sleep
pablosichert Mar 12, 2021
f392acb
Fix core_threads -> worker_threads
pablosichert Mar 12, 2021
c266677
Wrap into UnboundedReceiverStream
pablosichert Mar 12, 2021
c1429d7
Wrap into ReceiverStream
pablosichert Mar 12, 2021
e51d629
Fix is_empty / migrate from deprecated try_recv
pablosichert Mar 12, 2021
f282f11
Use .next().await instead of .try_recv()
pablosichert Mar 12, 2021
5d1ad1e
Wrap into ReceiverStream (continued)
pablosichert Mar 13, 2021
2e1deba
Wrap into UnboundedReceiverStream (continued)
pablosichert Mar 13, 2021
1f9a807
Wrap into IntervalStream
pablosichert Mar 13, 2021
bfd428b
Migrate usage of .into_future() -> .recv()
pablosichert Mar 13, 2021
82f5f02
Fix MockSourceConfig not being Send + Sync
pablosichert Mar 13, 2021
8f1201d
Migrate to AsyncWriteExt::shutdown
pablosichert Mar 13, 2021
ce7ec6e
Migrate poll_next -> poll_recv
pablosichert Mar 13, 2021
7571379
Wrap into TcpListenerStream
pablosichert Mar 13, 2021
0ead572
Fix buffer related compiler errors
pablosichert Mar 13, 2021
27ef6d8
Hack lifetime issue by blocking on thread
pablosichert Mar 13, 2021
433ba9d
Fix mutability issues
pablosichert Mar 13, 2021
8b4b140
Relax lifetime requirement in wait_with_timeout
pablosichert Mar 13, 2021
50e65aa
Upgrade to tokio 1.3.0
pablosichert Mar 13, 2021
aeb3db0
Implement poll_ready/try_flush in Pipeline using Sender::try_reserve
pablosichert Mar 13, 2021
07ecfa2
Implement poll_ready in Sink for BoundedSink using Sender::try_reserve
pablosichert Mar 14, 2021
e21d0c6
Only signal close when 0 bytes have been read
pablosichert Mar 14, 2021
c74a65b
Poll stream shutdown without blocking the thread
pablosichert Mar 14, 2021
29236a3
Try fixing stream shutdown
pablosichert Mar 14, 2021
c2c9f57
Try making polled shutdown of stream work in tcp source
pablosichert Mar 14, 2021
e0478bc
Fix unused import warnings
pablosichert Mar 14, 2021
42338dc
Fix unused variable warnings
pablosichert Mar 14, 2021
4c912a4
Fix unused mut warnings
pablosichert Mar 14, 2021
a995768
Fix unused Result warnings
pablosichert Mar 14, 2021
94d18cf
Fix dead code warnings
pablosichert Mar 14, 2021
40c722a
Fix usage of async child process API
pablosichert Mar 14, 2021
1e9c817
Remove feature gate for udp module
pablosichert Mar 14, 2021
43bd67e
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Mar 14, 2021
21cf023
Remove faulty feature gate
pablosichert Mar 14, 2021
bf1fa34
Remove unused import
pablosichert Mar 14, 2021
b7d27a8
Add missing imports in test
pablosichert Mar 14, 2021
aad950a
Fix compilation errors/warnings in benches
pablosichert Mar 14, 2021
ee89a43
Fix compilation errors/warnings in integration tests
pablosichert Mar 14, 2021
69ebfe8
Temporarily make clippy happy
pablosichert Mar 14, 2021
c762c9d
Fix stream shutdown in async functions
pablosichert Mar 14, 2021
42026ee
Fix unused import warning on Windows
pablosichert Mar 14, 2021
fa53330
Fix setting TCP keepalive
pablosichert Mar 14, 2021
a97070d
Pin rdkafka dependency to git revision with aarch64 fix
pablosichert Mar 14, 2021
5f84579
Fix import on Windows
pablosichert Mar 15, 2021
e3b1a4d
Fix feature gates on Windows
pablosichert Mar 15, 2021
60c727a
Rework setting socket options with socket2 upgrade / allow setting op…
pablosichert Mar 15, 2021
c65db09
Fix feature gates on Windows
pablosichert Mar 15, 2021
009fc05
Fix unused mut warning on Windows
pablosichert Mar 15, 2021
189caa5
Shutdown TCP stream synchronously
pablosichert Mar 15, 2021
f041ae2
Fix dead code warning
pablosichert Mar 15, 2021
0cc432d
Fix dead code warning
pablosichert Mar 15, 2021
d5174f3
Use futures::channel::mpsc instead of sink wrappers around tokio::syn…
pablosichert Mar 15, 2021
d2fc17b
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Mar 15, 2021
873a081
Fix control channels being dropped and closed prematurely
pablosichert Mar 16, 2021
c2408ba
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Mar 16, 2021
dee00de
Fix control channel being prematurely closed by being moved and dropp…
pablosichert Mar 16, 2021
8f92641
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Mar 16, 2021
468a087
Fix buffer capacities after switching from tokio::sync::mpsc to futur…
pablosichert Mar 16, 2021
b6ac2b7
Fix not simultaneously polling all channels
pablosichert Mar 16, 2021
f482f95
Fix poll_next being called in poll_close
pablosichert Mar 16, 2021
2746ba6
Fix tests not being run in an async runtime
pablosichert Mar 17, 2021
70f2e03
Fix shutdown by not calling poll_next in poll_flush
pablosichert Mar 17, 2021
a44aa5b
Fix shutdown of TCP stream / TCP socket source
pablosichert Mar 17, 2021
1508eb4
Reintroduce processing of control messages in flush/close, but don't …
pablosichert Mar 17, 2021
09e9151
Protect control channel from being over-polled by using futures::stre…
pablosichert Mar 17, 2021
9b43bec
Fix stream not being flushed with shutdown signal
pablosichert Mar 17, 2021
a7304cc
Flush stream before shutdown
pablosichert Mar 17, 2021
04f7714
Fix tests not fully collecting receiver after shutdown
pablosichert Mar 17, 2021
149fb79
Synchronously assert on generator output
pablosichert Mar 18, 2021
c0666ef
Move pipeline to channel with Sink API / tokio::sync::mpsc -> futures…
pablosichert Mar 18, 2021
d918ecd
Component features cleanup
pablosichert Mar 18, 2021
f7fd87f
Fix compilation errors in integration tests
pablosichert Mar 18, 2021
a7d1f54
Update the adaptive concurrency tests for new tokio timings
bruceg Mar 18, 2021
16d285b
Fix error return path on TLS handshake failure
bruceg Mar 18, 2021
f439d90
Move back to collect_ready as the receiver doesn't seem to have seman…
pablosichert Mar 19, 2021
7b76a82
Move locking of mutex out of poll function
pablosichert Mar 19, 2021
50951ca
Continue polling stream after shutting down TCP socket for correct fl…
pablosichert Mar 19, 2021
93b859b
Expand a few more adaptive concurrency test parameters
bruceg Mar 19, 2021
c116340
Upgrade rdkafka to 0.26.0
pablosichert Mar 19, 2021
d93da35
Work around named pipes not being available, connect to Docker via HT…
pablosichert Mar 19, 2021
ef34698
Constrain runtime worker threads when spawning from a forked process
pablosichert Mar 20, 2021
1e6e0ba
Additionally constrain number of blocking threads
pablosichert Mar 20, 2021
1ee64e6
Remove unnecessary namespace qualifiers
pablosichert Mar 26, 2021
4465235
Remove intermediate variables
pablosichert Mar 26, 2021
fbf1860
Remove explicitly passing number of threads (done by default)
pablosichert Mar 26, 2021
efac5c3
Don't explicitly poll flush before shutdown (done implicitly)
pablosichert Mar 26, 2021
2cf9032
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Mar 26, 2021
ddfbbe7
Enable "warp" dependency with "api" feature
pablosichert Mar 26, 2021
fbb5a32
Replace stream! invocations with appropriate stream wrappers
pablosichert Mar 26, 2021
673e30a
Disable API tests due to issues with rusty_fork
pablosichert Apr 1, 2021
f52e32f
Merge remote-tracking branch 'origin/master' into tokio-1.0
pablosichert Apr 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
680 changes: 272 additions & 408 deletions Cargo.lock

Large diffs are not rendered by default.

92 changes: 44 additions & 48 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ vector-api-client = { path = "lib/vector-api-client", optional = true }
vrl-cli = { path = "lib/vrl/cli", optional = true }

# Tokio / Futures
async-trait = "0.1"
async-trait = "0.1.42"
futures = { version = "0.3", default-features = false, features = ["compat", "io-compat"] }
futures01 = { package = "futures", version = "0.1.25" }
tokio = { version = "0.2.13", features = ["blocking", "fs", "io-std", "macros", "process", "rt-core", "rt-threaded", "signal", "stream", "sync", "time", "udp", "uds"] }
tokio-openssl = "0.4.0"
tokio-util = { version = "0.3.1", features = ["codec"] }
tokio = { version = "1.3.0", features = ["full"] }
pablosichert marked this conversation as resolved.
Show resolved Hide resolved
tokio-openssl = "0.6.1"
tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-util = { version = "0.6.2", features = ["codec", "time"] }

# Tracing
tracing = "0.1.15"
Expand All @@ -101,37 +102,38 @@ metrics-util = "=0.4.0-alpha.10"
metrics-macros = "=0.1.0-alpha.9"

# Aws
rusoto_cloudwatch = { version = "0.45.0", optional = true }
rusoto_core = { version = "0.45.0", features = ["encoding"], optional = true }
rusoto_credential = { version = "0.45.0", optional = true }
rusoto_es = { version = "0.45.0", optional = true }
rusoto_firehose = { version = "0.45.0", optional = true }
rusoto_kinesis = { version = "0.45.0", optional = true }
rusoto_logs = { version = "0.45.0", optional = true }
rusoto_s3 = { version = "0.45.0", optional = true }
rusoto_signature = { version = "0.45.0", optional = true }
rusoto_sqs = { version = "0.45.0", optional = true }
rusoto_sts = { version = "0.45.0", optional = true }
rusoto_cloudwatch = { version = "0.46.0", optional = true }
rusoto_core = { version = "0.46.0", features = ["encoding"], optional = true }
rusoto_credential = { version = "0.46.0", optional = true }
rusoto_es = { version = "0.46.0", optional = true }
rusoto_firehose = { version = "0.46.0", optional = true }
rusoto_kinesis = { version = "0.46.0", optional = true }
rusoto_logs = { version = "0.46.0", optional = true }
rusoto_s3 = { version = "0.46.0", optional = true }
rusoto_signature = { version = "0.46.0", optional = true }
rusoto_sqs = { version = "0.46.0", optional = true }
rusoto_sts = { version = "0.46.0", optional = true }

# Tower
tower = { version = "0.3.1", git = "https://github.com/tower-rs/tower", rev = "43168944220ed32dab83cb4f11f7b97abc5818d5", features = ["buffer", "limit", "retry", "timeout", "util"] }
ktff marked this conversation as resolved.
Show resolved Hide resolved
tower = { version = "0.4.0", features = ["buffer", "limit", "retry", "timeout", "util"] }
tower-layer = "0.3.1"

# Serde
serde = { version = "1.0.117", features = ["derive"] }
serde_json = { version = "1.0.33", features = ["raw_value"] }
serde_yaml = { version ="0.8.13" }

# Prost
prost = "0.6.1"
prost-types = "0.6.1"
prost = "0.7.0"
prost-types = "0.7.0"

# GCP
goauth = { version = "0.9.0", optional = true }
smpl_jwt = { version = "0.6.1", optional = true }

# API
async-graphql = { version = "=2.5.0", optional = true }
async-graphql-warp = { version = "=2.5.0", optional = true }
async-graphql = { version = "2.5.9", optional = true }
async-graphql-warp = { version = "2.5.9", optional = true }
itertools = { version = "0.10.0", optional = true }

# API client
Expand All @@ -146,12 +148,12 @@ vrl-stdlib = { path = "lib/vrl/stdlib" }

# External libs
anyhow = "1.0.37"
async-compression = { version = "0.3.7", features = ["tokio-02", "gzip", "zstd"] }
async-compression = { version = "0.3.7", features = ["tokio", "gzip", "zstd"] }
avro-rs = { version = "0.13.0", optional = true }
base64 = { version = "0.13.0", optional = true }
bloom = { version = "0.3.2", optional = true }
bollard = { version = "0.9.1", features = ["ssl"], optional = true }
bytes = { version = "0.5.6", features = ["serde"] }
bollard = { version = "0.10.1", features = ["ssl"], optional = true }
bytes = { version = "1.0.0", features = ["serde"] }
bytesize = { version = "1.0.0", optional = true }
chrono = { version = "0.4.19", features = ["serde"] }
cidr-utils = "0.5.0"
Expand All @@ -162,7 +164,7 @@ derivative = "2.1.1"
dirs-next = { version = "2.0.0", optional = true }
dyn-clone = "1.0.3"
encoding_rs = { version = "0.8", features = ["serde"] }
evmap = { version = "10.0.2", features = ["bytes"], optional = true }
evmap = { git = "https://github.com/lukesteensen/evmap.git", rev = "45ba973c22715a68c5e99efad4b072421f7ad40b", features = ["bytes"], optional = true }
Copy link
Member

@jszwedko jszwedko Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we open a PR to get this change upstream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, this was only for this so the difference from a released version is minimal.

I believe the upstream crate has reorganized a bit for its next major version, but I'll take a look at getting us off of this soon.

exitcode = "1.1.2"
flate2 = "1.0.19"
getset = "0.1.1"
Expand All @@ -172,8 +174,8 @@ headers = "0.3"
heim = { version = "0.1.0-rc.1", features = ["full"], optional = true }
hostname = "0.3.1"
http = "0.2"
hyper = "0.13"
hyper-openssl = "0.8"
hyper = { version = "0.14", features = ["full"] }
pablosichert marked this conversation as resolved.
Show resolved Hide resolved
hyper-openssl = "0.9.1"
indexmap = {version = "1.6.2", features = ["serde"]}
indoc = "1.0.3"
inventory = "0.1.10"
Expand Down Expand Up @@ -201,16 +203,17 @@ postgres-openssl = { version = "0.3.0", optional = true }
pulsar = { version = "1.0.0", default-features = false, features = ["tokio-runtime"], optional = true }
rand = { version = "0.8.0", features = ["small_rng"] }
rand_distr = "0.4.0"
rdkafka = { version = "0.24.0", features = ["libz", "ssl", "zstd"], optional = true }
# Move to 0.25.x release after fix for aarch64 builds has been published: https://github.com/fede1024/rust-rdkafka/pull/346.
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka", rev = "52bcef43b684f90294d8b4b92a5e6b1129aab468", features = ["libz", "ssl", "zstd"], optional = true }
regex = "1.3.9"
# make sure to update the external docs when the Lua version changes
rlua = { version = "0.17.0", optional = true }
seahash = { version = "4.0.1", optional = true }
semver = { version = "0.11.0", features = ["serde"], optional = true }
snafu = { version = "0.6.10", features = ["futures", "futures-01"] }
snap = { version = "1.0.3", optional = true }
socket2 = { version = "0.3.19", optional = true }
stream-cancel = "0.6.2"
socket2 = "0.4.0"
stream-cancel = "0.8.0"
strip-ansi-escapes = "0.1.0"
structopt = "0.3.21"
syslog = { version = "5", optional = true }
Expand All @@ -223,7 +226,7 @@ toml = "0.5.8"
typetag = "0.1.6"
url = "2.2.1"
uuid = { version = "0.8", features = ["serde", "v4"], optional = true }
warp = { version = "0.2.5", default-features = false, optional = true }
warp = { version = "0.3.0", default-features = false, optional = true }

# For WASM
async-stream = "0.3.0"
Expand All @@ -244,7 +247,7 @@ atty = "0.2"
nix = "0.19.0"

[build-dependencies]
prost-build = "0.6.1"
prost-build = "0.7.0"
built = { version = "0.4.4", features = ["chrono", "git2"] }

[dev-dependencies]
Expand All @@ -257,13 +260,13 @@ libc = "0.2.80"
libz-sys = "1.1.2"
matches = "0.1.8"
pretty_assertions = "0.7.1"
reqwest = { version = "0.10.9", features = ["json"] }
reqwest = { version = "0.11.0", features = ["json"] }
rusty-fork = "0.3.0"
tempfile = "3.0.6"
tokio = { version = "0.2", features = ["test-util"] }
tokio-test = "0.4"
tokio = { version = "1.3.0", features = ["test-util"] }
tokio-test = "0.4.0"
tokio01-test = "0.1.1"
tower-test = "0.3.0"
tower-test = "0.4.0"
walkdir = "2.2.7"

[features]
Expand Down Expand Up @@ -383,16 +386,16 @@ sources-mongodb_metrics = ["mongodb"]
sources-nginx_metrics = ["nom"]
sources-postgresql_metrics = ["postgres-openssl", "tokio-postgres"]
sources-prometheus = ["prometheus-parser", "sinks-prometheus", "sources-utils-http", "warp"]
sources-socket = ["bytesize", "listenfd", "tokio-util/udp", "sources-utils-udp", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "sources-utils-unix"]
sources-socket = ["bytesize", "listenfd", "tokio-util/net", "sources-utils-udp", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "sources-utils-unix"]
sources-splunk_hec = ["bytesize", "sources-utils-tls", "warp"]
sources-statsd = ["listenfd", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "sources-utils-udp", "sources-utils-unix", "tokio-util/udp"]
sources-statsd = ["listenfd", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "sources-utils-udp", "sources-utils-unix", "tokio-util/net"]
sources-stdin = ["bytesize"]
sources-syslog = ["bytesize", "listenfd", "tokio-util/udp", "sources-utils-udp", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "sources-utils-unix", "syslog_loose"]
sources-syslog = ["bytesize", "listenfd", "tokio-util/net", "sources-utils-udp", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "sources-utils-unix", "syslog_loose"]
sources-utils-http = ["snap", "sources-utils-tls", "warp"]
sources-utils-tcp-keepalive = []
sources-utils-tcp-socket = []
sources-utils-tls = []
sources-utils-udp = ["socket2"]
sources-utils-udp = []
sources-utils-unix = []
sources-vector = ["listenfd", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls"]

Expand Down Expand Up @@ -545,8 +548,8 @@ sinks-pulsar = ["avro-rs", "pulsar"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
sinks-socket = ["sinks-utils-udp"]
sinks-splunk_hec = ["bytesize"]
sinks-statsd = ["sinks-utils-udp", "tokio-util/udp"]
sinks-utils-udp = ["socket2"]
sinks-statsd = ["sinks-utils-udp", "tokio-util/net"]
sinks-utils-udp = []
sinks-vector = ["sinks-utils-udp"]

# Identifies that the build is a nightly build
Expand Down Expand Up @@ -664,10 +667,3 @@ required-features = ["metrics-benches"]
name = "distribution_statistic"
harness = false
required-features = ["statistic-benches"]

[patch.'https://github.com/tower-rs/tower']
tower-layer = "=0.3.0"

[patch.crates-io]
# TODO: update to the next 0.13.x (after 0.13.10, if any) or 0.14 (or higher)
hyper = { version = "0.13", git = "https://github.com/hyperium/hyper", rev = "d7495a75abca34646b1d6d047589c1b8110d0fa5" }
4 changes: 2 additions & 2 deletions benches/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn benchmark_batching(c: &mut Criterion) {
batch_sink,
)
},
|(mut rt, input, batch_sink)| rt.block_on(input.forward(batch_sink)).unwrap(),
|(rt, input, batch_sink)| rt.block_on(input.forward(batch_sink)).unwrap(),
criterion::BatchSize::LargeInput,
)
},
Expand All @@ -86,7 +86,7 @@ fn benchmark_batching(c: &mut Criterion) {

(rt, stream::iter(input.clone()).map(Ok), batch_sink)
},
|(mut rt, input, batch_sink)| rt.block_on(input.forward(batch_sink)).unwrap(),
|(rt, input, batch_sink)| rt.block_on(input.forward(batch_sink)).unwrap(),
criterion::BatchSize::LargeInput,
)
});
Expand Down
12 changes: 6 additions & 6 deletions benches/buffering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn benchmark_buffers(c: &mut Criterion) {
when_full: Default::default(),
};

let mut rt = runtime();
let rt = runtime();
let (output_lines, topology) = rt.block_on(async move {
let output_lines = CountReceiver::receive_lines(out_addr);
let (topology, _crash) = start_topology(config.build().unwrap(), false).await;
Expand All @@ -44,7 +44,7 @@ fn benchmark_buffers(c: &mut Criterion) {

(rt, topology, output_lines)
},
|(mut rt, topology, output_lines)| {
|(rt, topology, output_lines)| {
rt.block_on(async move {
let lines = random_lines(line_size).take(num_lines);
send_lines(in_addr, lines).await.unwrap();
Expand Down Expand Up @@ -82,7 +82,7 @@ fn benchmark_buffers(c: &mut Criterion) {
when_full: Default::default(),
};
config.global.data_dir = Some(data_dir.path().to_path_buf());
let mut rt = runtime();
let rt = runtime();
let (output_lines, topology) = rt.block_on(async move {
let output_lines = CountReceiver::receive_lines(out_addr);
let (topology, _crash) = start_topology(config.build().unwrap(), false).await;
Expand All @@ -91,7 +91,7 @@ fn benchmark_buffers(c: &mut Criterion) {
});
(rt, topology, output_lines)
},
|(mut rt, topology, output_lines)| {
|(rt, topology, output_lines)| {
rt.block_on(async move {
let lines = random_lines(line_size).take(num_lines);
send_lines(in_addr, lines).await.unwrap();
Expand Down Expand Up @@ -130,7 +130,7 @@ fn benchmark_buffers(c: &mut Criterion) {
//when_full: Default::default(),
//};
//config.global.data_dir = Some(data_dir.path().to_path_buf());
//let mut rt = runtime();
//let rt = runtime();
//let (output_lines, topology) = rt.block_on(async move {
//let output_lines = CountReceiver::receive_lines(out_addr);
//let (topology, _crash) = start_topology(config.build().unwrap(), false).await;
Expand All @@ -139,7 +139,7 @@ fn benchmark_buffers(c: &mut Criterion) {
//});
//(rt, topology, output_lines)
//},
//|(mut rt, topology, output_lines)| {
//|(rt, topology, output_lines)| {
//rt.block_on(async move {
//let lines = random_lines(line_size).take(num_lines);
//send_lines(in_addr, lines).await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions benches/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ fn benchmark_files_without_partitions(c: &mut Criterion) {
},
);

let mut rt = runtime();
let rt = runtime();
let (topology, input) = rt.block_on(async move {
let (topology, _crash) = start_topology(config.build().unwrap(), false).await;

Expand All @@ -74,7 +74,7 @@ fn benchmark_files_without_partitions(c: &mut Criterion) {
});
(rt, topology, input)
},
|(mut rt, topology, input)| {
|(rt, topology, input)| {
rt.block_on(async move {
let lines = random_lines(line_size).take(num_lines).map(|mut line| {
line.push('\n');
Expand Down
4 changes: 2 additions & 2 deletions benches/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn benchmark_http(c: &mut Criterion) {
},
);

let mut rt = runtime();
let rt = runtime();
let topology = rt.block_on(async move {
let (topology, _crash) =
start_topology(config.build().unwrap(), false).await;
Expand All @@ -67,7 +67,7 @@ fn benchmark_http(c: &mut Criterion) {
});
(rt, topology)
},
|(mut rt, topology)| {
|(rt, topology)| {
rt.block_on(async move {
let lines = random_lines(line_size).take(num_lines);
send_lines(in_addr, lines).await.unwrap();
Expand Down
20 changes: 11 additions & 9 deletions benches/isolated_buffering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::{
};
use futures01::{stream, Sink, Stream};
use tempfile::tempdir;
use tokio_stream::wrappers::ReceiverStream;
use vector::{
buffers::{
disk::{leveldb_buffer, DiskBuffer},
Expand Down Expand Up @@ -55,7 +56,7 @@ fn benchmark_buffers(c: &mut Criterion) {

(rt, writer, read_loop)
},
|(mut rt, writer, read_loop)| {
|(rt, writer, read_loop)| {
let send = writer.send_all(random_events(line_size).take(num_lines as u64));

let read_handle = rt.spawn(read_loop.compat());
Expand All @@ -81,7 +82,7 @@ fn benchmark_buffers(c: &mut Criterion) {

(rt, writer, read_handle)
},
|(mut rt, mut writer, read_handle)| {
|(rt, mut writer, read_handle)| {
let write_handle = rt.spawn(async move {
let mut stream = random_events(line_size).take(num_lines as u64).compat();
while let Some(e) = stream.next().await {
Expand All @@ -101,13 +102,14 @@ fn benchmark_buffers(c: &mut Criterion) {
|| {
let rt = runtime();

let (writer, mut reader) = tokio::sync::mpsc::channel(100);
let (writer, reader) = tokio::sync::mpsc::channel(100);
let mut stream = ReceiverStream::new(reader);

let read_handle = rt.spawn(async move { while reader.next().await.is_some() {} });
let read_handle = rt.spawn(async move { while stream.next().await.is_some() {} });

(rt, writer, read_handle)
},
|(mut rt, mut writer, read_handle)| {
|(rt, writer, read_handle)| {
let write_handle = rt.spawn(async move {
let mut stream = random_events(line_size).take(num_lines as u64).compat();
while let Some(e) = stream.next().await {
Expand Down Expand Up @@ -136,7 +138,7 @@ fn benchmark_buffers(c: &mut Criterion) {

(rt, writer)
},
|(mut rt, writer)| {
|(rt, writer)| {
let send = writer.send_all(random_events(line_size).take(num_lines as u64));
let write_handle = rt.spawn(send.compat());
let _ = rt.block_on(write_handle).unwrap().unwrap();
Expand All @@ -150,7 +152,7 @@ fn benchmark_buffers(c: &mut Criterion) {
|| {
let data_dir = tempdir().unwrap();

let mut rt = runtime();
let rt = runtime();

let plenty_of_room = num_lines * line_size * 2;
let (writer, reader, acker) =
Expand All @@ -170,7 +172,7 @@ fn benchmark_buffers(c: &mut Criterion) {

(rt, read_loop)
},
|(mut rt, read_loop)| {
|(rt, read_loop)| {
let read_handle = rt.spawn(read_loop);
rt.block_on(read_handle).unwrap().unwrap();
},
Expand Down Expand Up @@ -198,7 +200,7 @@ fn benchmark_buffers(c: &mut Criterion) {

(rt, writer, read_loop)
},
|(mut rt, writer, read_loop)| {
|(rt, writer, read_loop)| {
let send = writer.send_all(random_events(line_size).take(num_lines as u64));

let read_handle = rt.spawn(read_loop);
Expand Down