Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"bench",
"cli",
"lib/api",
"lib/model",
"lib/common",
"lib/encoding",
Expand All @@ -12,7 +13,8 @@ members = [
"lib/storage",
"lib/rdf-fusion",
"lib/web",
"testsuite", "lib/functions"]
"testsuite",
]
resolver = "2"

[workspace.package]
Expand Down Expand Up @@ -79,6 +81,7 @@ sparesults = "0.2.5"
spargebra = "0.3.5"

# Internal dependencies
rdf-fusion-api = { version = "=0.1.0", path = "lib/api" }
rdf-fusion-model = { version = "=0.1.0", path = "lib/model" }
rdf-fusion-common = { version = "=0.1.0", path = "lib/common" }
rdf-fusion-encoding = { version = "=0.1.0", path = "lib/encoding" }
Expand Down
21 changes: 7 additions & 14 deletions bench/src/benchmarks/bsbm/business_intelligence/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ impl BsbmBusinessIntelligenceBenchmark {
)?;
let result = match self.max_query_count {
None => list_raw_operations(&queries_path)?
.filter_map(parse_query)
.map(parse_query)
.collect(),
Some(max_query_count) => list_raw_operations(&queries_path)?
.filter_map(parse_query)
.map(parse_query)
.take(usize::try_from(max_query_count)?)
.collect(),
};
Expand Down Expand Up @@ -145,20 +145,13 @@ impl Benchmark for BsbmBusinessIntelligenceBenchmark {
}
}

fn parse_query(
query: BsbmBusinessIntelligenceRawOperation,
) -> Option<BsbmBusinessIntelligenceOperation> {
fn parse_query(query: BsbmBusinessIntelligenceRawOperation) -> BsbmBusinessIntelligenceOperation {
match query {
BsbmBusinessIntelligenceRawOperation::Query(name, query) => {
// TODO remove once describe is supported
if query.contains("DESCRIBE") {
None
} else {
Some(BsbmBusinessIntelligenceOperation::Query(
name,
Query::parse(&query.replace('#', ""), None).unwrap(),
))
}
BsbmBusinessIntelligenceOperation::Query(
name,
Query::parse(&query.replace(" #", ""), None).unwrap(),
)
}
}
}
Expand Down
16 changes: 4 additions & 12 deletions bench/src/benchmarks/bsbm/explore/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ impl BsbmExploreBenchmark {
.join_data_dir(PathBuf::from(format!("explore-{}.csv", self.dataset_size)).as_path())?;
let result = match self.max_query_count {
None => list_raw_operations(&queries_path)?
.filter_map(parse_query)
.map(parse_query)
.collect(),
Some(max_query_count) => list_raw_operations(&queries_path)?
.filter_map(parse_query)
.map(parse_query)
.take(usize::try_from(max_query_count)?)
.collect(),
};
Expand Down Expand Up @@ -140,18 +140,10 @@ impl Benchmark for BsbmExploreBenchmark {
}
}

fn parse_query(query: BsbmExploreRawOperation) -> Option<BsbmExploreOperation> {
fn parse_query(query: BsbmExploreRawOperation) -> BsbmExploreOperation {
match query {
BsbmExploreRawOperation::Query(name, query) => {
// TODO remove once describe is supported
if query.contains("DESCRIBE") {
None
} else {
Some(BsbmExploreOperation::Query(
name,
Query::parse(&query, None).unwrap(),
))
}
BsbmExploreOperation::Query(name, Query::parse(&query, None).unwrap())
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions lib/api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "rdf-fusion-api"
version.workspace = true
authors.workspace = true
license.workspace = true
edition.workspace = true
rust-version.workspace = true

[dependencies]
async-trait.workspace = true
datafusion.workspace = true
rdf-fusion-common.workspace = true
rdf-fusion-encoding.workspace = true
rdf-fusion-model.workspace = true

[lints]
workspace = true
1 change: 1 addition & 0 deletions lib/api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod storage;
3 changes: 3 additions & 0 deletions lib/api/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod quad_storage;

pub use quad_storage::*;
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::error::StorageError;
use crate::{BlankNodeMatchingMode, DFResult};
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_planner::ExtensionPlanner;
use rdf_fusion_common::error::StorageError;
use rdf_fusion_common::{BlankNodeMatchingMode, DFResult};
use rdf_fusion_encoding::QuadStorageEncoding;
use rdf_fusion_model::{
GraphName, GraphNameRef, NamedOrBlankNode, NamedOrBlankNodeRef, Quad, QuadRef, TriplePattern,
Variable,
Expand All @@ -14,13 +14,12 @@ use std::sync::Arc;
#[async_trait]
#[allow(clippy::len_without_is_empty)]
pub trait QuadStorage: Send + Sync {
/// Returns the table name of this [QuadStorage]. This name is used to register a table in the
/// DataFusion engine.
fn table_name(&self) -> &str;
/// Returns the quad storage encoding.
fn encoding(&self) -> QuadStorageEncoding;

/// Returns the [TableProvider] for this [QuadStorage]. This provider is registered in the
/// DataFusion session and used for planning the execution of queries.
fn table_provider(&self) -> Arc<dyn TableProvider>;
/// Returns a list of planners that support planning logical nodes requiring access to the
/// storage layer.
fn planners(&self) -> Vec<Arc<dyn ExtensionPlanner + Send + Sync>>;

/// Loads the given quads into the storage.
async fn extend(&self, quads: Vec<Quad>) -> Result<usize, StorageError>;
Expand Down Expand Up @@ -55,9 +54,8 @@ pub trait QuadStorage: Send + Sync {
/// Removes the given quad from the storage.
async fn remove(&self, quad: QuadRef<'_>) -> Result<bool, StorageError>;

/// Returns a list of planners that support planning logical nodes requiring access to the
/// storage layer.
fn planners(&self) -> Vec<Arc<dyn ExtensionPlanner + Send + Sync>>;
/// Returns the number of quads in the storage.
async fn len(&self) -> Result<usize, StorageError>;
}

/// The quad pattern evaluator is responsible for accessing the storage and returning a stream of
Expand All @@ -70,6 +68,9 @@ pub trait QuadStorage: Send + Sync {
/// snapshot of the storage layer.
#[async_trait]
pub trait QuadPatternEvaluator: Debug + Send + Sync {
/// Returns the [QuadStorageEncoding] of the storage layer.
fn storage_encoding(&self) -> QuadStorageEncoding;

/// Returns a stream of quads that match the given pattern.
///
/// The resulting stream must have a schema that projects to the variables provided in the
Expand Down
7 changes: 4 additions & 3 deletions lib/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ extern crate core;

mod blank_node_mode;
pub mod error;
mod quad_storage;
pub mod quads;

pub use blank_node_mode::BlankNodeMatchingMode;
pub use quad_storage::QuadPatternEvaluator;
pub use quad_storage::QuadStorage;

use datafusion::arrow::error::ArrowError;

pub type AResult<T> = Result<T, ArrowError>;
pub type DFResult<T> = datafusion::error::Result<T>;
4 changes: 4 additions & 0 deletions lib/common/src/quads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub const COL_GRAPH: &str = "graph";
pub const COL_SUBJECT: &str = "subject";
pub const COL_PREDICATE: &str = "predicate";
pub const COL_OBJECT: &str = "object";
1 change: 1 addition & 0 deletions lib/encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust-version.workspace = true
rdf-fusion-common.workspace = true
rdf-fusion-model.workspace = true
datafusion.workspace = true
async-trait.workspace = true

[dev-dependencies]
codspeed-criterion-compat = { workspace = true, features = ["async_tokio"] }
Expand Down
Loading
Loading