-
Notifications
You must be signed in to change notification settings - Fork 515
/
pipe-main.go
216 lines (186 loc) · 6.61 KB
/
pipe-main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"io"
"os"
"runtime/debug"
"syscall"
"github.com/dustin/go-humanize"
"github.com/minio/cli"
"github.com/minio/mc/pkg/probe"
"github.com/minio/minio-go/v7"
)
func defaultPartSize() string {
_, partSize, _, _ := minio.OptimalPartInfo(-1, 0)
return humanize.IBytes(uint64(partSize))
}
var pipeFlags = []cli.Flag{
cli.StringFlag{
Name: "storage-class, sc",
Usage: "set storage class for new object(s) on target",
},
cli.StringFlag{
Name: "attr",
Usage: "add custom metadata for the object",
},
cli.StringFlag{
Name: "tags",
Usage: "apply one or more tags to the uploaded objects",
},
cli.IntFlag{
Name: "concurrent",
Value: 1,
Usage: "allow N concurrent uploads [WARNING: will use more memory use it with caution]",
},
cli.StringFlag{
Name: "part-size",
Value: defaultPartSize(),
Usage: "customize chunk size for each concurrent upload",
},
cli.IntFlag{
Name: "pipe-max-size",
Usage: "increase the pipe buffer size to a custom value",
Hidden: true,
},
}
// Display contents of a file.
var pipeCmd = cli.Command{
Name: "pipe",
Usage: "stream STDIN to an object",
Action: mainPipe,
OnUsageError: onUsageError,
Before: setGlobalsFromContext,
Flags: append(append(pipeFlags, encFlags...), globalFlags...),
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} [FLAGS] [TARGET]
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
ENVIRONMENT VARIABLES:
MC_ENC_KMS: KMS encryption key in the form of (alias/prefix=key).
MC_ENC_S3: S3 encryption key in the form of (alias/prefix=key).
EXAMPLES:
1. Write contents of stdin to a file on local filesystem.
{{.Prompt}} {{.HelpName}} /tmp/hello-world.go
2. Write contents of stdin to an object on Amazon S3 cloud storage.
{{.Prompt}} {{.HelpName}} s3/personalbuck/meeting-notes.txt
3. Copy an ISO image to an object on Amazon S3 cloud storage.
{{.Prompt}} cat debian-8.2.iso | {{.HelpName}} s3/opensource-isos/gnuos.iso
4. Copy an ISO image to an object on minio storage using KMS encryption.
{{.Prompt}} cat debian-8.2.iso | {{.HelpName}} --enc-kms="minio/opensource-isos=my-key-name" minio/opensource-isos/gnuos.iso
5. Stream MySQL database dump to Amazon S3 directly.
{{.Prompt}} mysqldump -u root -p ******* accountsdb | {{.HelpName}} s3/sql-backups/backups/accountsdb-oct-9-2015.sql
6. Write contents of stdin to an object on Amazon S3 cloud storage and assign REDUCED_REDUNDANCY storage-class to the uploaded object.
{{.Prompt}} {{.HelpName}} --storage-class REDUCED_REDUNDANCY s3/personalbuck/meeting-notes.txt
7. Copy to MinIO cloud storage with specified metadata, separated by ";"
{{.Prompt}} cat music.mp3 | {{.HelpName}} --attr "Cache-Control=max-age=90000,min-fresh=9000;Artist=Unknown" play/mybucket/music.mp3
8. Set tags to the uploaded objects
{{.Prompt}} tar cvf - . | {{.HelpName}} --tags "category=prod&type=backup" play/mybucket/backup.tar
`,
}
func pipe(ctx *cli.Context, targetURL string, encKeyDB map[string][]prefixSSEPair, meta map[string]string, quiet bool) *probe.Error {
// If possible increase the pipe buffer size
if e := increasePipeBufferSize(os.Stdin, ctx.Int("pipe-max-size")); e != nil {
fatalIf(probe.NewError(e), "Unable to increase custom pipe-max-size")
}
if targetURL == "" {
// When no target is specified, pipe cat's stdin to stdout.
return catOut(os.Stdin, -1).Trace()
}
storageClass := ctx.String("storage-class")
alias, _ := url2Alias(targetURL)
sseKey := getSSE(targetURL, encKeyDB[alias])
multipartThreads := ctx.Int("concurrent")
if multipartThreads > 1 {
// We will be allocating large buffers, reduce default GC overhead
debug.SetGCPercent(20)
}
var multipartSize uint64
var e error
if partSizeStr := ctx.String("part-size"); partSizeStr != "" {
multipartSize, e = humanize.ParseBytes(partSizeStr)
if e != nil {
return probe.NewError(e)
}
}
// Stream from stdin to multiple objects until EOF.
// Ignore size, since os.Stat() would not return proper size all the time
// for local filesystem for example /proc files.
opts := PutOptions{
sse: sseKey,
storageClass: storageClass,
metadata: meta,
multipartSize: multipartSize,
multipartThreads: uint(multipartThreads),
concurrentStream: ctx.IsSet("concurrent"),
}
var reader io.Reader
if !quiet {
pg := newProgressBar(0)
reader = io.TeeReader(os.Stdin, pg)
} else {
reader = os.Stdin
}
_, err := putTargetStreamWithURL(targetURL, reader, -1, opts)
// TODO: See if this check is necessary.
switch e := err.ToGoError().(type) {
case *os.PathError:
if e.Err == syscall.EPIPE {
// stdin closed by the user. Gracefully exit.
return nil
}
}
return err.Trace(targetURL)
}
// checkPipeSyntax - validate arguments passed by user
func checkPipeSyntax(ctx *cli.Context) {
if len(ctx.Args()) != 1 {
showCommandHelpAndExit(ctx, 1) // last argument is exit code.
}
}
// mainPipe is the main entry point for pipe command.
func mainPipe(ctx *cli.Context) error {
// validate pipe input arguments.
checkPipeSyntax(ctx)
encKeyDB, err := validateAndCreateEncryptionKeys(ctx)
fatalIf(err, "Unable to parse encryption keys.")
// globalQuiet is true for no window size to get. We just need --quiet here.
quiet := ctx.IsSet("quiet")
meta := map[string]string{}
if attr := ctx.String("attr"); attr != "" {
meta, err = getMetaDataEntry(attr)
fatalIf(err.Trace(attr), "Unable to parse --attr value")
}
if tags := ctx.String("tags"); tags != "" {
meta["X-Amz-Tagging"] = tags
}
if len(ctx.Args()) == 0 {
err = pipe(ctx, "", nil, meta, quiet)
fatalIf(err.Trace("stdout"), "Unable to write to one or more targets.")
} else {
// extract URLs.
URLs := ctx.Args()
err = pipe(ctx, URLs[0], encKeyDB, meta, quiet)
fatalIf(err.Trace(URLs[0]), "Unable to write to one or more targets.")
}
// Done.
return nil
}