Skip to content

Commit

Permalink
fix runtime tests after paging
Browse files Browse the repository at this point in the history
Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed May 21, 2024
1 parent f7de17b commit b1bb452
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 85 deletions.
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rust.unsafe_code = "forbid"
#rust.missing_docs = "warn"

[dependencies]
futures.workspace = true
futures = { workspace = true, features = ["async-await"] }
kube-client = { path = "../kube-client", version = "=0.91.0", default-features = false, features = ["jsonpatch", "client"] }
derivative.workspace = true
serde.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ mod tests {
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
println!("reconciling {:?}", obj.metadata.name);
//println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
})
},
Expand All @@ -1763,6 +1763,7 @@ mod tests {
queue_rx.map(Result::<_, Infallible>::Ok),
Config::default(),
));
store_tx.apply_watcher_event(&watcher::Event::Restart);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
Expand Down
37 changes: 22 additions & 15 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,9 @@ pub(crate) mod test {
Poll::Ready(Some(Ok(Event::Restart)))
));

assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
// these don't come back in order atm:
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));

// When main channel is closed, it is propagated to subscribers
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand All @@ -292,7 +293,7 @@ pub(crate) mod test {
]);

let foo = Arc::new(foo);
let bar = Arc::new(bar);
let _bar = Arc::new(bar);

let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
Expand All @@ -315,20 +316,23 @@ pub(crate) mod test {
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartInit)))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
));
drop(reflect);

assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
// we will get foo and bar here, but we dont have a guaranteed ordering on page events
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
}

Expand All @@ -343,9 +347,8 @@ pub(crate) mod test {
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Apply(foo.clone())),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])),
Ok(Event::Restart),
Ok(Event::Apply(bar.clone())),
Ok(Event::Apply(foo.clone())),
]);

let foo = Arc::new(foo);
Expand All @@ -364,13 +367,14 @@ pub(crate) mod test {

// Poll first subscriber, but not the second.
//
// The buffer can hold one value, so even if we have a slow subscriber,
// The buffer can hold one object value, so even if we have a slow subscriber,
// we will still get an event from the root.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

// One subscriber is not reading, so we need to apply backpressure until
// channel has capacity.
//
Expand All @@ -387,18 +391,21 @@ pub(crate) mod test {

// We now have room for only one more item. In total, the previous event
// had two. We repeat the same pattern.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
// Poll again to drain the queue.
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));

assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
Expand Down
3 changes: 1 addition & 2 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::watcher;
use async_stream::stream;
use futures::{Stream, StreamExt};
use std::hash::Hash;
#[cfg(feature = "unstable-runtime-subscribe")]
pub use store::store_shared;
#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
pub use store::{store, Store};

/// Cache objects from a [`watcher()`] stream into a local [`Store`]
Expand Down
3 changes: 1 addition & 2 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ mod backoff_reset_timer;
pub(crate) mod delayed_init;
mod event_flatten;
mod event_modify;
#[cfg(feature = "unstable-runtime-predicates")]
mod predicate;
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
mod reflect;
mod stream_backoff;
mod watch_ext;
Expand Down
105 changes: 41 additions & 64 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,9 @@ where
{
match state {
State::Empty => match wc.initial_list_strategy {
InitialListStrategy::ListWatch => (
Some(Ok(Event::RestartInit)),
State::InitPage { continue_token: None },
),
InitialListStrategy::ListWatch => (Some(Ok(Event::RestartInit)), State::InitPage {
continue_token: None,
}),
InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await {
Ok(stream) => (None, State::InitialWatch { stream }),

Check warning on line 494 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L494

Added line #L494 was not covered by tests
Err(err) => {
Expand All @@ -509,19 +508,15 @@ where
match api.list(&lp).await {
Ok(list) => {
if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) {
(
Some(Ok(Event::RestartPage(list.items))),
State::InitPage {
continue_token: Some(continue_token),
},
)
(Some(Ok(Event::RestartPage(list.items))), State::InitPage {
continue_token: Some(continue_token),
})
} else if let Some(resource_version) =
list.metadata.resource_version.filter(|s| !s.is_empty())

Check warning on line 515 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L515

Added line #L515 was not covered by tests
{
(
Some(Ok(Event::RestartPage(list.items))),
State::InitPageDone { resource_version },
)
(Some(Ok(Event::RestartPage(list.items))), State::InitPageDone {
resource_version,

Check warning on line 518 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L518

Added line #L518 was not covered by tests
})
} else {
(Some(Err(Error::NoResourceVersion)), State::Empty)

Check warning on line 521 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L521

Added line #L521 was not covered by tests
}
Expand All @@ -544,20 +539,18 @@ where
Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
(Some(Ok(Event::RestartApply(obj))), State::InitialWatch { stream })

Check warning on line 540 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L540

Added line #L540 was not covered by tests
}
Some(Ok(WatchEvent::Deleted(obj))) => (
Some(Ok(Event::RestartDelete(obj))),
State::InitialWatch { stream },
),
Some(Ok(WatchEvent::Deleted(obj))) => {
(Some(Ok(Event::RestartDelete(obj))), State::InitialWatch {
stream,

Check warning on line 544 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L543-L544

Added lines #L543 - L544 were not covered by tests
})
}
Some(Ok(WatchEvent::Bookmark(bm))) => {
let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
if marks_initial_end {
(
Some(Ok(Event::Restart)),
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
)
(Some(Ok(Event::Restart)), State::Watching {

Check warning on line 550 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L550

Added line #L550 was not covered by tests
resource_version: bm.metadata.resource_version,
stream,
})
} else {
(None, State::InitialWatch { stream })

Check warning on line 555 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L555

Added line #L555 was not covered by tests
}
Expand Down Expand Up @@ -589,23 +582,19 @@ where
}
State::InitListed { resource_version } => {
match api.watch(&wc.to_watch_params(), &resource_version).await {
Ok(stream) => (
None,
State::Watching {
resource_version,
stream,
},
),
Ok(stream) => (None, State::Watching {
resource_version,
stream,
}),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");
} else {
debug!("watch initlist error: {err:?}");
}
(
Some(Err(Error::WatchStartFailed(err))),
State::InitListed { resource_version },
)
(Some(Err(Error::WatchStartFailed(err))), State::InitListed {
resource_version,
})
}
}
}
Expand All @@ -618,36 +607,27 @@ where
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(
Some(Ok(Event::Apply(obj))),
State::Watching {
resource_version,
stream,
},
)
(Some(Ok(Event::Apply(obj))), State::Watching {
resource_version,
stream,
})
}
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_version().unwrap_or_default();
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(
Some(Ok(Event::Delete(obj))),
State::Watching {
resource_version,
stream,
},
)
(Some(Ok(Event::Delete(obj))), State::Watching {
resource_version,
stream,
})
}
}
Some(Ok(WatchEvent::Bookmark(bm))) => (
None,
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
),
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
}),
Some(Ok(WatchEvent::Error(err))) => {
// HTTP GONE, means we have desynced and need to start over and re-list :(
let new_state = if err.code == 410 {
Expand All @@ -671,13 +651,10 @@ where
} else {
debug!("watcher error: {err:?}");
}
(
Some(Err(Error::WatchFailed(err))),
State::Watching {
resource_version,
stream,
},
)
(Some(Err(Error::WatchFailed(err))), State::Watching {
resource_version,
stream,
})
}
None => (None, State::InitListed { resource_version }),
},
Expand Down

0 comments on commit b1bb452

Please sign in to comment.