Skip to content

Commit a934c71

Browse files
author
Sven Sandberg
committed
BUG#33666652: Server may decompress entire Transaction_payload_log_event in memory
* Background Binary log compression is implemented by compressing many events into one. There were decompressing iterators over the file, which yielded the events of the binary log one at a time, including the decompressed events. * Problem When the iterators reached a Transaction_payload_log_event, they decompressed the full payload, even if only one event at a time was yielded to the caller. This is wasteful. * Solution - The decompressor was mixing memory management with decompressor glue. Separated the responsibilities so the decompressor only is compressor glue, and the new class Growable_buffer manages memory for a growable buffer. - The decompressor APIs were implemented as "iterators". However, the implementation violated a common iterator idiom by holding an error state. This made it necessary to keep the iterator object after it had reached the end iteraton in order to check for the error, which maked it unusable for e.g. range-based for loops. So, changed to using a "stream" instead, where it is more natural to check for errors using the typical idioms. - Made the stream classes decompress only one event at a time. - Removed the abstract base class common to compressors and decompressors. - Changed the Decompressor API to better match what decompressor libraries provide and what API clients need. Now the use pattern is: - Call 'feed' to provide input to decompress - Call 'decompress' produce a given number of decompressed bytes. - Repeat the above as many times as needed. - The above process can be aborted at any time (e.g. after error) by calling 'reset'. - The caller doesn't need to track how much is left to compress; it only needs to check whether the compress operation failed. - Removed use of ZSTD's "advanced" decompression API. We were not using the features it provided, and some platforms were using an older version where this API was unstable. - Made all instantiations of template class Basic_binlog_file_reader have a common base class, IBasic_binlog_file_reader. Before, the old "Iterator" class had to perform a reinterpret_cast (in binlog.cc:show_binlog_events) in order to reuse the same code for Relaylog_ifile and Binlog_ifile. This is dangerous and we should never use such casts; it would lead to crashes if the types are not binary compatible. Now, the new "stream" class just uses a pointer to the base class and resolves whether the object is a Relaylog_ifile or a Binlog_ifile using polymorphism. Change-Id: I1e4872597fb6b37bda03682eadd058dd59b21a60
1 parent ca51ab1 commit a934c71

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3921
-1304
lines changed

client/mysqlbinlog.cc

+81-61
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
of the log; if it is the 3rd then there is this combination:
3232
Format_desc_of_slave, Rotate_of_master, Format_desc_of_master.
3333
*/
34-
3534
#include "client/mysqlbinlog.h"
3635

3736
#include <fcntl.h>
@@ -43,14 +42,15 @@
4342
#include <time.h>
4443
#include <algorithm>
4544
#include <map>
45+
#include <sstream>
4646
#include <utility>
4747

