Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds update state indexing to MutableState #4224

Merged
merged 15 commits into from May 3, 2023
Merged

Adds update state indexing to MutableState #4224

merged 15 commits into from May 3, 2023

Conversation

mmcshane
Copy link
Contributor

What changed?
Adds update state indexing to MutableState and recover of update registry on page-in from data stored in MS.

Why?
Part of update feature for outcome polling. Also, long-running updates are likely to encounter shard reloads and/or workflow cache evictions so it is important to keep and restore in-flight accepted updates into the update registry.

How did you test it?
Unit tests

Potential risks

Is hotfix candidate?
No

@mmcshane mmcshane marked this pull request as ready for review April 27, 2023 15:17
@mmcshane mmcshane requested a review from a team as a code owner April 27, 2023 15:17
Copy link
Contributor

@MichaelSnowden MichaelSnowden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please split this into separate PRs?

workflowCtx = workflow.NewContext(c.shard, key, c.logger)
elem, err := c.PutIfNotExist(key, workflowCtx)
newCtx := workflow.NewContext(c.shardCtx, key, c.logger)
if err := c.tryRestore(ctx, c.shardCtx, newCtx); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do lazy restore only when UpdateRegistry is retrieved from workflow context?
implicitly doing a DB load when locking the workflow concerns me a little bit.

Copy link
Contributor Author

@mmcshane mmcshane Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be only when we fault the workflow.Context into cache, not every time we lock it. The reason I was ok with the load here is that I didn't seem like there would be many (any?) cases where we loaded the context object into cache but then didn't use MutableState in some way. Does that happen?

On the other hand, a lazy restore does allow me to remove that other trick to get avoid breaking all the unit tests. I'll see how it looks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the thing about lazy load is that it makes the Workflow.UpdateRegistry function pretty ugly. If we may need to restore the registry from mutable state at that point, we'll need to take a context.Context as a parameter (not too bad) but we'll also need to return an error in case either the mutable state load fails or the incomplete update events can't be loaded. It's weird to have a function that should just be an accessor that needs to take a context and return an error.

Only way around that I see would be a second implementation of update.Registry that returned an error from every call to indicate a broken state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've remove update restoration from this PR pursuant to @MichaelSnowden's request above that the PR be limited in scope. I will follow up with the same thing later. FWIW, it looks like restoring the UpdateRegistry might be best done in the ConsistencyChecker which is also where we load MutableState for a given workflow.Context.

Copy link
Member

@yycptt yycptt May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to have a function that should just be an accessor that needs to take a context and return an error.

I don't feel strongly about it actually, especially when comparing to other approaches I can think of.
re. consistency checker, I am not sure that's the only place workflow.Context is created.

(My original concern for adding the logic inside workflow cache is that it changes the rest of the system's view on this component & how the error from it should be handled. Previously it's only for in memory locking, so timeout simply means workflow is busy, and not persistence timeout. But I guess that doesn't matter any more. :) )

@mmcshane
Copy link
Contributor Author

Can you please split this into separate PRs?

This PR now includes only the code required to add the updateRecords field to MutableState and to durably CRUD the same.

Copy link
Contributor

@MichaelSnowden MichaelSnowden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please move the resetBlobMap and error-handling changes to a separate PR?

alexshtin
alexshtin previously approved these changes Apr 28, 2023
service/history/workflow/mutable_state.go Outdated Show resolved Hide resolved
}
compPtr := rec.GetCompletedPointer()
if compPtr == nil {
// not an error because update was found, but update has not completed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

service/history/workflow/test_util.go Outdated Show resolved Hide resolved
@alexshtin alexshtin dismissed their stale review April 28, 2023 22:03

Events need to be lazy loaded

return event, nil
}

