forked from wavii/spread
-
Notifications
You must be signed in to change notification settings - Fork 0
/
explode_app.cpp
112 lines (95 loc) · 3.52 KB
/
explode_app.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <iterator>
#include <iostream>
#include <fstream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/scoped_array.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/program_options.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <spread/exploder.h>
#include <spread/progress.h>
#include <spread/endpoint.h>
using namespace spread;
using namespace boost;
using boost::asio::ip::tcp;
using namespace std;
namespace po = boost::program_options;
void run(exploder & e, istream& in)
{
// any larger, and we may try to write to a multi_frame that never has enough space
static const int BUF_SIZE = multi_frame::MAX_SIZE - frame::FRAME_SIZE;
scoped_array<char> buffer(new char[BUF_SIZE]);
ios::sync_with_stdio(false); // makes a big difference on buffered i/o
for (int line = 1; !in.eof(); ++line)
{
in.getline(buffer.get(), BUF_SIZE - 1); // leave 1 for us to inject back the newline
if (buffer[0] == '\0')
continue;
if (in.fail()) // line was too long?
{
cerr << "Skipping line <" << line << ">: line is probably too long" << endl;
in.clear(); // clear state
in.ignore(numeric_limits<streamsize>::max(), '\n');
continue;
}
buffer[in.gcount() - 1] = '\n'; // inject back the newline
buffer[in.gcount()] = '\0';
e << buffer.get();
}
}
int main(int argc, char * argv[])
{
int local_port = 0;
vector<string> remotes;
string remotes_filename;
string input_filename;
string output_filename;
po::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help message")
("port,p", po::value<int>(&local_port)->default_value(9998), "The local port")
("host,h", po::value< vector<string> >(&remotes)->composing(), "An address, in a host:port format. Can specify multiple -r args.")
("hostfile,f", po::value< string >(&remotes_filename), "A file containing addresses, in host:port format. Can combine with -r args.")
("input,i", po::value<string>(&input_filename), "The input file. If not specified it will use stdin")
("ouput,o", po::value<string>(&output_filename), "The output file. If not specified it will use stdout")
("progress", "Print transmission stats once every 5 seconds.")
;
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
}
catch(std::exception& e)
{
cerr << e.what() << "\n";
return 1;
}
if (!remotes_filename.empty())
{
ifstream in(remotes_filename.c_str());
copy(istream_iterator<string>(in), istream_iterator<string>(), back_inserter(remotes));
}
if (vm.count("help") || !local_port || remotes.empty())
{
cerr << "Usage: explode [options] < input\n";
cerr << desc;
return 0;
}
vector<tcp::endpoint> endpoints;
address_to_endpoint(9998, remotes, endpoints);
ifstream in(input_filename.c_str());
ofstream out(output_filename.c_str());
asio::io_service io;
shared_ptr<thread> pt; // give the io_service a separate thread, so we can do blocking i/o on the main thread
{
exploder e(io, tcp::endpoint(tcp::v4(), local_port), endpoints, output_filename.empty() ? cout : out,
shared_ptr<progress>(vm.count("progress") ? new progress(io) : NULL));
pt.reset(new thread(bind(&asio::io_service::run, &io)));
run(e, input_filename.empty() ? cin : in);
}
pt->join(); // wait for any pending io_service events to flush
return 0;
}