/
transfer.go
130 lines (121 loc) · 3.09 KB
/
transfer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package infra
import (
"github.com/google/uuid"
"github.com/zengzhuozhen/dataflow/core"
"github.com/zengzhuozhen/dataflow/infra/model"
"time"
)
func ToWindow(window *model.Window) core.Windows {
switch window.Type {
case core.WindowTypeFixedWindow:
t := time.Duration(window.Size) * time.Second
return core.NewFixedWindows(t)
case core.WindowTypeSlideWindow:
t := time.Duration(window.Size) * time.Second
t2 := time.Duration(window.Period) * time.Second
return core.NewSlideWindow(t, t2)
case core.WindowTypeSessionWindow:
t := time.Duration(window.Gap) * time.Second
return core.NewSessionWindow(t)
default:
return core.NewDefaultGlobalWindow()
}
}
func ToTrigger(trigger *model.Trigger) core.Trigger {
switch trigger.Type {
case core.TriggerTypeCounterTrigger:
return core.NewCounterTrigger(int(trigger.Count))
case core.TriggerTypeTimerTrigger:
t := time.Duration(trigger.Period) * time.Second
return core.NewTimerTrigger(t)
default:
return nil
}
}
func ToEvictor(evictor *model.Evictor) core.Evictor {
switch evictor.Type {
case core.EvictorTypeAccumulate:
return core.AccumulateEvictor{}
case core.EvictorTypeRecalculate:
return core.RecalculateEvictor{}
default:
return nil
}
}
func ToOperator(operator *model.Operator) core.Operator {
switch operator.Type {
case core.OperatorTypeSum:
return core.SumOperator{}
default:
return nil
}
}
func ToWindowModel(windows core.Windows) *model.Window {
var (
size, period, gap time.Duration
windowType core.WindowType
)
switch t := windows.(type) {
case *core.FixedWindow:
size = t.GetParams()
windowType = core.WindowTypeFixedWindow
case *core.SlideWindow:
size, period = t.GetParams()
windowType = core.WindowTypeSlideWindow
case *core.SessionWindow:
gap = t.GetParams()
windowType = core.WindowTypeSessionWindow
}
return &model.Window{
Id: uuid.New().String(),
Type: windowType,
Size: int32(size / time.Second),
Period: int32(period / time.Second),
Gap: int32(gap / time.Second),
}
}
func ToTriggerModel(trigger core.Trigger) *model.Trigger {
var (
count int
period time.Duration
triggerType int32
)
switch t := trigger.(type) {
case core.CounterTrigger:
count = t.GetParams()
triggerType = core.TriggerTypeCounterTrigger
case core.TimeTrigger:
period = t.GetParams()
triggerType = core.TriggerTypeTimerTrigger
}
return &model.Trigger{
Id: uuid.New().String(),
Type: triggerType,
Count: int32(count),
Period: int32(period / time.Second),
}
}
func ToOperatorModel(operator core.Operator) *model.Operator {
switch operator.(type) {
case core.SumOperator:
return &model.Operator{
Id: uuid.New().String(),
Type: core.OperatorTypeSum,
DataType: operator.GetDataType(),
}
}
return nil
}
func ToEvictorModel(evictor core.Evictor) *model.Evictor {
var evictorType int32
switch evictor.(type) {
case core.AccumulateEvictor:
evictorType = core.EvictorTypeAccumulate
case core.RecalculateEvictor:
evictorType = core.EvictorTypeRecalculate
}
return &model.Evictor{
Id: uuid.New().String(),
Type: evictorType,
}
}