Skip to content

Commit f8da211

Browse files
author
Luis Soares
committed
WL#13901: service to read change streams (binlogs) from local storage
Defines a service API that abstracts reading binary logs from local storage. Provides an implementation of that service. Change-Id: I5d8436c84d543f855ac35116f17e9433032fcea9
1 parent 386f8fa commit f8da211

33 files changed

+4772
-67
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/* Copyright (c) 2023, Oracle and/or its affiliates.
2+
3+
This program is free software; you can redistribute it and/or modify
4+
it under the terms of the GNU General Public License, version 2.0,
5+
as published by the Free Software Foundation.
6+
7+
This program is also distributed with certain software (including
8+
but not limited to OpenSSL) that is licensed under separate terms,
9+
as designated in a particular file or component or in included license
10+
documentation. The authors of MySQL hereby grant you an additional
11+
permission to link the program and your derivative works with the
12+
separately licensed software that they have included with MySQL.
13+
14+
This program is distributed in the hope that it will be useful,
15+
but WITHOUT ANY WARRANTY; without even the implied warranty of
16+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+
GNU General Public License, version 2.0, for more details.
18+
19+
You should have received a copy of the GNU General Public License
20+
along with this program; if not, write to the Free Software
21+
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22+
23+
#ifndef BINLOG_STORAGE_ITERATOR_SERVICE_HEADERS_H
24+
#define BINLOG_STORAGE_ITERATOR_SERVICE_HEADERS_H
25+
#include <cstdint>
26+
#include <string>
27+
28+
#include <mysql/components/service.h>
29+
#include <mysql/components/services/dynamic_privilege.h>
30+
#include <mysql/plugin_audit_message_types.h>
31+
32+
DEFINE_SERVICE_HANDLE(my_h_binlog_storage_iterator);
33+
34+
enum Binlog_iterator_service_init_status : uint16_t {
35+
/// @brief Iterator was successfully initialized.
36+
kBinlogIteratorInitOk = 0,
37+
38+
/// @brief Returned when the required GTIDs have already been purged and
39+
/// therefore the iterator cannot fetch the needed entries. Caller should
40+
/// still call the deinit function on the iterator.
41+
kBinlogIteratorIniErrorPurgedGtids,
42+
43+
/// @brief Returned when the log is closed and therefore the iterator cannot
44+
/// get the change entries. Caller should still call the deinit function on
45+
/// the iterator.
46+
kBinlogIteratorInitErrorLogClosed,
47+
48+
/// @brief Failure to initialize iterator due to undefined error. Caller
49+
/// should still call the deinit function on the iterator.
50+
kBinlogIteratorInitErrorUnspecified
51+
};
52+
53+
/// @brief This enumeration lists the possible return values for the get
54+
/// function
55+
enum Binlog_iterator_service_get_status : uint16_t {
56+
/// @brief returned when the get operation succeeded.
57+
///
58+
/// If the get operation succeeds, this also means that the iterator advances.
59+
kBinlogIteratorGetOk = 0,
60+
61+
/// @brief returned when there are no more entries to get.
62+
///
63+
/// The iterator remains open and you can call get on it again. If
64+
/// more content has been created in the meantime, get will get it
65+
/// for you.
66+
kBinlogIteratorGetEndOfChanges,
67+
68+
/// @brief returned whenever the get was called with an insufficient buffer.
69+
///
70+
/// The iterator does not advance and the caller can call get again with a
71+
/// larger buffer.
72+
kBinlogIteratorGetInsufficientBuffer,
73+
74+
/// @brief Returned when there is an unrecoverable error and this iterator has
75+
/// been closed. The caller still needs to deinitialize the iterator.
76+
kBinlogIteratorGetErrorClosed,
77+
78+
/// @brief returned whenever the iterator context is invalid.
79+
///
80+
/// The iterator became invalid and therefore it cannot be used successfully
81+
/// from now onwards. I must still be de-initialized to release resources.
82+
kBinlogIteratorGetErrorInvalid,
83+
84+
/// @brief returned whenever there was an unspecified error attempting to get
85+
/// the next entry.
86+
///
87+
/// In case of an unspecified error, the caller can retry but there is no
88+
/// guarantee whether the retry is successful or not.
89+
kBinlogIteratorGetErrorUnspecified
90+
};
91+
92+
BEGIN_SERVICE_DEFINITION(binlog_storage_iterator)
93+
94+
/// @brief Initializes the iterator.
95+
///
96+
/// my_h_binlog_storage_iterator is the service handle defined and is an opaque
97+
/// pointer to the stream state.
98+
///
99+
/// @param[out] iterator where the iterator created will be stored.
100+
///
101+
/// @param[in] excluded_gtids_as_string The set of GTIDs to filter out from the
102+
/// iterator.
103+
///
104+
/// @retval kBinlogIteratorInitOk @see
105+
/// Binlog_iterator_service_init_status#kBinlogIteratorInitOk
106+
/// @retval kBinlogIteratorIniErrorPurgedGtids @see
107+
/// Binlog_iterator_service_init_status#kBinlogIteratorIniErrorPurgedGtids
108+
/// @retval kBinlogIteratorInitErrorLogClosed @see
109+
/// Binlog_iterator_service_init_status#kBinlogIteratorInitErrorLogClosed
110+
/// @retval kBinlogIteratorInitErrorUnspecified @see
111+
/// Binlog_iterator_service_init_status#kBinlogIteratorGetErrorUnspecified
112+
DECLARE_METHOD(Binlog_iterator_service_init_status, init,
113+
(my_h_binlog_storage_iterator * iterator,
114+
const char *excluded_gtids_as_string));
115+
116+
/// @brief Shall get the next event in the iterator.
117+
///
118+
/// Gets the next event in the iterator. If there are no more events in the
119+
/// iterator, it just returns immediately. Note that this function will also
120+
/// advance the iterator if the operation is successful or the next entries are
121+
/// to be skipped.
122+
///
123+
/// In case the error is kBinlogIteratorGetErrorInvalid or
124+
/// kBinlogIteratorGetErrorUnspecified the iterator must be de initialized by
125+
/// calling deinit. If the caller attempts to call get again, then the
126+
/// same error is returned.
127+
///
128+
/// @param iterator the iterator reference to use use during the get operation.
129+
///
130+
/// @param[in,out] buffer the buffer to store the raw change stream bytes for
131+
/// the next entry fetched from the given iterator.
132+
///
133+
/// @param[in] buffer_capacity the capacity of the buffer where the bytes are to
134+
/// be stored.
135+
///
136+
/// @param[out] bytes_read the amount of bytes read and put into the buffer.
137+
///
138+
/// @retval kBinlogIteratorGetOk
139+
/// Binlog_iterator_service_get_status#kBinlogIteratorGetOk
140+
/// @retval kBinlogIteratorGetEndOfChanges
141+
/// Binlog_iterator_service_get_status#kBinlogIteratorGetEndOfChanges
142+
/// @retval kBinlogIteratorGetInsufficientBuffer
143+
/// Binlog_iterator_service_get_status#kBinlogIteratorGetInsufficientBuffer
144+
/// @retval kBinlogIteratorGetErrorClosed @see
145+
/// Binlog_iterator_service_get_status#kBinlogIteratorGetErrorClosed
146+
/// @retval kBinlogIteratorGetErrorInvalid @see
147+
/// Binlog_iterator_service_get_status#kBinlogIteratorGetErrorInvalid
148+
/// @retval kBinlogIteratorGetErrorUnspecified @see
149+
/// Binlog_iterator_service_get_status#kBinlogIteratorGetErrorUnspecified
150+
DECLARE_METHOD(Binlog_iterator_service_get_status, get,
151+
(my_h_binlog_storage_iterator iterator, unsigned char *buffer,
152+
uint64_t buffer_capacity, uint64_t *bytes_read));
153+
154+
/// @brief Destroys the iterator and releases resources associated with it.
155+
///
156+
/// @param[in] iterator the iterator to destroy.
157+
DECLARE_METHOD(void, deinit, (my_h_binlog_storage_iterator iterator));
158+
159+
/// @brief Gets details about the entry's storage in a JSON format.
160+
///
161+
/// Allows the caller to get information about the underlying storage. Some
162+
/// implementations may return a name and a position, for instance.
163+
///
164+
/// @param[in] iterator the iterator handle.
165+
///
166+
/// @param[in,out] buffer The buffer to store the information in.
167+
///
168+
/// @param[in,out] size As input, the size of the buffer provided. As output,
169+
/// the size of the data copied into the buffer.
170+
///
171+
/// @return true if there was an error, false otherwise.
172+
DECLARE_BOOL_METHOD(get_storage_details, (my_h_binlog_storage_iterator iterator,
173+
char *buffer, uint64_t *size));
174+
175+
/// @brief Gets the size of the next entry to fetch from the iterator.
176+
///
177+
/// Useful to drive reallocations.
178+
///
179+
/// @param[in] iterator the iterator being operated.
180+
/// @param[out] size a pointer to store the size of the next entry to get.
181+
///
182+
/// @return false on success, true otherwise. Note that if the iterator has
183+
/// reached the end of changes, then it means that an error shall be returned.
184+
DECLARE_BOOL_METHOD(get_next_entry_size,
185+
(my_h_binlog_storage_iterator iterator, uint64_t *size));
186+
187+
END_SERVICE_DEFINITION(binlog_storage_iterator)
188+
189+
#endif /* BINLOG_STORAGE_ITERATOR_SERVICE_HEADERS_H */

