Skip to content

Commit

Permalink
Developed the Mariadbannotate ros event (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
chungeun-choi committed Aug 20, 2023
1 parent ebc84d9 commit c40f6f4
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 63 deletions.
14 changes: 14 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,17 @@ services:
ports:
- 3307:3307
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307

mariadb-10.6:
image: mariadb:10.6
environment:
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
ports:
- "3308:3306"
command: |
--server-id=1
--default-authentication-plugin=mysql_native_password
--log-bin=master-bin
--binlog-format=row
--log-slave-updates=on
8 changes: 5 additions & 3 deletions examples/mariadb_gtid/read_event.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pymysql

from pymysqlreplication import BinLogStreamReader, gtid
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent

MARIADB_SETTINGS = {
Expand Down Expand Up @@ -65,10 +65,12 @@ def query_server_id(self):
RotateEvent,
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent
DeleteRowsEvent,
MariadbAnnotateRowsEvent
],
auto_position=gtid,
is_mariadb=True
is_mariadb=True,
annotate_rows_event=True
)

print('Starting reading events from GTID ', gtid)
Expand Down
130 changes: 75 additions & 55 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
QueryEvent, RotateEvent, FormatDescriptionEvent,
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
MariadbAnnotateRowsEvent)
from .exceptions import BinLogNotEnabled
from .row_event import (
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
Expand Down Expand Up @@ -141,6 +142,7 @@ def __init__(self, connection_settings, server_id,
fail_on_table_metadata_unavailable=False,
slave_heartbeat=None,
is_mariadb=False,
annotate_rows_event=False,
ignore_decode_errors=False):
"""
Attributes:
Expand Down Expand Up @@ -178,6 +180,8 @@ def __init__(self, connection_settings, server_id,
for semantics
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
to point to Mariadb specific GTID.
annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
used with 'is_mariadb'
ignore_decode_errors: If true, any decode errors encountered
when reading column data will be ignored.
"""
Expand Down Expand Up @@ -219,6 +223,7 @@ def __init__(self, connection_settings, server_id,
self.auto_position = auto_position
self.skip_to_timestamp = skip_to_timestamp
self.is_mariadb = is_mariadb
self.__annotate_rows_event = annotate_rows_event

if end_log_pos:
self.is_past_end_log_pos = False
Expand Down Expand Up @@ -331,67 +336,39 @@ def __connect_to_stream(self):
self._register_slave()

if not self.auto_position:
# only when log_file and log_pos both provided, the position info is
# valid, if not, get the current position from master
if self.log_file is None or self.log_pos is None:
cur = self._stream_connection.cursor()
cur.execute("SHOW MASTER STATUS")
master_status = cur.fetchone()
if master_status is None:
raise BinLogNotEnabled()
self.log_file, self.log_pos = master_status[:2]
cur.close()

prelude = struct.pack('<i', len(self.log_file) + 11) \
+ bytes(bytearray([COM_BINLOG_DUMP]))

if self.__resume_stream:
prelude += struct.pack('<I', self.log_pos)
else:
prelude += struct.pack('<I', 4)

flags = 0
if not self.__blocking:
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
prelude += struct.pack('<H', flags)

prelude += struct.pack('<I', self.__server_id)
prelude += self.log_file.encode()
else:
if self.is_mariadb:
# https://mariadb.com/kb/en/5-slave-registration/
cur = self._stream_connection.cursor()
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
cur.execute("SET @slave_gtid_strict_mode=1")
cur.execute("SET @slave_gtid_ignore_duplicates=0")
cur.close()

# https://mariadb.com/kb/en/com_binlog_dump/
header_size = (
4 + # binlog pos
2 + # binlog flags
4 + # slave server_id,
4 # requested binlog file name , set it to empty
)

prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))

# binlog pos
prelude += struct.pack('<i', 4)
prelude = self.__set_mariadb_settings()
else:
# only when log_file and log_pos both provided, the position info is
# valid, if not, get the current position from master
if self.log_file is None or self.log_pos is None:
cur = self._stream_connection.cursor()
cur.execute("SHOW MASTER STATUS")
master_status = cur.fetchone()
if master_status is None:
raise BinLogNotEnabled()
self.log_file, self.log_pos = master_status[:2]
cur.close()

prelude = struct.pack('<i', len(self.log_file) + 11) \
+ bytes(bytearray([COM_BINLOG_DUMP]))

if self.__resume_stream:
prelude += struct.pack('<I', self.log_pos)
else:
prelude += struct.pack('<I', 4)

flags = 0

if not self.__blocking:
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK

# binlog flags
prelude += struct.pack('<H', flags)

# server id (4 bytes)
prelude += struct.pack('<I', self.__server_id)

