Skip to content

Commit

Permalink
Update interfaces and structs for admin use (#2533)
Browse files Browse the repository at this point in the history
* Update interface and struct for admin use
  • Loading branch information
yux0 committed Feb 23, 2022
1 parent a5a91ff commit c479ea3
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 193 deletions.
68 changes: 34 additions & 34 deletions common/persistence/serialization/serializer.go
Expand Up @@ -282,12 +282,12 @@ func (e *DeserializationError) Error() string {
}

func (t *serializerImpl) ShardInfoToBlob(info *persistencespb.ShardInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) ShardInfoFromBlob(data *commonpb.DataBlob, clusterName string) (*persistencespb.ShardInfo, error) {
shardInfo := &persistencespb.ShardInfo{}
err := proto3DecodeBlob(data, shardInfo)
err := ProtoDecodeBlob(data, shardInfo)

if err != nil {
return nil, err
Expand Down Expand Up @@ -317,124 +317,124 @@ func (t *serializerImpl) ShardInfoFromBlob(data *commonpb.DataBlob, clusterName
}

func (t *serializerImpl) NamespaceDetailToBlob(info *persistencespb.NamespaceDetail, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) NamespaceDetailFromBlob(data *commonpb.DataBlob) (*persistencespb.NamespaceDetail, error) {
result := &persistencespb.NamespaceDetail{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) HistoryTreeInfoToBlob(info *persistencespb.HistoryTreeInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) HistoryTreeInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.HistoryTreeInfo, error) {
result := &persistencespb.HistoryTreeInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) HistoryBranchToBlob(info *persistencespb.HistoryBranch, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) HistoryBranchFromBlob(data *commonpb.DataBlob) (*persistencespb.HistoryBranch, error) {
result := &persistencespb.HistoryBranch{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) WorkflowExecutionInfoToBlob(info *persistencespb.WorkflowExecutionInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) WorkflowExecutionInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.WorkflowExecutionInfo, error) {
result := &persistencespb.WorkflowExecutionInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) WorkflowExecutionStateToBlob(info *persistencespb.WorkflowExecutionState, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) WorkflowExecutionStateFromBlob(data *commonpb.DataBlob) (*persistencespb.WorkflowExecutionState, error) {
result := &persistencespb.WorkflowExecutionState{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) ActivityInfoToBlob(info *persistencespb.ActivityInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) ActivityInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.ActivityInfo, error) {
result := &persistencespb.ActivityInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) ChildExecutionInfoToBlob(info *persistencespb.ChildExecutionInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) ChildExecutionInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.ChildExecutionInfo, error) {
result := &persistencespb.ChildExecutionInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) SignalInfoToBlob(info *persistencespb.SignalInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) SignalInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.SignalInfo, error) {
result := &persistencespb.SignalInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) RequestCancelInfoToBlob(info *persistencespb.RequestCancelInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) RequestCancelInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.RequestCancelInfo, error) {
result := &persistencespb.RequestCancelInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) TimerInfoToBlob(info *persistencespb.TimerInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) TimerInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TimerInfo, error) {
result := &persistencespb.TimerInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) TaskInfoToBlob(info *persistencespb.AllocatedTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) TaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.AllocatedTaskInfo, error) {
result := &persistencespb.AllocatedTaskInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) TaskQueueInfoToBlob(info *persistencespb.TaskQueueInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
return ProtoEncodeBlob(info, encodingType)
}

func (t *serializerImpl) TaskQueueInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TaskQueueInfo, error) {
result := &persistencespb.TaskQueueInfo{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func (t *serializerImpl) ChecksumToBlob(checksum *persistencespb.Checksum, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
// nil is replaced with empty object because it is not supported for "checksum" field in DB.
if checksum == nil {
checksum = &persistencespb.Checksum{}
}
return proto3EncodeBlob(checksum, encodingType)
return ProtoEncodeBlob(checksum, encodingType)
}

func (t *serializerImpl) ChecksumFromBlob(data *commonpb.DataBlob) (*persistencespb.Checksum, error) {
result := &persistencespb.Checksum{}
err := proto3DecodeBlob(data, result)
err := ProtoDecodeBlob(data, result)
if err != nil || result.GetFlavor() == enumsspb.CHECKSUM_FLAVOR_UNSPECIFIED {
// If result is an empty struct (Flavor is unspecified), replace it with nil, because everywhere in the code checksum is pointer type.
return nil, err
Expand All @@ -452,15 +452,15 @@ func (t *serializerImpl) QueueMetadataFromBlob(data *commonpb.DataBlob) (*persis
}

func (t *serializerImpl) ReplicationTaskToBlob(replicationTask *replicationspb.ReplicationTask, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(replicationTask, encodingType)
return ProtoEncodeBlob(replicationTask, encodingType)
}

func (t *serializerImpl) ReplicationTaskFromBlob(data *commonpb.DataBlob) (*replicationspb.ReplicationTask, error) {
result := &replicationspb.ReplicationTask{}
return result, proto3DecodeBlob(data, result)
return result, ProtoDecodeBlob(data, result)
}

func proto3DecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
func ProtoDecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
if data == nil {
// TODO: should we return nil or error?
return NewDeserializationError("cannot decode nil")
Expand Down Expand Up @@ -490,7 +490,7 @@ func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
case enumspb.ENCODING_TYPE_JSON:
return codec.NewJSONPBEncoder().Decode(data.Data, result)
case enumspb.ENCODING_TYPE_PROTO3:
return proto3DecodeBlob(data, result)
return ProtoDecodeBlob(data, result)
default:
return NewUnknownEncodingTypeError(data.EncodingType)
}
Expand All @@ -515,13 +515,13 @@ func encodeBlob(o proto.Message, encoding enumspb.EncodingType) (*commonpb.DataB
EncodingType: enumspb.ENCODING_TYPE_JSON,
}, nil
case enumspb.ENCODING_TYPE_PROTO3:
return proto3EncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3)
return ProtoEncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3)
default:
return nil, NewUnknownEncodingTypeError(encoding)
}
}

