/
main.go
144 lines (96 loc) · 3.18 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// es-sfomuseum-index bulk indexes one or more whosonfirst/go-whosonfirst-iterate/v2 sources in an Elasticsearch database.
package main
import (
_ "github.com/whosonfirst/go-whosonfirst-iterate-git/v2"
)
import (
"context"
"encoding/json"
"fmt"
"github.com/sfomuseum/go-flags/flagset"
"github.com/sfomuseum/go-sfomuseum-instagram/media"
"github.com/sfomuseum/go-whosonfirst-elasticsearch/index"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"log"
"strings"
"time"
)
func main() {
ctx := context.Background()
fs, err := index.NewBulkIndexerFlagSet(ctx)
if err != nil {
log.Fatalf("Failed to create new flagset, %v", err)
}
flagset.Parse(fs)
opts, err := index.RunBulkIndexerOptionsFromFlagSet(ctx, fs)
if err != nil {
log.Fatalf("Failed to create options, %v", err)
}
// START OF sfom stuff - this should eventually be moved in to
// a github.com/sfomuseum/go-elasticsearch-sfomuseum/document
// package
sfom_f := func(ctx context.Context, body []byte) ([]byte, error) {
im_rsp := gjson.GetBytes(body, "millsfield:images")
if im_rsp.Exists() {
count := len(im_rsp.Array())
body, err = sjson.SetBytes(body, "millsfield:count_images", count)
if err != nil {
return nil, fmt.Errorf("Failed to assign millsfield:count_images, %w", err)
}
}
texts_rsp := gjson.GetBytes(body, "millsfield:images_texts")
if texts_rsp.Exists() {
texts_array := texts_rsp.Array()
texts_count := len(texts_array)
if texts_count > 0 {
texts := make([]string, texts_count)
for idx, t := range texts_array {
texts[idx] = t.String()
}
body, err = sjson.SetBytes(body, "millsfield:images_texts", texts)
if err != nil {
return nil, fmt.Errorf("Failed to assign millsfield:images_texts, %w", err)
}
}
}
// Instagram stuff
// tl;dr is "convert IG's goofy datetime strings in RFC3339 so that Elasticsearch isn't sad"
// See also: sfomuseum/go-sfomuseum-instagram and sfomuseum/go-sfomuseum-instagram-publish
ig_rsp := gjson.GetBytes(body, "instagram:post")
if ig_rsp.Exists() {
taken_rsp := gjson.GetBytes(body, "instagram:post.taken_at")
t, err := time.Parse(media.TIME_FORMAT, taken_rsp.String())
if err != nil {
return nil, fmt.Errorf("Failed to parse '%s', %w", taken_rsp.String(), err)
}
body, err = sjson.SetBytes(body, "instagram:post.taken_at", t.Format(time.RFC3339))
if err != nil {
return nil, err
}
tags_rsp := gjson.GetBytes(body, "instagram:post.caption.hashtags")
if tags_rsp.Exists() {
hashtags := make([]string, 0)
for _, t := range tags_rsp.Array() {
hashtags = append(hashtags, strings.ToLower(t.String()))
}
body, err = sjson.SetBytes(body, "instagram:post.caption.hashtags", hashtags)
if err != nil {
return nil, fmt.Errorf("Failed to update IG hash tags, %w", err)
}
}
}
return body, nil
}
opts.PrepareFuncs = append(opts.PrepareFuncs, sfom_f)
// END OF sfom stuff
stats, err := index.RunBulkIndexer(ctx, opts)
if err != nil {
log.Fatalf("Failed to run bulk tool, %v", err)
}
enc_stats, err := json.Marshal(stats)
if err != nil {
log.Fatalf("Failed to marshal stats, %v", err)
}
fmt.Println(string(enc_stats))
}