forked from chrislusf/gleam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sort_benchmark.go
93 lines (73 loc) · 2.16 KB
/
sort_benchmark.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"flag"
"github.com/chrislusf/gleam/distributed"
"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/plugins/file"
)
var (
size = flag.Int("size", 0, "0 for small, 1 for 1GB, 2 for 10GB")
isDistributed = flag.Bool("distributed", false, "distributed mode or not")
isInMemory = flag.Bool("inMemory", true, "distributed mode but only through memory")
isProfiling = flag.Bool("isProfiling", false, "profiling the flow")
splitter = gio.RegisterMapper(splitLine)
)
func main() {
flag.Parse()
gio.Init()
bigFile := *size
fileName := "/Users/chris/Desktop/record_10K_input.txt"
partition := 2
size := int64(10)
if bigFile == 1 {
fileName = "/Users/chris/Desktop/record_1GB_input.txt"
partition = 4
size = 1024
}
if bigFile == 2 {
fileName = "/Users/chris/Desktop/record_10GB_input.txt"
partition = 40
size = 10240
}
gleamSortDistributed(fileName, size, partition, *isDistributed, *isInMemory)
}
func linuxSortDistributed(fileName string, partition int) {
flow.New("linuxSort").Read(file.Txt(fileName, partition)).
Map("split", splitter).
Pipe("linuxSort", `sort -k 1`).
MergeSortedTo("merge", 1).
Printlnf("%s %s").
Run(distributed.Option())
}
func linuxSortStandalone(fileName string, partition int) {
flow.New("linuxSort").Read(file.Txt(fileName, partition)).
Map("split", splitter).
Pipe("linuxSort", `sort -k 1`).
MergeSortedTo("merge", 1).
Printlnf("%s %s").
Run()
}
func gleamSortDistributed(fileName string, size int64, partition int, isDistributed, isInMemory bool) {
f := flow.New("gleamSort").Read(file.Txt(fileName, partition)).
Hint(flow.TotalSize(size)).
Map("split", splitter)
if isInMemory {
f = f.PartitionByKey("partition", partition).SortByKey("sort")
} else {
f = f.OnDisk(func(d *flow.Dataset) *flow.Dataset {
return d.PartitionByKey("partition", partition).SortByKey("sort")
})
}
f = f.Printlnf("%s %s")
if isDistributed {
f.Run(distributed.Option().SetProfiling(*isProfiling))
} else {
f.Run()
}
}
func splitLine(row []interface{}) error {
line := row[0].(string)
gio.Emit(line[0:10], line[12:])
return nil
}