forked from pantsbuild/pants
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.rs
461 lines (437 loc) · 15.5 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
// Copyright 2017 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
#![deny(warnings)]
// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source.
#![deny(
clippy::all,
clippy::default_trait_access,
clippy::expl_impl_clone_on_copy,
clippy::if_not_else,
clippy::needless_continue,
clippy::unseparated_literal_suffix,
// TODO: Falsely triggers for async/await:
// see https://github.com/rust-lang/rust-clippy/issues/5360
// clippy::used_underscore_binding
)]
// It is often more clear to show that nothing is being moved.
#![allow(clippy::match_ref_pats)]
// Subjective style.
#![allow(
clippy::len_without_is_empty,
clippy::redundant_field_names,
clippy::too_many_arguments
)]
// Default isn't as big a deal as people seem to think it is.
#![allow(clippy::new_without_default, clippy::new_ret_no_self)]
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]
#![type_length_limit = "1076739"]
use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryFrom;
use std::iter::{FromIterator, Iterator};
use std::path::PathBuf;
use std::process::exit;
use std::time::Duration;
use clap::{value_t, App, AppSettings, Arg};
use futures::compat::Future01CompatExt;
use hashing::{Digest, Fingerprint};
use process_execution::{
Context, NamedCaches, Platform, PlatformConstraint, ProcessMetadata, RelativePath,
};
use store::{BackoffConfig, Store};
use tokio::runtime::Handle;
/// A binary which takes args of format:
/// process_executor --env=FOO=bar --env=SOME=value --input-digest=abc123 --input-digest-length=80
/// -- /path/to/binary --flag --otherflag
/// and runs /path/to/binary --flag --otherflag with FOO and SOME set.
/// It outputs its output/err to stdout/err, and exits with its exit code.
///
/// It does not perform $PATH lookup or shell expansion.
#[tokio::main]
async fn main() {
env_logger::init();
let args = App::new("process_executor")
.arg(
Arg::with_name("work-dir")
.long("work-dir")
.takes_value(true)
.help("Path to workdir"),
)
.arg(
Arg::with_name("local-store-path")
.long("local-store-path")
.takes_value(true)
.help("Path to lmdb directory used for local file storage"),
)
.arg(
Arg::with_name("named-cache-path")
.long("named-cache-path")
.takes_value(true)
.help("Path to a directory to be used for named caches")
)
.arg(
Arg::with_name("input-digest")
.long("input-digest")
.takes_value(true)
.required(true)
.help("Fingerprint (hex string) of the digest to use as the input file tree."),
)
.arg(
Arg::with_name("input-digest-length")
.long("input-digest-length")
.takes_value(true)
.required(true)
.help("Length of the proto-bytes whose digest to use as the input file tree."),
)
.arg(
Arg::with_name("working-directory")
.long("working-directory")
.takes_value(true)
.required(false)
.help("Path to execute the binary at relative to its input digest root.")
)
.arg(
Arg::with_name("server")
.long("server")
.takes_value(true)
.help(
"The host:port of the gRPC server to connect to. Forces remote execution. \
If unspecified, local execution will be performed.",
),
)
.arg(
Arg::with_name("execution-root-ca-cert-file")
.help("Path to file containing root certificate authority certificates for the execution server. If not set, TLS will not be used when connecting to the execution server.")
.takes_value(true)
.long("execution-root-ca-cert-file")
.required(false)
)
.arg(
Arg::with_name("execution-oauth-bearer-token-path")
.help("Path to file containing oauth bearer token for communication with the execution server. If not set, no authorization will be provided to remote servers.")
.takes_value(true)
.long("execution-oauth-bearer-token-path")
.required(false)
)
.arg(
Arg::with_name("cas-server")
.long("cas-server")
.takes_value(true)
.help("The host:port of the gRPC CAS server to connect to."),
)
.arg(
Arg::with_name("cas-root-ca-cert-file")
.help("Path to file containing root certificate authority certificates for the CAS server. If not set, TLS will not be used when connecting to the CAS server.")
.takes_value(true)
.long("cas-root-ca-cert-file")
.required(false)
)
.arg(
Arg::with_name("cas-oauth-bearer-token-path")
.help("Path to file containing oauth bearer token for communication with the CAS server. If not set, no authorization will be provided to remote servers.")
.takes_value(true)
.long("cas-oauth-bearer-token-path")
.required(false)
)
.arg(Arg::with_name("remote-instance-name")
.takes_value(true)
.long("remote-instance-name")
.required(false))
.arg(Arg::with_name("cache-key-gen-version")
.takes_value(true)
.long("cache-key-gen-version")
.required(false))
.arg(
Arg::with_name("upload-chunk-bytes")
.help("Number of bytes to include per-chunk when uploading bytes. grpc imposes a hard message-size limit of around 4MB.")
.takes_value(true)
.long("chunk-bytes")
.required(false)
.default_value("3145728") // 3MB
)
.arg(
Arg::with_name("extra-platform-property")
.long("extra-platform-property")
.takes_value(true)
.multiple(true)
.help("Extra platform properties to set on the execution request."),
)
.arg(
Arg::with_name("header")
.long("header")
.takes_value(true)
.multiple(true)
.help("Extra header to pass on remote execution request."),
)
.arg(
Arg::with_name("env")
.long("env")
.takes_value(true)
.multiple(true)
.help("Environment variables with which the process should be run."),
)
.arg(
Arg::with_name("jdk")
.long("jdk")
.takes_value(true)
.required(false)
.help("Symlink a JDK from .jdk in the working directory. For local execution, symlinks to the value of this flag. For remote execution, just requests that some JDK is symlinked if this flag has any value. https://github.com/pantsbuild/pants/issues/6416 will make this less weird in the future.")
)
.arg(
Arg::with_name("target-platform")
.long("target-platform")
.takes_value(true)
.required(true)
.help("The name of the platform that this request's output is compatible with. Options are 'linux', 'darwin', or 'none' (which indicates either)")
)
.arg(
Arg::with_name("use-nailgun")
.long("use-nailgun")
.takes_value(true)
.required(false)
.default_value("false")
.help("Whether or not to enable running the process through a Nailgun server.\
This will likely start a new Nailgun server as a side effect.")
)
.arg(
Arg::with_name("overall-deadline-secs")
.long("overall-deadline-secs")
.takes_value(true)
.required(false)
.default_value("600")
.help("Overall timeout in seconds for each request from time of submission")
)
.setting(AppSettings::TrailingVarArg)
.arg(
Arg::with_name("argv")
.multiple(true)
.last(true)
.required(true),
)
.arg(
Arg::with_name("output-file-path")
.long("output-file-path")
.takes_value(true)
.multiple(true)
.required(false)
.help("Path to file that is considered to be output."),
)
.arg(
Arg::with_name("output-directory-path")
.long("output-directory-path")
.takes_value(true)
.multiple(true)
.required(false)
.help("Path to directory that is considered to be output."),
)
.arg(
Arg::with_name("materialize-output-to")
.long("materialize-output-to")
.takes_value(true)
.required(false)
.help("The name of a directory (which may or may not exist), where the output tree will be materialized.")
)
.arg(
Arg::with_name("store-connection-limit")
.help("Number of concurrent servers to allow connections to.")
.takes_value(true)
.long("store-connection-limit")
.required(false)
.default_value("3")
)
.get_matches();
let argv: Vec<String> = args
.values_of("argv")
.unwrap()
.map(str::to_string)
.collect();
let env = args
.values_of("env")
.map(collection_from_keyvalues::<_, BTreeMap<_, _>>)
.unwrap_or_default();
let platform_properties = args
.values_of("extra-platform-property")
.map(collection_from_keyvalues::<_, Vec<_>>)
.unwrap_or_default();
let work_dir_base = args
.value_of("work-dir")
.map(PathBuf::from)
.unwrap_or_else(std::env::temp_dir);
let local_store_path = args
.value_of("local-store-path")
.map(PathBuf::from)
.unwrap_or_else(Store::default_path);
let named_cache_path = args
.value_of("named-cache-path")
.map(PathBuf::from)
.unwrap_or_else(NamedCaches::default_path);
let server_arg = args.value_of("server");
let remote_instance_arg = args.value_of("remote-instance-name").map(str::to_owned);
let output_files = if let Some(values) = args.values_of("output-file-path") {
values.map(PathBuf::from).collect()
} else {
BTreeSet::new()
};
let output_directories = if let Some(values) = args.values_of("output-directory-path") {
values.map(PathBuf::from).collect()
} else {
BTreeSet::new()
};
let headers = args
.values_of("headers")
.map(collection_from_keyvalues::<_, BTreeMap<_, _>>)
.unwrap_or_default();
let overall_deadline_secs = value_t!(args.value_of("overall-deadline-secs"), u64).unwrap_or(3600);
let executor = task_executor::Executor::new(Handle::current());
let store = match (server_arg, args.value_of("cas-server")) {
(Some(_server), Some(cas_server)) => {
let chunk_size =
value_t!(args.value_of("upload-chunk-bytes"), usize).expect("Bad upload-chunk-bytes flag");
let root_ca_certs = if let Some(path) = args.value_of("cas-root-ca-cert-file") {
Some(std::fs::read(path).expect("Error reading root CA certs file"))
} else {
None
};
let oauth_bearer_token = if let Some(path) = args.value_of("cas-oauth-bearer-token-path") {
Some(std::fs::read_to_string(path).expect("Error reading oauth bearer token file"))
} else {
None
};
Store::with_remote(
executor.clone(),
local_store_path,
vec![cas_server.to_owned()],
remote_instance_arg.clone(),
root_ca_certs,
oauth_bearer_token,
1,
chunk_size,
Duration::from_secs(30),
// TODO: Take a command line arg.
BackoffConfig::new(Duration::from_secs(1), 1.2, Duration::from_secs(20)).unwrap(),
3,
value_t!(args.value_of("store-connection-limit"), usize)
.expect("Bad store-connection-limit flag"),
)
}
(None, None) => Store::local_only(executor.clone(), local_store_path),
_ => panic!("Must specify either both --server and --cas-server or neither."),
}
.expect("Error making store");
let input_files = {
let fingerprint = Fingerprint::from_hex_string(args.value_of("input-digest").unwrap())
.expect("Bad input-digest");
let length = args
.value_of("input-digest-length")
.unwrap()
.parse::<usize>()
.expect("input-digest-length must be a non-negative number");
Digest(fingerprint, length)
};
let working_directory = args
.value_of("working-directory")
.map(|path| RelativePath::new(path).expect("working-directory must be a relative path"));
let is_nailgunnable: bool = args.value_of("use-nailgun").unwrap().parse().unwrap();
let request = process_execution::Process {
argv,
env,
working_directory,
input_files,
output_files,
output_directories,
timeout: Some(Duration::new(15 * 60, 0)),
description: "process_executor".to_string(),
append_only_caches: BTreeMap::new(),
jdk_home: args.value_of("jdk").map(PathBuf::from),
target_platform: PlatformConstraint::try_from(
&args.value_of("target-platform").unwrap().to_string(),
)
.expect("invalid value for `target-platform"),
is_nailgunnable,
execution_slot_variable: None,
};
let runner: Box<dyn process_execution::CommandRunner> = match server_arg {
Some(address) => {
let root_ca_certs = if let Some(path) = args.value_of("execution-root-ca-cert-file") {
Some(std::fs::read(path).expect("Error reading root CA certs file"))
} else {
None
};
let oauth_bearer_token =
if let Some(path) = args.value_of("execution-oauth-bearer-token-path") {
Some(std::fs::read_to_string(path).expect("Error reading oauth bearer token file"))
} else {
None
};
let command_runner_box: Box<dyn process_execution::CommandRunner> = {
Box::new(
process_execution::remote::CommandRunner::new(
address,
vec![address.to_owned()],
ProcessMetadata {
instance_name: remote_instance_arg,
cache_key_gen_version: args.value_of("cache-key-gen-version").map(str::to_owned),
platform_properties,
},
root_ca_certs,
oauth_bearer_token,
headers,
store.clone(),
Platform::Linux,
Duration::from_secs(overall_deadline_secs),
)
.expect("Failed to make command runner"),
)
};
command_runner_box
}
None => Box::new(process_execution::local::CommandRunner::new(
store.clone(),
executor,
work_dir_base,
NamedCaches::new(named_cache_path),
true,
)) as Box<dyn process_execution::CommandRunner>,
};
let result = runner
.run(request.into(), Context::default())
.await
.expect("Error executing");
if let Some(output) = args.value_of("materialize-output-to").map(PathBuf::from) {
store
.materialize_directory(output, result.output_directory)
.compat()
.await
.unwrap();
}
let stdout: Vec<u8> = store
.load_file_bytes_with(result.stdout_digest, |bytes| bytes.to_vec())
.await
.unwrap()
.unwrap()
.0;
let stderr: Vec<u8> = store
.load_file_bytes_with(result.stderr_digest, |bytes| bytes.to_vec())
.await
.unwrap()
.unwrap()
.0;
print!("{}", String::from_utf8(stdout).unwrap());
eprint!("{}", String::from_utf8(stderr).unwrap());
exit(result.exit_code);
}
fn collection_from_keyvalues<'a, It, Col>(keyvalues: It) -> Col
where
It: Iterator<Item = &'a str>,
Col: FromIterator<(String, String)>,
{
keyvalues
.map(|kv| {
let mut parts = kv.splitn(2, '=');
(
parts.next().unwrap().to_string(),
parts.next().unwrap_or_default().to_string(),
)
})
.collect()
}