-
-
Notifications
You must be signed in to change notification settings - Fork 762
/
08.Rmd
731 lines (541 loc) · 32.9 KB
/
08.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
---
suppress-bibliography: true
---
```{r console_start, include=FALSE}
console_start()
```
```{console setup_history, include=FALSE}
export CHAPTER="08"
export HISTFILE=/history/history_${CHAPTER}
rm -f $HISTFILE
```
# Parallel Pipelines {#chapter-8-parallel-pipelines}
<!-- #TODO: SHOULD: Dicuss progress bar -->
In the previous chapters, we've been dealing with commands and pipelines that take care of an entire task at once.
In practice, however, you may find yourself facing a task which requires the same command or pipeline to run multiple times.
For, example, you may need to:
- Scrape hundreds of web pages
- Make dozens of API calls and transform their output
- Train a classifier for a range of parameter values
- Generate scatter plots for every pair of features in your dataset
In any of the above examples, there's a certain form of repetition involved.
With your favorite scripting or programming language, you take care of this with a for loop or a while loop.
On the command line, the first thing you might be inclined to do is to press **`Up`** to bring back the previous command, modify it if necessary, and press **`Enter`** to run the command again.
This is fine for two or three times, but imagine doing this dozens of times.
Such an approach quickly becomes cumbersome, inefficient, and prone to errors.
The good news is that you can write such loops on the command line as well.
That's what this chapter is all about.
Sometimes, repeating a fast command one after the other (in a *serial* manner) is sufficient.
When you have multiple cores (and perhaps even multiple machines) it would be nice if you could make use of those, especially when you’re faced with a data-intensive task.
When using multiple cores or machines, the total running time may be reduced significantly.
In this chapter I will introduce a very powerful tool called `parallel`[@parallel] that can take care of exactly this. It enables you to apply a command or pipeline for a range of arguments such as numbers, lines, and files.
Plus, as the name implies, it allows you to run your commands in *parallel*.
## Overview
This intermezzo chapter discusses several approaches to speed up tasks that require commands and pipelines to be run many times.
My main goal is to demonstrate to you the flexibility and power of `parallel`.
Because this tool can be combined with any other tool discussed in this book, it will positively change the way you use the command line for data science.
In this chapter, you’ll learn about:
- Running commands in serial to a range of numbers, lines, and files
- Breaking a large task into several smaller tasks
- Running pipelines in parallel
- Distributing pipelines to multiple machines
This chapter starts with the following files:
```{console cd}
cd /data/ch08
l
```
The instructions to get these files are in [Chapter 2](#chapter-2-getting-started).
Any other files are either downloaded or generated using command-line tools.
## Serial Processing
Before I dive into parallelization, I'll briefly discuss looping in a serial fashion.
It’s worthwhile to know how to do this because this functionality is always available, the syntax closely resembles looping in other programming languages, and it will really make you appreciate `parallel`.
From the examples provided in the introduction of this chapter, we can distill three types of items to loop over: numbers, lines, and files.
These three types of items will be discussed in the next three subsections, respectively.
### Looping Over Numbers
Imagine that you need to compute the square of every even integer between 0 and 100. There’s a tool called `bc`[@bc], which is a *basic calculator* where you can pipe an equation to.
The command to compute the square of 4 looks as follows:
```{console bc}
echo "4^2" | bc
```
For a one-off calculation, this will do.
However, as mentioned in the introduction, you would need to be crazy to press **`Up`**, change the number, and press **`Enter`** 50 times!
In this case it's better to let the shell do the hard work for you by using a for loop:
```{console for_loop}
for i in {0..100..2} #<1>
do
echo "$i^2" | bc #<2>
done | trim
```
<1> The Z shell has a feature called brace expansion, which transforms *`{0..100..2}`* into a list separated by spaces: *`0 2 4 … 98 100`*. The variable *`i`* is assigned the value "0" in the first iteration, "1" in the second iteration, and so forth.
<3> The value of this variable can be used by prefixing it with a dollar sign (*`$`*). The shell will replace *`$i`* with its value before `echo` is being executed. Note that there can be more than one command between *`do`* and *`done`*.
Although the syntax may appear a bit odd compared to your favorite programming language, it's worth remembering this because it's always available in the shell.
I'll introduce a better and more flexible way of repeating commands in a moment.
### Looping Over Lines
The second type of items you can loop over are lines.
These lines can come from either a file or from standard input.
This is a very generic approach because the lines can contain anything, including: numbers, dates, and email addresses.
Imagine that you'd want to send an email to all your contacts.
Let’s first generate some fake users using the free [Random User Generator API](https://randomuser.me):
```{console emails}
curl -s "https://randomuser.me/api/1.2/?results=5&seed=dsatcl2e" > users.json
< users.json jq -r '.results[].email' > emails
bat emails
```
You can loop over the lines from *emails* with a while loop:
```{console while_loop}
while read line #<1>
do
echo "Sending invitation to ${line}." #<2>
done < emails #<3>
```
<1> In this case you need to use a while loop because the Z shell does not know beforehand how many lines the input consists of.
<2> Although the curly braces around the *line* variable are not necessary in this case (since variable names cannot contain periods), it’s still good practice.
<3> This redirection can also be placed before `while`.
You can also provide input to a while loop interactively by specifying the special file standard input */dev/stdin*. Press **`Ctrl-D`** when you are done.
```{console while_interactive}
while read line; do echo "You typed: ${line}."; done < /dev/stdin#! expect_prompt=FALSE
one#! expect_prompt=FALSE
two#! expect_prompt=FALSE
three#! expect_prompt=FALSE
C-D#! literal=FALSE, expect_prompt=TRUE
```
This method, however, has the disadvantage that, once you press **`Enter`**, the commands between _`do`_ and _`done`_ are run immediately for that line of input. There's no turning back.
### Looping Over Files
In this section I discuss the third type of item that we often need to loop over: files.
To handle special characters, use *globbing* (i.e., pathname expansion) instead of `ls`[@ls]:
```{console for}
for chapter in /data/*
do
echo "Processing Chapter ${chapter}."
done
```
Just as with brace expansion, the expression _`/data/*`_ is first expanded into a list by the Z shell before it's being processed by the for loop.
A more elaborate alternative to listing files is `find`[@find], which:
- Can traverse down directories
- Allows for elaborate searching on properties such as size, access time, and permissions
- Handles special characters such as spaces and newlines
For example, the following `find` invocation lists all files located under the directory */data* that have *csv* as extension and are smaller than 2 kilobyte:
```{console find_csv}
find /data -type f -name '*.csv' -size -2k
```
## Parallel Processing
Let's say that you have a very long running tool, such as the one shown here:
```{console bat_slow, callouts=list("ts", "RANDOM", "sleep")}
bat slow.sh
```
<1> `ts`[@ts] adds a timestamp.
<2> The magic variable *`RANDOM`* calls an internal Bash function that returns a pseudorandom integer between 0 and 32767. Taking the remainder of the division of that integer by 5 and adding 1 ensures that *duration* is between 1 and 5.
<3> `sleep` pauses execution for a given number of seconds.
This process probably doesn't take up all the available resources.
And it so happens that you need to run this command a lot of times.
For example, you need to download a whole sequence of files.
A naive way to parallelize is to run the commands in the background.
Let's run `slow.sh` three times:
```{console subshell, keep_last_prompt=TRUE, callouts=list(2, 4)}
for i in {A..C}; do
./slow.sh $i &
done#! hold=7
```
<1> The ampersand (`&`) sends the command to the background, allowing the for loop to continue immediately with the next iteration.
<2> This line shows the job number given by the Z shell and the process ID, which can be used for more fine-grained job control. This topic, while powerful, is beyond the scope of this book.
```{block2, type="rmdnote"}
Keep in mind that not everything can be parallelized.
API calls may be limited to a certain number, or some commands can only have one instance.
```
\@ref(fig:diagram-parallel-processing) illustrates, on a conceptual level, the difference between serial processing, naive parallel processing, and parallel processing with GNU Parallel in terms of the number of concurrent processes and the total amount of time it takes to run everything.
```{r diagram-parallel-processing, echo=FALSE, fig.cap="Serial processing, naive parallel processing, and parallel processing with GNU Parallel", fig.align="center", out.width="60%"}
knitr::include_graphics("images/dscl_0801.png")
```
There are two problems with this naive approach.
First, there’s no way to control how many processes you are running concurrently.
If you start too many jobs at once, they could be competing for the same resources such as CPU, memory, disk access, and network bandwidth.
This could lead to a longer time to run everything.
Second, it's difficult to tell which output belongs to which input.
Let's look at a better approach.
### Introducing GNU Parallel
Allow me to introduce `parallel`, a command-line tool that allows you to parallelize and distribute commands and pipelines.
The beauty of this tool is that existing tools can be used as they are; they do not need to be modified.
```{block2, type="rmdcaution"}
Be aware that there are two command-line tools with the name `parallel`.
If you're using the Docker image then you already have the correct one installed.
Otherwise, you can check that you have the correct one by running `parallel --version`.
It should say "GNU parallel".
```
Before I go into the details of `parallel`, here’s a little teaser to show you how easy it is to replace the for-loop from earlier:
```{console teaser_2}
seq 0 2 100 | parallel "echo {}^2 | bc" | trim
```
This is `parallel` in its simplest form: the items to loop over are passed via standard input and there aren't any arguments other than the command that `parallel` needs to run.
See \@ref(fig:diagram-parallel-output) for an illustration of how `parallel` concurrently distributes input among processes and collects their outputs.
```{r diagram-parallel-output, echo=FALSE, fig.cap="GNU Parallel concurrently distributes input among processes and collects their outputs", fig.align="center"}
knitr::include_graphics("images/dscl_0802.png")
```
As you can see it basically acts as a for loop.
Here's another teaser, which replaces the for loop from the previous section.
```{console teaser_1}
parallel --jobs 2 ./slow.sh ::: {A..C}
```
Here, using the `--jobs` option, I specify that `parallel` can run at most two jobs concurrently. The arguments to `slow.sh` are specified as an argument instead of via standard input.
With a whopping 159 different options, `parallel` offers a lot of functionality.
(Perhaps too much.)
Luckily you only need to know a handful in order to be effective.
The manual page is quite informative in case you need to use a less common option.
### Specifying Input
The most important argument to `parallel`, is the command or pipeline that you'd like to run for every input.
The question is: where should the input item be inserted in the command line?
If you don't specify anything, then the input item will be appended to the end of the pipeline.
```{console, remove="keep"}
seq 3 | parallel cowsay#! enter=FALSE
C-C#! literal=FALSE
parallel --jobs 1 --keep-order cowsay ::: 1 2 3
```
The above is the same as running:
```{console}
cowsay 1 > /dev/null #<1>
cowsay 2 > /dev/null
cowsay 3 > /dev/null
```
<1> Because the output is the same as before, I redirect it to */dev/null* to suppress it.
Although this often works, I advise you to be explicit about where the input item should be inserted in the command by using placeholders.
In this case, because you want to use the entire input line (a number) at once, you only need one placeholder.
You specify the placeholder, in other words, where to put the input item, with a pair of curly braces (`{}`):
```{console}
seq 3 | parallel cowsay {} > /dev/null
```
```{block2, type="rmdnote"}
There are other ways to provide input to `parallel`.
I prefer piping the input (as I do throughout this chapter) because that's how most command-line tools are chained together into a pipeline.
The other ways involve syntax that's not seen anywhere else.
Having said that, they do enable additional functionality, such as iterating over all possible combinations of multiple lists, so be sure to read `parallel`s manual page if you like to know more.
```
When the input items are filenames, there are a couple of modifiers you can use only parts of the filename.
For example, with `{/}`, only the basename of the filename will be used:
```{console callouts=list(1)}
find /data/ch03 -type f | parallel echo '{#}\) \"{}\" has basename \"{/}\"'
```
<1> Characters such as parentheses (`)`) and quotes (`"`) have a special meaning in the shell. To use them literally you put a backslash `\` in front of them. This is called *escaping*.
If the input line has multiple parts separated by a delimiter you can add numbers to the placeholders.For example:
```{console remove=c(1, 2)}
touch input.csv
< input.csv parallel --colsep , "mv {2} {1}" > /dev/null#! enter=FALSE
C-C#! literal=FALSE
```
Here, you can apply the same placeholder modifiers.
It is also possible to reuse the same input item.
If the input to `parallel` is a CSV file with a header, then you can use the column names as placeholders:
```{console}
< input.csv parallel -C, --header : "invite {name} {email}"#! enter=FALSE
C-C#! literal=FALSE
```
```{block2, type="rmdtip"}
If you ever wonder whether your placeholders are set up correctly, you can add the `--dryrun` option.
Instead of actually executing the command, `parallel` will print out all the commands exactly as if they would have been executed.
```
### Controlling the Number of Concurrent Jobs
By default, parallel runs one job per CPU core.
You can control the number of jobs that will be run concurrently with the `--jobs` or `-j` option.
Specifying a number means that many jobs will be run concurrently.
If you put a plus sign in front of the number then `parallel` will run *N* jobs plus the number of CPU cores. If you put a minus sign in front of the number then parallel will run *N-M* jobs.
Where *N* is the number of CPU cores.
You can also specify a percentage, where the default is 100% of the number of CPU cores.
The optimal number of jobs to run concurrently depends on the actual commands you are running.
<!-- #TODO: Figure out where to type concurrently instead of parallel. And make sure all occurences of parallel are surrounded by backticks -->
```{console}
seq 5 | parallel -j0 "echo Hi {}"
```
```{console}
seq 5 | parallel -j200% "echo Hi {}"
```
If you specify `-j1`, then the commands will be run in serial. Even though this doesn’t do the name of the tool of justice, it still has its uses. For example, when you need to access an API which only allows one connection at a time. If you specify `-j0`, then parallel will run as many jobs in parallel as possible. This can be compared to your loop with the ampersand. This is not advised.
### Logging and Output
To save the output of each command, you might be tempted to the following:
```{console}
seq 5 | parallel "echo \"Hi {}\" > hi-{}.txt"
```
This will save the output into individual files.
Or, if you want to save everything into one big file you could do the following:
```{console}
seq 5 | parallel "echo Hi {}" >> one-big-file.txt
```
However, `parallel` offers the `--results` option, which stores the output in separate files.
For each job, `parallel` creates three files: *seq*, which holds the job number, *stdout* which contains the output produced by the job, and *stderr* which contains any errors produced by the job.
These three files are placed in subdirectories based on the input values.
`parallel` still prints all the output, which is redundant in this case.
You can redirect both the standard input and standard output to */dev/null* as follows:
```{console}
seq 10 | parallel --results outdir "curl 'https://anapioficeandfire.com/api/characters/{}' | jq -r '.aliases[0]'" 2>/dev/null 1>&2
tree outdir | trim
```
See \@ref(fig:diagram-parallel-results) for a pictorial overview of how the `--results` option works.
```{r diagram-parallel-results, echo=FALSE, fig.cap="GNU Parallel stores output in separate files with the `--results` option", fig.align="center"}
knitr::include_graphics("images/dscl_0803.png")
```
When you're running multiple jobs in parallel, the order in which the jobs are run may not correspond to the order of the input.
The output of jobs is therefore also mixed up.
To keep the same order, specify the `--keep-order` option or `-k` option.
Sometimes it’s useful to record which input generated which output.
`parallel` allows you to *tag* the output with the `--tag` option, which prepends each line with the input item.
```{console tag}
seq 5 | parallel --tag "echo 'sqrt({})' | bc -l"
parallel --tag --keep-order "echo '{1}*{2}' | bc -l" ::: 3 4 ::: 5 6 7
```
### Creating Parallel Tools
The `bc` tool, which I used in the beginning of the chapter, is not parallel by itself.
However, you can parallelize it using `parallel`.
The Docker image contains a tool called `pbc` [@pbc].
Its code is shown here:
```{console bat_pbc}
bat $(which pbc)
```
This tool allows us to simplify the code used in the beginning of the chapter too.
And it can process comma-separated values simultaneously:
```{console run_pbc}
seq 100 | pbc '{1}^2' | trim
paste -d, <(seq 4) <(seq 4) <(seq 4) | pbc 'sqrt({1}+{2})^{3}'
```
## Distributed Processing
Sometimes you need more power than your local machine, even with all its cores, can offer.
Luckily, `parallel` can also leverage the power of remote machines, which really allows you to speed up your pipeline.
What’s great is that `parallel` doesn't have to be installed on the remote machine.
All that’s required is that you can connect to the remote machine with the *Secure Shell* protocol (or SSH), which is also what `parallel` uses to distribute your pipeline.
(Having `parallel` installed is helpful because it can then determine how many cores to employ on each remote machine; more on this later.)
<!-- #TODO: Introduce AWS EC2 -->
<!-- #TODO: COULD: Talk about Google Cloud and MS Azure -->
First, I'm going to obtain a list of running AWS EC2 instances.
Don’t worry if you don’t have any remote machines, you can replace any occurrence of `--slf hostnames`, which tells `parallel` which remote machines to use, with `--sshlogin :`.
This way, you can still follow along with the examples in this section.
Once you know which remote machines to take over, we’re going to consider three flavors of distributed processing:
- Running ordinary commands on remote machines
- Distributing local data directly among remote machines
- Sending files to remote machines, process them, and retrieve the results
### Get List of Running AWS EC2 Instances
<!-- #TODO: Add links to GCP And Azure -->
In this section we’re creating a file named *hostnames* that will contain one hostname of a remote machine per line.
I'm using Amazon Web Services (AWS) as an example.
I assume that you have an AWS account and that you know how to launch instances.
If you’re using a different cloud computing service (such as Google Cloud Platform or Microsoft Azure), or if you have your own servers, please make sure that you create a *hostnames* file yourself before continuing to the next section.
You can obtain a list of running AWS EC2 instances using `aws` [@aws], the command-line interface to the AWS API.
With `aws`, you can almost do everything you can do with the online AWS Management Console.
The command `aws ec2 describe-instances` returns a lot of information about all your EC2 instances in JSON format (see [the online documentation](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/ec2/describe-instances.html) for more information).
You can extract the relevant fields using `jq`:
```{console, remove=list(3, "echo")}
aws ec2 describe-instances | jq '.Reservations[].Instances[] | {public_dns: .PublicDnsName, state: .State.Name}'#! enter=FALSE
C-C#!literal=FALSE
echo '{' &&
echo ' "state": "running",' &&
echo ' "public_dns": "ec2-54-88-122-140.compute-1.amazonaws.com"' &&
echo '}' &&
echo '{' &&
echo ' "state": "stopped",' &&
echo ' "public_dns": null' &&
echo '}' &&
```
The possible states of an EC2 instance are: *`pending`*, *`running`*, *`shutting-down`*, *`terminated`*, *`stopping`*, and *`stopped`*.
Because you can only distribute your pipeline to running instances, you filter out the non-running instances as follows:
```{console, remove=list(3, "echo")}
aws ec2 describe-instances | jq -r '.Reservations[].Instances[] | select(.State.Name=="running") | .PublicDnsName' | tee hostnames#! enter=FALSE
C-C#! literal=FALSE
echo 'ec2-54-88-122-140.compute-1.amazonaws.com' &&
echo 'ec2-54-88-89-208.compute-1.amazonaws.com'
```
(Without the `-r` or `--raw-output` option, the hostnames would have been surrounded by double quotes.)
The output is saved to *hostnames*, so that I can pass this to `parallel` later.
As mentioned, `parallel` employs `ssh`[@ssh] to connect to the remote machines.
If you want to connect to your EC2 instances without typing the credentials every time, you can add something like the following text to the file *\~/.ssh/config*.
```{console, include=FALSE}
mkdir -p ~/.ssh
echo "Host *.amazonaws.com\n\tIdentityFile ~/.ssh/MyKeyFile.pem\n\tUser ubuntu" > ~/.ssh/config
```
```{console}
bat ~/.ssh/config
```
Depending on your which distribution your running, your user name may be different than *`ubuntu`*.
### Running Commands on Remote Machines
The first flavor of distributed processing is to run ordinary commands on remote machines.
Let’s first double check that `parallel` is working by running the tool `hostname`[@hostname] on each EC2 instance:
```{console, remove=list(2, "echo")}
parallel --nonall --sshloginfile hostnames hostname#! enter=FALSE
C-C#! literal=FALSE
echo 'ip-172-31-23-204\nip-172-31-23-205'
```
Here, the `--sshloginfile` or `--slf` option is used to refer to the file *hostnames*.
The `--nonall` option instructs `parallel` to execute the same command on every remote machine in the *hostnames* file without using any parameters.
Remember, if you don’t have any remote machines to utilize, you can replace `--slf hostnames` with `--sshlogin :` so that the command is run on your local machine:
```{console, remove=list(2, "echo")}
parallel --nonall --sshlogin : hostname#! enter=FALSE
C-C#! literal=FALSE
echo 'data-science-toolbox'
```
Running the same command on every remote machine once only requires one core per machine. If you wanted to distribute the list of arguments passed in to `parallel` then it could potentially use more than one core. If the number of cores are not specified explicitly, `parallel` will try to determine this.
```{console, remove=list(2, 4, "fake")}
alias fake=echo
seq 2 | parallel --slf hostnames echo 2>&1#! enter=FALSE
C-C#! literal=FALSE
fake 'bash: parallel: command not found' &&
fake -n 'parallel: Warning: Could not figure out number of cpus on' &&
fake ' ec2-54-88-122-140.compute-1.amazonaws.com (). Using 1.' &&
fake '1' &&
fake '2'
```
In this case, I have `parallel` installed on one of the two remote machines.
I'm getting a warning message indicating that `parallel` is not found on one of them.
As a result, `parallel` cannot determine the number of cores and will default to using one core.
When you receive this warning message, you can do one of the following four things:
- Don’t worry, and be happy with using one core per machine
- Specify the number of jobs for each machine via the `--jobs` or `-j` option
- Specify the number of cores to use per machine by putting, for example, *2/* if you want two cores, in front of each hostname in the *hostnames* file
- Install `parallel` using a package manager. For example, if the remote machines all run Ubuntu:
```{console}
parallel --nonall --slf hostnames "sudo apt-get install -y parallel"#! enter=FALSE
C-C#! literal=FALSE
```
### Distributing Local Data among Remote Machines
The second flavor of distributed processing is to distribute local data directly among remote machines.
Imagine that you have one very large dataset that you want to process using multiple remote machines.
For simplicity, let's sum all integers from 1 to 1000.
First, let’s double check that your input is actually being distributed by printing the hostname of the remote machine and the length of the input it received using `wc`:
```{console, remove=list(3, "echo")}
seq 1000 | parallel -N100 --pipe --slf hostnames "(hostname; wc -l) | paste -sd:"#! enter=FALSE
C-C#! literal=FALSE
echo 'ip-172-31-23-204:100' &&
echo 'ip-172-31-23-205:100' &&
echo 'ip-172-31-23-205:100' &&
echo 'ip-172-31-23-204:100' &&
echo 'ip-172-31-23-205:100' &&
echo 'ip-172-31-23-204:100' &&
echo 'ip-172-31-23-205:100' &&
echo 'ip-172-31-23-204:100' &&
echo 'ip-172-31-23-205:100' &&
echo 'ip-172-31-23-204:100'
```
Excellent. You can see that your 1000 numbers get distributed evenly in subsets of 100 (as specified by `-N100`).
Now, you’re ready to sum all those numbers:
```{console, remove=list(2, "echo")}
seq 1000 | parallel -N100 --pipe --slf hostnames "paste -sd+ | bc" | paste -sd+ | bc#! enter=FALSE
C-C#! literal=FALSE
echo '500500'
```
Here, you immediately also sum the ten sums you get back from the remote machines.
Let’s check that the answer is correct by doing the same calculation without `parallel`:
```{console}
seq 1000 | paste -sd+ | bc
```
Good, that works.
If you have a larger pipeline that you want to execute on the remote machines, you can also put it in a separate script and upload it with `parallel`.
I'll demonstrate this by creating a very simple command-line tool called `add`:
```{console create_add, marker="#~"}
echo '#!/usr/bin/env bash' > add
echo 'paste -sd+ | bc' >> add
bat add
chmod u+x add
seq 1000 | ./add
```
Using the `--basefile` option, `parallel` first uploads the file *add* to all remote machines before running the jobs:
<!-- #TODO: Explain --pipe -->
```{console use_add, remove=list(4, "echo")}
seq 1000 |
parallel -N100 --basefile add --pipe --slf hostnames './add' |
./add #! enter=FALSE
C-C#! literal=FALSE
echo '500500'
```
Summing 1000 numbers is of course only a toy example.
Plus, it would've been much faster to do this locally.
Still, I hope it’s clear from this that `parallel` can be incredibly powerful.
### Processing Files on Remote Machines
The third flavor of distributed processing is to send files to remote machines, process them, and retrieve the results.
Imagine that you want to count for each borough of New York City, how often they receive service calls on 311.
You don’t have that data on your local machine yet, so let’s first obtain it from the free [NYC Open Data API](https://data.cityofnewyork.us/):
```{console nyc}
seq 0 100 900 | parallel "curl -sL 'http://data.cityofnewyork.us/resource/erm2-nwe9.json?\$limit=100&\$offset={}' | jq -c '.[]' | gzip > nyc-{#}.json.gz"
```
You now have 10 files containing compressed JSON data:
```{console nyc_ls}
l nyc*json.gz
```
Note that `jq -c '.[]'` is used to flatten the array of JSON objects so that there’s one object per line, with a total of 100 lines per file.
Using `zcat` [@zcat], you directly print the contents of a compress file:
```{console zcat_trim}
zcat nyc-1.json.gz | trim
```
Let’s see what one line of JSON looks like using:
```{console zcat_head_1}
zcat nyc-1.json.gz | head -n 1
```
If you were to get the total number of service calls per borough on your local machine, you would run the following command:
```{console, callouts=1:6}
zcat nyc*json.gz |
jq -r '.borough' |
tr '[A-Z] ' '[a-z]_' |
sort | uniq -c | sort -nr |
awk '{print $2","$1}' |
header -a borough,count |
csvlook
```
<1> Expand all compressed files using `zcat`.
<2> For each call, extract the name of the borough using `jq`.
<3> Convert borough names to lowercase and replace spaces with underscores (because `awk` splits on whitespace by default).
<4> Count the occurrences of each borough using `sort` and `uniq`.
<5> Reverse the two columns and delimit them by comma delimited using `awk`.
<6> Add a header using `header`.
Imagine, for a moment, that your own machine is so slow that you simply cannot perform this pipeline locally.
You can use `parallel` to distribute the local files among the remote machines, let them do the processing, and retrieve the results:
```{console, callouts=c(1, 2, 3, 4, 6)}
ls *.json.gz |
parallel -v --basefile jq \
--trc {.}.csv \
--slf hostnames \
"zcat {} | ./jq -r '.borough' | tr '[A-Z] ' '[a-z]_' | sort | uniq -c | awk '{print \$2\",\"\$1}' > {.}.csv"#! enter=FALSE
C-C#! literal=FALSE
```
<1> Print the list of files and pipe it into `parallel`
<2> Transmit the `jq` binary to each remote machine. Luckily, `jq` has no dependencies. This file will be removed from the remote machines afterwards because I specified the `--trc` option (which implies the `--cleanup` option). Note that the pipeline uses `./jq` instead of just `jq`. That's because the pipeline needs to use the version which was uploaded and not the version that may or may not be on the search path.
<3> The command-line argument `--trc {.}.csv` is short for `--transfer --return {.}.csv --cleanup`. (The replacement string *`{.}`* gets replaced with the input filename without the last extension.) Here, this means that the JSON file gets transferred to the remote machine, the CSV file gets returned to the local machine, and both files will be removed after each job from the remote machine
<4> Specify a list of hostnames. Remember, if you want to try this out locally, you can specify `--sshlogin :` instead of `--slf hostnames`
<5> Note the escaping in the `awk` expression. Quoting can sometimes be tricky. Here, the dollar signs and the double quotes are escaped. If quoting ever gets too confusing, remember that you put the pipeline into a separate command-line tool just as I did with `add`
```{console, include=FALSE}
ls *.json.gz |
parallel -v --basefile jq \
--trc {.}.csv \
--sshlogin : \
"zcat {} | jq -r '.borough' | tr '[A-Z] ' '[a-z]_' | sort | uniq -c | awk '{print \$2\",\"\$1}' > {.}.csv"
```
If you, during this process, run `ls` on one of the remote machines, you would see that `parallel` indeed transfers (and cleans up) the binary `jq`, the JSON files, and CSV files:
```{console, remove=list(2, "echo")}
ssh $(head -n 1 hostnames) ls#! enter=FALSE
C-C#! literal=FALSE
echo 'nyc-1.json.csv' &&
echo 'nyc-1.json.gz' &&
echo 'jq' &&
```
Each CSV file looks something like this:
```{console, remove=list(2, "echo")}
cat nyc-1.json.csv #! enter=FALSE
C-C#! literal=FALSE
echo 'bronx,3' &&
echo 'brooklyn,5' &&
echo 'manhattan,24' &&
echo 'queens,3' &&
echo 'staten_island,2'
```
You can sum the counts in each CSV file using `rush`[@rush] and the tidyverse:
```{console}
cat nyc*csv | header -a borough,count |
rush run -t 'group_by(df, borough) %>% summarize(count = sum(count))' - |
csvsort -rc count | csvlook
```
Or, if you prefer to use SQL to aggregate results, you can use `csvsql` as discussed in [Chapter 5](#chapter-5-scrubbing-data):
```{console}
cat nyc*csv | header -a borough,count |
csvsql --query 'SELECT borough, SUM(count) AS count FROM stdin GROUP BY borough ORDER BY count DESC' |
csvlook
```
## Summary
As a data scientist, you work with data--occasionally a lot of data.
This means that sometimes you need to run a command multiple times or distribute data-intensive commands over multiple cores.
In this chapter I have shown you how easy it is to parallelize commands.
`parallel` is a very powerful and flexible tool to speed up ordinary command-line tools and distribute them.
It offers a lot of functionality and in this chapter I’ve only been able to scratch the surface.
In the next chapter I'm going to cover the fourth step of the OSEMN model: modeling data.
## For Further Exploration
- Once you have a basic understanding of `parallel` and its most important options, I recommend that you take a look at [its online tutorial](https://www.gnu.org/software/parallel/parallel_tutorial.html). You'll learn, among other things, how to specify different ways of specifying input, keep a log of all the jobs, and how to timeout, resume, and retry jobs. As creator of `parallel` Ole Tange in this tutorial says, "Your command line will love you for it."