Skip to content

Commit

Permalink
Remove unused functions in shard interface (#4255)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed May 1, 2023
1 parent 2aa5481 commit 80687d7
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 378 deletions.
503 changes: 231 additions & 272 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

Expand Up @@ -54,7 +54,7 @@ message ShardInfo {
int32 stolen_since_renew = 6;
google.protobuf.Timestamp update_time = 7 [(gogoproto.stdtime) = true];
reserved 8;
int64 namespace_notification_version = 9;
reserved 9;
reserved 10;
reserved 11;
reserved 12;
Expand Down
3 changes: 0 additions & 3 deletions service/history/historyEngine.go
Expand Up @@ -324,9 +324,6 @@ func (e *historyEngineImpl) registerNamespaceStateChangeCallback() {
queueProcessor.FailoverNamespace(ns.ID().String())
}
}

// for backward compatibility
_ = e.shard.UpdateNamespaceNotificationVersion(ns.NotificationVersion() + 1)
})
}

Expand Down
22 changes: 8 additions & 14 deletions service/history/shard/context.go
Expand Up @@ -66,6 +66,14 @@ type (
GetMetricsHandler() metrics.Handler
GetTimeSource() clock.TimeSource

GetRemoteAdminClient(string) (adminservice.AdminServiceClient, error)
GetHistoryClient() historyservice.HistoryServiceClient
GetPayloadSerializer() serialization.Serializer

GetSearchAttributesProvider() searchattribute.Provider
GetSearchAttributesMapperProvider() searchattribute.MapperProvider
GetArchivalMetadata() archiver.ArchivalMetadata

GetEngine(ctx context.Context) (Engine, error)

AssertOwnership(ctx context.Context) error
Expand All @@ -86,17 +94,11 @@ type (

UpdateRemoteClusterInfo(cluster string, ackTaskID int64, ackTimestamp time.Time)

GetMaxTaskIDForCurrentRangeID() int64

SetCurrentTime(cluster string, currentTime time.Time)
GetCurrentTime(cluster string) time.Time
GetLastUpdatedTime() time.Time

GetReplicationStatus(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error)

// TODO: deprecate UpdateNamespaceNotificationVersion in v1.21 and remove
// NamespaceNotificationVersion from shardInfo proto blob
UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error
UpdateHandoverNamespace(ns *namespace.Namespace, deletedFromDb bool)

AppendHistoryEvents(ctx context.Context, request *persistence.AppendHistoryNodesRequest, namespaceID namespace.ID, execution commonpb.WorkflowExecution) (int, error)
Expand All @@ -112,14 +114,6 @@ type (
// If branchToken != nil, then delete history also, otherwise leave history.
DeleteWorkflowExecution(ctx context.Context, workflowKey definition.WorkflowKey, branchToken []byte, startTime *time.Time, closeTime *time.Time, closeExecutionVisibilityTaskID int64, stage *tasks.DeleteWorkflowExecutionStage) error

GetRemoteAdminClient(cluster string) (adminservice.AdminServiceClient, error)
GetHistoryClient() historyservice.HistoryServiceClient
GetPayloadSerializer() serialization.Serializer

GetSearchAttributesProvider() searchattribute.Provider
GetSearchAttributesMapperProvider() searchattribute.MapperProvider
GetArchivalMetadata() archiver.ArchivalMetadata

Unload()
}
)
41 changes: 10 additions & 31 deletions service/history/shard/context_impl.go
Expand Up @@ -228,13 +228,6 @@ func (s *ContextImpl) GetEngine(
return s.engineFuture.Get(ctx)
}

func (s *ContextImpl) GetMaxTaskIDForCurrentRangeID() int64 {
s.rLock()
defer s.rUnlock()
// maxTaskSequenceNumber is the exclusive upper bound of task ID for current range.
return s.maxTaskSequenceNumber - 1
}

func (s *ContextImpl) AssertOwnership(
ctx context.Context,
) error {
Expand Down Expand Up @@ -424,19 +417,6 @@ func (s *ContextImpl) UpdateReplicatorDLQAckLevel(
return nil
}

func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error {
s.wLock()
defer s.wUnlock()

// update namespace notification version.
if s.shardInfo.NamespaceNotificationVersion < namespaceNotificationVersion {
s.shardInfo.NamespaceNotificationVersion = namespaceNotificationVersion
return s.updateShardInfoLocked()
}

return nil
}

func (s *ContextImpl) UpdateHandoverNamespace(ns *namespace.Namespace, deletedFromDb bool) {
nsName := ns.Name()
// NOTE: replication state field won't be replicated and currently we only update a namespace
Expand Down Expand Up @@ -1230,7 +1210,7 @@ func (s *ContextImpl) GetCurrentTime(cluster string) time.Time {
return s.timeSource.Now().UTC()
}

func (s *ContextImpl) GetLastUpdatedTime() time.Time {
func (s *ContextImpl) getLastUpdatedTime() time.Time {
s.rLock()
defer s.rUnlock()
return s.lastUpdated
Expand Down Expand Up @@ -1297,7 +1277,7 @@ func (s *ContextImpl) handleWriteErrorAndUpdateMaxReadLevelLocked(err error, new
func (s *ContextImpl) maybeRecordShardAcquisitionLatency(ownershipChanged bool) {
if ownershipChanged {
s.GetMetricsHandler().Timer(metrics.ShardContextAcquisitionLatency.GetMetricName()).
Record(s.GetCurrentTime(s.GetClusterMetadata().GetCurrentClusterName()).Sub(s.GetLastUpdatedTime()),
Record(s.GetCurrentTime(s.GetClusterMetadata().GetCurrentClusterName()).Sub(s.getLastUpdatedTime()),
metrics.OperationTag(metrics.ShardInfoScope),
)
}
Expand Down Expand Up @@ -1896,15 +1876,14 @@ func copyShardInfo(shardInfo *persistencespb.ShardInfo) *persistencespb.ShardInf
}

return &persistencespb.ShardInfo{
ShardId: shardInfo.ShardId,
Owner: shardInfo.Owner,
RangeId: shardInfo.RangeId,
StolenSinceRenew: shardInfo.StolenSinceRenew,
NamespaceNotificationVersion: shardInfo.NamespaceNotificationVersion,
ReplicationDlqAckLevel: maps.Clone(shardInfo.ReplicationDlqAckLevel),
UpdateTime: shardInfo.UpdateTime,
QueueAckLevels: queueAckLevels,
QueueStates: queueStates,
ShardId: shardInfo.ShardId,
Owner: shardInfo.Owner,
RangeId: shardInfo.RangeId,
StolenSinceRenew: shardInfo.StolenSinceRenew,
ReplicationDlqAckLevel: maps.Clone(shardInfo.ReplicationDlqAckLevel),
UpdateTime: shardInfo.UpdateTime,
QueueAckLevels: queueAckLevels,
QueueStates: queueStates,
}
}

Expand Down
50 changes: 4 additions & 46 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions service/history/timerQueueStandbyTaskExecutor.go
Expand Up @@ -420,17 +420,6 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTimeoutTask(
)
}

func (t *timerQueueStandbyTaskExecutor) getStandbyClusterTime() time.Time {
// time of remote cluster in the shard is delayed by "StandbyClusterDelay"
// so to get the current accurate remote cluster time, need to add it back
currentTime := t.shard.GetCurrentTime(t.clusterName)
if t.clusterName != t.shard.GetClusterMetadata().GetCurrentClusterName() {
currentTime.Add(t.shard.GetConfig().StandbyClusterDelay())
}

return currentTime
}

func (t *timerQueueStandbyTaskExecutor) getTimerSequence(
mutableState workflow.MutableState,
) workflow.TimerSequence {
Expand Down

0 comments on commit 80687d7

Please sign in to comment.