Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 68 additions & 66 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ directories = "5.0.1"
docker-api = "0.14.0"
dyn-clone = "1.0.17"
flate2 = "1.0.28"
futures = {version = "0.3.30", features = ["executor"]}
futures = "0.3.30"
futures-lite = "2.3.0"
hex = "0.4.3"
http = "0.2.12" # todo: upgrade this alongside hyper/axum/tokio-tungstenite/tower-http
Expand Down
6 changes: 3 additions & 3 deletions lib/dal-test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ rust_library(
deps = [
"//lib/buck2-resources:buck2-resources",
"//lib/council-server:council-server",
"//lib/dal:dal-integration-test",
"//lib/dal:dal",
"//lib/module-index-client:module-index-client",
"//lib/pinga-server:pinga-server-integration-test",
"//lib/rebaser-server:rebaser-server-integration-test",
"//lib/pinga-server:pinga-server",
"//lib/rebaser-server:rebaser-server",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
Expand Down
5 changes: 2 additions & 3 deletions lib/dal-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ impl Config {
config.pg.port = env::var(ENV_VAR_PG_PORT)
.unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string())
.parse()?;

config.pg.pool_max_size = 4;
config.pg.pool_max_size *= 32;
config.pg.certificate_path = Some(config.postgres_key_path.clone().try_into()?);

if let Ok(value) = env::var(ENV_VAR_PG_HOSTNAME) {
Expand All @@ -152,7 +151,7 @@ impl Config {
config.layer_cache_pg_pool.port = env::var(ENV_VAR_PG_PORT)
.unwrap_or_else(|_| DEFAULT_TEST_PG_PORT_STR.to_string())
.parse()?;
config.layer_cache_pg_pool.pool_max_size = 4;
config.layer_cache_pg_pool.pool_max_size *= 32;
config.layer_cache_pg_pool.certificate_path =
Some(config.postgres_key_path.clone().try_into()?);

Expand Down
83 changes: 2 additions & 81 deletions lib/dal/BUCK
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
load(
"@prelude-si//:macros.bzl",
"rust_library",
"rust_library_integration_test",
"rust_test",
)

Expand Down Expand Up @@ -76,83 +75,6 @@ rust_library(
test_unit_deps = [
"//third-party/rust:tempfile",
],
)

rust_library_integration_test(
name = "dal-integration-test",
crate = "dal",
deps = [
"//lib/si-cbor:si-cbor",
"//lib/council-server:council-server",
"//lib/nats-subscriber:nats-subscriber",
"//lib/object-tree:object-tree",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
"//lib/si-events-rs:si-events",
"//lib/si-hash:si-hash",
"//lib/si-layer-cache:si-layer-cache",
"//lib/si-pkg:si-pkg",
"//lib/si-std:si-std",
"//lib/telemetry-rs:telemetry",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/veritech-client:veritech-client",
"//third-party/rust:async-recursion",
"//third-party/rust:async-trait",
"//third-party/rust:base64",
"//third-party/rust:blake3",
"//third-party/rust:chrono",
"//third-party/rust:ciborium",
"//third-party/rust:convert_case",
"//third-party/rust:derive_more",
"//third-party/rust:diff",
"//third-party/rust:dyn-clone",
"//third-party/rust:futures",
"//third-party/rust:hex",
"//third-party/rust:iftree",
"//third-party/rust:itertools",
"//third-party/rust:jwt-simple",
"//third-party/rust:lazy_static",
"//third-party/rust:once_cell",
"//third-party/rust:paste",
"//third-party/rust:petgraph",
"//third-party/rust:postcard",
"//third-party/rust:postgres-types",
"//third-party/rust:pretty_assertions_sorted",
"//third-party/rust:rand",
"//third-party/rust:refinery",
"//third-party/rust:regex",
"//third-party/rust:remain",
"//third-party/rust:serde",
"//third-party/rust:serde-aux",
"//third-party/rust:serde_json",
"//third-party/rust:serde_with",
"//third-party/rust:sled",
"//third-party/rust:sodiumoxide",
"//third-party/rust:strum",
"//third-party/rust:thiserror",
"//third-party/rust:tokio",
"//third-party/rust:tokio-stream",
"//third-party/rust:ulid",
"//third-party/rust:url",
],
rustc_flags = [
"--cfg=integration_test",
],
srcs = glob([
"src/**/*.rs",
"src/builtins/func/**",
"src/builtins/schema/data/**/*.json",
"src/builtins/schema/definitions/**/*.json",
"src/migrations/**/*.sql",
"src/queries/**/*.sql",
]),
env = {
"CARGO_MANIFEST_DIR": ".",
},
test_unit_deps = [
"//third-party/rust:tempfile",
],
extra_test_targets = [":test-integration"],
)

Expand All @@ -161,8 +83,7 @@ rust_test(
deps = [
"//lib/dal-test:dal-test",
"//lib/rebaser-core:rebaser-core",
"//lib/rebaser-server:rebaser-server-integration-test",
"//lib/si-events-rs:si-events",
"//lib/rebaser-server:rebaser-server",
"//lib/si-pkg:si-pkg",
"//lib/veritech-client:veritech-client",
"//third-party/rust:base64",
Expand All @@ -177,7 +98,7 @@ rust_test(
"//third-party/rust:tokio",
"//third-party/rust:tokio-util",
"//third-party/rust:ulid",
":dal-integration-test",
":dal",
],
crate_root = "tests/integration.rs",
srcs = glob([
Expand Down
1 change: 0 additions & 1 deletion lib/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,5 @@ veritech-client = { path = "../../lib/veritech-client" }

itertools = { workspace = true }
pretty_assertions_sorted = { workspace = true }
si-events = { path = "../../lib/si-events-rs" }
tempfile = { workspace = true }
tokio-util = { workspace = true }
2 changes: 1 addition & 1 deletion lib/dal/src/action/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl ActionPrototype {
{
let node_weight = workspace_snapshot.get_node_weight(node_index).await?;
let id = node_weight.id();
if NodeWeightDiscriminants::Func == node_weight.as_ref().into() {
if NodeWeightDiscriminants::Func == node_weight.into() {
return Ok(id.into());
}
}
Expand Down
10 changes: 3 additions & 7 deletions lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl AttributePrototype {
{
let node_weight = workspace_snapshot.get_node_weight(node_index).await?;
let node_weight_id = node_weight.id();
if NodeWeightDiscriminants::Func == node_weight.as_ref().into() {
if NodeWeightDiscriminants::Func == node_weight.into() {
return Ok(node_weight_id.into());
}
}
Expand Down Expand Up @@ -356,7 +356,6 @@ impl AttributePrototype {
let (target_id, edge_weight_discrim) = match workspace_snapshot
.get_node_weight(prototype_edge_source)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => {
(prop_inner.id(), EdgeWeightKindDiscriminants::Prop)
Expand Down Expand Up @@ -391,7 +390,6 @@ impl AttributePrototype {
if let NodeWeight::AttributeValue(av_node_weight) = workspace_snapshot
.get_node_weight(attribute_value_target)
.await?
.as_ref()
{
attribute_value_ids.push(av_node_weight.id().into())
}
Expand Down Expand Up @@ -428,10 +426,8 @@ impl AttributePrototype {

Ok(match maybe_value_idxs.first().copied() {
Some(value_idx) => {
if let NodeWeight::AttributeValue(av_node_weight) = workspace_snapshot
.get_node_weight(value_idx)
.await?
.as_ref()
if let NodeWeight::AttributeValue(av_node_weight) =
workspace_snapshot.get_node_weight(value_idx).await?
{
Some(av_node_weight.id().into())
} else {
Expand Down
6 changes: 3 additions & 3 deletions lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl AttributePrototypeArgument {
)
.await?
{
match workspace_snapshot.get_node_weight(node_idx).await?.as_ref() {
match workspace_snapshot.get_node_weight(node_idx).await? {
NodeWeight::Content(inner) => {
let inner_addr_discrim: ContentAddressDiscriminants =
inner.content_address().into();
Expand Down Expand Up @@ -318,7 +318,7 @@ impl AttributePrototypeArgument {
.into_iter()
.next()
{
match workspace_snapshot.get_node_weight(target).await?.as_ref() {
match workspace_snapshot.get_node_weight(target).await? {
NodeWeight::Prop(inner) => {
return Ok(Some(ValueSource::Prop(inner.id().into())));
}
Expand Down Expand Up @@ -513,7 +513,7 @@ impl AttributePrototypeArgument {

for idx in apa_node_idxs {
let node_weight = workspace_snapshot.get_node_weight(idx).await?;
if let NodeWeight::AttributePrototypeArgument(apa_weight) = node_weight.as_ref() {
if let NodeWeight::AttributePrototypeArgument(apa_weight) = &node_weight {
if let Some(ArgumentTargets {
destination_component_id,
..
Expand Down
19 changes: 8 additions & 11 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,8 @@ impl AttributeValue {
let workspace_snapshot = ctx.workspace_snapshot()?;

let prop_node_index = workspace_snapshot.get_node_index_by_id(prop_id).await?;
if let NodeWeight::Prop(prop_inner) = workspace_snapshot
.get_node_weight(prop_node_index)
.await?
.as_ref()
if let NodeWeight::Prop(prop_inner) =
workspace_snapshot.get_node_weight(prop_node_index).await?
{
prop_inner.kind()
} else {
Expand Down Expand Up @@ -1277,7 +1275,6 @@ impl AttributeValue {
.workspace_snapshot()?
.get_node_weight(node_index)
.await?
.as_ref()
{
prop_map.insert(
prop_inner.name().to_string(),
Expand Down Expand Up @@ -1375,7 +1372,6 @@ impl AttributeValue {
match workspace_snapshot
.get_node_weight(element_prop_index)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => (prop_inner.id(), prop_inner.kind()),
_ => {
Expand Down Expand Up @@ -1620,7 +1616,6 @@ impl AttributeValue {
match workspace_snapshot
.get_node_weight(element_prop_index)
.await?
.as_ref()
{
NodeWeight::Prop(prop_inner) => (prop_inner.id(), prop_inner.kind()),
_ => {
Expand Down Expand Up @@ -1822,7 +1817,7 @@ impl AttributeValue {
view: Option<serde_json::Value>,
) -> AttributeValueResult<()> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let (_, av_node_weight) = {
let (av_idx, av_node_weight) = {
let av_idx = workspace_snapshot
.get_node_index_by_id(attribute_value_id)
.await?;
Expand Down Expand Up @@ -1867,6 +1862,7 @@ impl AttributeValue {
workspace_snapshot
.add_node(NodeWeight::AttributeValue(new_av_node_weight))
.await?;
workspace_snapshot.replace_references(av_idx).await?;

Ok(())
}
Expand All @@ -1881,7 +1877,7 @@ impl AttributeValue {
func_execution_pk: FuncExecutionPk,
) -> AttributeValueResult<()> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let (_av_idx, av_node_weight) = {
let (av_idx, av_node_weight) = {
let av_idx = workspace_snapshot
.get_node_index_by_id(attribute_value_id)
.await?;
Expand Down Expand Up @@ -1942,6 +1938,7 @@ impl AttributeValue {
workspace_snapshot
.add_node(NodeWeight::AttributeValue(new_av_node_weight))
.await?;
workspace_snapshot.replace_references(av_idx).await?;

Ok(())
}
Expand Down Expand Up @@ -1978,7 +1975,7 @@ impl AttributeValue {
.await?
{
let target_node_weight = workspace_snapshot.get_node_weight(target).await?;
if let NodeWeight::Prop(prop_node_weight) = target_node_weight.as_ref() {
if let NodeWeight::Prop(prop_node_weight) = &target_node_weight {
maybe_prop_id = match maybe_prop_id {
Some(already_found_prop_id) => {
return Err(AttributeValueError::MultiplePropsFound(
Expand Down Expand Up @@ -2097,7 +2094,7 @@ impl AttributeValue {
.pop()
{
let node_weight = workspace_snapshot.get_node_weight(ordering).await?;
if let NodeWeight::Ordering(ordering_weight) = node_weight.as_ref() {
if let NodeWeight::Ordering(ordering_weight) = node_weight {
Ok(ordering_weight
.order()
.clone()
Expand Down
6 changes: 4 additions & 2 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ impl Component {
if let NodeWeight::Content(content) = workspace_snapshot
.get_node_weight(maybe_schema_variant_index)
.await?
.as_ref()
{
let content_hash_discriminants: ContentAddressDiscriminants =
content.content_address().into();
Expand Down Expand Up @@ -833,7 +832,7 @@ impl Component {
.await?
{
let target_node_weight = workspace_snapshot.get_node_weight(target).await?;
if let NodeWeight::AttributeValue(_) = target_node_weight.as_ref() {
if let NodeWeight::AttributeValue(_) = target_node_weight {
maybe_root_attribute_value_id = match maybe_root_attribute_value_id {
Some(already_found_root_attribute_value_id) => {
return Err(ComponentError::MultipleRootAttributeValuesFound(
Expand Down Expand Up @@ -1243,6 +1242,9 @@ impl Component {
ctx.workspace_snapshot()?
.add_node(NodeWeight::Component(new_component_node_weight))
.await?;
ctx.workspace_snapshot()?
.replace_references(component_idx)
.await?;
}

let updated = ComponentContentV1::from(component.clone());
Expand Down
3 changes: 1 addition & 2 deletions lib/dal/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use tokio::time::Instant;
use veritech_client::{Client as VeritechClient, CycloneEncryptionKey};

use crate::layer_db_types::ContentTypes;
use crate::workspace_snapshot::node_weight::NodeWeight;
use crate::workspace_snapshot::{
conflict::Conflict, graph::WorkspaceSnapshotGraph, update::Update, vector_clock::VectorClockId,
};
Expand All @@ -37,7 +36,7 @@ use crate::{
};
use crate::{EncryptedSecret, Workspace};

pub type DalLayerDb = LayerDb<ContentTypes, EncryptedSecret, WorkspaceSnapshotGraph, NodeWeight>;
pub type DalLayerDb = LayerDb<ContentTypes, EncryptedSecret, WorkspaceSnapshotGraph>;

/// A context type which contains handles to common core service dependencies.
///
Expand Down
Loading