/
read_write.go
540 lines (492 loc) · 13.6 KB
/
read_write.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
package vfs
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
"sync"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/accounting"
"github.com/ncw/rclone/fs/log"
"github.com/ncw/rclone/fs/operations"
"github.com/ncw/rclone/lib/file"
"github.com/pkg/errors"
)
// RWFileHandle is a handle that can be open for read and write.
//
// It will be open to a temporary file which, when closed, will be
// transferred to the remote.
type RWFileHandle struct {
*os.File
mu sync.Mutex
closed bool // set if handle has been closed
remote string
file *File
d *Dir
opened bool
flags int // open flags
osPath string // path to the file in the cache
writeCalled bool // if any Write() methods have been called
changed bool // file contents was changed in any other way
}
// Check interfaces
var (
_ io.Reader = (*RWFileHandle)(nil)
_ io.ReaderAt = (*RWFileHandle)(nil)
_ io.Writer = (*RWFileHandle)(nil)
_ io.WriterAt = (*RWFileHandle)(nil)
_ io.Seeker = (*RWFileHandle)(nil)
_ io.Closer = (*RWFileHandle)(nil)
)
func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandle, err error) {
// if O_CREATE and O_EXCL are set and if path already exists, then return EEXIST
if flags&(os.O_CREATE|os.O_EXCL) == os.O_CREATE|os.O_EXCL && f.exists() {
return nil, EEXIST
}
fh = &RWFileHandle{
file: f,
d: d,
remote: remote,
flags: flags,
}
// mark the file as open in the cache - must be done before the mkdir
fh.d.vfs.cache.open(fh.remote)
// Make a place for the file
fh.osPath, err = d.vfs.cache.mkdir(remote)
if err != nil {
fh.d.vfs.cache.close(fh.remote)
return nil, errors.Wrap(err, "open RW handle failed to make cache directory")
}
rdwrMode := fh.flags & accessModeMask
if rdwrMode != os.O_RDONLY {
fh.file.addWriter(fh)
}
// truncate or create files immediately to prepare the cache
if fh.flags&os.O_TRUNC != 0 || fh.flags&(os.O_CREATE) != 0 && !f.exists() {
if err := fh.openPending(false); err != nil {
fh.file.delWriter(fh, false)
return nil, err
}
}
return fh, nil
}
// copy an object to or from the remote while accounting for it
func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
if operations.NeedTransfer(context.TODO(), dst, src) {
accounting.Stats.Transferring(src.Remote())
newDst, err = operations.Copy(context.TODO(), f, dst, remote, src)
accounting.Stats.DoneTransferring(src.Remote(), err == nil)
} else {
newDst = dst
}
return newDst, err
}
// openPending opens the file if there is a pending open
//
// call with the lock held
func (fh *RWFileHandle) openPending(truncate bool) (err error) {
if fh.opened {
return nil
}
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
o := fh.file.getObject()
var fd *os.File
cacheFileOpenFlags := fh.flags
// if not truncating the file, need to read it first
if fh.flags&os.O_TRUNC == 0 && !truncate {
// If the remote object exists AND its cached file exists locally AND there are no
// other RW handles with it open, then attempt to update it.
if o != nil && fh.file.rwOpens() == 0 {
cacheObj, err := fh.d.vfs.cache.f.NewObject(context.TODO(), fh.remote)
if err == nil && cacheObj != nil {
_, err = copyObj(fh.d.vfs.cache.f, cacheObj, fh.remote, o)
if err != nil {
return errors.Wrap(err, "open RW handle failed to update cached file")
}
}
}
// try to open a exising cache file
fd, err = file.OpenFile(fh.osPath, cacheFileOpenFlags&^os.O_CREATE, 0600)
if os.IsNotExist(err) {
// cache file does not exist, so need to fetch it if we have an object to fetch
// it from
if o != nil {
_, err = copyObj(fh.d.vfs.cache.f, nil, fh.remote, o)
if err != nil {
cause := errors.Cause(err)
if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound {
// return any non NotFound errors
return errors.Wrap(err, "open RW handle failed to cache file")
}
// continue here with err=fs.Error{Object,Dir}NotFound
}
}
// if err == nil, then we have cached the file successfully, otherwise err is
// indicating some kind of non existent file/directory either
// os.IsNotExist(err) or fs.Error{Object,Dir}NotFound
if err != nil {
if fh.flags&os.O_CREATE != 0 {
// if the object wasn't found AND O_CREATE is set then
// ignore error as we are about to create the file
fh.file.setSize(0)
fh.changed = true
} else {
return errors.Wrap(err, "open RW handle failed to cache file")
}
}
} else if err != nil {
return errors.Wrap(err, "cache open file failed")
} else {
fs.Debugf(fh.logPrefix(), "Opened existing cached copy with flags=%s", decodeOpenFlags(fh.flags))
}
} else {
// Set the size to 0 since we are truncating and flag we need to write it back
fh.file.setSize(0)
fh.changed = true
if fh.flags&os.O_CREATE == 0 && fh.file.exists() {
// create an empty file if it exists on the source
err = ioutil.WriteFile(fh.osPath, []byte{}, 0600)
if err != nil {
return errors.Wrap(err, "cache open failed to create zero length file")
}
}
// Windows doesn't seem to deal well with O_TRUNC and
// certain access modes so so truncate the file if it
// exists in these cases.
if runtime.GOOS == "windows" && fh.flags&os.O_APPEND != 0 {
cacheFileOpenFlags &^= os.O_TRUNC
_, err = os.Stat(fh.osPath)
if err == nil {
err = os.Truncate(fh.osPath, 0)
if err != nil {
return errors.Wrap(err, "cache open failed to truncate")
}
}
}
}
if fd == nil {
fs.Debugf(fh.logPrefix(), "Opening cached copy with flags=%s", decodeOpenFlags(fh.flags))
fd, err = file.OpenFile(fh.osPath, cacheFileOpenFlags, 0600)
if err != nil {
return errors.Wrap(err, "cache open file failed")
}
}
fh.File = fd
fh.opened = true
fh.file.addRWOpen()
fh.d.addObject(fh.file) // make sure the directory has this object in it now
return nil
}
// String converts it to printable
func (fh *RWFileHandle) String() string {
if fh == nil {
return "<nil *RWFileHandle>"
}
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.file == nil {
return "<nil *RWFileHandle.file>"
}
return fh.file.String() + " (rw)"
}
// Node returns the Node assocuated with this - satisfies Noder interface
func (fh *RWFileHandle) Node() Node {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file
}
// Returns whether the file needs to be written back.
//
// If write hasn't been called and the file hasn't been changed in any other
// way we haven't modified it so we don't need to transfer it
//
// Must be called with fh.mu held
func (fh *RWFileHandle) modified() bool {
if !fh.writeCalled && !fh.changed {
fs.Debugf(fh.logPrefix(), "not modified so not transferring")
return false
}
return true
}
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
//
// Note that we leave the file around in the cache on error conditions
// to give the user a chance to recover it.
func (fh *RWFileHandle) close() (err error) {
defer log.Trace(fh.logPrefix(), "")("err=%v", &err)
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
if fh.closed {
return ECLOSED
}
fh.closed = true
defer func() {
if fh.opened {
fh.file.delRWOpen()
}
fh.d.vfs.cache.close(fh.remote)
}()
rdwrMode := fh.flags & accessModeMask
writer := rdwrMode != os.O_RDONLY
// If read only then return
if !fh.opened && rdwrMode == os.O_RDONLY {
return nil
}
isCopied := false
if writer {
isCopied = fh.file.delWriter(fh, fh.modified())
defer fh.file.finishWriterClose()
}
// If we aren't creating or truncating the file then
// we haven't modified it so don't need to transfer it
if fh.flags&(os.O_CREATE|os.O_TRUNC) != 0 {
if err := fh.openPending(false); err != nil {
return err
}
}
if writer && fh.opened {
fi, err := fh.File.Stat()
if err != nil {
fs.Errorf(fh.logPrefix(), "Failed to stat cache file: %v", err)
} else {
fh.file.setSize(fi.Size())
}
}
// Close the underlying file
if fh.opened {
err = fh.File.Close()
if err != nil {
err = errors.Wrap(err, "failed to close cache file")
return err
}
}
if isCopied {
// Transfer the temp file to the remote
cacheObj, err := fh.d.vfs.cache.f.NewObject(context.TODO(), fh.remote)
if err != nil {
err = errors.Wrap(err, "failed to find cache file")
fs.Errorf(fh.logPrefix(), "%v", err)
return err
}
o, err := copyObj(fh.d.vfs.f, fh.file.getObject(), fh.remote, cacheObj)
if err != nil {
err = errors.Wrap(err, "failed to transfer file from cache to remote")
fs.Errorf(fh.logPrefix(), "%v", err)
return err
}
fh.file.setObject(o)
fs.Debugf(o, "transferred to remote")
}
return nil
}
// Close closes the file
func (fh *RWFileHandle) Close() error {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.close()
}
// Flush is called each time the file or directory is closed.
// Because there can be multiple file descriptors referring to a
// single opened file, Flush can be called multiple times.
func (fh *RWFileHandle) Flush() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return nil
}
if fh.closed {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush nothing to do")
return nil
}
// fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush")
if !fh.opened {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unopened handle")
return nil
}
// If Write hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.writeCalled {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unwritten handle")
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.logPrefix(), "RWFileHandle.Flush error: %v", err)
} else {
// fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush OK")
}
return err
}
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *RWFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.logPrefix(), "RWFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.logPrefix(), "RWFileHandle.Release error: %v", err)
} else {
// fs.Debugf(fh.logPrefix(), "RWFileHandle.Release OK")
}
return err
}
// Size returns the size of the underlying file
func (fh *RWFileHandle) Size() int64 {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return fh.file.Size()
}
fi, err := fh.File.Stat()
if err != nil {
return 0
}
return fi.Size()
}
// Stat returns info about the file
func (fh *RWFileHandle) Stat() (os.FileInfo, error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file, nil
}
// readFn is a general purpose read function - pass in a closure to do
// the actual read
func (fh *RWFileHandle) readFn(read func() (int, error)) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if fh.flags&accessModeMask == os.O_WRONLY {
return 0, EBADF
}
if err = fh.openPending(false); err != nil {
return n, err
}
return read()
}
// Read bytes from the file
func (fh *RWFileHandle) Read(b []byte) (n int, err error) {
return fh.readFn(func() (int, error) {
return fh.File.Read(b)
})
}
// ReadAt bytes from the file at off
func (fh *RWFileHandle) ReadAt(b []byte, off int64) (n int, err error) {
return fh.readFn(func() (int, error) {
return fh.File.ReadAt(b, off)
})
}
// Seek to new file position
func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if !fh.opened && offset == 0 && whence != 2 {
return 0, nil
}
if err = fh.openPending(false); err != nil {
return ret, err
}
return fh.File.Seek(offset, whence)
}
// writeFn general purpose write call
//
// Pass a closure to do the actual write
func (fh *RWFileHandle) writeFn(write func() error) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if fh.flags&accessModeMask == os.O_RDONLY {
return EBADF
}
if err = fh.openPending(false); err != nil {
return err
}
fh.writeCalled = true
err = write()
if err != nil {
return err
}
fi, err := fh.File.Stat()
if err != nil {
return errors.Wrap(err, "failed to stat cache file")
}
fh.file.setSize(fi.Size())
return nil
}
// Write bytes to the file
func (fh *RWFileHandle) Write(b []byte) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.Write(b)
return err
})
return n, err
}
// WriteAt bytes to the file at off
func (fh *RWFileHandle) WriteAt(b []byte, off int64) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.WriteAt(b, off)
return err
})
return n, err
}
// WriteString a string to the file
func (fh *RWFileHandle) WriteString(s string) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.WriteString(s)
return err
})
return n, err
}
// Truncate file to given size
func (fh *RWFileHandle) Truncate(size int64) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if err = fh.openPending(size == 0); err != nil {
return err
}
fh.changed = true
fh.file.setSize(size)
return fh.File.Truncate(size)
}
// Sync commits the current contents of the file to stable storage. Typically,
// this means flushing the file system's in-memory copy of recently written
// data to disk.
func (fh *RWFileHandle) Sync() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if !fh.opened {
return nil
}
if fh.flags&accessModeMask == os.O_RDONLY {
return nil
}
return fh.File.Sync()
}
func (fh *RWFileHandle) logPrefix() string {
return fmt.Sprintf("%s(%p)", fh.remote, fh)
}