-
Notifications
You must be signed in to change notification settings - Fork 19
/
cluster.go
317 lines (284 loc) · 8.64 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package recurring
import (
"math"
"regexp"
"strings"
"github.com/monetr/monetr/server/internal/calc"
"github.com/monetr/monetr/server/models"
)
const (
Epsilon = 0.98
MinNeighbors = 1
)
var (
dbscanClusterDebug = false
)
var (
clusterCleanStringRegex = regexp.MustCompile(`[a-zA-Z'\.\d]+`)
numberOnly = regexp.MustCompile(`^\d+$`)
specialWeights = map[string]float64{
"amazon": 10,
"pwp": 0,
"debit": 0,
"pos": 0,
"visa": 0,
"ach": 0,
"transaction": 0,
"card": 0,
"check": 0,
"transfer": 0,
"deposit": 0,
}
synonyms = map[string]string{
"amzn": "amazon",
}
)
type Document struct {
ID uint64
TF map[string]float64
TFIDF map[string]float64
Vector []float64
Transaction *models.Transaction
String string
Valid bool
}
type PreProcessor struct {
documents []Document
// Word count
wc map[string]float64
}
func NewPreProcessor() *PreProcessor {
return &PreProcessor{
documents: []Document{},
wc: map[string]float64{},
}
}
func (p *PreProcessor) indexWords() map[string]int {
index := 0
result := make(map[string]int)
for word, count := range p.wc {
if count == 1 {
continue
}
result[word] = index
index++
}
return result
}
func (p *PreProcessor) AddTransaction(txn *models.Transaction) {
words := CleanNameRegex(txn)
name := make([]string, 0, len(words))
wordCounts := make(map[string]float64, len(words))
for _, word := range words {
// If there is a synonym for the current word use that instead.
if synonym, ok := synonyms[word]; ok {
word = synonym
}
if multiplier, ok := specialWeights[word]; ok && multiplier == 0 {
continue
}
wordCounts[word]++
p.wc[word]++
name = append(name, word)
}
tf := make(map[string]float64, len(wordCounts))
for word, count := range wordCounts {
tf[word] = count / float64(len(words))
}
p.documents = append(p.documents, Document{
ID: txn.TransactionId,
String: strings.Join(name, " "),
Transaction: txn,
TF: tf,
TFIDF: map[string]float64{},
})
}
func (p *PreProcessor) GetDatums() []Datum {
datums := make([]Datum, 0, len(p.documents))
docCount := float64(len(p.documents))
idf := make(map[string]float64, len(p.wc))
for word, count := range p.wc {
idf[word] = math.Log(docCount / (count + 1))
}
// Get a map of all the meaningful words and their index to use in the vector
minified := p.indexWords()
// Define the length of the vector and adjust it to be divisible by 8. This will enable us to leverage SIMD in the
// future. By using 8 we are compatible with both AVX and AVX512.
vectorLength := len(minified) + (8 - (len(minified) % 8))
for i := range p.documents {
// Get the current document we are working with
document := p.documents[i]
// Calculate the TFIDF for that document
for word, tfValue := range document.TF {
document.TFIDF[word] = tfValue * idf[word]
// If this specific word is meant to be more meaningful than tfidf might treat it then adjust it accordingly
if multiplier, ok := specialWeights[word]; ok {
document.TFIDF[word] *= multiplier
}
}
// Then create a vector of the words in the document name to use for the DBSCAN clustering
document.Vector = make([]float64, vectorLength)
words := 0
for word, tfidfValue := range document.TFIDF {
index, exists := minified[word]
if !exists {
continue
}
words++
document.Vector[index] = tfidfValue
}
if words == 0 {
document.Valid = false
p.documents[i] = document
continue
}
document.Valid = true
// Normalize the document's tfidf vector.
calc.NormalizeVector64(document.Vector)
p.documents[i] = document
// Then store the document back in
if document.Valid {
datums = append(datums, Datum{
ID: document.ID,
Transaction: document.Transaction,
String: document.String,
Amount: document.Transaction.Amount,
Vector: document.Vector,
})
}
}
return datums
}
type Datum struct {
ID uint64
Transaction *models.Transaction
String string
Amount int64
Vector []float64
}
type Cluster struct {
ID uint64
Items map[int]uint8
}
type DBSCAN struct {
labels map[uint64]bool
dataset []Datum
epsilon float64
minPoints int
clusters []Cluster
}
func NewDBSCAN(dataset []Datum, epsilon float64, minPoints int) *DBSCAN {
return &DBSCAN{
labels: map[uint64]bool{},
dataset: dataset,
epsilon: epsilon,
minPoints: minPoints,
clusters: nil,
}
}
func (d *DBSCAN) GetDatumByIndex(index int) (*Datum, bool) {
if index >= len(d.dataset) || index < 0 {
return nil, false
}
return &d.dataset[index], true
}
func (d *DBSCAN) Calculate() []Cluster {
// Initialize or reinitialize the clusters. We want to start with a clean slate.
d.clusters = make([]Cluster, 0)
// From the top, take one point at a time.
for index, point := range d.dataset {
// If we have already visited this point then skip it
if _, visited := d.labels[point.ID]; visited {
continue
}
// Find all the other points that are within the epsilon of this point.
neighbors := d.getNeighbors(index)
// If there are not enough points then this is not a core point.
if len(neighbors) < d.minPoints {
// Mark it as noise and keep moving
d.labels[point.ID] = true
continue
}
// Otherwise mark the point as visited so we don't do the same work again
d.labels[point.ID] = false
// Then start constructing a cluster around this point.
newCluster := d.expandCluster(index, neighbors)
// Set the cluster's unique ID to the lowest numeric ID in that cluster.
// HACK: I need a way to uniquely identify each cluster. Generally by using the contents of that cluster. This
// relies on the contents of that cluster remaining consistent over time. While the order of the clusters might
// change in the future or they might expand as new transactions show up, I need to know which cluster they get
// added to in order to tune things over time. This has the potential to cause issues on its own, what if the
// cluster algorithm changes enough that the "lowest ID" gets kicked out of the cluster? What if we push a bad
// change and the clusters change entirely? Or what if that "lowest ID" gets moved to a different cluster. This
// needs improvement, but I think this should be fine for the initial implementation of the clustering algorithm.
for i := range newCluster.Items {
item := d.dataset[i]
if item.ID < newCluster.ID || newCluster.ID == 0 {
newCluster.ID = item.ID
}
}
d.clusters = append(d.clusters, newCluster)
}
return d.clusters
}
func (d *DBSCAN) expandCluster(index int, neighbors []int) Cluster {
// Bootstrap a cluster for the current point, this function might be called recursively
cluster := Cluster{
Items: map[int]uint8{},
}
// And add a pointer to the current item into the new cluster.
cluster.Items[index] = 0
for _, neighborIndex := range neighbors {
// Retrieve the item from the dataset.
neighbor := d.dataset[neighborIndex]
// If Q (neighbor) is not visited then mark it as visited and check for more neighbors.
if _, visited := d.labels[neighbor.ID]; !visited {
// Mark Q as visited but not as noise.
d.labels[neighbor.ID] = false
// Find more nearby neighbors.
newNeighbors := d.getNeighbors(neighborIndex)
// If we have enough neighbors then we can expand the cluster even more.
if len(newNeighbors) >= d.minPoints {
// Merge new neighbors with neighbors
// Recursively descend and then add the data we get into the one we currently have.
recurResult := d.expandCluster(neighborIndex, newNeighbors)
// Just add the recurred items into this cluster.
for k, v := range recurResult.Items {
cluster.Items[k] = v
}
}
}
// If Q (neighbor) is not yet part of any cluster
var found bool
for _, cluster := range d.clusters {
_, ok := cluster.Items[neighborIndex]
if ok {
found = true
break
}
}
// Then add it to this cluster.
if !found {
cluster.Items[neighborIndex] = 0
}
}
return cluster
}
func (d *DBSCAN) getNeighbors(index int) []int {
// Pre-allocate an array of neighbors for us to work with.
neighbors := make([]int, 0, len(d.dataset)/2)
point := d.dataset[index]
for i, counterpoint := range d.dataset {
// Don't calculate against yourself
if i == index {
continue
}
// Calculate the distance from our Q point to our P point.
distance := calc.EuclideanDistance64(point.Vector, counterpoint.Vector)
// If we are close enough then we could be part of a core cluster point. Add it to the list.
if distance <= d.epsilon {
neighbors = append(neighbors, i)
}
}
return neighbors
}