Skip to content

Commit

Permalink
feat(rust): support glob in parquet object_storage (#5928)
Browse files Browse the repository at this point in the history
Co-authored-by: Marius <lov2cod@gmail.com>
  • Loading branch information
winding-lines and winding-lines committed Dec 29, 2022
1 parent 30da2b1 commit 30613db
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 25 deletions.
2 changes: 1 addition & 1 deletion examples/read_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use polars::prelude::*;

// Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket.
// Adjust the link below.
const TEST_S3: &str = "s3://lov2test/polars/datasets/foods1.parquet";
const TEST_S3: &str = "s3://lov2test/polars/datasets/*.parquet";

fn main() -> PolarsResult<()> {
let cred = Credentials::default().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub mod json;
pub mod ndjson_core;
#[cfg(feature = "object_store")]
mod object_store;
#[cfg(feature = "object_store")]
pub use crate::object_store::glob as async_glob;

#[cfg(any(
feature = "csv-file",
Expand Down
300 changes: 284 additions & 16 deletions polars/polars-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ use std::sync::Arc;
use std::task::Poll;

use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::future::{ready, BoxFuture};
use futures::lock::Mutex;
use futures::{AsyncRead, AsyncSeek, Future, TryFutureExt};
use futures::{AsyncRead, AsyncSeek, Future, StreamExt, TryFutureExt, TryStreamExt};
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectStore;
use polars_core::prelude::{PolarsError, PolarsResult};
use regex::Regex;
use url::Url;

type OptionalFuture = Arc<Mutex<Option<BoxFuture<'static, std::io::Result<Vec<u8>>>>>>;
const DELIMITER: char = '/';

/// Adaptor to translate from AsyncSeek and AsyncRead to the object_store get_range API.
pub struct CloudReader {
Expand Down Expand Up @@ -135,29 +137,295 @@ impl AsyncSeek for CloudReader {
}
}

/// Split the url in
/// 1. the prefix part (all path components until the first one with '*')
/// 2. a regular expression representation of the rest.
fn extract_prefix_expansion(url: &str) -> PolarsResult<(String, Option<String>)> {
let splits = url.split(DELIMITER);
let mut prefix = String::new();
let mut expansion = String::new();
let mut last_split_was_wildcard = false;
for split in splits {
let has_star = split.contains('*');
if expansion.is_empty() && !has_star {
// We are still gathering splits in the prefix.
if !prefix.is_empty() {
prefix.push(DELIMITER);
}
prefix.push_str(split);
continue;
}
// We are gathering splits for the expansion.
//
// Handle '**', we expect them to be by themselves in a split.
if split == "**" {
last_split_was_wildcard = true;
expansion.push_str(".*");
continue;
}
if split.contains("**") {
return PolarsResult::Err(PolarsError::ComputeError(
format!("Expected '**' by itself in path component, got {url}.").into(),
));
}
if !last_split_was_wildcard && !expansion.is_empty() {
expansion.push(DELIMITER);
}
// Handle '.' inside a split.
if split.contains('.') || split.contains('*') {
let processed = split.replace('.', "\\.");
expansion.push_str(&processed.replace('*', "([^/]*)"));
continue;
}
last_split_was_wildcard = false;
expansion.push_str(split);
}
// Prefix post-processing: when present, prefix should end with '/' in order to simplify matching.
if !prefix.is_empty() && !expansion.is_empty() {
prefix.push(DELIMITER);
}
// Expansion post-processing: when present, expansion should cover the whole input.
if !expansion.is_empty() {
expansion.insert(0, '^');
expansion.push('$');
}
Ok((
prefix,
if !expansion.is_empty() {
Some(expansion)
} else {
None
},
))
}

/// A location on cloud storage, may have wildcards.
#[derive(PartialEq, Debug)]
pub struct CloudLocation {
/// The scheme (s3, ...).
pub scheme: String,
/// The bucket name.
pub bucket: String,
/// The prefix inside the bucket, this will be the full key when wildcards are not used.
pub prefix: String,
/// The path components that need to be expanded.
pub expansion: Option<String>,
}

impl CloudLocation {
/// Parse a CloudLocation from an url.
fn new(url: &str) -> PolarsResult<CloudLocation> {
let parsed = Url::parse(url).map_err(anyhow::Error::from)?;
let is_local = parsed.scheme() == "file";
let (bucket, key) = if is_local {
("".into(), url[7..].into())
} else {
let key = parsed.path();
let bucket = parsed
.host()
.ok_or(PolarsError::ComputeError(
format!("Cannot parse bucket (ie host) from {url}").into(),
))?
.to_string();
(bucket, key)
};
let (mut prefix, expansion) = extract_prefix_expansion(key)?;
if is_local && key.starts_with(DELIMITER) {
prefix.insert(0, DELIMITER);
}
Ok(CloudLocation {
scheme: parsed.scheme().into(),
bucket,
prefix,
expansion,
})
}
}

/// Return a full url from a key relative to the given location.
fn full_url(scheme: &str, bucket: &str, key: Path) -> String {
format!("{scheme}://{bucket}/{key}")
}

/// Build an ObjectStore based on the URL and information from the environment. Return an object store and the path relative to the store.
pub fn build(uri: &str) -> PolarsResult<(Path, Box<dyn ObjectStore>)> {
let parsed = Url::parse(uri).map_err(anyhow::Error::from)?;
let path = Path::from(parsed.path());
match parsed.scheme() {
pub fn build(url: &str) -> PolarsResult<(CloudLocation, Box<dyn ObjectStore>)> {
let cloud_location = CloudLocation::new(url)?;
let store = match cloud_location.scheme.as_str() {
"s3" => {
let s3 = AmazonS3Builder::from_env()
.with_bucket_name(
parsed
.host()
.ok_or(PolarsError::ComputeError(
format!("Cannot parse host from {path}").into(),
))?
.to_string(),
)
.with_bucket_name(&cloud_location.bucket)
.build()
.map_err(anyhow::Error::from)?;
Ok((path, Box::new(s3)))
Ok::<_, PolarsError>(Box::new(s3) as Box<dyn ObjectStore>)
}
"file" => {
let local = LocalFileSystem::new();
Ok((path, Box::new(local)))
Ok::<_, PolarsError>(Box::new(local) as Box<dyn ObjectStore>)
}
_ => unimplemented!(),
}?;
Ok((cloud_location, store))
}

/// A simple matcher, if more is required consider depending on https://crates.io/crates/globset.
/// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`.
struct Matcher {
prefix: String,
re: Option<Regex>,
}

impl Matcher {
/// Build a Matcher for the given prefix and expansion.
fn new(prefix: String, expansion: Option<&str>) -> PolarsResult<Matcher> {
// Cloud APIs accept a prefix without any expansion, extract it.
let re = expansion
.map(|ex| Regex::new(ex).map_err(anyhow::Error::from))
.transpose()?;
Ok(Matcher { prefix, re })
}

fn is_matching(&self, key: &Path) -> bool {
let key: &str = key.as_ref();
if !key.starts_with(&self.prefix) {
// Prefix does not match, should not happen.
return false;
}
if self.re.is_none() {
return true;
}
let last = &key[self.prefix.len()..];
return self.re.as_ref().unwrap().is_match(last.as_ref());
}
}

#[tokio::main(flavor = "current_thread")]
/// List files with a prefix derived from the pattern.
pub async fn glob(url: &str) -> PolarsResult<Vec<String>> {
// Find the fixed prefix, up to the first '*'.

let (
CloudLocation {
scheme,
bucket,
prefix,
expansion,
},
store,
) = build(url)?;
let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?;

let list_stream = store
.list(Some(&Path::from(prefix)))
.await
.map_err(anyhow::Error::from)?;
let locations: Vec<Path> = list_stream
.then(|entry| async {
let entry = entry.map_err(|e| PolarsError::ComputeError(e.to_string().into()))?;
Ok::<_, PolarsError>(entry.location)
})
.filter(|name| match name {
PolarsResult::Ok(name) => ready(matcher.is_matching(name)),
_ => ready(true),
})
.try_collect()
.await?;
Ok(locations
.into_iter()
.map(|l| full_url(&scheme, &bucket, l))
.collect::<Vec<_>>())
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_cloud_location() {
assert_eq!(
CloudLocation::new("s3://a/b").unwrap(),
CloudLocation {
scheme: "s3".into(),
bucket: "a".into(),
prefix: "b".into(),
expansion: None,
}
);
assert_eq!(
CloudLocation::new("s3://a/b/*.c").unwrap(),
CloudLocation {
scheme: "s3".into(),
bucket: "a".into(),
prefix: "b/".into(),
expansion: Some("^([^/]*)\\.c$".into()),
}
);
assert_eq!(
CloudLocation::new("file:///a/b").unwrap(),
CloudLocation {
scheme: "file".into(),
bucket: "".into(),
prefix: "/a/b".into(),
expansion: None,
}
);
}

#[test]
fn test_extract_prefix_expansion() {
assert!(extract_prefix_expansion("**url").is_err());
assert_eq!(
extract_prefix_expansion("a/b.c").unwrap(),
("a/b.c".into(), None)
);
assert_eq!(
extract_prefix_expansion("a/**").unwrap(),
("a/".into(), Some("^.*$".into()))
);
assert_eq!(
extract_prefix_expansion("a/**/b").unwrap(),
("a/".into(), Some("^.*b$".into()))
);
assert_eq!(
extract_prefix_expansion("a/**/*b").unwrap(),
("a/".into(), Some("^.*([^/]*)b$".into()))
);
assert_eq!(
extract_prefix_expansion("a/**/data/*b").unwrap(),
("a/".into(), Some("^.*data/([^/]*)b$".into()))
);
assert_eq!(
extract_prefix_expansion("a/*b").unwrap(),
("a/".into(), Some("^([^/]*)b$".into()))
);
}

#[test]
fn test_matcher_file_name() {
let cloud_location = CloudLocation::new("s3://bucket/folder/*.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Regular match.
assert!(a.is_matching(&Path::from("folder/1.parquet")));
// Require . in the file name.
assert!(!a.is_matching(&Path::from("folder/1parquet")));
// Intermediary folders are not allowed.
assert!(!a.is_matching(&Path::from("folder/other/1.parquet")));
}

#[test]
fn test_matcher_folders() {
let cloud_location = CloudLocation::new("s3://bucket/folder/**/*.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Intermediary folders are optional.
assert!(a.is_matching(&Path::from("folder/1.parquet")));
// Intermediary folders are allowed.
assert!(a.is_matching(&Path::from("folder/other/1.parquet")));
let cloud_location = CloudLocation::new("s3://bucket/folder/**/data/*.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Required folder `data` is missing.
assert!(!a.is_matching(&Path::from("folder/1.parquet")));
// Required folder is present.
assert!(a.is_matching(&Path::from("folder/data/1.parquet")));
// Required folder is present and additional folders are allowed.
assert!(a.is_matching(&Path::from("folder/other/data/1.parquet")));
}
}
6 changes: 3 additions & 3 deletions polars/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use polars_core::schema::Schema;
use super::mmap;
use super::mmap::ColumnStore;
use super::read_impl::FetchRowGroups;
use crate::object_store::{build, CloudReader};
use crate::object_store::{build, CloudLocation, CloudReader};

pub struct ParquetObjectStore {
store: Arc<Mutex<Box<dyn ObjectStore>>>,
Expand All @@ -30,12 +30,12 @@ pub struct ParquetObjectStore {

impl ParquetObjectStore {
pub fn from_uri(uri: &str) -> PolarsResult<Self> {
let (path, store) = build(uri)?;
let (CloudLocation { prefix, .. }, store) = build(uri)?;
let store = Arc::new(Mutex::from(store));

Ok(ParquetObjectStore {
store,
path,
path: prefix.into(),
length: None,
metadata: None,
})
Expand Down

0 comments on commit 30613db

Please sign in to comment.