Skip to content

Commit

Permalink
HTTP registry implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
arlosi committed Mar 21, 2022
1 parent 109bfbd commit 412b633
Show file tree
Hide file tree
Showing 18 changed files with 1,807 additions and 323 deletions.
163 changes: 162 additions & 1 deletion crates/cargo-test-support/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::collections::BTreeMap;
use std::fmt::Write as _;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Write};
use std::net::TcpListener;
use std::net::{SocketAddr, TcpListener};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use tar::{Builder, Header};
use url::Url;
Expand Down Expand Up @@ -368,6 +370,165 @@ pub fn alt_init() {
RegistryBuilder::new().alternative(true).build();
}

pub struct RegistryServer {
done: Arc<AtomicBool>,
server: Option<thread::JoinHandle<()>>,
addr: SocketAddr,
}

impl RegistryServer {
pub fn addr(&self) -> SocketAddr {
self.addr
}
}

impl Drop for RegistryServer {
fn drop(&mut self) {
self.done.store(true, Ordering::SeqCst);
// NOTE: we can't actually await the server since it's blocked in accept()
let _ = self.server.take();
}
}

#[must_use]
pub fn serve_registry(registry_path: PathBuf) -> RegistryServer {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let done = Arc::new(AtomicBool::new(false));
let done2 = done.clone();

let t = thread::spawn(move || {
let mut line = String::new();
'server: while !done2.load(Ordering::SeqCst) {
let (socket, _) = listener.accept().unwrap();
// Let's implement a very naive static file HTTP server.
let mut buf = BufReader::new(socket);

// First, the request line:
// GET /path HTTPVERSION
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
// Connection terminated.
continue;
}

assert!(line.starts_with("GET "), "got non-GET request: {}", line);
let path = PathBuf::from(
line.split_whitespace()
.skip(1)
.next()
.unwrap()
.trim_start_matches('/'),
);

let file = registry_path.join(path);
if file.exists() {
// Grab some other headers we may care about.
let mut if_modified_since = None;
let mut if_none_match = None;
loop {
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
continue 'server;
}

if line == "\r\n" {
// End of headers.
line.clear();
break;
}

let value = line
.splitn(2, ':')
.skip(1)
.next()
.map(|v| v.trim())
.unwrap();

if line.starts_with("If-Modified-Since:") {
if_modified_since = Some(value.to_owned());
} else if line.starts_with("If-None-Match:") {
if_none_match = Some(value.trim_matches('"').to_owned());
}
}

// Now grab info about the file.
let data = fs::read(&file).unwrap();
let etag = Sha256::new().update(&data).finish_hex();
let last_modified = format!("{:?}", file.metadata().unwrap().modified().unwrap());

// Start to construct our response:
let mut any_match = false;
let mut all_match = true;
if let Some(expected) = if_none_match {
if etag != expected {
all_match = false;
} else {
any_match = true;
}
}
if let Some(expected) = if_modified_since {
// NOTE: Equality comparison is good enough for tests.
if last_modified != expected {
all_match = false;
} else {
any_match = true;
}
}

// Write out the main response line.
if any_match && all_match {
buf.get_mut()
.write_all(b"HTTP/1.1 304 Not Modified\r\n")
.unwrap();
} else {
buf.get_mut().write_all(b"HTTP/1.1 200 OK\r\n").unwrap();
}
// TODO: Support 451 for crate index deletions.

// Write out other headers.
buf.get_mut()
.write_all(format!("Content-Length: {}\r\n", data.len()).as_bytes())
.unwrap();
buf.get_mut()
.write_all(format!("ETag: \"{}\"\r\n", etag).as_bytes())
.unwrap();
buf.get_mut()
.write_all(format!("Last-Modified: {}\r\n", last_modified).as_bytes())
.unwrap();

// And finally, write out the body.
buf.get_mut().write_all(b"\r\n").unwrap();
buf.get_mut().write_all(&data).unwrap();
} else {
loop {
line.clear();
if buf.read_line(&mut line).unwrap() == 0 {
// Connection terminated.
continue 'server;
}

if line == "\r\n" {
break;
}
}

buf.get_mut()
.write_all(b"HTTP/1.1 404 Not Found\r\n\r\n")
.unwrap();
buf.get_mut().write_all(b"\r\n").unwrap();
}
buf.get_mut().flush().unwrap();
}
});

RegistryServer {
addr,
server: Some(t),
done,
}
}

