/
exportCSV.go
193 lines (163 loc) · 4.64 KB
/
exportCSV.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package preprocessor
import (
"bufio"
"context"
"encoding/csv"
"fmt"
"math"
"os"
"path/filepath"
"strings"
"time"
"github.com/gocarina/gocsv"
"github.com/negapedia/wikibrief"
"github.com/pkg/errors"
)
func (p preprocessor) exportCSV(ctx context.Context, fail func(error) error, articles <-chan wikibrief.EvolvingPage) {
csvArticleRevisionChan := make(chan interface{}, 10000)
//pages: topics and articles
csvPageChan := make(chan interface{}, 10000)
//social jumps input
articleMultiEdgeChan := make(chan multiEdge, 10000)
//social jumps output
articleSocialJumpsChan := p.bi2Similgraph(ctx, articleMultiEdgeChan)
go func() {
defer close(csvArticleRevisionChan)
defer close(csvPageChan)
defer close(articleMultiEdgeChan)
for _, t := range p.Topics { //dump topics
select {
case csvPageChan <- &csvPage{ID: t.ID, Title: t.Title}:
//proceed
case <-ctx.Done():
return
}
}
for a := range articles {
users2weight := make(map[uint32]float64, len(a.Revisions))
serialRevisionID := uint32(0)
oldWeight := float64(0)
for r := range a.Revisions {
serialRevisionID++
//User data
var userID *uint32
if uID := r.UserID; uID != wikibrief.AnonimousUserID {
userID = &uID
}
//Revision metric data
weight := float64(len(r.Text))
diff := weight - oldWeight
oldWeight = weight
//Export to csv
csvArticleRevisionChan <- &csvRevision{a.PageID, serialRevisionID, userID, r.IsBot, weight, diff, r.IsRevert, false, r.Timestamp.Format(time.RFC3339Nano)}
//Convert data for socialjumps
if r.IsBot || r.UserID == wikibrief.AnonimousUserID {
continue //do not use for social jumps calculations
}
userWeight := users2weight[r.UserID]
switch {
case r.IsRevert > 0:
users2weight[r.UserID] = math.Max(userWeight, 1.0)
case diff <= 100.0: //&& isPositive
users2weight[r.UserID] = math.Max(userWeight, 10.0)
case userWeight <= 10:
userWeight = 0 //Resetting weight for different scheme.
fallthrough
default:
users2weight[r.UserID] = math.Min(userWeight+diff/10, 100)
}
}
csvPageChan := csvPageChan
articleMultiEdgeChan := articleMultiEdgeChan
for i := 0; i < 2; i++ {
select {
case csvPageChan <- &csvPage{a.PageID, a.Title, a.Abstract, a.TopicID}:
csvPageChan = nil
case articleMultiEdgeChan <- multiEdge{a.PageID, users2weight}:
articleMultiEdgeChan = nil
case <-ctx.Done():
return
}
}
}
}()
doneArticleRevisionWriting := make(chan interface{})
go func() {
defer close(doneArticleRevisionWriting)
if err := chan2csv(csvArticleRevisionChan, filepath.Join(p.CSVDir, "revisions.csv")); err != nil {
p.Fail(err)
}
}()
if err := chan2csv(csvPageChan, filepath.Join(p.CSVDir, "pages.csv")); err != nil {
fail(err)
return
}
//pages social jumps
csvSocialJumpsChan := make(chan interface{}, 1000)
go func() {
defer close(csvSocialJumpsChan)
for sj := range articleSocialJumpsChan {
select {
case csvSocialJumpsChan <- &csvSocialJumps{sj.From, uint32s(sj.To)}:
//proceed
case <-ctx.Done():
return
}
}
}()
if err := chan2csv(csvSocialJumpsChan, filepath.Join(p.CSVDir, "socialjumps.csv")); err != nil {
fail(err)
return
}
<-doneArticleRevisionWriting
return
}
func chan2csv(c <-chan interface{}, filePath string) (err error) {
var csvFile *os.File
if csvFile, err = os.Create(filePath); err != nil {
err = errors.Wrapf(err, "Error while creating file at %v", filePath)
return
}
defer func() {
if e := csvFile.Close(); e != nil && err == nil {
err = errors.Wrapf(e, "Error while closing file %v", filePath)
}
}()
bw := bufio.NewWriter(csvFile)
defer bw.Flush()
csvw := csv.NewWriter(bw)
defer csvw.Flush()
if err = gocsv.MarshalChan(c, gocsv.NewSafeCSVWriter(csvw)); err != nil {
err = errors.Wrapf(err, "Error while marshaling to file %v", filePath)
}
return err
}
type csvRevision struct {
PageID uint32 `csv:"pageid"`
ID uint32 `csv:"ID"`
UserID *uint32 `csv:"userid"`
IsBot bool `csv:"isbot"`
Weight float64 `csv:"weight"`
Diff float64 `csv:"diff"`
IsRevert uint32 `csv:"isrevert"`
IsReverted bool `csv:"isreverted"`
Timestamp string `csv:"timestamp"`
}
type csvPage struct {
ID uint32 `csv:"id"`
Title string `csv:"title"`
Abstract string `csv:"abstract"`
TopicID uint32 `csv:"topicid"`
}
type csvSocialJumps struct {
ID uint32 `csv:"id"`
SocialJumps uint32s `csv:"socialjumps"`
}
type uint32s []uint32
func (s uint32s) String() string {
pps := make([]string, len(s))
for i, p := range s {
pps[i] = fmt.Sprint(p)
}
return "{" + strings.Join(pps, ", ") + "}"
}