# empty_binlog_name (4 bytes)
prelude += b'\0\0\0\0'

prelude += self.log_file.encode()
else:
if self.is_mariadb:
prelude = self.__set_mariadb_settings()
else:
# Format for mysql packet master_auto_position
#
Expand Down Expand Up @@ -473,6 +450,48 @@ def __connect_to_stream(self):
self._stream_connection._next_seq_id = 1
self.__connected_stream = True

def __set_mariadb_settings(self):
# https://mariadb.com/kb/en/5-slave-registration/
cur = self._stream_connection.cursor()
if self.auto_position != None :
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
cur.execute("SET @slave_gtid_strict_mode=1")
cur.execute("SET @slave_gtid_ignore_duplicates=0")
cur.close()

# https://mariadb.com/kb/en/com_binlog_dump/
header_size = (
4 + # binlog pos
2 + # binlog flags
4 + # slave server_id,
4 # requested binlog file name , set it to empty
)

prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))

# binlog pos
prelude += struct.pack('<i', 4)

flags = 0

# Enable annotate rows event
if self.__annotate_rows_event:
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT

if not self.__blocking:
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK

# binlog flags
prelude += struct.pack('<H', flags)

# server id (4 bytes)
prelude += struct.pack('<I', self.__server_id)

# empty_binlog_name (4 bytes)
prelude += b'\0\0\0\0'

return prelude

def fetchone(self):
while True:
if self.end_log_pos and self.is_past_end_log_pos:
Expand Down Expand Up @@ -600,7 +619,8 @@ def _allowed_event_list(self, only_events, ignored_events,
TableMapEvent,
HeartbeatLogEvent,
NotImplementedEvent,
MariadbGtidEvent
MariadbGtidEvent,
MariadbAnnotateRowsEvent
))
if ignored_events is not None:
for e in ignored_events:
Expand Down
18 changes: 18 additions & 0 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ def _dump(self):
print('GTID:', self.gtid)


class MariadbAnnotateRowsEvent(BinLogEvent):
"""
Annotate rows event
If you want to check this binlog, change the value of the flag(line 382 of the 'binlogstream.py') option to 2
https://mariadb.com/kb/en/annotate_rows_event/
Attributes:
sql_statement: The SQL statement
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(MariadbAnnotateRowsEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
self.sql_statement = self.packet.read(event_size)

def _dump(self):
super(MariadbAnnotateRowsEvent, self)._dump()
print("SQL statement :", self.sql_statement)


class RotateEvent(BinLogEvent):
"""Change MySQL bin log file
Expand Down
2 changes: 1 addition & 1 deletion pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class BinLogPacketWrapper(object):
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
# MariaDB GTID
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent,
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent,
Expand Down
26 changes: 26 additions & 0 deletions pymysqlreplication/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,29 @@ def bin_log_basename(self):
bin_log_basename = cursor.fetchone()[0]
bin_log_basename = bin_log_basename.split("/")[-1]
return bin_log_basename


class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase):
def setUp(self):
# default
self.database = {
"host": "localhost",
"user": "root",
"passwd": "",
"port": 3308,
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}

self.conn_control = None
db = copy.copy(self.database)
db["db"] = None
self.connect_conn_control(db)
self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
self.execute("CREATE DATABASE pymysqlreplication_test")
db = copy.copy(self.database)
self.connect_conn_control(db)
self.stream = None
self.resetBinLog()

38 changes: 34 additions & 4 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
from pymysqlreplication.constants.BINLOG import *
from pymysqlreplication.row_event import *

__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"]
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader","TestMariadbBinlogStreamReader"]


class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
return [GtidEvent]

def test_allowed_event_list(self):
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 16)
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 15)
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 15)
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17)
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16)
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16)
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)

def test_read_query_event(self):
Expand Down Expand Up @@ -1002,6 +1002,36 @@ def test_parsing(self):
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1")
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1")

class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase):

def test_annotate_rows_event(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
# Insert first event
query = "BEGIN;"
self.execute(query)
insert_query = b"INSERT INTO test (id, data) VALUES(1, 'Hello')"
self.execute(insert_query)
query = "COMMIT;"
self.execute(query)

self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
blocking=False,
only_events=[MariadbAnnotateRowsEvent],
is_mariadb=True,
annotate_rows_event=True,
)

event = self.stream.fetchone()
#Check event type 160,MariadbAnnotateRowsEvent
self.assertEqual(event.event_type,160)
#Check self.sql_statement
self.assertEqual(event.sql_statement,insert_query)
self.assertIsInstance(event,MariadbAnnotateRowsEvent)


if __name__ == "__main__":
import unittest
Expand Down

0 comments on commit c40f6f4

Please sign in to comment.