/
local_activity.go
110 lines (91 loc) · 2.58 KB
/
local_activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package aggregatedpool
import (
"context"
"sync"
"sync/atomic"
"github.com/google/uuid"
"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v3/payload"
"github.com/temporalio/roadrunner-temporal/v3/common"
"github.com/temporalio/roadrunner-temporal/v3/internal"
commonpb "go.temporal.io/api/common/v1"
tActivity "go.temporal.io/sdk/activity"
"go.temporal.io/sdk/temporal"
"go.uber.org/zap"
)
type LocalActivityFn struct {
header *commonpb.Header
codec common.Codec
pool common.Pool
log *zap.Logger
seqID uint64
}
func NewLocalActivityFn(header *commonpb.Header, codec common.Codec, pool common.Pool, log *zap.Logger) *LocalActivityFn {
return &LocalActivityFn{
header: header,
codec: codec,
pool: pool,
log: log,
}
}
func (la *LocalActivityFn) execute(ctx context.Context, args *commonpb.Payloads) (*commonpb.Payloads, error) {
const op = errors.Op("activity_pool_execute_activity")
var info = tActivity.GetInfo(ctx)
info.TaskToken = []byte(uuid.NewString())
mh := tActivity.GetMetricsHandler(ctx)
// if the mh is not nil, record the RR metric
if mh != nil {
mh.Gauge(RrMetricName).Update(float64(la.pool.QueueSize()))
defer mh.Gauge(RrMetricName).Update(float64(la.pool.QueueSize()))
}
var msg = &internal.Message{
ID: atomic.AddUint64(&la.seqID, 1),
Command: internal.InvokeLocalActivity{
Name: info.ActivityType.Name,
Info: info,
},
Payloads: args,
Header: la.header,
}
la.log.Debug("executing local activity fn", zap.Uint64("ID", msg.ID), zap.String("task-queue", info.TaskQueue), zap.String("la ID", info.ActivityID))
pld := getPld()
defer putPld(pld)
err := la.codec.Encode(&internal.Context{TaskQueue: info.TaskQueue}, pld, msg)
if err != nil {
return nil, err
}
result, err := la.pool.Exec(ctx, pld)
if err != nil {
return nil, errors.E(op, err)
}
out := make([]*internal.Message, 0, 2)
err = la.codec.Decode(result, &out)
if err != nil {
return nil, err
}
if len(out) != 1 {
return nil, errors.E(op, errors.Str("invalid local activity worker response"))
}
retPld := out[0]
if retPld.Failure != nil {
if retPld.Failure.Message == doNotCompleteOnReturn {
return nil, tActivity.ErrResultPending
}
return nil, temporal.GetDefaultFailureConverter().FailureToError(retPld.Failure)
}
return retPld.Payloads, nil
}
var pldP = sync.Pool{ //nolint:gochecknoglobals
New: func() any {
return &payload.Payload{}
},
}
func getPld() *payload.Payload {
return pldP.Get().(*payload.Payload)
}
func putPld(pld *payload.Payload) {
pld.Codec = 0
pld.Context = nil
pld.Body = nil
pldP.Put(pld)
}