-
Notifications
You must be signed in to change notification settings - Fork 800
/
dataStoreInterfaces.go
1017 lines (911 loc) · 40.1 KB
/
dataStoreInterfaces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2017-2020 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package persistence
import (
"context"
"fmt"
"time"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/types"
)
type (
//////////////////////////////////////////////////////////////////////
// Persistence interface is a lower layer of dataInterface.
// The intention is to let different persistence implementation(SQL,Cassandra/etc) share some common logic
// Right now the only common part is serialization/deserialization, and only ExecutionManager/HistoryManager need it.
// TaskManager are the same.
//////////////////////////////////////////////////////////////////////
// ShardStore is the lower level of ShardManager
ShardStore interface {
Closeable
GetName() string
CreateShard(ctx context.Context, request *InternalCreateShardRequest) error
GetShard(ctx context.Context, request *InternalGetShardRequest) (*InternalGetShardResponse, error)
UpdateShard(ctx context.Context, request *InternalUpdateShardRequest) error
}
// TaskStore is a lower level of TaskManager
TaskStore interface {
Closeable
GetName() string
LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error)
DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error
GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error)
CreateTasks(ctx context.Context, request *InternalCreateTasksRequest) (*CreateTasksResponse, error)
GetTasks(ctx context.Context, request *GetTasksRequest) (*InternalGetTasksResponse, error)
CompleteTask(ctx context.Context, request *CompleteTaskRequest) error
// CompleteTasksLessThan completes tasks less than or equal to the given task id
// This API takes a limit parameter which specifies the count of maxRows that
// can be deleted. This parameter may be ignored by the underlying storage, but
// its mandatory to specify it. On success this method returns the number of rows
// actually deleted. If the underlying storage doesn't support "limit", all rows
// less than or equal to taskID will be deleted.
// On success, this method returns:
// - number of rows actually deleted, if limit is honored
// - UnknownNumRowsDeleted, when all rows below value are deleted
CompleteTasksLessThan(ctx context.Context, request *CompleteTasksLessThanRequest) (*CompleteTasksLessThanResponse, error)
// GetOrphanTasks returns tasks that exist as records in the database but are part of task lists which
// _do not_ exist in the database. They are therefore unreachable and no longer represent valid items
// that can be legitimately acted upon.
GetOrphanTasks(ctx context.Context, request *GetOrphanTasksRequest) (*GetOrphanTasksResponse, error)
}
// DomainStore is a lower level of DomainManager
DomainStore interface {
Closeable
GetName() string
CreateDomain(ctx context.Context, request *InternalCreateDomainRequest) (*CreateDomainResponse, error)
GetDomain(ctx context.Context, request *GetDomainRequest) (*InternalGetDomainResponse, error)
UpdateDomain(ctx context.Context, request *InternalUpdateDomainRequest) error
DeleteDomain(ctx context.Context, request *DeleteDomainRequest) error
DeleteDomainByName(ctx context.Context, request *DeleteDomainByNameRequest) error
ListDomains(ctx context.Context, request *ListDomainsRequest) (*InternalListDomainsResponse, error)
GetMetadata(ctx context.Context) (*GetMetadataResponse, error)
}
// ExecutionStore is used to manage workflow executions for Persistence layer
ExecutionStore interface {
Closeable
GetName() string
GetShardID() int
//The below three APIs are related to serialization/deserialization
GetWorkflowExecution(ctx context.Context, request *InternalGetWorkflowExecutionRequest) (*InternalGetWorkflowExecutionResponse, error)
UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error
ConflictResolveWorkflowExecution(ctx context.Context, request *InternalConflictResolveWorkflowExecutionRequest) error
CreateWorkflowExecution(ctx context.Context, request *InternalCreateWorkflowExecutionRequest) (*CreateWorkflowExecutionResponse, error)
DeleteWorkflowExecution(ctx context.Context, request *DeleteWorkflowExecutionRequest) error
DeleteCurrentWorkflowExecution(ctx context.Context, request *DeleteCurrentWorkflowExecutionRequest) error
GetCurrentExecution(ctx context.Context, request *GetCurrentExecutionRequest) (*GetCurrentExecutionResponse, error)
IsWorkflowExecutionExists(ctx context.Context, request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error)
// Transfer task related methods
GetTransferTasks(ctx context.Context, request *GetTransferTasksRequest) (*GetTransferTasksResponse, error)
CompleteTransferTask(ctx context.Context, request *CompleteTransferTaskRequest) error
RangeCompleteTransferTask(ctx context.Context, request *RangeCompleteTransferTaskRequest) (*RangeCompleteTransferTaskResponse, error)
// Cross-cluster task related methods
GetCrossClusterTasks(ctx context.Context, request *GetCrossClusterTasksRequest) (*GetCrossClusterTasksResponse, error)
CompleteCrossClusterTask(ctx context.Context, request *CompleteCrossClusterTaskRequest) error
RangeCompleteCrossClusterTask(ctx context.Context, request *RangeCompleteCrossClusterTaskRequest) (*RangeCompleteCrossClusterTaskResponse, error)
// Replication task related methods
GetReplicationTasks(ctx context.Context, request *GetReplicationTasksRequest) (*InternalGetReplicationTasksResponse, error)
CompleteReplicationTask(ctx context.Context, request *CompleteReplicationTaskRequest) error
RangeCompleteReplicationTask(ctx context.Context, request *RangeCompleteReplicationTaskRequest) (*RangeCompleteReplicationTaskResponse, error)
PutReplicationTaskToDLQ(ctx context.Context, request *InternalPutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*InternalGetReplicationTasksFromDLQResponse, error)
GetReplicationDLQSize(ctx context.Context, request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
DeleteReplicationTaskFromDLQ(ctx context.Context, request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *RangeDeleteReplicationTaskFromDLQRequest) (*RangeDeleteReplicationTaskFromDLQResponse, error)
CreateFailoverMarkerTasks(ctx context.Context, request *CreateFailoverMarkersRequest) error
// Timer related methods.
GetTimerIndexTasks(ctx context.Context, request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
CompleteTimerTask(ctx context.Context, request *CompleteTimerTaskRequest) error
RangeCompleteTimerTask(ctx context.Context, request *RangeCompleteTimerTaskRequest) (*RangeCompleteTimerTaskResponse, error)
// Scan related methods
ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*InternalListConcreteExecutionsResponse, error)
ListCurrentExecutions(ctx context.Context, request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error)
}
// HistoryStore is to manager workflow history events
HistoryStore interface {
Closeable
GetName() string
// The below are history V2 APIs
// V2 regards history events growing as a tree, decoupled from workflow concepts
// AppendHistoryNodes add(or override) a node to a history branch
AppendHistoryNodes(ctx context.Context, request *InternalAppendHistoryNodesRequest) error
// ReadHistoryBranch returns history node data for a branch
ReadHistoryBranch(ctx context.Context, request *InternalReadHistoryBranchRequest) (*InternalReadHistoryBranchResponse, error)
// ForkHistoryBranch forks a new branch from a old branch
ForkHistoryBranch(ctx context.Context, request *InternalForkHistoryBranchRequest) (*InternalForkHistoryBranchResponse, error)
// DeleteHistoryBranch removes a branch
DeleteHistoryBranch(ctx context.Context, request *InternalDeleteHistoryBranchRequest) error
// GetHistoryTree returns all branch information of a tree
GetHistoryTree(ctx context.Context, request *InternalGetHistoryTreeRequest) (*InternalGetHistoryTreeResponse, error)
// GetAllHistoryTreeBranches returns all branches of all trees
GetAllHistoryTreeBranches(ctx context.Context, request *GetAllHistoryTreeBranchesRequest) (*GetAllHistoryTreeBranchesResponse, error)
}
// VisibilityStore is the store interface for visibility
VisibilityStore interface {
Closeable
GetName() string
RecordWorkflowExecutionStarted(ctx context.Context, request *InternalRecordWorkflowExecutionStartedRequest) error
RecordWorkflowExecutionClosed(ctx context.Context, request *InternalRecordWorkflowExecutionClosedRequest) error
RecordWorkflowExecutionUninitialized(ctx context.Context, request *InternalRecordWorkflowExecutionUninitializedRequest) error
UpsertWorkflowExecution(ctx context.Context, request *InternalUpsertWorkflowExecutionRequest) error
ListOpenWorkflowExecutions(ctx context.Context, request *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutions(ctx context.Context, request *InternalListWorkflowExecutionsRequest) (*InternalListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByType(ctx context.Context, request *InternalListWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByType(ctx context.Context, request *InternalListWorkflowExecutionsByTypeRequest) (*InternalListWorkflowExecutionsResponse, error)
ListOpenWorkflowExecutionsByWorkflowID(ctx context.Context, request *InternalListWorkflowExecutionsByWorkflowIDRequest) (*InternalListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByWorkflowID(ctx context.Context, request *InternalListWorkflowExecutionsByWorkflowIDRequest) (*InternalListWorkflowExecutionsResponse, error)
ListClosedWorkflowExecutionsByStatus(ctx context.Context, request *InternalListClosedWorkflowExecutionsByStatusRequest) (*InternalListWorkflowExecutionsResponse, error)
GetClosedWorkflowExecution(ctx context.Context, request *InternalGetClosedWorkflowExecutionRequest) (*InternalGetClosedWorkflowExecutionResponse, error)
DeleteWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
}
ConfigStore interface {
Closeable
FetchConfig(ctx context.Context, configType ConfigType) (*InternalConfigStoreEntry, error)
UpdateConfig(ctx context.Context, value *InternalConfigStoreEntry) error
}
InternalConfigStoreEntry struct {
RowType int
Version int64
Timestamp time.Time
Values *DataBlob
}
// Queue is a store to enqueue and get messages
Queue interface {
Closeable
EnqueueMessage(ctx context.Context, messagePayload []byte) error
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*InternalQueueMessage, error)
DeleteMessagesBefore(ctx context.Context, messageID int64) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetAckLevels(ctx context.Context) (map[string]int64, error)
EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) error
ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*InternalQueueMessage, []byte, error)
DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
GetDLQSize(ctx context.Context) (int64, error)
}
// InternalQueueMessage is the message that stores in the queue
InternalQueueMessage struct {
ID int64 `json:"message_id"`
QueueType QueueType `json:"queue_type"`
Payload []byte `json:"message_payload"`
}
// DataBlob represents a blob for any binary data.
// It contains raw data, and metadata(right now only encoding) in other field
// Note that it should be only used for Persistence layer, below dataInterface and application(historyEngine/etc)
DataBlob struct {
Encoding common.EncodingType
Data []byte
}
// InternalCreateWorkflowExecutionRequest is used to write a new workflow execution
InternalCreateWorkflowExecutionRequest struct {
RangeID int64
Mode CreateWorkflowMode
PreviousRunID string
PreviousLastWriteVersion int64
NewWorkflowSnapshot InternalWorkflowSnapshot
}
// InternalGetReplicationTasksResponse is the response to GetReplicationTask
InternalGetReplicationTasksResponse struct {
Tasks []*InternalReplicationTaskInfo
NextPageToken []byte
}
// InternalPutReplicationTaskToDLQRequest is used to put a replication task to dlq
InternalPutReplicationTaskToDLQRequest struct {
SourceClusterName string
TaskInfo *InternalReplicationTaskInfo
}
// InternalGetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
InternalGetReplicationTasksFromDLQResponse = InternalGetReplicationTasksResponse
// InternalReplicationTaskInfo describes the replication task created for replication of history events
InternalReplicationTaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
TaskType int
FirstEventID int64
NextEventID int64
Version int64
ScheduledID int64
BranchToken []byte
NewRunBranchToken []byte
CreationTime time.Time
}
// InternalWorkflowExecutionInfo describes a workflow execution for Persistence Interface
InternalWorkflowExecutionInfo struct {
DomainID string
WorkflowID string
RunID string
FirstExecutionRunID string
ParentDomainID string
ParentWorkflowID string
ParentRunID string
InitiatedID int64
CompletionEventBatchID int64
CompletionEvent *DataBlob
TaskList string
WorkflowTypeName string
WorkflowTimeout time.Duration
DecisionStartToCloseTimeout time.Duration
ExecutionContext []byte
State int
CloseStatus int
LastFirstEventID int64
LastEventTaskID int64
NextEventID int64
LastProcessedEvent int64
StartTimestamp time.Time
LastUpdatedTimestamp time.Time
CreateRequestID string
SignalCount int32
DecisionVersion int64
DecisionScheduleID int64
DecisionStartedID int64
DecisionRequestID string
DecisionTimeout time.Duration
DecisionAttempt int64
DecisionStartedTimestamp time.Time
DecisionScheduledTimestamp time.Time
DecisionOriginalScheduledTimestamp time.Time
CancelRequested bool
CancelRequestID string
StickyTaskList string
StickyScheduleToStartTimeout time.Duration
ClientLibraryVersion string
ClientFeatureVersion string
ClientImpl string
AutoResetPoints *DataBlob
// for retry
Attempt int32
HasRetryPolicy bool
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
ExpirationTime time.Time
MaximumAttempts int32
NonRetriableErrors []string
BranchToken []byte
CronSchedule string
ExpirationInterval time.Duration
Memo map[string][]byte
SearchAttributes map[string][]byte
PartitionConfig map[string]string
// attributes which are not related to mutable state at all
HistorySize int64
IsCron bool
}
// InternalWorkflowMutableState indicates workflow related state for Persistence Interface
InternalWorkflowMutableState struct {
ExecutionInfo *InternalWorkflowExecutionInfo
VersionHistories *DataBlob
ReplicationState *ReplicationState // TODO: remove this after all 2DC workflows complete
ActivityInfos map[int64]*InternalActivityInfo
TimerInfos map[string]*TimerInfo
ChildExecutionInfos map[int64]*InternalChildExecutionInfo
RequestCancelInfos map[int64]*RequestCancelInfo
SignalInfos map[int64]*SignalInfo
SignalRequestedIDs map[string]struct{}
BufferedEvents []*DataBlob
// Checksum field is used by Cassandra storage
// ChecksumData is used by All SQL storage
Checksum checksum.Checksum
ChecksumData *DataBlob
}
// InternalActivityInfo details for Persistence Interface
InternalActivityInfo struct {
Version int64
ScheduleID int64
ScheduledEventBatchID int64
ScheduledEvent *DataBlob
ScheduledTime time.Time
StartedID int64
StartedEvent *DataBlob
StartedTime time.Time
ActivityID string
RequestID string
Details []byte
ScheduleToStartTimeout time.Duration
ScheduleToCloseTimeout time.Duration
StartToCloseTimeout time.Duration
HeartbeatTimeout time.Duration
CancelRequested bool
CancelRequestID int64
LastHeartBeatUpdatedTime time.Time
TimerTaskStatus int32
// For retry
Attempt int32
DomainID string
StartedIdentity string
TaskList string
HasRetryPolicy bool
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
ExpirationTime time.Time
MaximumAttempts int32
NonRetriableErrors []string
LastFailureReason string
LastWorkerIdentity string
LastFailureDetails []byte
// Not written to database - This is used only for deduping heartbeat timer creation
LastHeartbeatTimeoutVisibilityInSeconds int64
}
// InternalChildExecutionInfo has details for pending child executions for Persistence Interface
InternalChildExecutionInfo struct {
Version int64
InitiatedID int64
InitiatedEventBatchID int64
InitiatedEvent *DataBlob
StartedID int64
StartedWorkflowID string
StartedRunID string
StartedEvent *DataBlob
CreateRequestID string
DomainID string
DomainNameDEPRECATED string // deprecated: use DomainID field
WorkflowTypeName string
ParentClosePolicy types.ParentClosePolicy
}
// InternalUpdateWorkflowExecutionRequest is used to update a workflow execution for Persistence Interface
InternalUpdateWorkflowExecutionRequest struct {
RangeID int64
Mode UpdateWorkflowMode
UpdateWorkflowMutation InternalWorkflowMutation
NewWorkflowSnapshot *InternalWorkflowSnapshot
}
// InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
InternalConflictResolveWorkflowExecutionRequest struct {
RangeID int64
Mode ConflictResolveWorkflowMode
// workflow to be resetted
ResetWorkflowSnapshot InternalWorkflowSnapshot
// maybe new workflow
NewWorkflowSnapshot *InternalWorkflowSnapshot
// current workflow
CurrentWorkflowMutation *InternalWorkflowMutation
}
// InternalWorkflowMutation is used as generic workflow execution state mutation for Persistence Interface
InternalWorkflowMutation struct {
ExecutionInfo *InternalWorkflowExecutionInfo
VersionHistories *DataBlob
StartVersion int64
LastWriteVersion int64
UpsertActivityInfos []*InternalActivityInfo
DeleteActivityInfos []int64
UpsertTimerInfos []*TimerInfo
DeleteTimerInfos []string
UpsertChildExecutionInfos []*InternalChildExecutionInfo
DeleteChildExecutionInfos []int64
UpsertRequestCancelInfos []*RequestCancelInfo
DeleteRequestCancelInfos []int64
UpsertSignalInfos []*SignalInfo
DeleteSignalInfos []int64
UpsertSignalRequestedIDs []string
DeleteSignalRequestedIDs []string
NewBufferedEvents *DataBlob
ClearBufferedEvents bool
TransferTasks []Task
CrossClusterTasks []Task
TimerTasks []Task
ReplicationTasks []Task
Condition int64
Checksum checksum.Checksum
ChecksumData *DataBlob
}
// InternalWorkflowSnapshot is used as generic workflow execution state snapshot for Persistence Interface
InternalWorkflowSnapshot struct {
ExecutionInfo *InternalWorkflowExecutionInfo
VersionHistories *DataBlob
StartVersion int64
LastWriteVersion int64
ActivityInfos []*InternalActivityInfo
TimerInfos []*TimerInfo
ChildExecutionInfos []*InternalChildExecutionInfo
RequestCancelInfos []*RequestCancelInfo
SignalInfos []*SignalInfo
SignalRequestedIDs []string
TransferTasks []Task
CrossClusterTasks []Task
TimerTasks []Task
ReplicationTasks []Task
Condition int64
Checksum checksum.Checksum
ChecksumData *DataBlob
}
// InternalAppendHistoryEventsRequest is used to append new events to workflow execution history for Persistence Interface
InternalAppendHistoryEventsRequest struct {
DomainID string
Execution workflow.WorkflowExecution
FirstEventID int64
EventBatchVersion int64
RangeID int64
TransactionID int64
Events *DataBlob
Overwrite bool
}
// InternalAppendHistoryNodesRequest is used to append a batch of history nodes
InternalAppendHistoryNodesRequest struct {
// True if it is the first append request to the branch
IsNewBranch bool
// The info for clean up data in background
Info string
// The branch to be appended
BranchInfo types.HistoryBranch
// The first eventID becomes the nodeID to be appended
NodeID int64
// The events to be appended
Events *DataBlob
// Requested TransactionID for conditional update
TransactionID int64
// Used in sharded data stores to identify which shard to use
ShardID int
}
// InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
InternalGetWorkflowExecutionRequest struct {
DomainID string
Execution types.WorkflowExecution
}
// InternalGetWorkflowExecutionResponse is the response to GetWorkflowExecution for Persistence Interface
InternalGetWorkflowExecutionResponse struct {
State *InternalWorkflowMutableState
}
// InternalListConcreteExecutionsResponse is the response to ListConcreteExecutions for Persistence Interface
InternalListConcreteExecutionsResponse struct {
Executions []*InternalListConcreteExecutionsEntity
NextPageToken []byte
}
// InternalListConcreteExecutionsEntity is a single entity in InternalListConcreteExecutionsResponse
InternalListConcreteExecutionsEntity struct {
ExecutionInfo *InternalWorkflowExecutionInfo
VersionHistories *DataBlob
}
// InternalForkHistoryBranchRequest is used to fork a history branch
InternalForkHistoryBranchRequest struct {
// The base branch to fork from
ForkBranchInfo types.HistoryBranch
// The nodeID to fork from, the new branch will start from ( inclusive ), the base branch will stop at(exclusive)
ForkNodeID int64
// branchID of the new branch
NewBranchID string
// the info for clean up data in background
Info string
// Used in sharded data stores to identify which shard to use
ShardID int
}
// InternalForkHistoryBranchResponse is the response to ForkHistoryBranchRequest
InternalForkHistoryBranchResponse struct {
// branchInfo to represent the new branch
NewBranchInfo types.HistoryBranch
}
// InternalDeleteHistoryBranchRequest is used to remove a history branch
InternalDeleteHistoryBranchRequest struct {
// branch to be deleted
BranchInfo types.HistoryBranch
// Used in sharded data stores to identify which shard to use
ShardID int
}
// InternalReadHistoryBranchRequest is used to read a history branch
InternalReadHistoryBranchRequest struct {
// The tree of branch range to be read
TreeID string
// The branch range to be read
BranchID string
// Get the history nodes from MinNodeID. Inclusive.
MinNodeID int64
// Get the history nodes upto MaxNodeID. Exclusive.
MaxNodeID int64
// passing thru for pagination
PageSize int
// Pagination token
NextPageToken []byte
// LastNodeID is the last known node ID attached to a history node
LastNodeID int64
// LastTransactionID is the last known transaction ID attached to a history node
LastTransactionID int64
// Used in sharded data stores to identify which shard to use
ShardID int
}
// InternalCompleteForkBranchRequest is used to update some tree/branch meta data for forking
InternalCompleteForkBranchRequest struct {
// branch to be updated
BranchInfo workflow.HistoryBranch
// whether fork is successful
Success bool
// Used in sharded data stores to identify which shard to use
ShardID int
}
// InternalReadHistoryBranchResponse is the response to ReadHistoryBranchRequest
InternalReadHistoryBranchResponse struct {
// History events
History []*DataBlob
// Pagination token
NextPageToken []byte
// LastNodeID is the last known node ID attached to a history node
LastNodeID int64
// LastTransactionID is the last known transaction ID attached to a history node
LastTransactionID int64
}
// InternalGetHistoryTreeRequest is used to get history tree
InternalGetHistoryTreeRequest struct {
// A UUID of a tree
TreeID string
// Get data from this shard
ShardID *int
// optional: can provide treeID via branchToken if treeID is empty
BranchToken []byte
}
// InternalGetHistoryTreeResponse is the response to GetHistoryTree
InternalGetHistoryTreeResponse struct {
// all branches of a tree
Branches []*types.HistoryBranch
}
// InternalVisibilityWorkflowExecutionInfo is visibility info for internal response
InternalVisibilityWorkflowExecutionInfo struct {
DomainID string
WorkflowType string
WorkflowID string
RunID string
TypeName string
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
Status *types.WorkflowExecutionCloseStatus
HistoryLength int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
UpdateTime time.Time
SearchAttributes map[string]interface{}
ShardID int16
}
// InternalListWorkflowExecutionsResponse is response from ListWorkflowExecutions
InternalListWorkflowExecutionsResponse struct {
Executions []*InternalVisibilityWorkflowExecutionInfo
// Token to read next page if there are more workflow executions beyond page size.
// Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page.
NextPageToken []byte
}
// InternalGetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution
InternalGetClosedWorkflowExecutionRequest struct {
DomainUUID string
Domain string // domain name is not persisted, but used as config filter key
Execution types.WorkflowExecution
}
// InternalListClosedWorkflowExecutionsByStatusRequest is used to list executions that have specific close status
InternalListClosedWorkflowExecutionsByStatusRequest struct {
InternalListWorkflowExecutionsRequest
Status types.WorkflowExecutionCloseStatus
}
// InternalListWorkflowExecutionsByWorkflowIDRequest is used to list executions that have specific WorkflowID in a domain
InternalListWorkflowExecutionsByWorkflowIDRequest struct {
InternalListWorkflowExecutionsRequest
WorkflowID string
}
// InternalListWorkflowExecutionsByTypeRequest is used to list executions of a specific type in a domain
InternalListWorkflowExecutionsByTypeRequest struct {
InternalListWorkflowExecutionsRequest
WorkflowTypeName string
}
// InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution
InternalGetClosedWorkflowExecutionResponse struct {
Execution *InternalVisibilityWorkflowExecutionInfo
}
// InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted
InternalRecordWorkflowExecutionStartedRequest struct {
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
WorkflowTimeout time.Duration
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
ShardID int16
}
// InternalRecordWorkflowExecutionClosedRequest is request to RecordWorkflowExecutionClosed
InternalRecordWorkflowExecutionClosedRequest struct {
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
TaskID int64
Memo *DataBlob
TaskList string
SearchAttributes map[string][]byte
CloseTimestamp time.Time
Status types.WorkflowExecutionCloseStatus
HistoryLength int64
RetentionPeriod time.Duration
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
ShardID int16
}
// InternalRecordWorkflowExecutionUninitializedRequest is used to add a record of a newly uninitialized execution
InternalRecordWorkflowExecutionUninitializedRequest struct {
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
UpdateTimestamp time.Time
ShardID int64
}
// InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
InternalUpsertWorkflowExecutionRequest struct {
DomainUUID string
WorkflowID string
RunID string
WorkflowTypeName string
StartTimestamp time.Time
ExecutionTimestamp time.Time
WorkflowTimeout time.Duration
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
ShardID int64
}
// InternalListWorkflowExecutionsRequest is used to list executions in a domain
InternalListWorkflowExecutionsRequest struct {
DomainUUID string
Domain string // domain name is not persisted, but used as config filter key
// The earliest end of the time range
EarliestTime time.Time
// The latest end of the time range
LatestTime time.Time
// Maximum number of workflow executions per page
PageSize int
// Token to continue reading next page of workflow executions.
// Pass in empty slice for first page.
NextPageToken []byte
}
// InternalDomainConfig describes the domain configuration
InternalDomainConfig struct {
Retention time.Duration
EmitMetric bool // deprecated
ArchivalBucket string // deprecated
ArchivalStatus types.ArchivalStatus // deprecated
HistoryArchivalStatus types.ArchivalStatus
HistoryArchivalURI string
VisibilityArchivalStatus types.ArchivalStatus
VisibilityArchivalURI string
BadBinaries *DataBlob
IsolationGroups *DataBlob
AsyncWorkflowsConfig *DataBlob
}
// InternalCreateDomainRequest is used to create the domain
InternalCreateDomainRequest struct {
Info *DomainInfo
Config *InternalDomainConfig
ReplicationConfig *DomainReplicationConfig
IsGlobalDomain bool
ConfigVersion int64
FailoverVersion int64
LastUpdatedTime time.Time
}
// InternalGetDomainResponse is the response for GetDomain
InternalGetDomainResponse struct {
Info *DomainInfo
Config *InternalDomainConfig
ReplicationConfig *DomainReplicationConfig
IsGlobalDomain bool
ConfigVersion int64
FailoverVersion int64
FailoverNotificationVersion int64
PreviousFailoverVersion int64
FailoverEndTime *time.Time
LastUpdatedTime time.Time
NotificationVersion int64
}
// InternalUpdateDomainRequest is used to update domain
InternalUpdateDomainRequest struct {
Info *DomainInfo
Config *InternalDomainConfig
ReplicationConfig *DomainReplicationConfig
ConfigVersion int64
FailoverVersion int64
FailoverNotificationVersion int64
PreviousFailoverVersion int64
FailoverEndTime *time.Time
LastUpdatedTime time.Time
NotificationVersion int64
}
// InternalListDomainsResponse is the response for GetDomain
InternalListDomainsResponse struct {
Domains []*InternalGetDomainResponse
NextPageToken []byte
}
// InternalShardInfo describes a shard
InternalShardInfo struct {
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"`
CrossClusterProcessingQueueStates *DataBlob `json:"cross_cluster_processing_queue_states"`
TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"`
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
}
// InternalCreateShardRequest is request to CreateShard
InternalCreateShardRequest struct {
ShardInfo *InternalShardInfo
}
// InternalGetShardRequest is used to get shard information
InternalGetShardRequest struct {
ShardID int
}
// InternalUpdateShardRequest is used to update shard information
InternalUpdateShardRequest struct {
ShardInfo *InternalShardInfo
PreviousRangeID int64
}
// InternalGetShardResponse is the response to GetShard
InternalGetShardResponse struct {
ShardInfo *InternalShardInfo
}
// InternalTaskInfo describes a Task
InternalTaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
ScheduleID int64
ScheduleToStartTimeout time.Duration
Expiry time.Time
CreatedTime time.Time
PartitionConfig map[string]string
}
// InternalCreateTasksInfo describes a task to be created in InternalCreateTasksRequest
InternalCreateTasksInfo struct {
Execution types.WorkflowExecution
Data *InternalTaskInfo
TaskID int64
}
// InternalCreateTasksRequest is request to CreateTasks
InternalCreateTasksRequest struct {
TaskListInfo *TaskListInfo
Tasks []*InternalCreateTasksInfo
}
// InternalGetTasksResponse is response from GetTasks
InternalGetTasksResponse struct {
Tasks []*InternalTaskInfo
}
)
// NewDataBlob returns a new DataBlob
func NewDataBlob(data []byte, encodingType common.EncodingType) *DataBlob {
if len(data) == 0 {
return nil
}
if encodingType != "thriftrw" && data[0] == 'Y' {
panic(fmt.Sprintf("Invalid incoding: \"%v\"", encodingType))
}
return &DataBlob{
Data: data,
Encoding: encodingType,
}
}
// FromDataBlob decodes a datablob into a (payload, encodingType) tuple
func FromDataBlob(blob *DataBlob) ([]byte, string) {
if blob == nil || len(blob.Data) == 0 {
return nil, ""
}
return blob.Data, string(blob.Encoding)
}
// Convert a *Datablob to safe that calling its method won't run into NPE
func (d *DataBlob) ToNilSafeDataBlob() *DataBlob {
if d != nil {
return d
}
return &DataBlob{}
}
func (d *DataBlob) GetEncodingString() string {
if d == nil {
return ""
}
return string(d.Encoding)
}
// GetData is a safe way to get the byte array or nil
func (d *DataBlob) GetData() []byte {
if d == nil || d.Data == nil {
return []byte{}
}
return d.Data
}
// GetEncoding returns encoding type
func (d *DataBlob) GetEncoding() common.EncodingType {
encodingStr := d.GetEncodingString()
switch common.EncodingType(encodingStr) {
case common.EncodingTypeGob:
return common.EncodingTypeGob
case common.EncodingTypeJSON:
return common.EncodingTypeJSON
case common.EncodingTypeThriftRW:
return common.EncodingTypeThriftRW
case common.EncodingTypeEmpty:
return common.EncodingTypeEmpty
default:
return common.EncodingTypeUnknown
}
}
// ToInternal convert data blob to internal representation
func (d *DataBlob) ToInternal() *types.DataBlob {
switch d.Encoding {
case common.EncodingTypeJSON:
return &types.DataBlob{
EncodingType: types.EncodingTypeJSON.Ptr(),
Data: d.Data,
}
case common.EncodingTypeThriftRW:
return &types.DataBlob{
EncodingType: types.EncodingTypeThriftRW.Ptr(),
Data: d.Data,
}
default:
panic(fmt.Sprintf("DataBlob seeing unsupported enconding type: %v", d.Encoding))
}
}