-
Notifications
You must be signed in to change notification settings - Fork 0
/
external_sort.cc
110 lines (85 loc) · 3.76 KB
/
external_sort.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include "external_sort.hh"
#include <seastar/core/sharded.hh>
#include "app_config.hh"
#include "first_pass_service.hh"
#include "second_pass_service.hh"
#include "verify_service.hh"
seastar::future<> external_sort(const app_config &config) {
logger.info("Starting external sort on file : {}", config.input_filename);
seastar::sharded<first_pass_service> fps;
seastar::sharded<second_pass_service> sps;
seastar::sharded<second_pass_service> final_ps;
seastar::sharded<verify_service> vs;
seastar::file input_file, output_file;
try {
input_file = co_await seastar::open_file_dma(config.input_filename,
seastar::open_flags::ro);
// initialize the first pass service across shards
co_await fps.start(input_file.dup(), config.temp_working_dir);
logger.info("Running first pass");
// run the first pass
co_await fps.invoke_on_all([](first_pass_service &local_service) {
return local_service.run();
});
logger.info("Completed first pass");
logger.info("Running second pass");
// initialize the second pass service across shards
auto get_num_of_files = [](const first_pass_service &fps) {
return fps.get_total_files();
};
co_await sps.start(
config.temp_working_dir,
seastar::sharded_parameter(get_num_of_files, std::ref(fps)));
// run the second pass
co_await sps.invoke_on_all([](second_pass_service &local_service) {
return local_service.run();
});
logger.info("Completed second pass");
logger.info("Running a final pass merging all intermediate files into "
"a single sorted file");
// initialize the final pass
co_await final_ps.start(config.temp_working_dir, seastar::smp::count,
config.output_filename);
// run it either locally or on another shard if available
co_await final_ps.invoke_on((seastar::smp::count > 1 ? 1 : 0),
[](second_pass_service &local_service) {
return local_service.run();
});
logger.info("Completed sorting the given file");
logger.info("Sorted file is stored at : {}", config.output_filename);
if (config.verify_results) {
output_file = co_await seastar::open_file_dma(
config.output_filename, seastar::open_flags::ro);
// initialize the verify service across shards
co_await vs.start(output_file.dup());
logger.info("Verifying the sorted result file");
if (co_await input_file.size() != co_await output_file.size()) {
throw verification_exception(
"sorted result file has a different size than the input "
"file");
}
// run the verify service
co_await vs.invoke_on_all([](verify_service &local_service) {
return local_service.run();
});
// none of the shards threw an exception => verification succeeded;
logger.info("Result file verification succeeded!");
}
} catch (verification_exception ex) {
logger.error("Result file verification failed : {}", ex.what());
} catch (...) {
logger.error("external sort failed with following error : {}",
std::current_exception());
}
// cleanup
if (input_file) {
co_await input_file.close();
}
if (output_file) {
co_await output_file.close();
}
co_await fps.stop();
co_await sps.stop();
co_await final_ps.stop();
co_await vs.stop();
}