Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed May 23, 2024
1 parent 1786d17 commit 59efd8d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 58 deletions.
3 changes: 1 addition & 2 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::watcher;
use async_stream::stream;
use futures::{Stream, StreamExt};
use std::hash::Hash;
#[cfg(feature = "unstable-runtime-subscribe")]
pub use store::store_shared;
#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
pub use store::{store, Store};

/// Cache objects from a [`watcher()`] stream into a local [`Store`]
Expand Down
89 changes: 33 additions & 56 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,19 +496,15 @@ where
match api.list(&lp).await {
Ok(list) => {
if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) {
(
Some(Ok(Event::InitPage(list.items))),
State::InitPage {
continue_token: Some(continue_token),
},
)
(Some(Ok(Event::InitPage(list.items))), State::InitPage {
continue_token: Some(continue_token),
})
} else if let Some(resource_version) =
list.metadata.resource_version.filter(|s| !s.is_empty())
{
(
Some(Ok(Event::InitPage(list.items))),
State::InitPageDone { resource_version },
)
(Some(Ok(Event::InitPage(list.items))), State::InitPageDone {
resource_version,
})
} else {
(Some(Err(Error::NoResourceVersion)), State::Empty)
}
Expand Down Expand Up @@ -540,13 +536,10 @@ where
Some(Ok(WatchEvent::Bookmark(bm))) => {
let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
if marks_initial_end {
(
Some(Ok(Event::Ready)),
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
)
(Some(Ok(Event::Ready)), State::Watching {

Check warning on line 539 in kube-runtime/src/watcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/watcher.rs#L539

Added line #L539 was not covered by tests
resource_version: bm.metadata.resource_version,
stream,
})
} else {
(None, State::InitialWatch { stream })
}
Expand Down Expand Up @@ -578,23 +571,19 @@ where
}
State::InitListed { resource_version } => {
match api.watch(&wc.to_watch_params(), &resource_version).await {
Ok(stream) => (
None,
State::Watching {
resource_version,
stream,
},
),
Ok(stream) => (None, State::Watching {
resource_version,
stream,
}),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");
} else {
debug!("watch initlist error: {err:?}");
}
(
Some(Err(Error::WatchStartFailed(err))),
State::InitListed { resource_version },
)
(Some(Err(Error::WatchStartFailed(err))), State::InitListed {
resource_version,
})
}
}
}
Expand All @@ -607,36 +596,27 @@ where
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(
Some(Ok(Event::Apply(obj))),
State::Watching {
resource_version,
stream,
},
)
(Some(Ok(Event::Apply(obj))), State::Watching {
resource_version,
stream,
})
}
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_version().unwrap_or_default();
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(
Some(Ok(Event::Delete(obj))),
State::Watching {
resource_version,
stream,
},
)
(Some(Ok(Event::Delete(obj))), State::Watching {
resource_version,
stream,
})
}
}
Some(Ok(WatchEvent::Bookmark(bm))) => (
None,
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
),
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
}),
Some(Ok(WatchEvent::Error(err))) => {
// HTTP GONE, means we have desynced and need to start over and re-list :(
let new_state = if err.code == 410 {
Expand All @@ -660,13 +640,10 @@ where
} else {
debug!("watcher error: {err:?}");
}
(
Some(Err(Error::WatchFailed(err))),
State::Watching {
resource_version,
stream,
},
)
(Some(Err(Error::WatchFailed(err))), State::Watching {
resource_version,
stream,
})
}
None => (None, State::InitListed { resource_version }),
},
Expand Down

0 comments on commit 59efd8d

Please sign in to comment.