Skip to content

Commit

Permalink
Refactor to cleanup and prepare for directory caching (#23)
Browse files Browse the repository at this point in the history
* there is no point in returning type and compression with every `get_tile` -- PMTiles keeps this data in the header, so it is constant, and if the user needs it, they can get it directly. Returns `bytes` now.
* add `Entry::is_leaf` helper
* add `Header::get_bounds` and `Header::get_center` (tilejson structs)
* no need for `AsyncPmTilesReader<B: ...>` type - it has to be specified in the impl anyway
* no need for the `backend::read_initial_bytes` - it is only used once, has default implementation anyway. Inlined.
* inlined `read_directory_with_backend` - used once and tiny
* split up the `find_tile_entry` into two functions - I will need this later to add caching -- the root entry is permanently cached as part of the main struct, but the other ones are not, so needs a different code path.
* added `cargo test` for default features
  • Loading branch information
nyurik committed Nov 10, 2023
1 parent c7dc1c4 commit 2de5837
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 119 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ jobs:
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pmtiles"
version = "0.3.1"
version = "0.4.0"
edition = "2021"
authors = ["Luke Seelenbinder <luke.seelenbinder@stadiamaps.com>"]
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -36,10 +36,10 @@ tokio = { version = "1", default-features = false, features = ["io-util"], optio
varint-rs = "2"

[dev-dependencies]
flate2 = "1"
fmmap = { version = "0.3", features = ["tokio-async"] }
reqwest = { version = "0.11", features = ["rustls-tls-webpki-roots"] }
tokio = { version = "1", features = ["test-util", "macros", "rt"] }
flate2 = "1"

[package.metadata.docs.rs]
all-features = true
3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ test:
cargo test --features http-async
cargo test --features mmap-async-tokio
cargo test --features tilejson
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

# Run cargo fmt and cargo clippy
Expand All @@ -25,7 +26,7 @@ fmt:

# Run cargo clippy
clippy:
cargo clippy --workspace --all-targets --bins --tests --lib --benches -- -D warnings
cargo clippy --workspace --all-targets --all-features --bins --tests --lib --benches -- -D warnings

# Build and open code documentation
docs:
Expand Down
151 changes: 58 additions & 93 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
use crate::http::HttpBackend;
#[cfg(feature = "mmap-async-tokio")]
use crate::mmap::MmapBackend;
use crate::tile::{tile_id, Tile};
use crate::tile::tile_id;
use crate::{Compression, Header};

pub struct AsyncPmTilesReader<B: AsyncBackend> {
pub header: Header,
pub struct AsyncPmTilesReader<B> {
backend: B,
header: Header,
root_directory: Directory,
}

Expand All @@ -30,11 +30,13 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_source(backend: B) -> Result<Self, Error> {
let mut initial_bytes = backend.read_initial_bytes().await?;

let header_bytes = initial_bytes.split_to(HEADER_SIZE);
// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
let mut initial_bytes = backend.read(0, MAX_INITIAL_BYTES).await?;
if initial_bytes.len() < HEADER_SIZE {
return Err(Error::InvalidHeader);
}

let header = Header::try_from_bytes(header_bytes)?;
let header = Header::try_from_bytes(initial_bytes.split_to(HEADER_SIZE))?;

let directory_bytes = initial_bytes
.split_off((header.root_offset as usize) - HEADER_SIZE)
Expand All @@ -44,45 +46,37 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
Self::read_compressed_directory(header.internal_compression, directory_bytes).await?;

Ok(Self {
header,
backend,
header,
root_directory,
})
}

/// Fetches a [Tile] from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option<Tile> {
/// Fetches tile bytes from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option<Bytes> {
let tile_id = tile_id(z, x, y);
let entry = self.find_tile_entry(tile_id, None, 0).await?;

let data = self
.backend
.read_exact(
(self.header.data_offset + entry.offset) as _,
entry.length as _,
)
.await
.ok()?;
let entry = self.find_tile_entry(tile_id).await?;

Some(Tile {
data,
tile_type: self.header.tile_type,
tile_compression: self.header.tile_compression,
})
let offset = (self.header.data_offset + entry.offset) as _;
let length = entry.length as _;
let data = self.backend.read_exact(offset, length).await.ok()?;

Some(data)
}

/// Access header information.
pub fn get_header(&self) -> &Header {
&self.header
}

/// Gets metadata from the archive.
///
/// Note: by spec, this should be valid JSON. This method currently returns a [String].
/// This may change in the future.
pub async fn get_metadata(&self) -> Result<String, Error> {
let metadata = self
.backend
.read_exact(
self.header.metadata_offset as _,
self.header.metadata_length as _,
)
.await?;
let offset = self.header.metadata_offset as _;
let length = self.header.metadata_length as _;
let metadata = self.backend.read_exact(offset, length).await?;

let decompressed_metadata =
Self::decompress(self.header.internal_compression, metadata).await?;
Expand Down Expand Up @@ -132,71 +126,52 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
Ok(tj)
}

#[async_recursion]
async fn find_tile_entry(
&self,
tile_id: u64,
next_dir: Option<Directory>,
depth: u8,
) -> Option<Entry> {
// Max recursion...
if depth >= 4 {
return None;
/// Recursively locates a tile in the archive.
async fn find_tile_entry(&self, tile_id: u64) -> Option<Entry> {
let entry = self.root_directory.find_tile_id(tile_id);
if let Some(entry) = entry {
if entry.is_leaf() {
return self.find_entry_rec(tile_id, entry, 0).await;
}
}
entry.cloned()
}

let next_dir = next_dir.as_ref().unwrap_or(&self.root_directory);

match next_dir.find_tile_id(tile_id) {
None => None,
Some(needle) => {
if needle.run_length == 0 {
// Leaf directory
let next_dir = self
.read_directory(
(self.header.leaf_offset + needle.offset) as _,
needle.length as _,
)
.await
.ok()?;
self.find_tile_entry(tile_id, Some(next_dir), depth + 1)
.await
#[async_recursion]
async fn find_entry_rec(&self, tile_id: u64, entry: &Entry, depth: u8) -> Option<Entry> {
// the recursion is done as two functions because it is a bit cleaner,
// and it allows directory to be cached later without cloning it first.
let offset = (self.header.leaf_offset + entry.offset) as _;
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id);

if let Some(entry) = entry {
if entry.is_leaf() {
return if depth <= 4 {
self.find_entry_rec(tile_id, entry, depth + 1).await
} else {
Some(needle.clone())
}
None
};
}
}

entry.cloned()
}

async fn read_directory(&self, offset: usize, length: usize) -> Result<Directory, Error> {
Self::read_directory_with_backend(
&self.backend,
self.header.internal_compression,
offset,
length,
)
.await
let data = self.backend.read_exact(offset, length).await?;
Self::read_compressed_directory(self.header.internal_compression, data).await
}

async fn read_compressed_directory(
compression: Compression,
bytes: Bytes,
) -> Result<Directory, Error> {
let decompressed_bytes = Self::decompress(compression, bytes).await?;

Directory::try_from(decompressed_bytes)
}

async fn read_directory_with_backend(
backend: &B,
compression: Compression,
offset: usize,
length: usize,
) -> Result<Directory, Error> {
let directory_bytes = backend.read_exact(offset, length).await?;

Self::read_compressed_directory(compression, directory_bytes).await
}

async fn decompress(compression: Compression, bytes: Bytes) -> Result<Bytes, Error> {
let mut decompressed_bytes = Vec::with_capacity(bytes.len() * 2);
match compression {
Expand Down Expand Up @@ -229,8 +204,8 @@ impl AsyncPmTilesReader<MmapBackend> {
/// Creates a new PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(p: P) -> Result<Self, Error> {
let backend = MmapBackend::try_from(p).await?;
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_source(backend).await
}
Expand All @@ -243,16 +218,6 @@ pub trait AsyncBackend {

/// Reads up to `length` bytes starting at `offset`.
async fn read(&self, offset: usize, length: usize) -> Result<Bytes, Error>;

/// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
async fn read_initial_bytes(&self) -> Result<Bytes, Error> {
let bytes = self.read(0, MAX_INITIAL_BYTES).await?;
if bytes.len() < HEADER_SIZE {
return Err(Error::InvalidHeader);
}

Ok(bytes)
}
}

#[cfg(test)]
Expand All @@ -274,11 +239,11 @@ mod tests {
let tile = tiles.get_tile(z, x, y).await.unwrap();

assert_eq!(
tile.data.len(),
tile.len(),
fixture_bytes.len(),
"Expected tile length to match."
);
assert_eq!(tile.data, fixture_bytes, "Expected tile to match fixture.");
assert_eq!(tile, fixture_bytes, "Expected tile to match fixture.");
}

#[tokio::test]
Expand Down
9 changes: 8 additions & 1 deletion src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Directory {
// https://github.com/protomaps/PMTiles/blob/9c7f298fb42290354b8ed0a9b2f50e5c0d270c40/js/index.ts#L210
if next_id > 0 {
let previous_tile = self.entries.get(next_id - 1)?;
if previous_tile.run_length == 0
if previous_tile.is_leaf()
|| tile_id - previous_tile.tile_id < previous_tile.run_length as u64
{
return Some(previous_tile);
Expand Down Expand Up @@ -88,6 +88,13 @@ pub(crate) struct Entry {
pub(crate) run_length: u32,
}

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
impl Entry {
pub fn is_leaf(&self) -> bool {
self.run_length == 0
}
}

#[cfg(test)]
mod tests {
use std::io::{BufReader, Read, Write};
Expand Down
30 changes: 19 additions & 11 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,27 @@ impl Header {
tiles: sources,
minzoom: self.min_zoom,
maxzoom: self.max_zoom,
bounds: tilejson::Bounds::new(
self.min_longitude as f64,
self.min_latitude as f64,
self.max_longitude as f64,
self.max_latitude as f64,
),
center: tilejson::Center::new(
self.center_longitude as f64,
self.center_latitude as f64,
self.center_zoom,
),
bounds: self.get_bounds(),
center: self.get_center(),
}
}

pub fn get_bounds(&self) -> tilejson::Bounds {
tilejson::Bounds::new(
self.min_longitude as f64,
self.min_latitude as f64,
self.max_longitude as f64,
self.max_latitude as f64,
)
}

pub fn get_center(&self) -> tilejson::Center {
tilejson::Center::new(
self.center_longitude as f64,
self.center_latitude as f64,
self.center_zoom,
)
}
}

#[derive(Debug, Eq, PartialEq, Copy, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ mod tests {
let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
let backend = HttpBackend::try_from(client, TEST_URL).unwrap();

let _tiles = AsyncPmTilesReader::try_from_source(backend).await.unwrap();
AsyncPmTilesReader::try_from_source(backend).await.unwrap();
}
}
10 changes: 0 additions & 10 deletions src/tile.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use bytes::Bytes;

use crate::{Compression, TileType};

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio", test))]
pub(crate) fn tile_id(z: u8, x: u64, y: u64) -> u64 {
if z == 0 {
Expand All @@ -15,12 +11,6 @@ pub(crate) fn tile_id(z: u8, x: u64, y: u64) -> u64 {
base_id + tile_id
}

pub struct Tile {
pub data: Bytes,
pub tile_type: TileType,
pub tile_compression: Compression,
}

#[cfg(test)]
mod test {
use super::tile_id;
Expand Down

0 comments on commit 2de5837

Please sign in to comment.