func proto3EncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) {
func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) {
if encoding != enumspb.ENCODING_TYPE_PROTO3 {
return nil, NewUnknownEncodingTypeError(encoding)
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngineInterfaces.go
Expand Up @@ -92,8 +92,8 @@ type (
getFinishedChan() <-chan struct{}
readTimerTasks() ([]tasks.Task, tasks.Task, bool, error)
completeTimerTask(time.Time, int64)
getAckLevel() timerKey
getReadLevel() timerKey
getAckLevel() tasks.Key
getReadLevel() tasks.Key
updateAckLevel() error
}
)
8 changes: 4 additions & 4 deletions service/history/historyEngineInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions service/history/queueProcessor.go
Expand Up @@ -103,11 +103,11 @@ func newQueueProcessorBase(

var taskProcessor *taskProcessor
if !options.EnablePriorityTaskProcessor() {
taskProcessorOptions := taskProcessorOptions{
queueSize: options.BatchSize(),
workerCount: options.WorkerCount(),
taskProcessorOptions := TaskProcessorOptions{
QueueSize: options.BatchSize(),
WorkerCount: options.WorkerCount(),
}
taskProcessor = newTaskProcessor(taskProcessorOptions, shard, historyCache, logger)
taskProcessor = NewTaskProcessor(taskProcessorOptions, shard, historyCache, logger)
}

p := &queueProcessorBase{
Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *queueProcessorBase) Start() {
defer p.logger.Info("", tag.LifeCycleStarted, tag.ComponentTransferQueue)

if p.taskProcessor != nil {
p.taskProcessor.start()
p.taskProcessor.Start()
}
p.shutdownWG.Add(1)
p.notifyNewTask()
Expand All @@ -164,7 +164,7 @@ func (p *queueProcessorBase) Stop() {
}

if p.taskProcessor != nil {
p.taskProcessor.stop()
p.taskProcessor.Stop()
}
}

Expand Down Expand Up @@ -280,7 +280,7 @@ func (p *queueProcessorBase) submitTask(
) bool {

return p.taskProcessor.addTask(
newTaskInfo(
NewTaskInfo(
p.processor,
taskInfo,
initializeLoggerForTask(p.shard.GetShardID(), taskInfo, p.logger),
Expand Down
22 changes: 11 additions & 11 deletions service/history/taskProcessor.go
Expand Up @@ -51,9 +51,9 @@ const (
)

type (
taskProcessorOptions struct {
queueSize int
workerCount int
TaskProcessorOptions struct {
QueueSize int
WorkerCount int
}

taskInfo struct {
Expand Down Expand Up @@ -92,7 +92,7 @@ type (
}
)

func newTaskInfo(
func NewTaskInfo(
processor taskExecutor,
task tasks.Task,
logger log.Logger,
Expand All @@ -109,36 +109,36 @@ func newTaskInfo(
}
}

func newTaskProcessor(
options taskProcessorOptions,
func NewTaskProcessor(
options TaskProcessorOptions,
shard shard.Context,
historyCache workflow.Cache,
logger log.Logger,
) *taskProcessor {

var workerNotificationChs []chan struct{}
for index := 0; index < options.workerCount; index++ {
for index := 0; index < options.WorkerCount; index++ {
workerNotificationChs = append(workerNotificationChs, make(chan struct{}, 1))
}

base := &taskProcessor{
shard: shard,
cache: historyCache,
shutdownCh: make(chan struct{}),
tasksCh: make(chan *taskInfo, options.queueSize),
tasksCh: make(chan *taskInfo, options.QueueSize),
config: shard.GetConfig(),
logger: logger,
metricsClient: shard.GetMetricsClient(),
timeSource: shard.GetTimeSource(),
workerNotificationChans: workerNotificationChs,
retryPolicy: common.CreatePersistenceRetryPolicy(),
numOfWorker: options.workerCount,
numOfWorker: options.WorkerCount,
}

return base
}

func (t *taskProcessor) start() {
func (t *taskProcessor) Start() {
for i := 0; i < t.numOfWorker; i++ {
t.workerWG.Add(1)
notificationChan := t.workerNotificationChans[i]
Expand All @@ -147,7 +147,7 @@ func (t *taskProcessor) start() {
t.logger.Info("Task processor started.")
}

func (t *taskProcessor) stop() {
func (t *taskProcessor) Stop() {
close(t.shutdownCh)
if success := common.AwaitWaitGroup(&t.workerWG, time.Minute); !success {
t.logger.Warn("Task processor timed out on shutdown.")
Expand Down

0 comments on commit c479ea3

Please sign in to comment.