forked from seaweedfs/seaweedfs
/
volume_grpc_copy.go
263 lines (221 loc) · 8.01 KB
/
volume_grpc_copy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package weed_server
import (
"context"
"fmt"
"io"
"math"
"os"
"path"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
const BufferSizeLimit = 1024 * 1024 * 2
// VolumeCopy copy the .idx .dat files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v != nil {
return nil, fmt.Errorf("volume %d already exists", req.VolumeId)
}
location := vs.store.FindFreeLocation()
if location == nil {
return nil, fmt.Errorf("no space left")
}
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
// copy .dat and .idx files
// read .idx .dat file size and timestamp
// send .idx file
// send .dat file
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
var volumeFileName, idxFileName, datFileName string
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
&volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
if nil != err {
return fmt.Errorf("read volume file status failed, %v", err)
}
volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
// println("source:", volFileInfoResp.String())
// copy ecx file
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
return err
}
if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
return err
}
return nil
})
idxFileName = volumeFileName + ".idx"
datFileName = volumeFileName + ".dat"
if err != nil && volumeFileName != "" {
if idxFileName != "" {
os.Remove(idxFileName)
}
if datFileName != "" {
os.Remove(datFileName)
}
return nil, err
}
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
return nil, err
}
// mount the volume
err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
}
return &volume_server_pb.VolumeCopyResponse{
LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
}, err
}
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
VolumeId: vid,
Ext: ext,
CompactionRevision: compactRevision,
StopOffset: stopOffset,
Collection: collection,
IsEcVolume: isEcVolume,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
if err != nil {
return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
return nil
}
/**
only check the the differ of the file size
todo: maybe should check the received count and deleted count of the volume
*/
func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
stat, err := os.Stat(idxFileName)
if err != nil {
return fmt.Errorf("stat idx file %s failed, %v", idxFileName, err)
}
if originFileInf.IdxFileSize != uint64(stat.Size()) {
return fmt.Errorf("idx file %s size [%v] is not same as origin file size [%v]",
idxFileName, stat.Size(), originFileInf.IdxFileSize)
}
stat, err = os.Stat(datFileName)
if err != nil {
return fmt.Errorf("get dat file info failed, %v", err)
}
if originFileInf.DatFileSize != uint64(stat.Size()) {
return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]",
stat.Size(), originFileInf.DatFileSize)
}
return nil
}
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
glog.V(4).Infof("writing to %s", fileName)
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if isAppend {
flags = os.O_WRONLY | os.O_CREATE
}
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
return nil
}
defer dst.Close()
for {
resp, receiveErr := client.Recv()
if receiveErr == io.EOF {
break
}
if receiveErr != nil {
return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
wt.MaybeSlowdown(int64(len(resp.FileContent)))
}
return nil
}
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
resp.VolumeId = req.VolumeId
datSize, idxSize, modTime := v.FileStat()
resp.DatFileSize = datSize
resp.IdxFileSize = idxSize
resp.DatFileTimestampSeconds = uint64(modTime.Unix())
resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
resp.FileCount = v.FileCount()
resp.CompactionRevision = uint32(v.CompactionRevision)
resp.Collection = v.Collection
return resp, nil
}
// CopyFile client pulls the volume related file from the source server.
// if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected
// The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data.
func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
var fileName string
if !req.IsEcVolume {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
return fmt.Errorf("volume %d is compacted", req.VolumeId)
}
fileName = v.FileName() + req.Ext
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
for _, location := range vs.store.Locations {
tName := path.Join(location.Directory, baseFileName)
if util.FileExists(tName) {
fileName = tName
}
}
if fileName == "" {
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
}
}
bytesToRead := int64(req.StopOffset)
file, err := os.Open(fileName)
if err != nil {
return err
}
defer file.Close()
buffer := make([]byte, BufferSizeLimit)
for bytesToRead > 0 {
bytesread, err := file.Read(buffer)
// println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
if err != nil {
if err != io.EOF {
return err
}
// println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
break
}
if int64(bytesread) > bytesToRead {
bytesread = int(bytesToRead)
}
err = stream.Send(&volume_server_pb.CopyFileResponse{
FileContent: buffer[:bytesread],
})
if err != nil {
// println("sending", bytesread, "bytes err", err.Error())
return err
}
bytesToRead -= int64(bytesread)
}
return nil
}
func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) {
}