-
Notifications
You must be signed in to change notification settings - Fork 1
/
plugin.go
113 lines (98 loc) · 2.76 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package feeditemgroup
import (
"context"
"encoding/json"
"github.com/hibiken/asynq"
"github.com/rumorsflow/mongo-ext"
"github.com/rumorsflow/rumors/internal/consts"
"github.com/rumorsflow/rumors/internal/models"
rumorscast "github.com/rumorsflow/rumors/internal/pkg/cast"
"github.com/rumorsflow/rumors/internal/storage"
"github.com/spf13/cast"
"go.uber.org/zap"
"net/url"
"strings"
)
const PluginName = consts.TaskFeedItemGroup
type Plugin struct {
log *zap.Logger
client *asynq.Client
roomStorage storage.RoomStorage
}
func (p *Plugin) Init(log *zap.Logger, client *asynq.Client, roomStorage storage.RoomStorage) error {
p.log = log
p.client = client
p.roomStorage = roomStorage
return nil
}
// Name returns user-friendly plugin name
func (p *Plugin) Name() string {
return PluginName
}
func (p *Plugin) ProcessTask(ctx context.Context, task *asynq.Task) error {
var items []models.FeedItem
if err := json.Unmarshal(task.Payload(), &items); err != nil {
p.log.Error("error due to unmarshal task payload", zap.Error(err))
return nil
}
if len(items) == 0 {
p.log.Debug("items group are empty")
return nil
}
var b strings.Builder
b.WriteString(items[0].FeedId)
group := map[string][]models.FeedItem{
items[0].FeedId: {items[0]},
}
for _, item := range items[1:] {
if _, ok := group[item.FeedId]; ok {
group[item.FeedId] = append(group[item.FeedId], item)
} else {
group[item.FeedId] = []models.FeedItem{item}
b.WriteString(",")
b.WriteString(item.FeedId)
}
}
size := 20
query := make(url.Values)
query.Set(mongoext.QuerySize, cast.ToString(size))
query.Set("f[0][0][field]", "broadcast")
query.Set("f[0][0][condition]", "in")
query.Set("f[0][0][value]", b.String())
query.Set("f[1][0][field]", "deleted")
query.Set("f[1][0][value]", "false")
for index := 0; ; index += size {
query.Set(mongoext.QueryIndex, cast.ToString(index))
rooms, err := p.roomStorage.Find(ctx, mongoext.C(query, "f"))
if err != nil {
p.log.Error("error due to find rooms", zap.Error(err))
return nil
}
for _, room := range rooms {
var roomItems []models.FeedItem
for _, id := range *room.Broadcast {
if data, ok := group[id]; ok {
roomItems = append(roomItems, data...)
}
}
if len(roomItems) > 0 {
payload, _ := json.Marshal(roomItems)
payload = append(payload, rumorscast.Int64ToBytes(room.Id)...)
t := asynq.NewTask(consts.TaskRoomBroadcast, payload)
q := asynq.Queue(consts.QueueFeedItems)
if _, err = p.client.EnqueueContext(ctx, t, q); err != nil {
p.log.Error(
"error due to enqueue room broadcast",
zap.Error(err),
zap.String("task", t.Type()),
zap.ByteString("payload", payload),
)
}
}
}
if len(rooms) < size {
break
}
}
return nil
}