Skip to content

Commit

Permalink
Consolidate backends with their invokers
Browse files Browse the repository at this point in the history
  • Loading branch information
nyurik committed Feb 3, 2024
1 parent 8c32043 commit ee05cc4
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 208 deletions.
20 changes: 14 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ categories = ["science::geo"]

[features]
default = []
http-async = ["dep:tokio", "dep:reqwest"]
s3-async-native = ["dep:tokio", "dep:rust-s3", "rust-s3?/tokio-native-tls"]
s3-async-rustls = ["dep:tokio", "dep:rust-s3", "rust-s3?/tokio-rustls-tls"]
mmap-async-tokio = ["dep:tokio", "dep:fmmap", "fmmap?/tokio-async", "async-compression?/tokio"]
http-async = ["__async", "dep:reqwest"]
mmap-async-tokio = ["__async", "dep:fmmap", "fmmap?/tokio-async"]
s3-async-native = ["__async-s3"]
s3-async-rustls = ["__async-s3"]
tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"]

# TODO: support other async libraries
# Forward some of the common features to reqwest dependency
reqwest-native-tls = ["reqwest?/native-tls"]
reqwest-rustls-tls = ["reqwest?/rustls-tls"]
reqwest-rustls-tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
reqwest-rustls-tls-native-roots = ["reqwest?/rustls-tls-native-roots"]

# Internal features, do not use
__async = ["dep:tokio", "async-compression/tokio"]
__async-s3 = ["__async", "dep:rust-s3", "rust-s3?/tokio-native-tls"]

[dependencies]
# TODO: determine how we want to handle compression in async & sync environments
Expand All @@ -29,13 +37,13 @@ bytes = "1"
fmmap = { version = "0.3", default-features = false, optional = true }
hilbert_2d = "1"
reqwest = { version = "0.11", default-features = false, optional = true }
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }
serde = { version = "1", optional = true }
serde_json = { version = "1", optional = true }
thiserror = "1"
tilejson = { version = "0.4", optional = true }
tokio = { version = "1", default-features = false, features = ["io-util"], optional = true }
varint-rs = "2"
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }

[dev-dependencies]
flate2 = "1"
Expand Down
101 changes: 3 additions & 98 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,14 @@
// so any file larger than 4GB, or an untrusted file with bad data may crash.
#![allow(clippy::cast_possible_truncation)]

#[cfg(feature = "mmap-async-tokio")]
use std::path::Path;

use async_recursion::async_recursion;
use async_trait::async_trait;
use bytes::Bytes;
#[cfg(feature = "http-async")]
use reqwest::{Client, IntoUrl};
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-rustls",
feature = "s3-async-native"
))]
#[cfg(feature = "__async")]
use tokio::io::AsyncReadExt;

#[cfg(feature = "http-async")]
use crate::backend::HttpBackend;
#[cfg(feature = "mmap-async-tokio")]
use crate::backend::MmapBackend;
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
use crate::backend::S3Backend;
use crate::cache::DirCacheResult;
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
#[cfg(feature = "__async")]
use crate::cache::{DirectoryCache, NoCache};
use crate::directory::{DirEntry, Directory};
use crate::error::{PmtError, PmtResult};
Expand Down Expand Up @@ -227,80 +206,6 @@ impl<B: AsyncBackend + Sync + Send, C: DirectoryCache + Sync + Send> AsyncPmTile
}
}

#[cfg(feature = "http-async")]
impl AsyncPmTilesReader<HttpBackend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> PmtResult<Self> {
Self::new_with_cached_url(NoCache, client, url).await
}
}

#[cfg(feature = "http-async")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_url<U: IntoUrl>(
cache: C,
client: Client,
url: U,
) -> PmtResult<Self> {
let backend = HttpBackend::try_from(client, url)?;

Self::try_from_cached_source(backend, cache).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl AsyncPmTilesReader<MmapBackend, NoCache> {
/// Creates a new `PMTiles` reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> PmtResult<Self> {
Self::new_with_cached_path(NoCache, path).await
}
}

#[cfg(feature = "mmap-async-tokio")]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
/// Creates a new cached `PMTiles` reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_cached_path<P: AsRef<Path>>(cache: C, path: P) -> PmtResult<Self> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_cached_source(backend, cache).await
}
}

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
impl AsyncPmTilesReader<S3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_bucket_path(bucket: s3::Bucket, path: String) -> PmtResult<Self> {
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
}
}

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_bucket_path(
cache: C,
bucket: s3::Bucket,
path: String,
) -> PmtResult<Self> {
let backend = S3Backend::from(bucket, path);

Self::try_from_cached_source(backend, cache).await
}
}

