Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DockerHub implementation for send_tags #6580

Merged
merged 2 commits into from
Aug 9, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
145 changes: 134 additions & 11 deletions cmd/oci-catalog/src/providers/dockerhub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tonic::Status;
/// The default page size with which requests are sent to docker hub.
const DEFAULT_PAGE_SIZE: u8 = 100;
pub const PROVIDER_NAME: &str = "DockerHubAPI";
pub const DOCKERHUB_URI: &str = "https://hub.docker.com";

#[derive(Serialize, Deserialize)]
struct DockerHubV2Repository {
Expand All @@ -29,6 +30,21 @@ struct DockerHubV2RepositoriesResult {
results: Vec<DockerHubV2Repository>,
}

#[derive(Serialize, Deserialize)]
struct DockerHubV2Tag {
name: String,
repository_type: Option<String>,
content_type: String,
}

#[derive(Serialize, Deserialize)]
struct DockerHubV2TagsResult {
count: u16,
next: Option<String>,
previous: Option<String>,
results: Vec<DockerHubV2Tag>,
}

#[derive(Debug, Default)]
pub struct DockerHubAPI {}

Expand All @@ -44,7 +60,7 @@ impl OCICatalogSender for DockerHubAPI {
tx: mpsc::Sender<Result<Repository, Status>>,
request: &ListRepositoriesRequest,
) {
let mut url = url_for_request(request);
let mut url = url_for_request_repositories(request);

let client = reqwest::Client::builder().build().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not now, but given the DockerHub rate limits, perhaps it is worthwhile adding sth like: https://crates.io/crates/governor for better handling it on the client side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that'd be great to add in the future if we find we're hitting rate limits (I'll be testing this using the bitnami catalog, which is probably on the larger side of OCI helm registries).


Expand Down Expand Up @@ -105,25 +121,79 @@ impl OCICatalogSender for DockerHubAPI {
}
}

async fn send_tags(&self, tx: mpsc::Sender<Result<Tag, Status>>, _request: &ListTagsRequest) {
for count in 0..10 {
tx.send(Ok(Tag {
name: format!("tag-{}", count),
}))
.await
.unwrap();
async fn send_tags(&self, tx: mpsc::Sender<Result<Tag, Status>>, request: &ListTagsRequest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what is worth, (I guess you already know it, but just in case), here is the "official" (albeit tagged as experimental) DockerHub client: https://github.com/docker/hub-tool/blob/main/pkg/hub/tags.go#L61

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as you can see in their (go) client, they need to collect all the page results (appending to the array) before returning the result. In our case, we're wanting to stream the results back (via gRPC stream) while they're collected, which is why I'm using these send_tags and send_repositories, to show the results down a channel (which we can buffer as needed) and continue fetching.

BTW: interesting to see they use the same default page size :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! Thanks for the explanation!

let mut url = match url_for_request_tags(request) {
Ok(u) => u,
Err(e) => {
tx.send(Err(e)).await.unwrap();
return;
}
};

let client = reqwest::Client::builder().build().unwrap();

loop {
log::debug!("requesting: {}", url);
let response = match client.get(url.clone()).send().await {
Ok(r) => r,
Err(e) => {
tx.send(Err(Status::failed_precondition(e.to_string())))
.await
.unwrap();
return;
}
};

if response.status() != StatusCode::OK {
tx.send(Err(Status::failed_precondition(format!(
"unexpected status code when requesting {}: {}",
url,
response.status()
))))
.await
.unwrap();
return;
}

let body = match response.text().await {
Ok(b) => b,
Err(e) => {
tx.send(Err(Status::failed_precondition(format!(
"unable to extract body from response: {}",
e.to_string()
))))
.await
.unwrap();
return;
}
};
log::trace!("response body: {}", body);

let response: DockerHubV2TagsResult = serde_json::from_str(&body).unwrap();

for tag in response.results {
tx.send(Ok(Tag { name: tag.name })).await.unwrap();
}

if response.next.is_some() {
url = reqwest::Url::parse(&response.next.unwrap()).unwrap();
Comment on lines +182 to +183
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - reading it twice, perhaps this is the pagination logic? If so, I would add some comments just for our future ourselves to understand 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is us using the pagination of the dockerhub API, yes - requesting the next page if response.next is defined. I'll add a comment to that effect.

} else {
break;
}
}
}
}

fn url_for_request(request: &ListRepositoriesRequest) -> Url {
let mut url = reqwest::Url::parse("https://hub.docker.com/v2/repositories/").unwrap();
fn url_for_request_repositories(request: &ListRepositoriesRequest) -> Url {
let mut url = reqwest::Url::parse(DOCKERHUB_URI).unwrap();

if !request.namespace.is_empty() {
url.set_path(&format!(
"/v2/namespaces/{}/repositories/",
request.namespace
));
} else {
url.set_path("/v2/repositories/");
}
// For now we use a default page size and default ordering.
url.query_pairs_mut()
Expand All @@ -137,6 +207,30 @@ fn url_for_request(request: &ListRepositoriesRequest) -> Url {
url
}

fn url_for_request_tags(request: &ListTagsRequest) -> Result<Url, Status> {
let mut url = reqwest::Url::parse(DOCKERHUB_URI).unwrap();

let repo = match request.repository.clone() {
Some(r) => r,
None => {
return Err(Status::invalid_argument(format!(
"repository not set in request"
)))
}
};

url.set_path(&format!(
"/v2/repositories/{}/{}/tags",
repo.namespace, repo.name
));

// For now we use a default page size.
url.query_pairs_mut()
.append_pair("page_size", &format!("{}", DEFAULT_PAGE_SIZE));
Comment on lines +231 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, are we only fetching (DEFAULT_PAGE_SIZE=100) elements for now, aren't we? I mean, there I don't see any pagination logic here. It seems Docker is using a Link header to let the client know about the next page (source), but we are not using it at the moment. No?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just the current default pagesize for the requests we make to dockerhub (ie. how big are the batches that we request), so will affect performance only (number of requests required to exhaust the result). It's not limiting how many results we get at all (ie. if I set this to 10, then the loop will fetch the results in batches of 10).

The gRPC API that we provide streams the result back, so no pagination required.


Ok(url)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -165,6 +259,35 @@ mod tests {
..Default::default()
}, "https://hub.docker.com/v2/namespaces/bitnamicharts/repositories/?page_size=100&ordering=name&content_types=helm&content_types=image")]
fn test_url_for_request(#[case] request: ListRepositoriesRequest, #[case] expected_url: Url) {
assert_eq!(url_for_request(&request), expected_url);
assert_eq!(url_for_request_repositories(&request), expected_url);
}

#[rstest]
#[case::without_repository(ListTagsRequest{
..Default::default()
}, Err(Status::invalid_argument("bang")))]
#[case::with_repository(ListTagsRequest{
repository: Some(Repository{
namespace: "bitnamicharts".to_string(),
name: "apache".to_string(),
..Default::default()
}),
..Default::default()
}, Ok(reqwest::Url::parse("https://hub.docker.com/v2/repositories/bitnamicharts/apache/tags?page_size=100").unwrap()))]
fn test_url_for_request_tags(
#[case] request: ListTagsRequest,
#[case] expected_result: Result<Url, Status>,
) {
match expected_result {
Ok(url) => {
assert_eq!(url_for_request_tags(&request).unwrap(), url);
}
Err(e) => {
assert_eq!(
url_for_request_tags(&request).err().unwrap().code(),
e.code()
)
}
}
}
}