diff --git a/api/persistence/v1/queues.pb.go b/api/persistence/v1/queues.pb.go index 80be8351257..d4812fb0cf0 100644 --- a/api/persistence/v1/queues.pb.go +++ b/api/persistence/v1/queues.pb.go @@ -37,6 +37,7 @@ import ( proto "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" + v1 "go.temporal.io/api/common/v1" ) // Reference imports to suppress errors if they are not otherwise used. @@ -289,6 +290,64 @@ func (m *ReadQueueMessagesNextPageToken) GetLastReadMessageId() int64 { return 0 } +// HistoryTask represents an internal history service task for a particular shard. We use a blob because there is no +// common proto for all task proto types. +type HistoryTask struct { + // shard_id that this task belonged to when it was created. Technically, you can derive this from the task data + // blob, but it's useful to have it here for quick access and to avoid deserializing the blob. + ShardId int32 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"` + // blob that contains the history task proto. There is a GoLang-specific generic deserializer for this blob, but + // there is no common proto for all task proto types, so deserializing in other languages will require a custom + // switch on the task category, which should be available from the metadata for the queue that this task came from. + Blob *v1.DataBlob `protobuf:"bytes,2,opt,name=blob,proto3" json:"blob,omitempty"` +} + +func (m *HistoryTask) Reset() { *m = HistoryTask{} } +func (*HistoryTask) ProtoMessage() {} +func (*HistoryTask) Descriptor() ([]byte, []int) { + return fileDescriptor_b7fa5f143ac80378, []int{5} +} +func (m *HistoryTask) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *HistoryTask) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_HistoryTask.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *HistoryTask) XXX_Merge(src proto.Message) { + xxx_messageInfo_HistoryTask.Merge(m, src) +} +func (m *HistoryTask) XXX_Size() int { + return m.Size() +} +func (m *HistoryTask) XXX_DiscardUnknown() { + xxx_messageInfo_HistoryTask.DiscardUnknown(m) +} + +var xxx_messageInfo_HistoryTask proto.InternalMessageInfo + +func (m *HistoryTask) GetShardId() int32 { + if m != nil { + return m.ShardId + } + return 0 +} + +func (m *HistoryTask) GetBlob() *v1.DataBlob { + if m != nil { + return m.Blob + } + return nil +} + func init() { proto.RegisterType((*QueueState)(nil), "temporal.server.api.persistence.v1.QueueState") proto.RegisterMapType((map[int64]*QueueReaderState)(nil), "temporal.server.api.persistence.v1.QueueState.ReaderStatesEntry") @@ -296,6 +355,7 @@ func init() { proto.RegisterType((*QueueSliceScope)(nil), "temporal.server.api.persistence.v1.QueueSliceScope") proto.RegisterType((*QueueSliceRange)(nil), "temporal.server.api.persistence.v1.QueueSliceRange") proto.RegisterType((*ReadQueueMessagesNextPageToken)(nil), "temporal.server.api.persistence.v1.ReadQueueMessagesNextPageToken") + proto.RegisterType((*HistoryTask)(nil), "temporal.server.api.persistence.v1.HistoryTask") } func init() { @@ -303,39 +363,44 @@ func init() { } var fileDescriptor_b7fa5f143ac80378 = []byte{ - // 512 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x4f, 0x6b, 0x13, 0x41, - 0x18, 0xc6, 0x77, 0x12, 0x5a, 0x70, 0x9a, 0x62, 0x3b, 0x78, 0x08, 0x41, 0xc6, 0xb0, 0xa7, 0x80, - 0xb8, 0x4b, 0x9b, 0x1e, 0x44, 0x2f, 0x22, 0x08, 0xd6, 0x10, 0x49, 0xb7, 0x05, 0xc1, 0xcb, 0x32, - 0x26, 0x2f, 0x9b, 0x31, 0xfb, 0xcf, 0x99, 0xd9, 0x35, 0xb9, 0xf9, 0x11, 0xfc, 0x18, 0xfa, 0x01, - 0xfc, 0x0e, 0x1e, 0x73, 0xec, 0x49, 0xcc, 0xe6, 0xe2, 0xb1, 0x1f, 0x41, 0xf6, 0x6f, 0x62, 0x45, - 0xdc, 0xf6, 0xb6, 0xb3, 0x33, 0xcf, 0xef, 0x7d, 0xde, 0x77, 0x9e, 0xc1, 0xa6, 0x02, 0x2f, 0x0c, - 0x04, 0x73, 0x4d, 0x09, 0x22, 0x06, 0x61, 0xb2, 0x90, 0x9b, 0x21, 0x08, 0xc9, 0xa5, 0x02, 0x7f, - 0x0c, 0x66, 0x7c, 0x64, 0x7e, 0x88, 0x20, 0x02, 0x69, 0x84, 0x22, 0x50, 0x01, 0xd1, 0x4b, 0x81, - 0x91, 0x0b, 0x0c, 0x16, 0x72, 0x63, 0x4b, 0x60, 0xc4, 0x47, 0x9d, 0x7e, 0x0d, 0x68, 0x28, 0x60, - 0xc2, 0xc7, 0x4c, 0x95, 0xe0, 0x8e, 0x51, 0x43, 0xa4, 0x98, 0x9c, 0x15, 0xe7, 0xf5, 0x1f, 0x0d, - 0x8c, 0xcf, 0x52, 0x67, 0xe7, 0x8a, 0x29, 0x20, 0x80, 0xf7, 0x05, 0xb0, 0x09, 0x08, 0x5b, 0xa6, - 0x6b, 0xd9, 0x46, 0xdd, 0x66, 0x6f, 0xef, 0xf8, 0x99, 0xf1, 0x7f, 0xbf, 0xc6, 0x06, 0x63, 0x58, - 0x19, 0x23, 0xfb, 0x96, 0x2f, 0x7c, 0x25, 0x16, 0x56, 0x4b, 0x6c, 0xfd, 0x22, 0x02, 0x3f, 0x80, - 0xf9, 0xd8, 0x8d, 0x24, 0x8f, 0xc1, 0x2e, 0x0a, 0x4e, 0xb9, 0x33, 0xb5, 0x3f, 0x32, 0x05, 0xc2, - 0x63, 0x62, 0xd6, 0x6e, 0x74, 0x51, 0x6f, 0xef, 0xf8, 0x61, 0x9d, 0xc2, 0x17, 0x4c, 0xce, 0x06, - 0xb0, 0xb0, 0xee, 0x57, 0xcc, 0xbc, 0xfe, 0x4b, 0xee, 0x4c, 0xdf, 0x94, 0xc0, 0x4e, 0x84, 0x0f, - 0xff, 0xb2, 0x45, 0x0e, 0x70, 0x73, 0x06, 0x8b, 0x36, 0xea, 0xa2, 0x5e, 0xd3, 0x4a, 0x3f, 0xc9, - 0x2b, 0xbc, 0x13, 0x33, 0x37, 0x82, 0xc2, 0xc0, 0x49, 0xed, 0xce, 0xb7, 0xe0, 0x56, 0x8e, 0x78, - 0xd2, 0x78, 0x8c, 0x74, 0x1b, 0x1f, 0x5c, 0xdf, 0x26, 0x03, 0xbc, 0x2b, 0xc7, 0x41, 0x58, 0x8d, - 0xb7, 0x5f, 0x7f, 0xbc, 0x2e, 0x1f, 0xc3, 0x79, 0xaa, 0xb5, 0x0a, 0x84, 0xfe, 0x15, 0xe1, 0xbb, - 0xd7, 0xf6, 0xc8, 0x29, 0xde, 0x11, 0xcc, 0x77, 0x20, 0x6b, 0xec, 0xc6, 0x7c, 0x2b, 0x95, 0x5a, - 0x39, 0x81, 0x0c, 0xf0, 0x9d, 0x2a, 0x64, 0xc5, 0x4c, 0x1e, 0xd5, 0xc1, 0x8d, 0x4a, 0x91, 0xb5, - 0xd1, 0xeb, 0xdf, 0xfe, 0xf0, 0x9a, 0xd5, 0x21, 0x23, 0xbc, 0xcf, 0xfd, 0x32, 0x0b, 0x1e, 0xf7, - 0x0b, 0xcf, 0x37, 0xba, 0xf9, 0x56, 0x45, 0x18, 0x72, 0x3f, 0x25, 0x6e, 0xd2, 0xe5, 0xb1, 0xf9, - 0x6d, 0xb2, 0xd4, 0xaa, 0x08, 0x43, 0x36, 0xd7, 0xcf, 0x30, 0x4d, 0xef, 0x2f, 0xb3, 0x3e, 0x04, - 0x29, 0x99, 0x03, 0xf2, 0x35, 0xcc, 0xd5, 0x88, 0x39, 0x70, 0x11, 0xcc, 0xc0, 0x27, 0x26, 0xbe, - 0xe7, 0x32, 0xa9, 0xb2, 0x30, 0xdb, 0x5e, 0x7e, 0xc4, 0xe6, 0x93, 0x22, 0x59, 0x87, 0xe9, 0x5e, - 0x4a, 0x28, 0xc4, 0xa7, 0x93, 0xe7, 0xef, 0x97, 0x2b, 0xaa, 0x5d, 0xae, 0xa8, 0x76, 0xb5, 0xa2, - 0xe8, 0x53, 0x42, 0xd1, 0x97, 0x84, 0xa2, 0xef, 0x09, 0x45, 0xcb, 0x84, 0xa2, 0x9f, 0x09, 0x45, - 0xbf, 0x12, 0xaa, 0x5d, 0x25, 0x14, 0x7d, 0x5e, 0x53, 0x6d, 0xb9, 0xa6, 0xda, 0xe5, 0x9a, 0x6a, - 0x6f, 0x4f, 0x9c, 0x60, 0xd3, 0x05, 0x0f, 0xfe, 0xfd, 0xc8, 0x9f, 0x6e, 0x2d, 0xdf, 0xed, 0x66, - 0x6f, 0xbd, 0xff, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x48, 0x67, 0x83, 0x03, 0xa7, 0x04, 0x00, 0x00, + // 586 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xcd, 0x6e, 0xd4, 0x3e, + 0x14, 0xc5, 0xc7, 0xed, 0xbf, 0xfd, 0x83, 0xdb, 0x8a, 0xd6, 0x62, 0x51, 0x46, 0xc8, 0x8c, 0x22, + 0x16, 0x95, 0x10, 0x89, 0xfa, 0xb1, 0x40, 0xb0, 0x41, 0x15, 0x48, 0x2d, 0x55, 0x51, 0x9b, 0x56, + 0x42, 0x62, 0x41, 0xe4, 0x49, 0xae, 0x32, 0x66, 0x92, 0x38, 0xd8, 0x9e, 0x61, 0x66, 0xc7, 0x23, + 0xf0, 0x18, 0xf0, 0x00, 0xbc, 0x03, 0xcb, 0x2e, 0xbb, 0x42, 0x34, 0xb3, 0x61, 0xd9, 0x47, 0x40, + 0xce, 0xd7, 0x0c, 0x45, 0x88, 0x94, 0x5d, 0x1c, 0xfb, 0xfc, 0xce, 0xb9, 0xf6, 0xb5, 0xb1, 0xa3, + 0x21, 0x4e, 0x85, 0x64, 0x91, 0xa3, 0x40, 0x0e, 0x41, 0x3a, 0x2c, 0xe5, 0x4e, 0x0a, 0x52, 0x71, + 0xa5, 0x21, 0xf1, 0xc1, 0x19, 0x6e, 0x3a, 0xef, 0x06, 0x30, 0x00, 0x65, 0xa7, 0x52, 0x68, 0x41, + 0xac, 0x4a, 0x60, 0x17, 0x02, 0x9b, 0xa5, 0xdc, 0x9e, 0x11, 0xd8, 0xc3, 0xcd, 0xf6, 0xfd, 0x1a, + 0x6a, 0x68, 0xbe, 0x88, 0x63, 0x91, 0x18, 0x50, 0x0c, 0x4a, 0xb1, 0x10, 0x0a, 0x52, 0x7b, 0xbb, + 0x81, 0x75, 0x2a, 0x21, 0xe0, 0x3e, 0xd3, 0x95, 0x7d, 0xdb, 0x6e, 0x20, 0xd2, 0x4c, 0xf5, 0xcb, + 0xf5, 0xd6, 0xb7, 0x39, 0x8c, 0x8f, 0x4d, 0xfe, 0x13, 0xcd, 0x34, 0x10, 0xc0, 0x2b, 0x12, 0x58, + 0x00, 0xd2, 0x53, 0x66, 0xac, 0xd6, 0x51, 0x67, 0x7e, 0x63, 0x69, 0xeb, 0xa9, 0xfd, 0xf7, 0xaa, + 0xec, 0x29, 0xc6, 0x76, 0x73, 0x46, 0xfe, 0xad, 0x9e, 0x27, 0x5a, 0x8e, 0xdd, 0x65, 0x39, 0xf3, + 0x8b, 0x48, 0x7c, 0x0f, 0x46, 0x7e, 0x34, 0x50, 0x7c, 0x08, 0x5e, 0x69, 0xd8, 0xe3, 0x61, 0xcf, + 0x7b, 0xcf, 0x34, 0xc8, 0x98, 0xc9, 0xfe, 0xfa, 0x5c, 0x07, 0x6d, 0x2c, 0x6d, 0x3d, 0x68, 0x62, + 0x7c, 0xca, 0x54, 0xff, 0x00, 0xc6, 0xee, 0xdd, 0x9a, 0x59, 0xf8, 0xef, 0xf1, 0xb0, 0xf7, 0xaa, + 0x02, 0xb6, 0x07, 0x78, 0xed, 0xb7, 0x58, 0x64, 0x15, 0xcf, 0xf7, 0x61, 0xbc, 0x8e, 0x3a, 0x68, + 0x63, 0xde, 0x35, 0x9f, 0xe4, 0x05, 0x5e, 0x18, 0xb2, 0x68, 0x00, 0x65, 0x80, 0x9d, 0xc6, 0x95, + 0xcf, 0xc0, 0xdd, 0x02, 0xf1, 0x78, 0xee, 0x11, 0xb2, 0x3c, 0xbc, 0x7a, 0x75, 0x9a, 0x1c, 0xe0, + 0x45, 0xe5, 0x8b, 0xb4, 0xde, 0xde, 0xed, 0xe6, 0xdb, 0x1b, 0x71, 0x1f, 0x4e, 0x8c, 0xd6, 0x2d, + 0x11, 0xd6, 0x67, 0x84, 0x6f, 0x5d, 0x99, 0x23, 0xfb, 0x78, 0x41, 0xb2, 0x24, 0x84, 0xbc, 0xb0, + 0x6b, 0xf3, 0x5d, 0x23, 0x75, 0x0b, 0x02, 0x39, 0xc0, 0x37, 0xeb, 0x26, 0x2b, 0xf7, 0xe4, 0x61, + 0x13, 0xdc, 0x51, 0x25, 0x72, 0xa7, 0x7a, 0xeb, 0xcb, 0x2f, 0x59, 0x73, 0x1f, 0x72, 0x84, 0x57, + 0x78, 0x52, 0xf5, 0x42, 0xcc, 0x93, 0x32, 0xf3, 0xb5, 0x4e, 0x7e, 0xb9, 0x26, 0x1c, 0xf2, 0xc4, + 0x10, 0xa7, 0xdd, 0x15, 0xb3, 0xd1, 0xbf, 0xf4, 0xd2, 0x72, 0x4d, 0x38, 0x64, 0x23, 0xeb, 0x18, + 0x53, 0x73, 0x7e, 0x79, 0xf4, 0xc3, 0xe2, 0x92, 0xaa, 0x97, 0x30, 0xd2, 0x47, 0x2c, 0x84, 0x53, + 0xd1, 0x87, 0x84, 0x38, 0xf8, 0x76, 0xc4, 0x94, 0xce, 0x9b, 0xd9, 0x2b, 0xef, 0xb1, 0xc7, 0x83, + 0xb2, 0xb3, 0xd6, 0xcc, 0x9c, 0x21, 0x94, 0xe2, 0xfd, 0xc0, 0x7a, 0x83, 0x97, 0xf6, 0xb8, 0xd2, + 0x42, 0x8e, 0x8d, 0x25, 0xb9, 0x83, 0x6f, 0xa8, 0x1e, 0x93, 0x41, 0xa5, 0x59, 0x70, 0xff, 0xcf, + 0xc7, 0xfb, 0x01, 0xd9, 0xc1, 0xff, 0x75, 0x23, 0xd1, 0x2d, 0xab, 0xe8, 0x4c, 0xab, 0x30, 0xf1, + 0x8b, 0xc7, 0xc3, 0x24, 0x7f, 0xc6, 0x34, 0xdb, 0x8d, 0x44, 0xd7, 0xcd, 0x57, 0xef, 0xbe, 0x3d, + 0xbb, 0xa0, 0xad, 0xf3, 0x0b, 0xda, 0xba, 0xbc, 0xa0, 0xe8, 0x43, 0x46, 0xd1, 0xa7, 0x8c, 0xa2, + 0xaf, 0x19, 0x45, 0x67, 0x19, 0x45, 0xdf, 0x33, 0x8a, 0x7e, 0x64, 0xb4, 0x75, 0x99, 0x51, 0xf4, + 0x71, 0x42, 0x5b, 0x67, 0x13, 0xda, 0x3a, 0x9f, 0xd0, 0xd6, 0xeb, 0x9d, 0x50, 0x4c, 0xf9, 0x5c, + 0xfc, 0xf9, 0x11, 0x79, 0x32, 0x33, 0xec, 0x2e, 0xe6, 0x6f, 0xc9, 0xf6, 0xcf, 0x00, 0x00, 0x00, + 0xff, 0xff, 0x04, 0x62, 0x8d, 0xd4, 0x2d, 0x05, 0x00, 0x00, } func (this *QueueState) Equal(that interface{}) bool { @@ -477,6 +542,33 @@ func (this *ReadQueueMessagesNextPageToken) Equal(that interface{}) bool { } return true } +func (this *HistoryTask) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*HistoryTask) + if !ok { + that2, ok := that.(HistoryTask) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.ShardId != that1.ShardId { + return false + } + if !this.Blob.Equal(that1.Blob) { + return false + } + return true +} func (this *QueueState) GoString() string { if this == nil { return "nil" @@ -554,6 +646,19 @@ func (this *ReadQueueMessagesNextPageToken) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *HistoryTask) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&persistence.HistoryTask{") + s = append(s, "ShardId: "+fmt.Sprintf("%#v", this.ShardId)+",\n") + if this.Blob != nil { + s = append(s, "Blob: "+fmt.Sprintf("%#v", this.Blob)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringQueues(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -780,6 +885,46 @@ func (m *ReadQueueMessagesNextPageToken) MarshalToSizedBuffer(dAtA []byte) (int, return len(dAtA) - i, nil } +func (m *HistoryTask) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HistoryTask) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *HistoryTask) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Blob != nil { + { + size, err := m.Blob.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQueues(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.ShardId != 0 { + i = encodeVarintQueues(dAtA, i, uint64(m.ShardId)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintQueues(dAtA []byte, offset int, v uint64) int { offset -= sovQueues(v) base := offset @@ -878,6 +1023,22 @@ func (m *ReadQueueMessagesNextPageToken) Size() (n int) { return n } +func (m *HistoryTask) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.ShardId != 0 { + n += 1 + sovQueues(uint64(m.ShardId)) + } + if m.Blob != nil { + l = m.Blob.Size() + n += 1 + l + sovQueues(uint64(l)) + } + return n +} + func sovQueues(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -952,6 +1113,17 @@ func (this *ReadQueueMessagesNextPageToken) String() string { }, "") return s } +func (this *HistoryTask) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&HistoryTask{`, + `ShardId:` + fmt.Sprintf("%v", this.ShardId) + `,`, + `Blob:` + strings.Replace(fmt.Sprintf("%v", this.Blob), "DataBlob", "v1.DataBlob", 1) + `,`, + `}`, + }, "") + return s +} func valueToStringQueues(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -1573,6 +1745,114 @@ func (m *ReadQueueMessagesNextPageToken) Unmarshal(dAtA []byte) error { } return nil } +func (m *HistoryTask) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueues + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HistoryTask: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HistoryTask: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ShardId", wireType) + } + m.ShardId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueues + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ShardId |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Blob", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueues + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueues + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueues + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Blob == nil { + m.Blob = &v1.DataBlob{} + } + if err := m.Blob.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueues(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueues + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueues + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipQueues(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/common/persistence/data_interfaces.go b/common/persistence/data_interfaces.go index 489dafa07df..bcdad547985 100644 --- a/common/persistence/data_interfaces.go +++ b/common/persistence/data_interfaces.go @@ -37,6 +37,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" + "go.temporal.io/server/common/persistence/serialization" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -1204,6 +1205,53 @@ type ( SaveClusterMetadata(ctx context.Context, request *SaveClusterMetadataRequest) (bool, error) DeleteClusterMetadata(ctx context.Context, request *DeleteClusterMetadataRequest) error } + + // HistoryTaskQueueManager is responsible for managing a queue of internal history tasks. It is currently unused, + // but we plan on using this to implement a DLQ for history tasks. This is called a history task queue manager, but + // the actual history task queues are not managed by this object. Instead, this object is responsible for managing + // a generic queue of history tasks (which is what the history task DLQ will be). + // TODO: make this an interface and add wrapper classes like retryable client, metrics client, etc. + HistoryTaskQueueManager struct { + queue QueueV2 + serializer *serialization.TaskSerializer + numHistoryShards int + } + + // QueueKey identifies a history task queue. It is converted to a queue name using the GetQueueName method. + QueueKey struct { + QueueType QueueV2Type + Category tasks.Category + SourceCluster string + // TargetCluster is only used for cross-cluster replication tasks. + TargetCluster string + } + + EnqueueTaskRequest struct { + QueueKey QueueKey + Task tasks.Task + } + + EnqueueTaskResponse struct { + Metadata MessageMetadata + } + + ReadTasksRequest struct { + QueueKey QueueKey + PageSize int + NextPageToken []byte + } + + ReadTasksResponse struct { + Tasks []tasks.Task + NextPageToken []byte + } + + ReadRawTasksRequest = ReadTasksRequest + + ReadRawTasksResponse struct { + Tasks []persistencespb.HistoryTask + NextPageToken []byte + } ) func (e *InvalidPersistenceRequestError) Error() string { diff --git a/common/persistence/history_task_queue_manager.go b/common/persistence/history_task_queue_manager.go new file mode 100644 index 00000000000..10af12b270b --- /dev/null +++ b/common/persistence/history_task_queue_manager.go @@ -0,0 +1,187 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "errors" + "fmt" + + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/service/history/tasks" +) + +const ( + // clusterNamesHashSuffixLength is the number of characters to use from the hash of the cluster names when forming + // the queue name. This is used to avoid name collisions when a cluster name contains the separator character. + clusterNamesHashSuffixLength = 8 + + ErrMsgSerializeTaskToEnqueue = "failed to serialize history task for task queue" + // ErrMsgDeserializeRawHistoryTask is returned when the raw task cannot be deserialized from the task queue. This error + // is returned when this whole top-level proto cannot be deserialized. + // Raw Task (a proto): <-- when this cannot be deserialized + // - ShardID + // - Blob (a serialized task) + ErrMsgDeserializeRawHistoryTask = "failed to deserialize raw history task from task queue" + // ErrMsgDeserializeHistoryTask is returned when the history task cannot be deserialized from the task queue. This + // error is returned when the blob inside the raw task cannot be deserialized. + // Raw Task (a proto): + // - ShardID + // - Blob (a serialized task) <-- when this cannot be deserialized + ErrMsgDeserializeHistoryTask = "failed to deserialize history task blob" +) + +var ( + ErrReadTasksNonPositivePageSize = errors.New("page size to read history tasks must be positive") + ErrHistoryTaskBlobIsNil = errors.New("history task from queue has nil blob") +) + +func NewTaskQueueManager(queue QueueV2, numHistoryShards int) *HistoryTaskQueueManager { + return &HistoryTaskQueueManager{ + queue: queue, + serializer: serialization.NewTaskSerializer(), + numHistoryShards: numHistoryShards, + } +} + +func (m *HistoryTaskQueueManager) EnqueueTask(ctx context.Context, request *EnqueueTaskRequest) (*EnqueueTaskResponse, error) { + blob, err := m.serializer.SerializeTask(request.Task) + if err != nil { + return nil, fmt.Errorf("%v: %w", ErrMsgSerializeTaskToEnqueue, err) + } + + shardID := tasks.GetShardIDForTask(request.Task, m.numHistoryShards) + task := persistencespb.HistoryTask{ + ShardId: int32(shardID), + Blob: &blob, + } + taskBytes, _ := task.Marshal() + blob = commonpb.DataBlob{ + EncodingType: enums.ENCODING_TYPE_PROTO3, + Data: taskBytes, + } + + message, err := m.queue.EnqueueMessage(ctx, &InternalEnqueueMessageRequest{ + QueueType: request.QueueKey.QueueType, + QueueName: request.QueueKey.GetQueueName(), + Blob: blob, + }) + if err != nil { + return nil, err + } + + return &EnqueueTaskResponse{ + Metadata: message.Metadata, + }, nil +} + +// ReadRawTasks returns a page of "raw" tasks from the queue. Here's a quick disambiguation of the different types of +// tasks: +// +// - [go.temporal.io/server/api/history/v1.Task]: the proto that is serialized and stored in the database which +// contains a shard ID and a blob of the serialized history task. This is also called a "raw" task. +// - [go.temporal.io/server/service/history/tasks.Task]: the interface that is implemented by all history tasks. +// This is the primary type used in code to represent a history task since it is the most structured. +func (m *HistoryTaskQueueManager) ReadRawTasks( + ctx context.Context, + request *ReadRawTasksRequest, +) (*ReadRawTasksResponse, error) { + if request.PageSize <= 0 { + return nil, fmt.Errorf("%w: %v", ErrReadTasksNonPositivePageSize, request.PageSize) + } + + response, err := m.queue.ReadMessages(ctx, &InternalReadMessagesRequest{ + QueueType: request.QueueKey.QueueType, + QueueName: request.QueueKey.GetQueueName(), + PageSize: request.PageSize, + NextPageToken: request.NextPageToken, + }) + if err != nil { + return nil, err + } + + responseTasks := make([]persistencespb.HistoryTask, len(response.Messages)) + for i, message := range response.Messages { + err := serialization.Proto3Decode(message.Data.Data, message.Data.EncodingType, &responseTasks[i]) + if err != nil { + return nil, fmt.Errorf("%v: %w", ErrMsgDeserializeRawHistoryTask, err) + } + } + + return &ReadRawTasksResponse{ + Tasks: responseTasks, + NextPageToken: response.NextPageToken, + }, nil +} + +// ReadTasks is a convenience method on top of ReadRawTasks that deserializes the tasks into the [tasks.Task] type. +func (m *HistoryTaskQueueManager) ReadTasks(ctx context.Context, request *ReadTasksRequest) (*ReadTasksResponse, error) { + response, err := m.ReadRawTasks(ctx, request) + if err != nil { + return nil, err + } + + resTasks := make([]tasks.Task, len(response.Tasks)) + + for i, rawTask := range response.Tasks { + blob := rawTask.Blob + if blob == nil { + return nil, serialization.NewDeserializationError(enums.ENCODING_TYPE_PROTO3, ErrHistoryTaskBlobIsNil) + } + + task, err := m.serializer.DeserializeTask(request.QueueKey.Category, *blob) + if err != nil { + return nil, fmt.Errorf("%v: %w", ErrMsgDeserializeHistoryTask, err) + } + + resTasks[i] = task + } + + return &ReadTasksResponse{ + Tasks: resTasks, + NextPageToken: response.NextPageToken, + }, nil +} + +// combineUnique combines the given strings into a single string by hashing the length of each string and the string +// itself. This is used to generate a unique suffix for the queue name. +func combineUnique(strs ...string) string { + h := sha256.New() + for _, str := range strs { + b := sha256.Sum256([]byte(str)) + _, _ = h.Write(b[:]) + } + return base64.StdEncoding.EncodeToString(h.Sum(nil)) +} + +func (k QueueKey) GetQueueName() string { + hash := combineUnique(k.SourceCluster, k.TargetCluster)[:clusterNamesHashSuffixLength] + return fmt.Sprintf("%d_%s_%s_%s", k.Category.ID(), k.SourceCluster, k.TargetCluster, hash) +} diff --git a/common/persistence/history_task_queue_manager_test.go b/common/persistence/history_task_queue_manager_test.go new file mode 100644 index 00000000000..0b34b3d8749 --- /dev/null +++ b/common/persistence/history_task_queue_manager_test.go @@ -0,0 +1,214 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistence_test + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/service/history/tasks" +) + +// TestQueueKey_Determinism tests that the queue name generated from a QueueKey is deterministic. This is important to +// test for because we don't want to accidentally change the queue name generation algorithm and break the mapping of +// queue keys to queue names. +func TestQueueKey_Determinism(t *testing.T) { + name := persistence.QueueKey{ + Category: tasks.CategoryTransfer, + SourceCluster: "a", + TargetCluster: "b", + }.GetQueueName() + assert.Equal(t, name, "1_a_b_5aAf7hTg") +} + +// TestQueueKey_Conflicts tests that unique tuples of cluster names containing the delimiter character will not produce +// names with conflicts when used to form queue names. +func TestQueueKey_Conflicts(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + explanation string + source1 string + target1 string + source2 string + target2 string + }{ + { + name: "(a,b_c) and (a_b,c)", + explanation: "If we just concatenate the cluster names with the queue name delimiter, both of these would" + + " produce the same queue name: 1_a_b_c", + source1: "a", + target1: "b_c", + source2: "a_b", + target2: "c", + }, + { + name: "(x_,x) and (x,_x)", + explanation: "If we concatenate the cluster names with the queue name delimiter and a hash of the" + + " concatenated cluster names, both of these would produce the same queue name: 1_x__x_", + source1: "x_", + target1: "x", + source2: "x", + target2: "_x", + }, + } { + t.Run(tc.name, func(t *testing.T) { + k1 := persistence.QueueKey{ + Category: tasks.CategoryTransfer, + SourceCluster: tc.source1, + TargetCluster: tc.target1, + }.GetQueueName() + k2 := persistence.QueueKey{ + Category: tasks.CategoryTransfer, + SourceCluster: tc.source2, + TargetCluster: tc.target2, + }.GetQueueName() + + // This test would fail if we did something naive to form the queue key like __. + assert.NotEqual(t, k1, k2, + "Two pairs of cluster names which are the same when concatenated with the queue name "+ + "delimiter should not have the same queue name.", tc.explanation) + }) + } +} + +func TestHistoryTaskQueueManager_ErrSerializeTaskToEnqueue(t *testing.T) { + t.Parallel() + + task := tasks.NewFakeTask(definition.WorkflowKey{}, tasks.Category{}, time.Time{}) + m := persistence.NewTaskQueueManager(nil, 1) + _, err := m.EnqueueTask(context.Background(), &persistence.EnqueueTaskRequest{ + QueueKey: persistence.QueueKey{ + Category: tasks.Category{}, // invalid category causes serialization error + }, + Task: task, + }) + assert.ErrorContains(t, err, persistence.ErrMsgSerializeTaskToEnqueue, "EnqueueTask should return "+ + "ErrMsgSerializeTaskToEnqueue when the task cannot be serialized due to an invalid task category") +} + +// corruptQueue is a QueueV2 implementation that returns a single message that cannot be deserialized into a task. +type corruptQueue struct { + persistence.QueueV2 +} + +func (f corruptQueue) ReadMessages( + context.Context, + *persistence.InternalReadMessagesRequest, +) (*persistence.InternalReadMessagesResponse, error) { + return &persistence.InternalReadMessagesResponse{ + Messages: []persistence.QueueV2Message{ + { + Data: commonpb.DataBlob{ + EncodingType: enumspb.ENCODING_TYPE_PROTO3, + Data: []byte("some bytes that cannot be deserialized into a task"), + }, + }, + }, + NextPageToken: nil, + }, nil +} + +func TestHistoryTaskQueueManager_ReadTasks_ErrDeserializeRawHistoryTask(t *testing.T) { + t.Parallel() + + m := persistence.NewTaskQueueManager(corruptQueue{}, 1) + _, err := m.ReadTasks(context.Background(), &persistence.ReadTasksRequest{ + QueueKey: persistence.QueueKey{ + Category: tasks.CategoryTransfer, + }, + PageSize: 1, + }) + assert.ErrorContains(t, err, persistence.ErrMsgDeserializeRawHistoryTask, + "ReadTasks should return ErrMsgDeserializeRawHistoryTask when the raw task cannot be deserialized"+ + " due to an error in the persistence layer") +} + +func TestHistoryTaskQueueManager_ReadTasks_NonPositivePageSize(t *testing.T) { + t.Parallel() + + m := persistence.NewTaskQueueManager(corruptQueue{}, 1) + for _, pageSize := range []int{0, -1} { + _, err := m.ReadTasks(context.Background(), &persistence.ReadTasksRequest{ + QueueKey: persistence.QueueKey{ + Category: tasks.Category{}, + }, + PageSize: pageSize, + }) + assert.ErrorIs(t, err, persistence.ErrReadTasksNonPositivePageSize, "ReadTasks should return "+ + "ErrReadTasksNonPositivePageSize when the request's page size is: "+strconv.Itoa(pageSize)) + } +} + +// failingQueue is a QueueV2 implementation that always returns an error. +type failingQueue struct{} + +func (q failingQueue) EnqueueMessage( + context.Context, + *persistence.InternalEnqueueMessageRequest, +) (*persistence.InternalEnqueueMessageResponse, error) { + return nil, assert.AnError +} + +func (q failingQueue) ReadMessages( + context.Context, + *persistence.InternalReadMessagesRequest, +) (*persistence.InternalReadMessagesResponse, error) { + return nil, assert.AnError +} + +func TestHistoryTaskQueueManager_ReadTasks_ErrReadQueueMessages(t *testing.T) { + t.Parallel() + + m := persistence.NewTaskQueueManager(failingQueue{}, 1) + _, err := m.ReadTasks(context.Background(), &persistence.ReadTasksRequest{ + QueueKey: persistence.QueueKey{ + Category: tasks.CategoryTransfer, + }, + PageSize: 1, + }) + assert.ErrorIs(t, err, assert.AnError, "ReadTasks should propagate errors from ReadMessages") +} + +func TestHistoryTaskQueueManager_ReadTasks_ErrEnqueueMessage(t *testing.T) { + t.Parallel() + + m := persistence.NewTaskQueueManager(failingQueue{}, 1) + _, err := m.EnqueueTask(context.Background(), &persistence.EnqueueTaskRequest{ + QueueKey: persistence.QueueKey{ + Category: tasks.CategoryTransfer, + }, + Task: &tasks.WorkflowTask{}, + }) + assert.ErrorIs(t, err, assert.AnError, "EnqueueTask should propagate errors from EnqueueMessage") +} diff --git a/common/persistence/serialization/blob.go b/common/persistence/serialization/blob.go index bda8ca092a3..b77b2d7183a 100644 --- a/common/persistence/serialization/blob.go +++ b/common/persistence/serialization/blob.go @@ -178,10 +178,17 @@ func proto3Encode(m proto.Message) (commonpb.DataBlob, error) { } func proto3Decode(blob []byte, encoding string, result proto.Message) error { - if e, ok := enumspb.EncodingType_value[encoding]; !ok || enumspb.EncodingType(e) != enumspb.ENCODING_TYPE_PROTO3 { + e, ok := enumspb.EncodingType_value[encoding] + if !ok { return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_PROTO3) } + return Proto3Decode(blob, enumspb.EncodingType(e), result) +} +func Proto3Decode(blob []byte, e enumspb.EncodingType, result proto.Message) error { + if e != enumspb.ENCODING_TYPE_PROTO3 { + return NewUnknownEncodingTypeError(enumspb.EncodingType_name[int32(e)], enumspb.ENCODING_TYPE_PROTO3) + } if err := proto.Unmarshal(blob, result); err != nil { return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err) } diff --git a/common/persistence/tests/cassandra_test.go b/common/persistence/tests/cassandra_test.go index 6fababa6ec9..f886ed97e72 100644 --- a/common/persistence/tests/cassandra_test.go +++ b/common/persistence/tests/cassandra_test.go @@ -232,7 +232,7 @@ func TestCassandraQueuePersistence(t *testing.T) { suite.Run(t, s) } -func TestCassandraQueueV2(t *testing.T) { +func TestCassandraQueueV2Persistence(t *testing.T) { t.Parallel() cluster := persistencetests.NewTestClusterForCassandra(&persistencetests.TestBaseOptions{}, log.NewNoopLogger()) @@ -261,6 +261,15 @@ func TestCassandraQueueV2(t *testing.T) { }) } +func TestCassandraHistoryTaskQueueManager(t *testing.T) { + t.Parallel() + + cluster := persistencetests.NewTestClusterForCassandra(&persistencetests.TestBaseOptions{}, log.NewNoopLogger()) + cluster.SetupTestDatabase() + t.Cleanup(cluster.TearDownTestDatabase) + RunHistoryTaskQueueManagerTestSuite(t, cassandra.NewQueueV2Store(cluster.GetSession())) +} + func testCassandraQueueV2Interface(t *testing.T, cluster *cassandra.TestCluster) { q := cassandra.NewQueueV2Store(cluster.GetSession()) RunQueueV2TestSuite(t, q) diff --git a/common/persistence/tests/history_task_queue_manager_test_suite.go b/common/persistence/tests/history_task_queue_manager_test_suite.go new file mode 100644 index 00000000000..f8557f332e8 --- /dev/null +++ b/common/persistence/tests/history_task_queue_manager_test_suite.go @@ -0,0 +1,156 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" + persistence2 "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/service/history/tasks" +) + +// RunHistoryTaskQueueManagerTestSuite runs all tests for the history task queue manager against a given queue provided by a +// particular database. This test suite should be re-used to test all queue implementations. +func RunHistoryTaskQueueManagerTestSuite(t *testing.T, queue persistence.QueueV2) { + t.Run("TestHistoryTaskQueueManagerHappyPath", func(t *testing.T) { + t.Parallel() + testHistoryTaskQueueManagerHappyPath(t, queue) + }) + t.Run("TestHistoryTaskQueueManagerErrDeserializeTask", func(t *testing.T) { + t.Parallel() + testHistoryTaskQueueManagerErrDeserializeHistoryTask(t, queue) + }) +} + +func testHistoryTaskQueueManagerHappyPath(t *testing.T, queue persistence.QueueV2) { + numHistoryShards := 5 + manager := persistence.NewTaskQueueManager(queue, numHistoryShards) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + t.Cleanup(cancel) + + namespaceID := "test-namespace" + workflowID := "test-workflow-id" + workflowKey := definition.NewWorkflowKey(namespaceID, workflowID, "test-run-id") + shardID := 2 + assert.Equal(t, int32(shardID), common.WorkflowIDToHistoryShard(namespaceID, workflowID, int32(numHistoryShards))) + + category := tasks.CategoryTransfer + queueKey := persistence.QueueKey{ + QueueType: persistence.QueueTypeHistoryNormal, + Category: category, + SourceCluster: "test-source-cluster-" + t.Name(), + } + + for i := 0; i < 2; i++ { + task := &tasks.WorkflowTask{ + WorkflowKey: workflowKey, + TaskID: int64(i + 1), + } + res, err := manager.EnqueueTask(ctx, &persistence.EnqueueTaskRequest{ + QueueKey: queueKey, + Task: task, + }) + require.NoError(t, err) + assert.Equal(t, int64(persistence.FirstQueueMessageID+i), res.Metadata.ID) + } + + var nextPageToken []byte + for i := 0; i < 3; i++ { + readRes, err := manager.ReadTasks(ctx, &persistence.ReadTasksRequest{ + QueueKey: queueKey, + PageSize: 1, + NextPageToken: nextPageToken, + }) + require.NoError(t, err) + + if i < 2 { + require.Len(t, readRes.Tasks, 1) + assert.Equal(t, shardID, tasks.GetShardIDForTask(readRes.Tasks[0], numHistoryShards)) + assert.Equal(t, int64(i+1), readRes.Tasks[0].GetTaskID()) + nextPageToken = readRes.NextPageToken + } else { + assert.Empty(t, readRes.Tasks) + assert.Empty(t, readRes.NextPageToken) + } + } +} + +func testHistoryTaskQueueManagerErrDeserializeHistoryTask(t *testing.T, queue persistence.QueueV2) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + t.Cleanup(cancel) + + t.Run("nil blob", func(t *testing.T) { + t.Parallel() + + err := enqueueAndDeserializeBlob(ctx, t, queue, nil) + assert.ErrorContains(t, err, persistence.ErrHistoryTaskBlobIsNil.Error()) + }) + t.Run("empty blob", func(t *testing.T) { + t.Parallel() + + err := enqueueAndDeserializeBlob(ctx, t, queue, &commonpb.DataBlob{}) + assert.ErrorContains(t, err, persistence.ErrMsgDeserializeHistoryTask) + }) +} + +func enqueueAndDeserializeBlob(ctx context.Context, t *testing.T, queue persistence.QueueV2, blob *commonpb.DataBlob) error { + queueType := persistence.QueueTypeHistoryNormal + queueKey := persistence.QueueKey{ + QueueType: queueType, + Category: tasks.CategoryTransfer, + SourceCluster: "test-source-cluster-" + t.Name(), + } + queueName := queueKey.GetQueueName() + historyTask := persistence2.HistoryTask{ + ShardId: 1, + Blob: blob, + } + historyTaskBytes, _ := historyTask.Marshal() + _, err := queue.EnqueueMessage(ctx, &persistence.InternalEnqueueMessageRequest{ + QueueType: queueType, + QueueName: queueName, + Blob: commonpb.DataBlob{ + EncodingType: enums.ENCODING_TYPE_PROTO3, + Data: historyTaskBytes, + }, + }) + require.NoError(t, err) + + manager := persistence.NewTaskQueueManager(queue, 1) + _, err = manager.ReadTasks(ctx, &persistence.ReadTasksRequest{ + QueueKey: queueKey, + PageSize: 1, + }) + return err +} diff --git a/proto/internal/temporal/server/api/persistence/v1/queues.proto b/proto/internal/temporal/server/api/persistence/v1/queues.proto index 4ca6c8495cf..daa2758b7eb 100644 --- a/proto/internal/temporal/server/api/persistence/v1/queues.proto +++ b/proto/internal/temporal/server/api/persistence/v1/queues.proto @@ -23,6 +23,7 @@ syntax = "proto3"; package temporal.server.api.persistence.v1; option go_package = "go.temporal.io/server/api/persistence/v1;persistence"; +import "temporal/api/common/v1/message.proto"; import "temporal/server/api/persistence/v1/predicates.proto"; import "temporal/server/api/persistence/v1/tasks.proto"; @@ -48,3 +49,15 @@ message QueueSliceRange { message ReadQueueMessagesNextPageToken { int64 last_read_message_id = 1; } + +// HistoryTask represents an internal history service task for a particular shard. We use a blob because there is no +// common proto for all task proto types. +message HistoryTask { + // shard_id that this task belonged to when it was created. Technically, you can derive this from the task data + // blob, but it's useful to have it here for quick access and to avoid deserializing the blob. + int32 shard_id = 1; + // blob that contains the history task proto. There is a GoLang-specific generic deserializer for this blob, but + // there is no common proto for all task proto types, so deserializing in other languages will require a custom + // switch on the task category, which should be available from the metadata for the queue that this task came from. + temporal.api.common.v1.DataBlob blob = 2; +} diff --git a/service/history/tasks/task.go b/service/history/tasks/task.go index d4d63f4bddd..c63f37e9b31 100644 --- a/service/history/tasks/task.go +++ b/service/history/tasks/task.go @@ -30,6 +30,7 @@ import ( "time" enumsspb "go.temporal.io/server/api/enums/v1" + "go.temporal.io/server/common" ) type ( @@ -49,3 +50,9 @@ type ( SetVisibilityTime(timestamp time.Time) } ) + +// GetShardIDForTask computes the shardID for a given task using the task's namespace, workflow ID and the number of +// history shards in the cluster. +func GetShardIDForTask(task Task, numShards int) int { + return int(common.WorkflowIDToHistoryShard(task.GetNamespaceID(), task.GetWorkflowID(), int32(numShards))) +}