/
cpc.go
169 lines (151 loc) · 4.16 KB
/
cpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package cpc provides a copy function optimized for files like SQLite that
// are only written at 4K page granularity. It writes the dest file in-place
// and tries to not write a page that's identical.
package cpc
import (
"bytes"
"context"
"fmt"
"os"
"runtime"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
)
// Stats contains the stats of a copy operation.
type Stats struct {
Duration time.Duration
PageSize int64
PagesWritten int64
PagesUnmodified int64
}
const pgSize = 4 << 10
// Page is a 4K page of a file.
type Page struct {
Off int64 // always 4K aligned
Len int // usually 4K, except at the tail
}
// Logf is a logger that takes a format string and arguments.
type Logf func(format string, args ...interface{})
// Copy provides a concurrent blockwise copy of srcName to dstName.
func Copy(ctx context.Context, logf Logf, srcName, dstName string) (*Stats, error) {
numCPU := runtime.NumCPU()
t0 := time.Now()
srcF, err := os.Open(srcName)
if err != nil {
return nil, err
}
fi, err := srcF.Stat()
if err != nil {
return nil, err
}
if !fi.Mode().IsRegular() {
return nil, fmt.Errorf("only copies regular files; src %v is %v", srcName, fi.Mode())
}
size := fi.Size()
dstF, err := os.OpenFile(dstName, os.O_CREATE|os.O_RDWR, fi.Mode().Perm())
if err != nil {
return nil, err
}
if err := dstF.Truncate(size); err != nil {
return nil, err
}
pages := 0
workc := make(chan Page, size/pgSize+1)
remainSize := size
off := int64(0)
for remainSize > 0 {
chunkSize := remainSize
if chunkSize > pgSize {
chunkSize = pgSize
}
p := Page{Off: off, Len: int(chunkSize)}
remainSize -= chunkSize
off += chunkSize
pages++
workc <- p
}
close(workc)
logf("file %v is %v bytes, %v pages", srcName, size, pages)
logf("over %v CPUs, %v pages per CPU", numCPU, pages/numCPU)
var pagesUnmodified atomicInt64
var pagesWritten atomicInt64
var pagesTotal atomicInt64
copyPage := func(p Page, bufSrc, bufDst []byte) error {
bufSrc = bufSrc[:p.Len]
bufDst = bufDst[:p.Len]
// Note: ReadAt doesn't do short reads like io.Reader. Also, these two
// ReadAt calls could be in theory be concurrent but we're already
// running NumCPUs goroutines, so it wouldn't really help.
if _, err := srcF.ReadAt(bufSrc, p.Off); err != nil {
return err
}
if _, err := dstF.ReadAt(bufDst, p.Off); err != nil {
return err
}
if bytes.Equal(bufSrc, bufDst) {
pagesUnmodified.Add(1)
return nil
}
if _, err := dstF.WriteAt(bufSrc, p.Off); err != nil {
return err
}
pagesWritten.Add(1)
return nil
}
var lastPrint atomic.Value // of time.Time
printProgress := func() {
logf("%0.2f%% done; %v pages written, %v unchanged",
float64(pagesTotal.Load())*100/float64(pages),
pagesWritten.Load(),
pagesUnmodified.Load())
}
grp, ctx := errgroup.WithContext(ctx)
for i := 0; i < numCPU; i++ {
grp.Go(func() error {
bufSrc := make([]byte, pgSize)
bufDst := make([]byte, pgSize)
for {
select {
case <-ctx.Done():
return ctx.Err()
case p, ok := <-workc:
if !ok {
return nil
}
if err := copyPage(p, bufSrc, bufDst); err != nil {
return err
}
done := pagesTotal.Add(1)
lastPrintTime, _ := lastPrint.Load().(time.Time)
if done%100 == 0 && time.Since(lastPrintTime) > time.Second {
printProgress()
lastPrint.Store(time.Now())
}
}
}
})
}
if err := grp.Wait(); err != nil {
return nil, err
}
printProgress()
d := time.Since(t0)
logf("Done in %v", d.Round(time.Millisecond))
if pagesWritten.Load()+pagesUnmodified.Load() != int64(pages) {
return nil, fmt.Errorf("not consistent; expected %v pages total", pages)
}
return &Stats{
Duration: d,
PageSize: pgSize,
PagesWritten: pagesWritten.Load(),
PagesUnmodified: pagesUnmodified.Load(),
}, nil
}
// atomicInt64 is sync/atomic.Int64, but this package is targeting
// pretty ancient Go.
type atomicInt64 int64
func (x *atomicInt64) Load() int64 { return atomic.LoadInt64((*int64)(x)) }
func (x *atomicInt64) Add(v int64) int64 { return atomic.AddInt64((*int64)(x), v) }