4848
#include "caching_sha2_passwordopt-vars.h"
4949
#include "client/client_priv.h"
5050
#include "compression.h"
5151
#include "libbinlogevents/include/codecs/factory.h"
5252
#include "libbinlogevents/include/compression/factory.h"
53-
#include "libbinlogevents/include/compression/iterator.h"
53+
#include "libbinlogevents/include/compression/payload_event_buffer_istream.h"
5454
#include "libbinlogevents/include/trx_boundary_parser.h"
5555
#include "my_byteorder.h"
5656
#include "my_dbug.h"
@@ -61,6 +61,7 @@
6161
#include "my_time.h"
6262
#include "prealloced_array.h"
6363
#include "print_version.h"
64+
#include "scope_guard.h"
6465
#include "sql/binlog_reader.h"
6566
#include "sql/log_event.h"
6667
#include "sql/my_decimal.h"
@@ -136,7 +137,6 @@ class Database_rewrite {
136137
Rewrite_payload_result rewrite_inner_events(
137138
binary_log::transaction::compression::type compression_type,
138139
const char *orig_payload, std::size_t orig_payload_size,
139-
std::size_t orig_payload_uncompressed_size,
140140
const binary_log::Format_description_event &fde) {
141141
// to return error or not
142142
auto err{false};
@@ -152,10 +152,17 @@ class Database_rewrite {
152152
// RAII objects
153153
Buffer_realloc_manager ibuffer_dealloc_guard(&ibuffer);
154154

155-
// iterator to decompress events
156-
binary_log::transaction::compression::Iterable_buffer it(
157-
orig_payload, orig_payload_size, orig_payload_uncompressed_size,
158-
compression_type);
155+
// stream to decompress events
156+
using Buffer_istream_t =
157+
binary_log::transaction::compression::Payload_event_buffer_istream;
158+
using Buffer_ptr_t = Buffer_istream_t::Buffer_ptr_t;
159+
160+
Buffer_istream_t istream(
161+
reinterpret_cast<const unsigned char *>(orig_payload),
162+
orig_payload_size, compression_type);
163+
Buffer_ptr_t buffer_ptr;
164+
165+
// compressor to compress again
159166
using Compress_status_t =
160167
binary_log::transaction::compression::Compress_status;
161168
using Managed_buffer_sequence_t =
@@ -167,20 +174,21 @@ class Database_rewrite {
167174
compression_type);
168175

169176
// rewrite and compress
170-
for (auto ptr : it) {
171-
std::size_t ev_len{uint4korr(ptr + EVENT_LEN_OFFSET)};
177+
while (istream >> buffer_ptr) {
178+
/// @todo: don't copy, just use the Decompressor's managed buffer
172179

173180
// reserve input buffer size (we are modifying the input buffer contents
174181
// before compressing it back).
175182
std::tie(ibuffer, ibuffer_capacity, err) =
176-
reserve(ibuffer, ibuffer_capacity, ev_len);
183+
reserve(ibuffer, ibuffer_capacity, buffer_ptr->size());
177184
if (err) return error_val;
178-
memcpy(ibuffer, ptr, ev_len);
185+
memcpy(ibuffer, buffer_ptr->data(), buffer_ptr->size());
179186

180187
// rewrite the database name if needed
188+
std::size_t ev_len = 0;
181189
std::tie(ibuffer, ibuffer_capacity, ev_len, err) =
182-
m_event_rewriter.rewrite_event(ibuffer, ibuffer_capacity, ev_len,
183-
fde);
190+
m_event_rewriter.rewrite_event(ibuffer, ibuffer_capacity,
191+
buffer_ptr->size(), fde);
184192
if (err) return error_val;
185193

186194
compressor->feed(ibuffer, ev_len);
@@ -189,6 +197,10 @@ class Database_rewrite {
189197
return error_val;
190198
obuffer_size_uncompressed += ev_len;
191199
}
200+
if (istream.has_error()) {
201+
error("%s", istream.get_error_str().c_str());
202+
return error_val;
203+
}
192204

193205
if (compressor->finish(managed_buffer_sequence) !=
194206
Compress_status_t::success)
@@ -230,7 +242,6 @@ class Database_rewrite {
230242

231243
auto orig_payload{tpe.get_payload()};
232244
auto orig_payload_size{tpe.get_payload_size()};
233-
auto orig_payload_uncompressed_size{tpe.get_uncompressed_size()};
234245
auto orig_payload_compression_type{tpe.get_compression_type()};
235246

236247
unsigned char *rewritten_payload{nullptr};
@@ -247,8 +258,7 @@ class Database_rewrite {
247258
rewritten_payload_size, rewritten_payload_uncompressed_size,
248259
rewrite_payload_res) =
249260
rewrite_inner_events(orig_payload_compression_type, orig_payload,
250-
orig_payload_size,
251-
orig_payload_uncompressed_size, fde);
261+
orig_payload_size, fde);
252262

253263
if (rewrite_payload_res) return Rewrite_result{nullptr, 0, 0, true};
254264

@@ -2730,12 +2740,12 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info,
27302740

27312741
if (!raw_mode || (type == binary_log::ROTATE_EVENT) ||
27322742
(type == binary_log::FORMAT_DESCRIPTION_EVENT)) {
2733-
Binlog_read_error read_error = binlog_event_deserialize(
2743+
Binlog_read_error read_status = binlog_event_deserialize(
27342744
reinterpret_cast<unsigned char *>(event_buf), event_len,
27352745
&glob_description_event, opt_verify_binlog_checksum, &ev);
2736-
2737-
if (read_error.has_error()) {
2738-
error("Could not construct log event object: %s", read_error.get_str());
2746+
if (read_status.has_error()) {
2747+
error("Could not construct log event object: %s",
2748+
read_status.get_str());
27392749
my_free(event_buf);
27402750
return ERROR_STOP;
27412751
}
@@ -3070,10 +3080,10 @@ static Exit_status dump_local_log_entries(PRINT_EVENT_INFO *print_event_info,
30703080
// truncated: this may occur when the file is concurrently
30713081
// written by mysqld.
30723082
auto fde_flags =
3073-
mysqlbinlog_file_reader.format_description_event()->header()->flags;
3083+
mysqlbinlog_file_reader.format_description_event().header()->flags;
30743084
auto in_use_flag = fde_flags & LOG_EVENT_BINLOG_IN_USE_F;
30753085
if (in_use_flag != 0 && error_type == Binlog_read_error::TRUNC_EVENT) {
3076-
fprintf(result_file, "# File ends with a truncated event.\n");
3086+
warning("File ends with a truncated event.");
30773087
return OK_CONTINUE;
30783088
}
30793089

@@ -3395,61 +3405,70 @@ void Transaction_payload_log_event::print(FILE *,
33953405
Format_description_event fde_no_crc = glob_description_event;
33963406
fde_no_crc.footer()->checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
33973407

3398-
bool error{false};
33993408
IO_CACHE *const head = &info->head_cache;
34003409
size_t current_buffer_size = 1024;
3401-
auto buffer = (uchar *)my_malloc(PSI_NOT_INSTRUMENTED, current_buffer_size,
3402-
MYF(MY_WME));
3410+
auto *buffer = static_cast<uchar *>(
3411+
my_malloc(PSI_NOT_INSTRUMENTED, current_buffer_size, MYF(MY_WME)));
3412+
if (buffer == nullptr) {
3413+
head->error = -1;
3414+
error("Out of memory.");
3415+
return;
3416+
}
3417+
Scope_guard free_buffer_guard([&] { my_free(buffer); });
34033418
if (!info->short_form) {
34043419
std::ostringstream oss;
34053420
oss << "\tTransaction_Payload\t" << to_string() << std::endl;
3406-
oss << "# Start of compressed events!" << std::endl;
3421+
oss << "# Start of compressed events." << std::endl;
34073422
print_header(head, info, false);
34083423
my_b_printf(head, "%s", oss.str().c_str());
34093424
}
34103425

34113426
// print the payload
3412-
binary_log::transaction::compression::Iterable_buffer it(
3413-
m_payload, m_payload_size, m_uncompressed_size, m_compression_type);
3414-
3415-
for (auto ptr : it) {
3427+
using Buffer_istream_t =
3428+
binary_log::transaction::compression::Payload_event_buffer_istream;
3429+
Buffer_istream_t istream(*this);
3430+
Buffer_istream_t::Buffer_ptr_t original_event_buffer;
3431+
while (istream >> original_event_buffer) {
34163432
Log_event *ev = nullptr;
34173433
bool is_deferred_event = false;
34183434

34193435
// fix the checksum part
3420-
size_t event_len = uint4korr(ptr + EVENT_LEN_OFFSET);
3421-
3422-
// resize the buffer we are using to handle the event if needed
3423-
if (event_len > current_buffer_size) {
3436+
size_t event_len = original_event_buffer->size();
3437+
3438+
// Resize the buffer we are using to handle the event if needed.
3439+
//
3440+
// The condition `buffer==nullptr` is redundant, because if buffer
3441+
// is null, then current_buffer_size is 0, and event_len is
3442+
// guaranteed to be greater than 0 when `operator>>` completed
3443+
// without taking the stream to an error state. But clang-tidy
3444+
// doesn't know that event_len is guaranteed to be greater than
3445+
// zero, and reports a possible memory leak.
3446+
if (buffer == nullptr || event_len > current_buffer_size) {
34243447
current_buffer_size =
34253448
round(((event_len + BINLOG_CHECKSUM_LEN) / 1024.0) + 1) * 1024;
3426-
buffer = (uchar *)my_realloc(PSI_NOT_INSTRUMENTED, buffer,
3427-
current_buffer_size, MYF(0));
3428-
3429-
/* purecov: begin inspected */
3430-
if (!buffer) {
3431-
// OOM
3449+
auto *new_buffer = static_cast<uchar *>(my_realloc(
3450+
PSI_NOT_INSTRUMENTED, buffer, current_buffer_size, MYF(0)));
3451+
if (new_buffer == nullptr) {
34323452
head->error = -1;
3433-
my_b_printf(head, "# Out of memory!");
3434-
goto end;
3453+
error("Out of memory.");
3454+
return;
34353455
}
3436-
/* purecov: end */
3456+
buffer = new_buffer;
34373457
}
34383458

3439-
memcpy(buffer, ptr, event_len);
3459+
memcpy(buffer, original_event_buffer->data(), event_len);
34403460

34413461
// rewrite the database name if needed
3442-
std::tie(buffer, current_buffer_size, event_len, error) =
3462+
bool rewrite_error{false};
3463+
std::tie(buffer, current_buffer_size, event_len, rewrite_error) =
34433464
global_database_rewriter.rewrite(buffer, current_buffer_size, event_len,
34443465
fde_no_crc);
34453466

3446-
/* purecov: begin inspected */
3447-
if (error) {
3467+
if (rewrite_error) {
34483468
head->error = -1;
3449-
my_b_printf(head, "# Error while rewriting db for compressed events!");
3450-
goto end;
3469+
error("Error rewriting db for compressed events.");
3470+
return;
34513471
}
3452-
/* purecov: end */
34533472

34543473
// update the CRC
34553474
if (has_crc) {
@@ -3459,14 +3478,13 @@ void Transaction_payload_log_event::print(FILE *,
34593478
}
34603479

34613480
// now deserialize the event
3462-
if (binlog_event_deserialize((const unsigned char *)buffer, event_len,
3463-
&glob_description_event, true, &ev)) {
3464-
/* purecov: begin inspected */
3481+
Binlog_read_error read_error =
3482+
binlog_event_deserialize((const unsigned char *)buffer, event_len,
3483+
&glob_description_event, true, &ev);
3484+
if (read_error.has_error()) {
34653485
head->error = -1;
3466-
my_b_printf(
3467-
head, "# Error while handling compressed events! Corrupted binlog?");
3468-
goto end;
3469-
/* purecov: end */
3486+
error("Error decoding Payload_log_event: %s.", read_error.get_str());
3487+
return;
34703488
}
34713489

34723490
switch (ev->get_type_code()) {
@@ -3498,10 +3516,12 @@ void Transaction_payload_log_event::print(FILE *,
34983516
current_buffer_size = 0; /* purecov: inspected */
34993517
}
35003518
}
3519+
if (istream.has_error()) {
3520+
error("%s", istream.get_error_str().c_str());
3521+
head->error = -1;
3522+
return;
3523+
}
35013524

3502-
if (!info->short_form) my_b_printf(head, "# End of compressed events!\n");
3503-
3504-
end:
3505-
my_free(buffer);
3525+
if (!info->short_form) my_b_printf(head, "# End of compressed events.\n");
35063526
}
35073527
#endif

0 commit comments

Comments
 (0)