/
uploader.go
192 lines (170 loc) · 6.58 KB
/
uploader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gs
import (
"bytes"
"context"
"fmt"
"net/http"
"time"
"golang.org/x/net/context/ctxhttp"
"google.golang.org/api/googleapi"
"github.com/TriggerMail/luci-go/common/clock"
"github.com/TriggerMail/luci-go/common/logging"
)
// TODO(vadimsh): Use this code from the client too.
// RestartUploadError is returned by Uploader when it resumes an interrupted
// upload, and Google Storage asks to upload from an offset the Uploader has no
// data for.
//
// Callers of Uploader should handle this case themselves by restarting the
// upload from the requested offset.
//
// See https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload#resume-upload
type RestartUploadError struct {
Offset int64
}
// Error is part of error interface.
func (e *RestartUploadError) Error() string {
return fmt.Sprintf("the upload should be restarted from offset %d", e.Offset)
}
// Uploader implements io.Writer for Google Storage Resumable Upload sessions.
//
// Does no buffering inside, thus efficiency of uploads directly depends on
// granularity of Write(...) calls. Additionally, Google Storage expects the
// length of each uploaded chunk to be a multiple of 256 Kb, so callers of
// Write(...) should supply the appropriately-sized chunks.
//
// Retries transient errors internally, but it can potentially end up in a
// situation where it needs data not available in the current Write(...)
// operation. In this case Write returns *RestartUploadError error, which
// indicates an offset the upload should be restarted from.
type Uploader struct {
Context context.Context // the context for canceling retries and for logging
Client *http.Client // the client to use for sending anonymous requests
UploadURL string // upload URL returned by GoogleStorage.StartUpload
Offset int64 // offset in the file to upload to, mutated by Write
FileSize int64 // total size of the file being uploaded, required
// requestMock used from tests to mock ctxhttp.Do that is hostile to mocked
// time.
requestMock func(*http.Request) (*http.Response, error)
}
// Write is part of io.Writer interface.
func (u *Uploader) Write(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
bufStart := u.Offset
bufEnd := u.Offset + int64(len(p))
if bufEnd > u.FileSize {
return 0, fmt.Errorf("attempting to write past the declared file size (%d > %d)", bufEnd, u.FileSize)
}
for u.Offset != bufEnd && err == nil {
resuming := false
err = withRetry(u.Context, func() error {
// When resuming, we upload 0 bytes chunk to grab the last known offset.
// Otherwise, just upload the next chunk of data.
var chunk []byte
if !resuming {
chunk = p[int(u.Offset-bufStart):]
}
resumeOffset, err := u.uploadChunk(chunk)
// On transient errors, try to resume right away once.
if apiErr, _ := err.(*googleapi.Error); apiErr != nil && apiErr.Code >= 500 {
logging.WithError(err).Warningf(u.Context, "Transient error, querying for last uploaded offset")
resuming = true
resumeOffset, err = u.uploadChunk(nil)
}
switch {
case err != nil:
// Either a fatal error during the upload or a transient or fatal error
// trying to resume. Let 'withRetry' handle it by retrying or failing.
return err
case resumeOffset < bufStart || resumeOffset > bufEnd:
// Resuming requires data we don't have? Escalate to the caller.
return &RestartUploadError{Offset: resumeOffset}
default:
// Resume the upload from the last acknowledged offset.
u.Offset = resumeOffset
resuming = false
return nil
}
})
}
return int(u.Offset - bufStart), err
}
// uploadChunk pushes the given chunk to Google Storage at u.Offset offset.
//
// Returns an offset to continue the upload from (usually u.Offset + len(p), but
// Google Storage docs are vague about that, so it may be different).
//
// If len(p) is 0, makes an empty PUT request. This is useful for querying
// for the last uploaded offset to resume upload from.
func (u *Uploader) uploadChunk(p []byte) (int64, error) {
ctx, cancel := clock.WithTimeout(u.Context, 30*time.Second)
defer cancel()
logging.Infof(ctx, "gs: UploadChunk(offset=%d, chunk_size=%d, length=%d)", u.Offset, len(p), u.FileSize)
req, err := http.NewRequest("PUT", u.UploadURL, bytes.NewBuffer(p))
if err != nil {
return 0, err
}
req.ContentLength = int64(len(p))
if len(p) > 0 {
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", u.Offset, u.Offset+int64(len(p))-1, u.FileSize))
} else {
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%d", u.FileSize))
}
var resp *http.Response
if u.requestMock != nil {
resp, err = u.requestMock(req)
} else {
resp, err = ctxhttp.Do(ctx, u.Client, req)
}
if err != nil {
return 0, err
}
defer googleapi.CloseBody(resp)
// Google Storage return 308 (http.StatusPermanentRedirect) on partial upload.
// Since it is not really a redirect, we just use 308 below to avoid
// confusion.
switch {
case resp.StatusCode >= 200 && resp.StatusCode <= 299:
return u.FileSize, nil // finished uploading everything
case resp.StatusCode != 308:
// Note: we can't call CheckResponse earlier, since it treats 308 as
// an error.
if err := googleapi.CheckResponse(resp); err != nil {
return 0, err
}
panic(fmt.Sprintf("impossible state, status code %d", resp.StatusCode))
}
// Extract the last uploaded offset from Range header. No Range header means
// there are no uploaded data yet (and so we need to restart from 0). Be
// paranoid and check this happens only when we are really resuming. Any
// successful data upload MUST have Range response header.
hdr := resp.Header.Get("Range")
if hdr == "" {
if len(p) != 0 {
return 0, fmt.Errorf("no Range header in Google Storage response")
}
return 0, nil
}
var offset int64
if _, err = fmt.Sscanf(hdr, "bytes=0-%d", &offset); err != nil {
return 0, fmt.Errorf("unexpected Range header value: %q", hdr)
}
// 'offset' is an offset of the last uploaded byte, need to resume uploading
// from the next one.
return offset + 1, nil
}