#[async_trait]
pub trait AsyncBackend {
/// Reads exactly `length` bytes starting at `offset`
Expand All @@ -314,8 +219,8 @@ pub trait AsyncBackend {
#[cfg(feature = "mmap-async-tokio")]
mod tests {
use super::AsyncPmTilesReader;
use crate::backend::MmapBackend;
use crate::tests::{RASTER_FILE, VECTOR_FILE};
use crate::MmapBackend;

#[tokio::test]
async fn open_sanity_check() {
Expand Down
17 changes: 0 additions & 17 deletions src/backend/mod.rs

This file was deleted.

50 changes: 0 additions & 50 deletions src/backend/s3.rs

This file was deleted.

27 changes: 26 additions & 1 deletion src/backend/http.rs → src/backend_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,35 @@ use bytes::Bytes;
use reqwest::header::{HeaderValue, RANGE};
use reqwest::{Client, IntoUrl, Method, Request, StatusCode, Url};

use crate::async_reader::AsyncBackend;
use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
use crate::cache::{DirectoryCache, NoCache};
use crate::error::PmtResult;
use crate::PmtError;

impl AsyncPmTilesReader<HttpBackend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_url<U: IntoUrl>(client: Client, url: U) -> PmtResult<Self> {
Self::new_with_cached_url(NoCache, client, url).await
}
}

impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<HttpBackend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_url<U: IntoUrl>(
cache: C,
client: Client,
url: U,
) -> PmtResult<Self> {
let backend = HttpBackend::try_from(client, url)?;

Self::try_from_cached_source(backend, cache).await
}
}

pub struct HttpBackend {
client: Client,
url: Url,
Expand Down
23 changes: 22 additions & 1 deletion src/backend/mmap.rs → src/backend_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,30 @@ use async_trait::async_trait;
use bytes::{Buf, Bytes};
use fmmap::tokio::{AsyncMmapFile, AsyncMmapFileExt as _, AsyncOptions};

use crate::async_reader::AsyncBackend;
use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
use crate::cache::{DirectoryCache, NoCache};
use crate::error::{PmtError, PmtResult};

impl AsyncPmTilesReader<MmapBackend, NoCache> {
/// Creates a new `PMTiles` reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> PmtResult<Self> {
Self::new_with_cached_path(NoCache, path).await
}
}

impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
/// Creates a new cached `PMTiles` reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_cached_path<P: AsRef<Path>>(cache: C, path: P) -> PmtResult<Self> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_cached_source(backend, cache).await
}
}

pub struct MmapBackend {
file: AsyncMmapFile,
}
Expand Down
76 changes: 76 additions & 0 deletions src/backend_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use async_trait::async_trait;
use bytes::Bytes;
use s3::Bucket;

use crate::async_reader::{AsyncBackend, AsyncPmTilesReader};
use crate::cache::{DirectoryCache, NoCache};
use crate::error::PmtError::{ResponseBodyTooLong, UnexpectedNumberOfBytesReturned};
use crate::PmtResult;

impl AsyncPmTilesReader<S3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_bucket_path(bucket: Bucket, path: String) -> PmtResult<Self> {
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
}
}

impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_bucket_path(
cache: C,
bucket: Bucket,
path: String,
) -> PmtResult<Self> {
let backend = S3Backend::from(bucket, path);

Self::try_from_cached_source(backend, cache).await
}
}

pub struct S3Backend {
bucket: Bucket,
path: String,
}

impl S3Backend {
#[must_use]
pub fn from(bucket: Bucket, path: String) -> S3Backend {
Self { bucket, path }
}
}

#[async_trait]
impl AsyncBackend for S3Backend {
async fn read_exact(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(UnexpectedNumberOfBytesReturned(length, data.len()))
}
}

async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let response = self
.bucket
.get_object_range(
self.path.as_str(),
offset as _,
Some((offset + length - 1) as _),
)
.await?;

let response_bytes = response.bytes();

if response_bytes.len() > length {
Err(ResponseBodyTooLong(response_bytes.len(), length))
} else {
Ok(response_bytes.clone())
}
}
}

0 comments on commit ee05cc4

Please sign in to comment.