This repository has been archived by the owner on Feb 14, 2023. It is now read-only.
forked from xitongsys/parquet-go
/
GCSFile.go
109 lines (98 loc) · 2.26 KB
/
GCSFile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package ParquetFile
import (
"context"
"cloud.google.com/go/storage"
)
type GcsFile struct {
ProjectId string
BucketName string
Ctx context.Context
Client *storage.Client
Bucket *storage.BucketHandle
FilePath string
FileReader *storage.Reader
FileWriter *storage.Writer
}
func NewGcsFileWriter(ctx context.Context, projectId string, bucketName string, name string) (ParquetFile, error) {
res := &GcsFile{
ProjectId: projectId,
BucketName: bucketName,
Ctx: ctx,
FilePath: name,
}
return res.Create(name)
}
func NewGcsFileReader(ctx context.Context, projectId string, bucketName string, name string) (ParquetFile, error) {
res := &GcsFile{
ProjectId: projectId,
BucketName: bucketName,
Ctx: ctx,
FilePath: name,
}
return res.Open(name)
}
func (self *GcsFile) Create(name string) (ParquetFile, error) {
var err error
gcs := new(GcsFile)
gcs.Client, err = storage.NewClient(self.Ctx)
gcs.FilePath = name
if err != nil {
return gcs, err
}
// must use existing bucket
gcs.Bucket = gcs.Client.Bucket(self.BucketName)
obj := gcs.Bucket.Object(name)
gcs.FileWriter = obj.NewWriter(self.Ctx)
return gcs, err
}
func (self *GcsFile) Open(name string) (ParquetFile, error) {
var err error
gcs := new(GcsFile)
gcs.Client, err = storage.NewClient(self.Ctx)
gcs.FilePath = name
if err != nil {
return gcs, err
}
// must use existing bucket
gcs.Bucket = gcs.Client.Bucket(self.BucketName)
obj := gcs.Bucket.Object(name)
gcs.FileReader, err = obj.NewReader(self.Ctx)
return gcs, err
}
func (self *GcsFile) Seek(offset int64, pos int) (int64, error) {
//Not implemented
return 0, nil
}
func (self *GcsFile) Read(b []byte) (cnt int, err error) {
var n int
ln := len(b)
for cnt < ln {
n, err = self.FileReader.Read(b[cnt:])
cnt += n
if err != nil {
break
}
}
return cnt, err
}
func (self *GcsFile) Write(b []byte) (n int, err error) {
return self.FileWriter.Write(b)
}
func (self *GcsFile) Close() error {
if self.FileReader != nil {
if err := self.FileReader.Close(); err != nil {
return err
}
}
if self.FileWriter != nil {
if err := self.FileWriter.Close(); err != nil {
return err
}
}
if self.Client != nil {
if err := self.Client.Close(); err != nil {
return err
}
}
return nil
}