/// Creates a new on-disk registry.
pub fn init_registry(registry_path: PathBuf, dl_url: String, api_url: Url, api_path: PathBuf) {
// Initialize a new registry.
Expand Down
2 changes: 2 additions & 0 deletions src/cargo/core/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ unstable_cli_options!(
no_index_update: bool = ("Do not update the registry index even if the cache is outdated"),
panic_abort_tests: bool = ("Enable support to run tests with -Cpanic=abort"),
host_config: bool = ("Enable the [host] section in the .cargo/config.toml file"),
http_registry: bool = ("Support HTTP-based crate registries"),
target_applies_to_host: bool = ("Enable the `target-applies-to-host` key in the .cargo/config.toml file"),
rustdoc_map: bool = ("Allow passing external documentation mappings to rustdoc"),
separate_nightlies: bool = (HIDDEN),
Expand Down Expand Up @@ -875,6 +876,7 @@ impl CliUnstable {
"multitarget" => self.multitarget = parse_empty(k, v)?,
"rustdoc-map" => self.rustdoc_map = parse_empty(k, v)?,
"terminal-width" => self.terminal_width = Some(parse_usize_opt(v)?),
"http-registry" => self.http_registry = parse_empty(k, v)?,
"namespaced-features" => stabilized_warn(k, "1.60", STABILISED_NAMESPACED_FEATURES),
"weak-dep-features" => stabilized_warn(k, "1.60", STABILIZED_WEAK_DEP_FEATURES),
"credential-process" => self.credential_process = parse_empty(k, v)?,
Expand Down
11 changes: 2 additions & 9 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,6 @@ impl<'cfg> PackageSet<'cfg> {
) -> CargoResult<PackageSet<'cfg>> {
// We've enabled the `http2` feature of `curl` in Cargo, so treat
// failures here as fatal as it would indicate a build-time problem.
//
// Note that the multiplexing support is pretty new so we're having it
// off-by-default temporarily.
//
// Also note that pipelining is disabled as curl authors have indicated
// that it's buggy, and we've empirically seen that it's buggy with HTTP
// proxies.
let mut multi = Multi::new();
let multiplexing = config.http_config()?.multiplexing.unwrap_or(true);
multi
Expand Down Expand Up @@ -700,7 +693,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
return Ok(Some(pkg));
}

// Ask the original source fo this `PackageId` for the corresponding
// Ask the original source for this `PackageId` for the corresponding
// package. That may immediately come back and tell us that the package
// is ready, or it could tell us that it needs to be downloaded.
let mut sources = self.set.sources.borrow_mut();
Expand Down Expand Up @@ -757,7 +750,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// initiate dozens of connections to crates.io, but rather only one.
// Once the main one is opened we realized that pipelining is possible
// and multiplexing is possible with static.crates.io. All in all this
// reduces the number of connections done to a more manageable state.
// reduces the number of connections down to a more manageable state.
try_old_curl!(handle.pipewait(true), "pipewait");

handle.write_function(move |buf| {
Expand Down
7 changes: 6 additions & 1 deletion src/cargo/core/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ impl<'cfg> PackageRegistry<'cfg> {
}

self.load(namespace, kind)?;

// This isn't strictly necessary since it will be called later.
// However it improves error messages for sources that issue errors
// in `block_until_ready` because the callers here have context about
// which deps are being resolved.
self.block_until_ready()?;
Ok(())
}
Expand Down Expand Up @@ -273,7 +278,7 @@ impl<'cfg> PackageRegistry<'cfg> {
// First up we need to actually resolve each `deps` specification to
// precisely one summary. We're not using the `query` method below as it
// internally uses maps we're building up as part of this method
// (`patches_available` and `patches). Instead we're going straight to
// (`patches_available` and `patches`). Instead we're going straight to
// the source to load information from it.
//
// Remember that each dependency listed in `[patch]` has to resolve to
Expand Down
7 changes: 6 additions & 1 deletion src/cargo/core/source/source_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ impl SourceId {
Ok(SourceId::new(SourceKind::Registry, url, None)?
.with_precise(Some("locked".to_string())))
}
"sparse" => {
let url = string.into_url()?;
Ok(SourceId::new(SourceKind::Registry, url, None)?
.with_precise(Some("locked".to_string())))
}
"path" => {
let url = url.into_url()?;
SourceId::new(SourceKind::Path, url, None)
Expand Down Expand Up @@ -301,7 +306,7 @@ impl SourceId {
self,
yanked_whitelist,
config,
))),
)?)),
SourceKind::LocalRegistry => {
let path = match self.inner.url.to_file_path() {
Ok(p) => p,
Expand Down
9 changes: 6 additions & 3 deletions src/cargo/ops/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ fn registry(
}
let api_host = {
let _lock = config.acquire_package_cache_lock()?;
let mut src = RegistrySource::remote(sid, &HashSet::new(), config);
let mut src = RegistrySource::remote(sid, &HashSet::new(), config)?;
// Only update the index if the config is not available or `force` is set.
if force_update {
src.invalidate_cache()
Expand Down Expand Up @@ -528,8 +528,11 @@ pub fn http_handle_and_timeout(config: &Config) -> CargoResult<(Easy, HttpTimeou
specified"
)
}
if !config.network_allowed() {
bail!("can't make HTTP request in the offline mode")
if config.offline() {
bail!(
"attempting to make an HTTP request, but --offline was \
specified"
)
}

// The timeout option for libcurl by default times out the entire transfer,
Expand Down
10 changes: 3 additions & 7 deletions src/cargo/sources/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,7 @@ impl<'cfg> Debug for PathSource<'cfg> {

impl<'cfg> Source for PathSource<'cfg> {
fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll<CargoResult<()>> {
if !self.updated {
return Poll::Pending;
}
self.update()?;
for s in self.packages.iter().map(|p| p.summary()) {
if dep.matches(s) {
f(s.clone())
Expand All @@ -514,9 +512,7 @@ impl<'cfg> Source for PathSource<'cfg> {
_dep: &Dependency,
f: &mut dyn FnMut(Summary),
) -> Poll<CargoResult<()>> {
if !self.updated {
return Poll::Pending;
}
self.update()?;
for s in self.packages.iter().map(|p| p.summary()) {
f(s.clone())
}
Expand All @@ -537,7 +533,7 @@ impl<'cfg> Source for PathSource<'cfg> {

fn download(&mut self, id: PackageId) -> CargoResult<MaybePackage> {
trace!("getting packages; id={}", id);

self.update()?;
let pkg = self.packages.iter().find(|pkg| pkg.package_id() == id);
pkg.cloned()
.map(MaybePackage::Ready)
Expand Down
Loading

0 comments on commit 412b633

Please sign in to comment.