forked from googleapis/google-cloud-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcs2cbt.cc
311 lines (275 loc) · 10.8 KB
/
gcs2cbt.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "google/cloud/bigtable/admin/bigtable_table_admin_client.h"
#include "google/cloud/bigtable/mutation_batcher.h"
#include "google/cloud/bigtable/table.h"
#include "google/cloud/storage/client.h"
#include "google/cloud/internal/getenv.h"
#include "google/cloud/internal/random.h"
#include "absl/strings/match.h"
#include "absl/strings/str_split.h"
#include <condition_variable>
#include <fstream>
#include <future>
#include <iomanip>
#include <mutex>
#include <sstream>
#include <vector>
/**
* @file
*
* Shows how to upload CSV data from Google Cloud Storage to Google Cloud
* Bigtable.
*/
namespace cbt = ::google::cloud::bigtable;
namespace gcs = ::google::cloud::storage;
namespace {
struct Options {
char separator;
std::vector<int> keys;
std::string keys_separator;
std::string project_id;
std::string instance_id;
std::string table_id;
std::string family;
std::string bucket;
std::string object;
};
Options ParseArgs(int argc, char* argv[]);
} // anonymous namespace
int main(int argc, char* argv[]) try {
auto const options = ParseArgs(argc, argv);
// Create a connection to Cloud Bigtable and an object to manipulate the
// specific table used in this demo.
cbt::Table table(cbt::MakeDataClient(options.project_id, options.instance_id),
options.table_id);
cbt::MutationBatcher batcher(table);
// How often do we print a progress marker ('.') in the reader thread.
int const report_reader_progress_rate = 500000;
// How often do we print a progress marker ('+') in the worker threads.
int const report_worker_progress_rate = 500000;
// The size of the thread pool pushing data to Cloud Bigtable
std::size_t const thread_pool_size = []() -> std::size_t {
if (std::thread::hardware_concurrency() != 0) {
return std::thread::hardware_concurrency() - 1;
}
return 1;
}();
std::cout << "Starting " << thread_pool_size << " workers ..." << std::flush;
google::cloud::CompletionQueue cq;
std::vector<std::thread> thread_pool;
for (std::size_t i = 0; i != thread_pool_size; ++i) {
thread_pool.emplace_back([&cq] { cq.Run(); });
}
std::cout << " DONE\n";
gcs::Client client(
google::cloud::Options{}.set<gcs::ProjectIdOption>(options.project_id));
// The main thread just reads the file one line at a time.
auto is = client.ReadObject(options.bucket, options.object);
std::string line;
std::getline(is, line, '\n');
int lineno = 0;
std::vector<std::string> const headers =
absl::StrSplit(line, options.separator);
std::cout << "# HEADER " << line << "\n";
std::cout << "Reading input file " << std::flush;
auto start = std::chrono::steady_clock::now();
std::atomic<int> apply_finished_count(0);
auto report_progress_callback =
[=, &apply_finished_count](
::google::cloud::future<::google::cloud::Status> status_future) {
if ((apply_finished_count.fetch_add(1) + 1) %
report_worker_progress_rate ==
0) {
std::cout << '+' << std::flush;
}
auto status = status_future.get();
if (!status.ok()) {
std::cerr << "Apply failed: " << status;
}
};
while (std::getline(is, line, '\n')) {
++lineno;
if (line.empty()) {
break;
}
std::vector<std::string> const parsed =
absl::StrSplit(line, options.separator);
using std::chrono::milliseconds;
auto ts = std::chrono::duration_cast<milliseconds>(
std::chrono::system_clock::now().time_since_epoch());
// Create the row key by concatenating the desired columns.
std::string row_key;
std::string row_key_separator;
for (auto&& index : options.keys) {
row_key += row_key_separator;
row_key_separator = options.keys_separator;
if (index == -1) {
// Magic index, use the line number
row_key += std::to_string(lineno);
} else {
row_key += parsed.at(index);
}
}
// Create a mutation that inserts one column per field, the name of the
// column is derived from the header.
cbt::SingleRowMutation mutation(row_key);
auto field_count = std::min(headers.size(), parsed.size());
for (std::size_t i = 0; i != field_count; ++i) {
mutation.emplace_back(
cbt::SetCell(options.family, headers[i], ts, parsed[i]));
}
auto admission_and_completion = batcher.AsyncApply(cq, std::move(mutation));
admission_and_completion.second.then(report_progress_callback);
if (lineno % report_reader_progress_rate == 0) {
std::cout << '.' << std::flush;
}
// Wait until there is space in buffers.
admission_and_completion.first.get();
}
batcher.AsyncWaitForNoPendingRequests().get();
std::cout << " DONE\n";
std::cout << "Waiting for worker threads " << std::flush;
// Let the workers know that they can exit.
cq.Shutdown();
for (auto& thread : thread_pool) {
thread.join();
}
std::cout << " DONE\n";
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
std::cout << "Total running time " << elapsed.count() << "s\n";
return 0;
} catch (std::exception const& ex) {
std::cerr << "Standard exception raised: " << ex.what() << "\n";
return 1;
}
namespace {
std::string ConsumeArg(Options& options, std::vector<std::string>& argv,
char const* arg_name) {
std::string const separator_option = "--separator=";
std::string const key_option = "--key=";
std::string const keys_separator_option = "--key-separator=";
std::string const usage = R""(
[options] <project> <instance> <table> <family> <bucket> <object>
The options are:
--help: produce this help.
--separator=c: use the 'c' character instead of comma (',') to separate the
values in the CSV file.
--key=N: use field number N as part of the row key. The fields are numbered
starting at one. They are concatenated in the order provided.
--key-separator=sep: use 'sep' to separate the fields when forming the row
key.
project: the Google Cloud Platform project id for your table.
instance: the Cloud Bigtable instance hosting your table.
table: the table where you want to upload the CSV file.
family: the column family where you want to upload the CSV file.
bucket: the name of the GCS bucket that contains the data.
object: the name of the GCS object that contains the data.
)"";
while (argv.size() >= 2) {
std::string argument(argv[1]);
argv.erase(argv.begin() + 1);
if (argument == "--help") {
} else if (absl::StartsWith(argument, separator_option)) {
options.separator = argument.substr(separator_option.size())[0];
} else if (absl::StartsWith(argument, key_option)) {
options.keys.push_back(std::stoi(argument.substr(key_option.size())) - 1);
} else if (absl::StartsWith(argument, keys_separator_option)) {
options.keys_separator = argument.substr(keys_separator_option.size());
} else {
return argument;
}
}
std::string cmd = argv[0];
auto last_slash = std::string(cmd).find_last_of('/');
cmd = cmd.substr(last_slash);
std::ostringstream os;
os << "Missing argument " << arg_name << "\n";
os << "Usage: " << cmd << usage << "\n";
throw std::runtime_error(os.str());
}
Options ParseArgsNoAutoRun(int argc, char const* const argv[]) {
// Parse the command-line arguments.
Options options;
std::vector<std::string> args(argv, argv + argc);
options.project_id = ConsumeArg(options, args, "project_id");
options.instance_id = ConsumeArg(options, args, "instance_id");
options.table_id = ConsumeArg(options, args, "table_id");
options.family = ConsumeArg(options, args, "family");
options.bucket = ConsumeArg(options, args, "bucket");
options.object = ConsumeArg(options, args, "object");
// If the user does not say, use the first column as the row key.
if (options.keys.empty()) {
options.keys.push_back(0);
}
return options;
}
/// Setup test versions of the Bigtable and Google Cloud Storage environments
/// and return options pointing to those versions.
Options AutoRun() {
using ::google::cloud::internal::GetEnv;
using ::google::cloud::internal::Sample;
namespace cbta = ::google::cloud::bigtable_admin;
for (auto const& var :
{"GOOGLE_CLOUD_PROJECT", "GOOGLE_CLOUD_CPP_BIGTABLE_TEST_INSTANCE_ID",
"GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME"}) {
auto const value = GetEnv(var).value_or("");
if (!value.empty()) continue;
std::ostringstream os;
os << "The environment variable " << var << " is not set or empty";
throw std::runtime_error(std::move(os).str());
}
auto const project_id = GetEnv("GOOGLE_CLOUD_PROJECT").value();
auto const instance_id =
GetEnv("GOOGLE_CLOUD_CPP_BIGTABLE_TEST_INSTANCE_ID").value();
auto const bucket_name =
GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value();
auto constexpr kTableId = "gcs2cbt-auto-run";
auto constexpr kObjectName = "gcs2cbt-sample-data.csv";
auto gcs_client = gcs::Client();
auto constexpr kTestData = R"""(RowId,Header1,Header2,Header3
1,v1,v2,v3
3,v1,v2,v3
)""";
gcs_client.InsertObject(bucket_name, kObjectName, kTestData).value();
google::bigtable::admin::v2::GcRule gc;
gc.set_max_num_versions(2);
google::bigtable::admin::v2::Table t;
auto& families = *t.mutable_column_families();
*families["fam"].mutable_gc_rule() = std::move(gc);
cbta::BigtableTableAdminClient admin(
cbta::MakeBigtableTableAdminConnection());
auto schema = admin.CreateTable(cbt::InstanceName(project_id, instance_id),
kTableId, t);
// Throw the error unless it is "already exists"
if (!schema &&
schema.status().code() != google::cloud::StatusCode::kAlreadyExists) {
(void)schema.value();
}
char const* argv[] = {
"auto-run", "--key=1", "--separator=,", project_id.c_str(),
instance_id.c_str(), kTableId, "fam", bucket_name.c_str(),
kObjectName};
int argc = sizeof(argv) / sizeof(argv[0]);
return ParseArgsNoAutoRun(argc, argv);
}
Options ParseArgs(int argc, char* argv[]) {
bool auto_run =
google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_AUTO_RUN_EXAMPLES")
.value_or("") == "yes";
if (auto_run) return AutoRun();
return ParseArgsNoAutoRun(argc, argv);
}
} // namespace