/
worker.go
293 lines (238 loc) · 7.8 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package mirror
import (
"context"
"fmt"
"github.com/gabriel-vasile/mimetype"
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/engine"
source "github.com/rss3-network/node/internal/engine/source/arweave"
"github.com/rss3-network/node/internal/engine/worker/contract/mirror/model"
"github.com/rss3-network/node/provider/arweave"
"github.com/rss3-network/node/provider/arweave/contract/mirror"
"github.com/rss3-network/node/provider/ipfs"
workerx "github.com/rss3-network/node/schema/worker"
"github.com/rss3-network/protocol-go/schema"
activityx "github.com/rss3-network/protocol-go/schema/activity"
"github.com/rss3-network/protocol-go/schema/metadata"
"github.com/rss3-network/protocol-go/schema/network"
"github.com/rss3-network/protocol-go/schema/tag"
"github.com/rss3-network/protocol-go/schema/typex"
"github.com/samber/lo"
"github.com/tidwall/gjson"
)
// make sure worker implements engine.Worker
var _ engine.Worker = (*worker)(nil)
type worker struct {
config *config.Module
arweaveClient arweave.Client
ipfsClient ipfs.HTTPClient
databaseClient database.Client
}
func (w *worker) Name() string {
return workerx.Mirror.String()
}
func (w *worker) Platform() string {
return workerx.PlatformMirror.String()
}
func (w *worker) Network() []network.Network {
return []network.Network{
network.Arweave,
}
}
func (w *worker) Tags() []tag.Tag {
return []tag.Tag{
tag.Social,
}
}
func (w *worker) Types() []schema.Type {
return []schema.Type{
typex.SocialPost,
typex.SocialRevise,
}
}
// Filter returns a filter for source.
func (w *worker) Filter() engine.SourceFilter {
return &source.Filter{OwnerAddresses: []string{mirror.AddressMirror}}
}
// Match returns true if the task is an Arweave task.
func (w *worker) Match(_ context.Context, task engine.Task) (bool, error) {
switch task.GetNetwork().Source() {
case network.ArweaveSource:
task := task.(*source.Task)
// Check if the transaction belongs to the mirror contract.
owner, err := arweave.PublicKeyToAddress(task.Transaction.Owner)
if err != nil {
return false, fmt.Errorf("parse transaction owner: %w", err)
}
return owner == mirror.AddressMirror, nil
default:
return false, nil
}
}
// Transform returns an activity with the action of the task.
func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Activity, error) {
// Cast the task to an Arweave task.
arweaveTask, ok := task.(*source.Task)
if !ok {
return nil, fmt.Errorf("invalid task type: %T", task)
}
// Build the activity.
activity, err := task.BuildActivity(activityx.WithActivityPlatform(w.Platform()))
if err != nil {
return nil, fmt.Errorf("build activity: %w", err)
}
// Get actions from the transaction.
actions, err := w.transformMirrorAction(ctx, arweaveTask)
if err != nil {
return nil, fmt.Errorf("handle arweave mirror transaction: %w", err)
}
activity.To = mirror.AddressMirror
// Activity type should be inferred from the action (if it's revise)
if actions != nil {
activity.Type = actions[0].Type
activity.Actions = append(activity.Actions, actions...)
}
return activity, nil
}
// transformPostOrReviseAction Returns the actions of mirror post or revise.
func (w *worker) transformMirrorAction(ctx context.Context, task *source.Task) ([]*activityx.Action, error) {
var (
contentDigest string
originContentDigest string
emptyOriginDigest bool
)
for _, tag := range task.Transaction.Tags {
tagName, err := arweave.Base64Decode(tag.Name)
if err != nil {
return nil, fmt.Errorf("base64 decode tag name: %w", err)
}
tagValue, err := arweave.Base64Decode(tag.Value)
if err != nil {
return nil, err
}
switch string(tagName) {
case "Content-Digest":
contentDigest = string(tagValue)
case "Original-Content-Digest":
originContentDigest = string(tagValue)
if len(string(tagValue)) == 0 {
emptyOriginDigest = true
}
}
}
// Construct content URI from tx id
contentURI := fmt.Sprintf("ar://%s", task.Transaction.ID)
// Get detailed post info from transaction data
transactionData, err := arweave.Base64Decode(task.Transaction.Data)
if err != nil {
return nil, fmt.Errorf("invalid foramt of transaction data: %w", err)
}
mirrorData := gjson.ParseBytes(transactionData)
author := mirrorData.Get("authorship.contributor").String()
var media []metadata.Media
// Get mirror nft as media
address := mirrorData.Get("wnft.imageURI").String()
if address != "" {
file, err := w.ipfsClient.Fetch(ctx, fmt.Sprintf("/ipfs/%s", address), ipfs.FetchModeQuick)
if err != nil {
return nil, fmt.Errorf("fetch ipfs: %w", err)
}
defer lo.Try(file.Close)
// Get nft mimetype
result, err := mimetype.DetectReader(file)
if err != nil {
return nil, fmt.Errorf("detect mimetype: %w", err)
}
if result == nil {
return nil, fmt.Errorf("empty result")
}
media = append(media, metadata.Media{
Address: fmt.Sprintf("ipfs://%s", address),
MimeType: result.String(),
})
}
var publicationID string
if contentDigest == "" {
publicationID = mirrorData.Get("digest").String()
} else {
publicationID = contentDigest
}
if originContentDigest != "" {
publicationID = originContentDigest
}
// Construct mirror Metadata
mirrorMetadata := &metadata.SocialPost{
Title: mirrorData.Get("content.title").String(),
Body: mirrorData.Get("content.body").String(),
ContentURI: contentURI,
PublicationID: publicationID,
Media: media,
Timestamp: mirrorData.Get("content.timestamp").Uint(),
}
// Build the post or revise action
action, err := w.buildMirrorAction(ctx, task.Transaction.ID, author, mirror.AddressMirror, mirrorMetadata, emptyOriginDigest, originContentDigest)
if err != nil {
return nil, fmt.Errorf("build post action: %w", err)
}
// Save Dataset Mirror Post
post := &model.DatasetMirrorPost{
TransactionID: task.Transaction.ID,
OriginContentDigest: originContentDigest,
}
if err := w.databaseClient.SaveDatasetMirrorPost(context.TODO(), post); err != nil {
return nil, fmt.Errorf("save dataset mirror post: %w", err)
}
actions := []*activityx.Action{
action,
}
return actions, nil
}
// buildArweaveTransactionTransferAction Returns the native transfer transaction action.
func (w *worker) buildMirrorAction(ctx context.Context, txID, from, to string, mirrorMetadata *metadata.SocialPost, emptyOriginDigest bool, originContentDigest string) (*activityx.Action, error) {
// Default action type is post.
filterType :=
typex.SocialPost
// if the origin digest is empty, the action type should be revise.
if emptyOriginDigest {
filterType =
typex.SocialRevise
}
// If the origin digest is not empty, check if the origin digest is the first mirror post.
if originContentDigest != "" {
post, err := w.databaseClient.LoadDatasetMirrorPost(ctx, originContentDigest)
if err != nil {
return nil, fmt.Errorf("load dataset mirror post: %w", err)
}
if post != nil && txID != post.TransactionID {
filterType =
typex.SocialRevise
}
}
// Construct action
action := activityx.Action{
Type: filterType,
Tag: tag.Social,
Platform: w.Platform(),
From: from,
To: to,
Metadata: mirrorMetadata,
}
return &action, nil
}
// NewWorker returns a new Arweave worker.
func NewWorker(config *config.Module, databaseClient database.Client) (engine.Worker, error) {
var instance = worker{
config: config,
}
var err error
if instance.arweaveClient, err = arweave.NewClient(); err != nil {
return nil, fmt.Errorf("new arweave client: %w", err)
}
// Initialize ipfs client.
if instance.ipfsClient, err = ipfs.NewHTTPClient(ipfs.WithGateways(config.IPFSGateways)); err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err)
}
instance.databaseClient = databaseClient
return &instance, nil
}