func (ms *MutableStateImpl) GetAcceptedWorkflowExecutionUpdates(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know there's no caller for this methods so this is more for the next PR.

Do we have any limit on the # of accepted update for a workflow? If no or the limit is large, then I don't think this can work because there can be one history branch read request for each accepted update.
Even it's like 10 accepted update, it means 10 more read when loading mutable state.

I am also curious to know for an accepted update why do we have to load it's accepted event, the updated request is not used from what I can tell. but maybe I missed something.

Copy link
Contributor Author

@mmcshane mmcshane May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also been discussing the restoration strategy with Alex. It turns out that there's not single point or place in code where it's optimal to restore the full UpdateRegistry state for a given workflow. The next thing I intend to explore is lazily fetching individual update state from MutableState on a per-update basis as messages are processed (i.e., if there is no existing update for a given ID, peek into MS to see if something can be restored before proceeding.

To answer your first question directly: there is a limit to the total number of entries in this index but it's high (2000 has been suggested as the default).

For the last question - are you suggesting then that rather than storing a pointer into history for updates in the Accepted state, we "inline" that data into the UpdateInfo value so that no history fetch is required? Because I think that could work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we could store the pointer "just in case" but not dereference it here.

@mmcshane mmcshane closed this May 1, 2023
@mmcshane mmcshane reopened this May 1, 2023
Matt McShane added 9 commits May 2, 2023 12:10
A mechanism for storing pointers into the workflow history representing
update accepted or completed events. Storage added for Cassandra,
PosgtreSQL, MySQL, and SQLite.
Describes what string to use as the key in a map
Note that update accepted and completed events are cached.
Copy link
Contributor

@MichaelSnowden MichaelSnowden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of the code itself looks good to me, but I have a few other concerns:

  1. We have no test coverage for a lot of the queries here
  2. I think we need to more clearly distinguish the database record update and workflow update concepts in our comments
  3. There's also just some other nits I have

}

if _, err := tx.ReplaceIntoUpdateInfoMaps(ctx, rows); err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf("Failed to update update records. Failed to execute update query. Error: %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you specify what updates are being updated to prevent stuttering here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to maintain the format shared with with all the other map types (activity info, signal info, &c) so that when this line is eventually grepped out of a log it matches the pattern.

const Version = "1.9"
const Version = "1.10"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the schema version, not the Postgres version, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema version, correct.

@@ -1105,6 +1192,18 @@ func resetSignalInfoMap(
return sMap, encoding, nil
}

func resetBlobMap[K comparable](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a reset function? It looks more like it's just mapping all the values to the .Data field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Named this way because it serves the same purpose as resetTimerInfoMap, resetSignalInfoMap, and resetActivityInfoMap that already exist. But this one is generic over the map key type.

out := make(map[K][]byte, len(m))
var encoding enumspb.EncodingType
for k, blob := range m {
encoding = blob.EncodingType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this part belongs in this function. It's probably up to the caller to interpret whether values with different encoding types are acceptable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the 5 other functions that this one is a clone of. This is how it's done and we want the same behavior.

@@ -933,6 +952,7 @@ func (m *executionManagerImpl) toWorkflowMutableState(internState *InternalWorkf
SignalRequestedIds: internState.SignalRequestedIDs,
NextEventId: internState.NextEventID,
BufferedEvents: make([]*historypb.HistoryEvent, len(internState.BufferedEvents)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a point in pre-allocating this because we just assign a different slice to it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay but this isn't new? After previous comments I've been keeping the changeset to the minimum.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was meant to contrast with the one below. Not an actionable comment

common/persistence/cassandra/util.go Show resolved Hide resolved
common/persistence/sql/execution_state_map.go Show resolved Hide resolved
@mmcshane
Copy link
Contributor Author

mmcshane commented May 3, 2023

The behavior of the code itself looks good to me, but I have a few other concerns:

I want to point out that this is in large part a copy/paste exercise and the difficulty is in just finding the right places to make changes. I've tried to address your comments where possible but in many cases the less-than-ideal code is preferred as it conforms to the pre-existing expectation.

Copy link
Contributor

@MichaelSnowden MichaelSnowden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand the desire to maintain conventions with some of these changes, but it would be better to do some judicial preparatory refactoring (fixing the previous issues in an upstream PR), so that we don't perpetuate mistakes.

I also don't buy the argument of continuing to string-substitute hollow comments in order to follow suit. Since there's no risk of behavioral effects, we should add informative documentation, regardless of how heterogeneous it makes the code, since it's likely that the original author, you, will have the best understanding for a long time.

If we add better documentation to the schemas and protos, then this change looks good to me.

Comment on lines +197 to +200
shard_id INT NOT NULL,
namespace_id BINARY(16) NOT NULL,
workflow_id VARCHAR(255) NOT NULL,
run_id BINARY(16) NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there's tabs here and spaces are used elsewhere in this file

@@ -933,6 +952,7 @@ func (m *executionManagerImpl) toWorkflowMutableState(internState *InternalWorkf
SignalRequestedIds: internState.SignalRequestedIDs,
NextEventId: internState.NextEventID,
BufferedEvents: make([]*historypb.HistoryEvent, len(internState.BufferedEvents)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was meant to contrast with the one below. Not an actionable comment

@mmcshane mmcshane merged commit 5783e78 into temporalio:master May 3, 2023
10 checks passed
@mmcshane mmcshane deleted the mpm/update-durability branch May 3, 2023 16:24
samanbarghi pushed a commit to samanbarghi/temporal that referenced this pull request May 5, 2023
Add UpdateInfo records to MutableState

A mechanism for storing pointers into the workflow history representing
update accepted or completed events. Storage added for Cassandra,
PosgtreSQL, MySQL, and SQLite.
wxing1292 added a commit that referenced this pull request May 11, 2023
mmcshane pushed a commit that referenced this pull request May 31, 2023
* Revert "Capture UpdateInfos in MutableState mutation (#4372)"

This reverts commit c719eac.

* Revert "Adds update state indexing to MutableState (#4224)"

This reverts commit 5783e78.

* Add UpdateInfo map to WorkflowExecutionInfo

This inlines the UpdateInfo index into the pre-existing execution blob
so that downstream persistence implementations won't see any change.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants