Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
"block": {
"number": true,
"hash": true,
"eventInboxRoot": true,
"consensusParametersVersion": true,
"stateTransitionBytecodeVersion": true,
"messageOutboxRoot": true
"consensusParametersVersion": true
}
},
"includeAllBlocks": false
Expand Down
Git LFS file not shown
Git LFS file not shown
19 changes: 8 additions & 11 deletions crates/query/src/scan/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,21 @@ use crate::primitives::Name;

#[derive(Debug)]
pub struct TableDoesNotExist {
pub table_name: Name
pub table_name: Name,
}


impl TableDoesNotExist {
pub fn new(table_name: Name) -> Self {
Self {
table_name
}
Self { table_name }
}
}


impl std::fmt::Display for TableDoesNotExist {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "table '{}' does not exist", self.table_name)
}
}


impl std::error::Error for TableDoesNotExist {}

#[derive(Debug)]
Expand All @@ -30,20 +25,22 @@ pub struct ColumnDoesNotExist {
pub table_name: String,
}


impl ColumnDoesNotExist {
pub fn new(table_name: String, column_name: Name) -> Self {
Self {
table_name,
column_name
column_name,
}
}
}


impl std::fmt::Display for ColumnDoesNotExist {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "column '{}' is not found in {}", self.column_name, self.table_name)
write!(
f,
"column '{}' is not found in '{}'",
self.column_name, self.table_name
)
}
}

Expand Down
19 changes: 12 additions & 7 deletions crates/query/src/scan/parquet/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,33 @@ use rayon::prelude::*;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::ops::Not;
use std::path::PathBuf;
use std::sync::Arc;

#[derive(Clone)]
pub struct ParquetFile {
io: MmapIO,
metadata: Arc<ParquetMetadata>,
filename: Arc<String>,
table_name: String,
}

impl ParquetFile {
pub fn open<P: Into<String>>(file: P) -> anyhow::Result<Self> {
let filename = file.into();
pub fn open(file: impl Into<PathBuf>) -> anyhow::Result<Self> {
let path = file.into();

let io = MmapIO::open(&filename)?;
let table_name = path.file_stem()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_default();

let io = MmapIO::open(&path)?;

let metadata =
ArrowReaderMetadata::load(&io, ArrowReaderOptions::new().with_page_index(true))?;

Ok(Self {
io,
metadata: Arc::new(ParquetMetadata::new(metadata)),
filename: Arc::new(filename),
table_name,
})
}
}
Expand Down Expand Up @@ -171,8 +176,8 @@ impl TableReader for ParquetFile {
if default_null_columns.map_or(false, |dnc| dnc.contains(name)) {
missing_null_columns.push(name);
} else {
tracing::error!("column '{}' is not found in {}", name, self.filename);
anyhow::bail!(ColumnDoesNotExist::new(self.filename.to_string(), name));
tracing::error!("column '{}' is not found in {}", name, self.table_name);
anyhow::bail!(ColumnDoesNotExist::new(self.table_name.to_string(), name));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/query/src/scan/parquet/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct MmapIO {


impl MmapIO {
pub fn open(filename: &str) -> std::io::Result<Self> {
pub fn open(filename: impl AsRef<std::path::Path>) -> std::io::Result<Self> {
let file = std::fs::File::open(filename)?;
let mmap = unsafe {
MmapOptions::new().map(&file)
Expand Down
8 changes: 5 additions & 3 deletions crates/query/tests/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ fn test_fixture(chunk: &dyn Chunk, query_file: PathBuf) {
let case_dir = query_file.parent().unwrap();
let result_file = case_dir.join("result.json");

let actual_bytes = execute_query(chunk, &query_file).unwrap();
let actual: serde_json::Value = serde_json::from_slice(&actual_bytes).unwrap();
let actual: serde_json::Value = match execute_query(chunk, &query_file) {
Ok(bytes) => serde_json::from_slice(&bytes).unwrap(),
Err(err) => serde_json::Value::String(err.to_string()),
};

let expected: serde_json::Value = match std::fs::read(&result_file) {
Ok(expected_bytes) => serde_json::from_slice(&expected_bytes).unwrap(),
Expand Down Expand Up @@ -182,4 +184,4 @@ mod storage {

Ok(())
}
}
}
Loading