libbinlogevents/include/binlog_event.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,14 @@ enum Log_event_type {
364364
ENUM_END_EVENT /* end marker */
365365
};
366366

367+
/**
368+
@brief Get the event type as string object
369+
370+
@param type the event type for which to get a textual representation.
371+
@return std::string a text representing the event name.
372+
*/
373+
const std::string &get_event_type_as_string(Log_event_type type);
374+
367375
/**
368376
Struct to pass basic information about a event: type, query, is it ignorable
369377
*/

libbinlogevents/src/binlog_event.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include <stdint.h>
2727
#include <algorithm>
28+
#include <unordered_map>
2829

2930
const unsigned char checksum_version_split[3] = {5, 6, 1};
3031
const unsigned long checksum_version_product =
@@ -40,6 +41,48 @@ bool debug_simulate_invalid_address = false;
4041

4142
namespace binary_log {
4243

44+
static const std::unordered_map<Log_event_type, const std::string>
45+
event_type_to_string = {{STOP_EVENT, "Stop"},
46+
{QUERY_EVENT, "Query"},
47+
{ROTATE_EVENT, "Rotate"},
48+
{INTVAR_EVENT, "Intvar"},
49+
{APPEND_BLOCK_EVENT, "Append_block"},
50+
{DELETE_FILE_EVENT, "Delete_file"},
51+
{RAND_EVENT, "RAND"},
52+
{USER_VAR_EVENT, "User var"},
53+
{XID_EVENT, "Xid"},
54+
{FORMAT_DESCRIPTION_EVENT, "Format_desc"},
55+
{TABLE_MAP_EVENT, "Table_map"},
56+
{WRITE_ROWS_EVENT_V1, "Write_rows_v1"},
57+
{UPDATE_ROWS_EVENT_V1, "Update_rows_v1"},
58+
{DELETE_ROWS_EVENT_V1, "Delete_rows_v1"},
59+
{BEGIN_LOAD_QUERY_EVENT, "Begin_load_query"},
60+
{EXECUTE_LOAD_QUERY_EVENT, "Execute_load_query"},
61+
{INCIDENT_EVENT, "Incident"},
62+
{IGNORABLE_LOG_EVENT, "Ignorable"},
63+
{ROWS_QUERY_LOG_EVENT, "Rows_query"},
64+
{WRITE_ROWS_EVENT, "Write_rows"},
65+
{UPDATE_ROWS_EVENT, "Update_rows"},
66+
{DELETE_ROWS_EVENT, "Delete_rows"},
67+
{GTID_LOG_EVENT, "Gtid"},
68+
{ANONYMOUS_GTID_LOG_EVENT, "Anonymous_Gtid"},
69+
{PREVIOUS_GTIDS_LOG_EVENT, "Previous_gtids"},
70+
{HEARTBEAT_LOG_EVENT, "Heartbeat"},
71+
{TRANSACTION_CONTEXT_EVENT, "Transaction_context"},
72+
{VIEW_CHANGE_EVENT, "View_change"},
73+
{XA_PREPARE_LOG_EVENT, "XA_prepare"},
74+
{PARTIAL_UPDATE_ROWS_EVENT, "Update_rows_partial"},
75+
{TRANSACTION_PAYLOAD_EVENT, "Transaction_payload"},
76+
{UNKNOWN_EVENT, "Unknown"}};
77+
78+
const std::string &get_event_type_as_string(Log_event_type type) {
79+
try {
80+
return event_type_to_string.at(type);
81+
} catch (const std::out_of_range &) {
82+
return event_type_to_string.at(UNKNOWN_EVENT);
83+
}
84+
}
85+
4386
Log_event_footer::Log_event_footer(Event_reader &reader,
4487
Log_event_type event_type,
4588
const Format_description_event *fde)

0 commit comments

Comments
 (0)