forked from xitongsys/parquet-go-source
-
Notifications
You must be signed in to change notification settings - Fork 0
/
minio.go
180 lines (156 loc) · 3.96 KB
/
minio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package minio
import (
"context"
"errors"
"io"
"github.com/minio/minio-go/v7"
"github.com/zyreio/parquet-go/source"
)
// MinioFile is ParquetFile for MinIO S3 API
type MinioFile struct {
ctx context.Context
client *minio.Client
offset int64
whence int
// write-related fields
pipeReader *io.PipeReader
pipeWriter *io.PipeWriter
// read-related fields
fileSize int64
downloader *minio.Object
err error
BucketName string
Key string
}
var (
errWhence = errors.New("Seek: invalid whence")
errInvalidOffset = errors.New("Seek: invalid offset")
errFailedUpload = errors.New("Write: failed upload")
)
// NewS3FileWriterWithClient is the same as NewMinioFileWriter but allows passing
// your own S3 client.
func NewS3FileWriterWithClient(
ctx context.Context,
s3Client *minio.Client,
bucket string,
key string,
) (source.ParquetFile, error) {
file := &MinioFile{
ctx: ctx,
client: s3Client,
BucketName: bucket,
Key: key,
}
return file.Create(key)
}
// NewS3FileReaderWithClient is the same as NewMinioFileReader but allows passing
// your own S3 client
func NewS3FileReaderWithClient(ctx context.Context, s3Client *minio.Client, bucket string, key string) (source.ParquetFile, error) {
file := &MinioFile{
ctx: ctx,
client: s3Client,
BucketName: bucket,
Key: key,
}
return file.Open(key)
}
// Seek tracks the offset for the next Read. Has no effect on Write.
func (s *MinioFile) Seek(offset int64, whence int) (int64, error) {
if whence < io.SeekStart || whence > io.SeekEnd {
return 0, errWhence
}
if s.fileSize > 0 {
switch whence {
case io.SeekStart:
if offset < 0 || offset > s.fileSize {
return 0, errInvalidOffset
}
case io.SeekCurrent:
offset += s.offset
if offset < 0 || offset > s.fileSize {
return 0, errInvalidOffset
}
case io.SeekEnd:
if offset > -1 || -offset > s.fileSize {
return 0, errInvalidOffset
}
}
}
s.offset = offset
s.whence = whence
return s.offset, nil
}
// Read up to len(p) bytes into p and return the number of bytes read
func (s *MinioFile) Read(p []byte) (n int, err error) {
if s.fileSize > 0 && s.offset >= s.fileSize {
return 0, io.EOF
}
bytesDownloaded, err := s.downloader.ReadAt(p, s.offset)
if err != nil {
return 0, err
}
s.offset += int64(bytesDownloaded)
return bytesDownloaded, err
}
// Write len(p) bytes from p to the Minio data stream
func (s *MinioFile) Write(p []byte) (n int, err error) {
// prevent further writes upon error
bytesWritten, writeError := s.pipeWriter.Write(p)
if writeError != nil {
s.err = writeError
s.pipeWriter.CloseWithError(err)
return 0, writeError
}
return bytesWritten, nil
}
// Close signals write completion and cleans up any
// open streams. Will block until pending uploads are complete.
func (s *MinioFile) Close() error {
var err error
if s.pipeWriter != nil {
if err = s.pipeWriter.Close(); err != nil {
return err
}
}
return err
}
// Open creates a new Minio File instance to perform concurrent reads
func (s *MinioFile) Open(name string) (source.ParquetFile, error) {
// new instance
pf := &MinioFile{
ctx: s.ctx,
client: s.client,
BucketName: s.BucketName,
Key: name,
offset: 0,
}
// init object info
downloader, err := s.client.GetObject(s.ctx, s.BucketName, s.Key, minio.GetObjectOptions{})
if err != nil {
return pf, err
}
info, err := downloader.Stat()
if err != nil {
return pf, err
}
s.downloader = downloader
s.fileSize = info.Size
return pf, nil
}
// Create creates a new Minio File instance to perform writes
func (s *MinioFile) Create(key string) (source.ParquetFile, error) {
pf := &MinioFile{
ctx: s.ctx,
client: s.client,
BucketName: s.BucketName,
Key: key,
}
pr, pw := io.Pipe()
_, err := s.client.PutObject(s.ctx, s.BucketName, s.Key, pr, -1, minio.PutObjectOptions{})
if err != nil {
return pf, err
}
pf.pipeReader = pr
pf.pipeWriter = pw
return pf, nil
}