Skip to content

Commit ca51ab1

Browse files
author
Sven Sandberg
committed
BUG#33588473: Binlog compression breaks when compressed size exceeds 1 GiB [step 2]
* Background Binary log events have a hard limit of 1 GiB. The server cannot read bigger events. If a bigger event would end up in the binary log, the operation that was reading it fails. Big transactions are usually split up into multiple smaller events. So it is possible to have transactions much bigger than 1 GiB, which will then consist of events smaller than 1 GiB. Binlog compression compresses the transaction as a whole and stores the compressed data in a single event. * Problem A transaction that consists of events smaller than 1 GiB, but whose total size adds up to bigger than 1 GiB, and where even compression does not make it smaller than 1 GiB, was nevertheless written to the binary log as one event bigger than 1 GiB. This made the binary log unusable; effectively it was corrupted since neither the server nor other tools could read it. * Solution When the compressed data grows bigger than 1 GiB, fall back to uncompressed. * Summary of changes 1. Buffer encapsulation: The code had buffer management spread over various components; the caller had to allocate an output buffer, which was then resized as needed in the compressor. The compressor class was not only integration glue around the compression library, but also resizable buffer management. The ownership of buffers was unclear. The API for the buffers was not used in an efficient way: the caller allocated a buffer the size of the input, and the compressor would resize it when the output grew bigger than the buffer. This can happen only on hard-to-compress data, and ZSTD guarantees that the output size is at most 257/256 of the input size. On compressible input this wastes memory on the output buffer. It's better to start with a small buffer and grow it as needed. Fixed by defining self-contained classes that manage buffers in a well-defined manner. This is libbinlogevents/{include,src}/buffer/* Also separated the logic for managing the buffer from the heuristic to compute how much the buffer should grow; the latter is grow_policy.h. The Grow_policy class has a maximum capacity, and that is what we use to bound the capacity to 1 GiB in order to fix the bug. Compressors don't need contiguous output buffers, since contiguity is mainly useful for data that needs to be decoded/parsed in some way, but compressed data is opaque by nature. So we use a "Buffer_sequence"; a vector of buffers. When the Buffer_sequence needs to grow, it just adds another buffer to the vector. Therefore, it never needs to copy data (as growable, contiguous buffers always need). Using Buffer_sequence instead of char* made it necessary to change code that handled compressed data, in log_event.cc and control_events.cc/.h. We allow callers to use custom memory allocation for Growable_buffer_sequence. Therefore, created resource::Memory_resource and resource::Allocator, which are similar to std::pmr::memory_resource and std::pmr::Default_allocator, except those are not currently supported on all our platforms. 2. Class hierarchy: The class hierarchy consisted of: - a common abstract base class for compressors and decompressors (Base_compressor_decompressor); - two direct, abstract subclasses specializing to compressors (Compressor) and decompressors (Decompressor); - two direct subclasses of each of those: the Zstd and None compressors, and the Zstd and None decompressors. The common base class mainly existed because the buffer management was mixed with the compression library glue, and some of the buffer management happened to be common to compressors and decompressors. It was never useful to use polymorphism to hold a pointer to a Base_compressor_decompressor without knowing which concrete subclass was instantiated. Now that we factored out buffer handling from the compression library glue, we can also decouple compressors and decompressors. So Compressor no longer inherits from Base_compressor_decompressor. Also separated the public interface used by clients from the private, pure, virtual API that implementations need to define. 3. Compression level API: The API to set compression level was defined in the base class, Compressor. But compression levels are algorithm-specific (and in fact was a no-op in the None_comp class), so removed this API from the base class. Compression level is now settable only in Zstd_comp. 4. Preprocessor conditions to use different version of ZSTD: Our code used ZSTD's "advanced API", which became stable in ZSTD 1.4.0. However, we support platforms that have system libraries that use ZSTD 1.2.0, and we support linking with system libraries. Therefore, we used preprocessor conditions to select between two implementations. However, we bundle ZSTD 1.5.0 with the server and our CI jobs only test with the bundled version. So the compatibility code for 1.2.0 was untested. But we only used a subset of the "advanced API" that had usable counterparts also in the old API. So, refactored to use the lowest common denominator, the non-advanced API, and removed the preprocessor conditions. The use of ZSTD functions was also unnecessarily complex: - `ZSTD_CCtx_setPledgedSrcSize(m_ctx, ZSTD_CONTENTSIZE_UNKNOWN)` is pointless; it gives no information to the library - It suffices to use just ZSTD_initCStream to reset the stream state; we don't need to use ZSTD_CCtx_reset when the compression level is unchanged. 5. Compression API: Changed the Compressor API to match better what compressor libraries provide and what users need. Now the use pattern is: - Call 'feed' to provide input to compress - Call 'compress' to use up the input. This may not produce all output; some output may remain in the compression library. - Repeat the above as many times as needed - Call 'finish' to produce remaining output - The above process can be aborted at any time (e.g. after error) by calling 'reset'. - The caller doesn't need to track how much is left to compress; it only needs to check whether the compress or finish operation failed. The change in API made it necessary to change users of the API: binlog.cc, mysqlbinlog.cc, rpl_context.cc/.h 6. Simplify compression code in binlog.cc: There was one big function that both checked the preconditions for compressing a transaction, and performed the actual compression. Factored out the checking of preconditions to a separate function. The compression code in binlog.cc would set a thread stage and later restore it. Setting and restoring thread stages is a common pattern in our code. In general, setting and restoring anything is better programmed using scope guards. Defined a new RAII class for this (sql_class.h), and used it in binlog.cc 7. Use [[nodiscard]] in functions that can fail. Worked around a gcc bug that made this attribute nonfunctional in some cases, in nodiscard.h 8. Changed to new coding standards for replication: - Use only line comments, not block comments - Use lowercase for constants 9. Wrote unit tests for the buffer management: buffer_sequence-t.cc, grow_policy-t.cc 10.Wrote mtr tests for the bugfix: rpl_compress_1g and rpl_compress_bound 11.Disabled two unittests that depended on the old compression API. These also depend on the decompression API, which will be refactored in the next patch. Therefore it is easier to fix them once both APIs are stable. See unittest/gunit/binlogevents/CMakeLists.txt Change-Id: Idb500ead49770c16ff8389a972d29f3ff190305a
1 parent d570434 commit ca51ab1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+4917
-671
lines changed

