Skip to content

Commit

Permalink
Fix race condition when check for existing view ids was too early (#420)
Browse files Browse the repository at this point in the history
* Move check closer to where we store it

* Add entry to CHANGELOG.md
  • Loading branch information
adzialocha committed Jun 22, 2023
1 parent 7cb0732 commit 3407076
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Bind untrusted user filter arguments in SQL query [#358](https://github.com/p2panda/aquadoggo/pull/358)
- Fix insertion of view before document is materialized [#413](https://github.com/p2panda/aquadoggo/pull/413)
- Handle duplicate document view insertions in reduce task [#410](https://github.com/p2panda/aquadoggo/pull/410)
- Fix race condition when check for existing view ids was too early [#420](https://github.com/p2panda/aquadoggo/pull/420)

### Open Sauce

Expand Down
61 changes: 29 additions & 32 deletions aquadoggo/src/materializer/tasks/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,6 @@ use crate::materializer::TaskInput;
pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskInput> {
debug!("Working on {}", input);

// If this task is concerned with a document view then we can first check if it has actually
// already been materialized. If so, we exit this task immediately and return no new tasks.
if let Some(document_view_id) = &input.document_view_id {
let document_view_exists = context
.store
.get_document_by_view_id(document_view_id)
.await
.map_err(|err| TaskError::Critical(err.to_string()))?
.is_some();

if document_view_exists {
return Ok(None);
}
}

// Find out which document we are handling
let document_id = match resolve_document_id(&context, &input).await? {
Some(document_id) => Ok(document_id),
Expand Down Expand Up @@ -174,27 +159,39 @@ async fn reduce_document_view<O: AsOperation + WithId<OperationId> + WithPublicK
)]));
};

// Insert the new document view into the database
context
// Make sure to not store document view twice
let document_view_exists = context
.store
.insert_document_view(
&document.view().unwrap(),
document.id(),
document.schema_id(),
)
.get_document_by_view_id(document_view_id)
.await
.map_err(|err| TaskError::Critical(err.to_string()))?;
.map_err(|err| TaskError::Critical(err.to_string()))?
.is_some();

info!("Stored {} document view {}", document, document.view_id());
if !document_view_exists {
context
.store
.insert_document_view(
&document.view().unwrap(),
document.id(),
document.schema_id(),
)
.await
.map_err(|err| TaskError::Critical(err.to_string()))?;

info!("Stored {} document view {}", document, document.view_id());

debug!(
"Dispatch dependency task for view with id: {}",
document.view_id()
);
Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(document.view_id().to_owned())),
)]))
debug!(
"Dispatch dependency task for view with id: {}",
document.view_id()
);

Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(document.view_id().to_owned())),
)]))
} else {
Ok(None)
}
}

/// Helper method to reduce an operation graph to the latest document view, returning the
Expand Down

0 comments on commit 3407076

Please sign in to comment.