-
Notifications
You must be signed in to change notification settings - Fork 0
/
remotefilez.go
134 lines (117 loc) · 3.71 KB
/
remotefilez.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package remotefilez
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
)
type ReaderAtSeekCloser interface {
io.ReaderAt
io.ReadSeekCloser
Size() (int64, error)
}
const (
schemeFile = "file"
schemeAzure = "abs"
)
var (
ErrRelativePath = errors.New("relative path")
ErrUnsupportedScheme = errors.New("unsupported scheme")
ErrNotImplemented = errors.New("not implemented")
)
// Opener provides a unified interface for resolving io.ReadSeekClosers from
// URLs.
type Opener struct {
azcreds azcore.TokenCredential
azOpenTimeout time.Duration
azDoAccounting bool
}
// WithAzureResolver returns a copy of the Opener with the provided Azure
// Resolver.
func (ro Opener) WithAzureResolver(
creds azcore.TokenCredential,
timeout time.Duration,
doAccounting bool,
) *Opener {
ro.azcreds = creds
ro.azOpenTimeout = timeout
ro.azDoAccounting = doAccounting
return &ro
}
// Open returns an io.ReadSeekCloser handle from the provided file URL.
//
// Depecated: Use OpenReader instead.
func (ro *Opener) Open(fileURL string) (ReaderAtSeekCloser, error) {
return ro.OpenReader(fileURL)
}
// OpenCtx returns an io.ReadSeekCloser handle from the provided file URL.
// Errors if a resolver for the provided schema is not registered.
//
// Depecated: Use OpenReaderCtx instead.
func (ro *Opener) OpenCtx(ctx context.Context, fileURL string) (ReaderAtSeekCloser, error) {
return ro.OpenReaderCtx(ctx, fileURL)
}
// OpenReader returns an io.ReadSeekCloser handle from the provided file URL.
func (ro *Opener) OpenReader(fileURL string) (ReaderAtSeekCloser, error) {
ctx := context.Background()
return ro.OpenCtx(ctx, fileURL)
}
// OpenReaderCtx returns an io.ReadSeekCloser handle from the provided file URL.
// Errors if a resolver for the provided schema is not registered.
func (ro *Opener) OpenReaderCtx(ctx context.Context, fileURL string) (ReaderAtSeekCloser, error) {
u, err := url.Parse(fileURL)
if err != nil {
return nil, fmt.Errorf("parse URL failed, %w", err)
}
// Best-effort to detect absolute paths
if len(fileURL) >= len(u.Scheme)+4 && fileURL[len(u.Scheme)+3] == '.' {
return nil, fmt.Errorf("%w not supported", ErrRelativePath)
}
switch u.Scheme {
case schemeFile:
f, err := os.Open(u.Path)
if err != nil {
return nil, err
}
return &sizedFile{File: f}, nil
case schemeAzure:
if ro.azcreds == nil {
return nil, errors.New("missing credentials please add AzureResolver")
}
return NewAzureBlobReader(ctx, fileURL, ro.azcreds, ro.azOpenTimeout, ro.azDoAccounting)
default:
return nil, fmt.Errorf("%w %q", ErrUnsupportedScheme, u.Scheme)
}
}
// Open returns an io.ReadSeekCloser handle from the provided file URL.
func (ro *Opener) OpenWriter(fileURL string) (io.WriteCloser, error) {
ctx := context.Background()
return ro.OpenWriterCtx(ctx, fileURL)
}
// OpenCtx returns an io.ReadSeekCloser handle from the provided file URL.
// Errors if a resolver for the provided schema is not registered.
func (ro *Opener) OpenWriterCtx(ctx context.Context, fileURL string) (io.WriteCloser, error) {
u, err := url.Parse(fileURL)
if err != nil {
return nil, fmt.Errorf("parse URL failed, %w", err)
}
// Best-effort to detect absolute paths
if len(fileURL) >= len(u.Scheme)+4 && fileURL[len(u.Scheme)+3] == '.' {
return nil, fmt.Errorf("%w not supported", ErrRelativePath)
}
switch u.Scheme {
case schemeFile:
return os.OpenFile(u.Path, os.O_WRONLY|os.O_CREATE, 0666)
case schemeAzure:
if ro.azcreds == nil {
return nil, errors.New("missing credentials please add AzureResolver")
}
return NewAzureBlobWriteCloser(fileURL, ro.azcreds, ro.azOpenTimeout, ctx)
default:
return nil, fmt.Errorf("%w %q", ErrUnsupportedScheme, u.Scheme)
}
}