client/mysqlbinlog.cc

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -143,33 +143,29 @@ class Database_rewrite {
143143
auto error_val = Rewrite_payload_result{nullptr, 0, 0, 0, true};
144144

145145
// output variables
146-
unsigned char *obuffer{nullptr};
147-
std::size_t obuffer_size{0};
148-
std::size_t obuffer_capacity{0};
149146
std::size_t obuffer_size_uncompressed{0};
150147

151148
// temporary buffer for holding uncompressed and rewritten events
152149
unsigned char *ibuffer{nullptr};
153150
std::size_t ibuffer_capacity{0};
154151

155152
// RAII objects
156-
Buffer_realloc_manager obuffer_dealloc_guard(&obuffer);
157153
Buffer_realloc_manager ibuffer_dealloc_guard(&ibuffer);
158154

159155
// iterator to decompress events
160156
binary_log::transaction::compression::Iterable_buffer it(
161157
orig_payload, orig_payload_size, orig_payload_uncompressed_size,
162158
compression_type);
163-
164-
// compressor to compress this again
159+
using Compress_status_t =
160+
binary_log::transaction::compression::Compress_status;
161+
using Managed_buffer_sequence_t =
162+
mysqlns::buffer::Managed_buffer_sequence<>;
163+
using Char_t = Managed_buffer_sequence_t::Char_t;
164+
Managed_buffer_sequence_t managed_buffer_sequence;
165165
auto compressor =
166166
binary_log::transaction::compression::Factory::build_compressor(
167167
compression_type);
168168

169-
compressor->set_buffer(obuffer, obuffer_size);
170-
compressor->reserve(orig_payload_uncompressed_size);
171-
compressor->open();
172-
173169
// rewrite and compress
174170
for (auto ptr : it) {
175171
std::size_t ev_len{uint4korr(ptr + EVENT_LEN_OFFSET)};
@@ -187,25 +183,27 @@ class Database_rewrite {
187183
fde);
188184
if (err) return error_val;
189185

190-
auto left{ev_len};
191-
while (left > 0 && !err) {
192-
auto pos{ibuffer + (ev_len - left)};
193-
std::tie(left, err) = compressor->compress(pos, left);
194-
}
195-
196-
if (err) return error_val;
186+
compressor->feed(ibuffer, ev_len);
187+
if (compressor->compress(managed_buffer_sequence) !=
188+
Compress_status_t::success)
189+
return error_val;
197190
obuffer_size_uncompressed += ev_len;
198191
}
199192

200-
compressor->close();
201-
std::tie(obuffer, obuffer_size, obuffer_capacity) =
202-
compressor->get_buffer();
193+
if (compressor->finish(managed_buffer_sequence) !=
194+
Compress_status_t::success)
195+
return error_val;
203196

204-
// do not dispose of the obuffer (disable RAII for obuffer)
205-
obuffer_dealloc_guard.release();
197+
// Get contiguous output buffer from managed_buffer_sequence
198+
auto *obuffer = static_cast<Char_t *>(
199+
malloc(managed_buffer_sequence.read_part().size()));
200+
if (obuffer == nullptr) return error_val;
201+
managed_buffer_sequence.read_part().copy(obuffer);
206202

207203
// set the new one and adjust event settings
208-
return Rewrite_payload_result{obuffer, obuffer_capacity, obuffer_size,
204+
return Rewrite_payload_result{obuffer,
205+
managed_buffer_sequence.read_part().size(),
206+
managed_buffer_sequence.read_part().size(),
209207
obuffer_size_uncompressed, false};
210208
}
211209

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/* Copyright (c) 2023, Oracle and/or its affiliates.
2+
3+
This program is free software; you can redistribute it and/or modify
4+
it under the terms of the GNU General Public License, version 2.0,
5+
as published by the Free Software Foundation.
6+
7+
This program is also distributed with certain software (including
8+
but not limited to OpenSSL) that is licensed under separate terms,
9+
as designated in a particular file or component or in included license
10+
documentation. The authors of MySQL hereby grant you an additional
11+
permission to link the program and your derivative works with the
12+
separately licensed software that they have included with MySQL.
13+
14+
This program is distributed in the hope that it will be useful,
15+
but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
GNU General Public License, version 2.0, for more details.
18+
19+
You should have received a copy of the GNU General Public License
20+
along with this program; if not, write to the Free Software
21+
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22+
23+
/// @addtogroup Replication
24+
/// @{
25+
///
26+
/// @file buffer_sequence_view.h
27+
///
28+
/// Container class that provides a sequence of buffers to the caller.
29+
/// This is intended for capturing the output from compressors.
30+
31+
#ifndef MYSQL_BUFFER_BUFFER_SEQUENCE_VIEW_H_
32+
#define MYSQL_BUFFER_BUFFER_SEQUENCE_VIEW_H_
33+
34+
#include <algorithm> // std::min
35+
#include <cassert> // assert
36+
#include <cstring> // std::memcpy
37+
#include <limits> // std::numeric_limits
38+
#include <memory> // std::allocator
39+
#include <type_traits> // std::conditional
40+
#include <vector> // std::vector
41+
#include "libbinlogevents/include/resource/allocator.h" // mysqlns::resource::Allocator
42+
43+
#include "buffer_view.h" // buffer::Buffer_view
44+
#include "grow_calculator.h" // buffer::Grow_calculator
45+
#include "grow_status.h" // buffer::Grow_status
46+
#include "libbinlogevents/include/nodiscard.h" // NODISCARD
47+
48+
#include "libbinlogevents/include/wrapper_functions.h" // BAPI_TRACE
49+
50+
namespace mysqlns::buffer {
51+
52+
/// Sequence of memory buffers.
53+
///
54+
/// This is a minimal class with just a sequence of buffers. It does
55+
/// not have a read/write position (@see Rw_buffer_sequence). It does
56+
/// not have methods to grow the buffer sequence (@see
57+
/// Managed_buffer_sequence).
58+
///
59+
/// @tparam Char_tp The type of elements stored in the buffer:
60+
/// typically unsigned char.
61+
///
62+
/// @tparam Container_tp The type of container to hold the buffers.
63+
/// This defaults to std::vector, but std::list is also possible.
64+
///
65+
/// @tparam const_tp If true, use const iterators instead of non-const
66+
/// iterators to represent the beginning and end of the container.
67+
template <class Char_tp = unsigned char,
68+
template <class Element_tp, class Allocator_tp> class Container_tp =
69+
std::vector,
70+
bool const_tp = false>
71+
class Buffer_sequence_view {
72+
public:
73+
using Char_t = Char_tp;
74+
using Size_t = std::size_t;
75+
using Buffer_view_t = Buffer_view<Char_t>;
76+
using Buffer_allocator_t = mysqlns::resource::Allocator<Buffer_view_t>;
77+
using Container_t = Container_tp<Buffer_view_t, Buffer_allocator_t>;
78+
using Const_iterator_t = typename Container_t::const_iterator;
79+
using Iterator_t =
80+
typename std::conditional<const_tp, Const_iterator_t,
81+
typename Container_t::iterator>::type;
82+
83+
private:
84+
/// Indicates that @c m_size has not yet been computed.
85+
static constexpr Size_t uninitialized_size =
86+
std::numeric_limits<Size_t>::max();
87+
88+
public:
89+
/// Construct a Buffer_sequence_view with buffers in the range given by
90+
/// the iterators.
91+
///
92+
/// This copies only the iterators; the underlying container and the
93+
/// buffers contained in the container are not copied.
94+
///
95+
/// @param begin_arg Iterator to the first buffer.
96+
///
97+
/// @param end_arg Iterator to one-past-the-last buffer.
98+
///
99+
/// @param size_arg The total size of all buffers from begin_arg to
100+
/// end_arg. This is an optimization only: if the parameter is
101+
/// omitted, it will be computed the next time it is needed.
102+
Buffer_sequence_view(Iterator_t begin_arg, Iterator_t end_arg,
103+
Size_t size_arg = uninitialized_size)
104+
: m_begin(begin_arg), m_end(end_arg), m_size(size_arg) {}
105+
106+
// Disallow copy, implement move.
107+
Buffer_sequence_view(Buffer_sequence_view &) = delete;
108+
Buffer_sequence_view(Buffer_sequence_view &&other) noexcept = default;
109+
Buffer_sequence_view &operator=(Buffer_sequence_view &) = delete;
110+
Buffer_sequence_view &operator=(Buffer_sequence_view &&) noexcept = default;
111+
112+
virtual ~Buffer_sequence_view() = default;
113+
114+
/// Iterator to the first buffer.
115+
Iterator_t begin() { return m_begin; }
116+
117+
/// Iterator to the last buffer.
118+
Iterator_t end() { return m_end; }
119+
120+
/// Iterator to the first buffer.
121+
Const_iterator_t begin() const { return m_begin; }
122+
123+
/// Iterator to the last buffer.
124+
Const_iterator_t end() const { return m_end; }
125+
126+
/// Const iterator pointing to the first buffer.
127+
Const_iterator_t cbegin() const { return m_begin; }
128+
129+
/// Const iterator pointing to the last buffer.
130+
Const_iterator_t cend() const { return m_end; }
131+
132+
/// Copy all data to the given, contiguous output buffer.
133+
///
134+
/// The caller is responsible for providing a buffer of at
135+
/// least @c size() bytes.
136+
///
137+
/// @param destination The target buffer.
138+
template <class Destination_char_t>
139+
void copy(Destination_char_t *destination) const {
140+
BAPI_TRACE;
141+
Size_t position = 0;
142+
for (const auto &buffer : *this) {
143+
std::memcpy(destination + position, buffer.data(), buffer.size());
144+
position += buffer.size();
145+
}
146+
}
147+
148+
/// Return a copy of all the data in this object, as a `std::string`
149+
/// object.
150+
template <class Str_char_t = char,
151+
class Str_traits_t = std::char_traits<Str_char_t>,
152+
class Str_allocator_t = std::allocator<Str_char_t>>
153+
std::basic_string<Str_char_t, Str_traits_t, Str_allocator_t> str(
154+
const Str_allocator_t &allocator = Str_allocator_t()) {
155+
std::basic_string<Str_char_t, Str_traits_t, Str_allocator_t> ret(
156+
this->size(), '\0', allocator);
157+
copy(ret.data());
158+
return ret;
159+
}
160+
161+
/// Return the total size of all buffers.
162+
Size_t size() const {
163+
if (m_size == uninitialized_size) {
164+
Size_t size = 0;
165+
for (const auto &buffer : *this) size += buffer.size();
166+
m_size = size;
167+
}
168+
return m_size;
169+
}
170+
171+
/// In debug mode, return a string that describes the internal
172+
/// structure of this object, to use for debugging.
173+
///
174+
/// @param show_contents If true, includes the buffer contents.
175+
/// Otherwise, just pointers and sizes.
176+
///
177+
/// @param indent If 0, put all info on one line. Otherwise, put
178+
/// each field on its own line and indent the given number of
179+
/// two-space levels.
180+
///
181+
/// @return String that describes the internal structure of this
182+
/// Buffer_sequence_view.
183+
std::string debug_string([[maybe_unused]] bool show_contents = false,
184+
[[maybe_unused]] int indent = 0) const {
185+
#ifdef NDEBUG
186+
return "";
187+
#else
188+
std::ostringstream ss;
189+
// whitespace following the comma: newline + indentation, or just space
190+
std::string ws;
191+
if (indent != 0)
192+
ws = std::string("\n") +
193+
std::string(static_cast<std::string::size_type>(indent * 2), ' ');
194+
else
195+
ws = " ";
196+
// separator = comma + whitespace
197+
std::string sep = "," + ws;
198+
// whitespace / separator with one level deeper indentation
199+
std::string ws2 = (indent != 0) ? (ws + " ") : ws;
200+
std::string sep2 = (indent != 0) ? (sep + " ") : sep;
201+
// clang-format off
202+
ss << "Buffer_sequence_view(ptr=" << (const void *)this
203+
<< sep << "size=" << size()
204+
<< sep << "buffers.ptr=" << (const void *)&*this->begin()
205+
<< sep << "buffers=[";
206+
// clang-format on
207+
bool first = true;
208+
for (auto &buffer : *this) {
209+
if (first) {
210+
if (indent != 0) ss << ws2;
211+
first = false;
212+
} else {
213+
ss << sep2;
214+
}
215+
ss << buffer.debug_string(show_contents);
216+
}
217+
ss << "])";
218+
return ss.str();
219+
#endif
220+
}
221+
222+
private:
223+
/// Iterator to beginning of buffer.
224+
Iterator_t m_begin;
225+
226+
/// Iterator to end of buffer.
227+
Iterator_t m_end;
228+
229+
/// Total size of all buffers, cached.
230+
mutable Size_t m_size;
231+
};
232+
233+
} // namespace mysqlns::buffer
234+
235+
/// @} (end of group Replication)
236+
237+
#endif // MYSQL_BUFFER_BUFFER_SEQUENCE_VIEW_H_

0 commit comments

Comments
 (0)