/
main.go
75 lines (63 loc) · 1.63 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
f, err := os.Create(outputFile)
if err != nil {
log.Fatalf("os.Open() failed, %v", err)
}
defer f.Close()
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion(awsRegion),
)
if err != nil {
log.Fatalf("config.LoadDefaultConfig() failed, %v", err)
}
client := dynamodb.NewFromConfig(cfg)
var gc uint64
wg := &sync.WaitGroup{}
for i := 0; i < threadCount; i++ {
wg.Add(1)
go func(input dynamodb.ScanInput, segment int) {
defer wg.Done()
input.Segment = aws.Int32(int32(segment))
scanWorker(ctx, client, input, f, &gc)
}(scanInput, i)
}
wg.Wait()
}
func scanWorker(ctx context.Context, client *dynamodb.Client, input dynamodb.ScanInput, f io.Writer, gc *uint64) {
scanner := dynamodb.NewScanPaginator(client, &input)
for scanner.HasMorePages() {
resp, err := scanner.NextPage(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Printf("scanner.NextPage() failed, %v", err)
continue
}
atomic.AddUint64(gc, uint64(resp.ScannedCount))
rows := extractAttributes(resp.Items)
for _, row := range rows {
fmt.Fprintf(f, "%s\n", strings.Join(row, ","))
}
fmt.Printf("\033[2K\rTotalScanned=%d; Segment=%d; ScannedCount=%d; Count=%d; Rows=%d ",
atomic.LoadUint64(gc), *input.Segment, resp.ScannedCount, resp.Count, len(rows))
}
}