From 6fdf0ed23d8d3cb8a82845e1ae45f3f3734fee9b Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Sat, 19 Aug 2023 14:02:02 +0900 Subject: [PATCH 1/8] Starting to develop 'logging' --- pymysqlreplication/logging.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 pymysqlreplication/logging.py diff --git a/pymysqlreplication/logging.py b/pymysqlreplication/logging.py new file mode 100644 index 00000000..4bffa5d3 --- /dev/null +++ b/pymysqlreplication/logging.py @@ -0,0 +1,6 @@ +import logging + +class BaseLogging(): + def __init__(self) -> None: + print("MockUP") + From f170571d953fe49017a7a79aa4b6f6dff115ccee Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Fri, 25 Aug 2023 21:58:49 +0900 Subject: [PATCH 2/8] Add logging for binlog stream reader attribute --- pymysqlreplication/binlogstream.py | 10 ++++++++-- pymysqlreplication/logging.py | 25 +++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fa65aa22..0c24dda4 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -18,7 +18,7 @@ from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) - +from .logging import RenderingLog try: from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID except ImportError: @@ -141,7 +141,8 @@ def __init__(self, connection_settings, server_id, fail_on_table_metadata_unavailable=False, slave_heartbeat=None, is_mariadb=False, - ignore_decode_errors=False): + ignore_decode_errors=False, + logging=True): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -219,6 +220,8 @@ def __init__(self, connection_settings, server_id, self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp self.is_mariadb = is_mariadb + if logging: + self.__set_logging() if end_log_pos: self.is_past_end_log_pos = False @@ -639,6 +642,9 @@ def __get_table_information(self, schema, table): continue else: raise error + + def __set_logging(self): + RenderingLog(self.__dict__) def __iter__(self): return iter(self.fetchone, None) diff --git a/pymysqlreplication/logging.py b/pymysqlreplication/logging.py index 4bffa5d3..4844d749 100644 --- a/pymysqlreplication/logging.py +++ b/pymysqlreplication/logging.py @@ -1,6 +1,27 @@ import logging -class BaseLogging(): +class BaseBinlogStreaRemaderLog: def __init__(self) -> None: - print("MockUP") + logging.info("""======Start Binlog stream reader======""") + + +class IsMariaDBLog: + def __init__(self) -> None: + logging.info("Set 'is_mariaDB' ON\n") + + +class NotLogging: + pass + + +class RenderingLog: + __logging_list = { + "is_mariadb":IsMariaDBLog + } + def __init__(self,obj_attr) -> None: + BaseBinlogStreaRemaderLog() + for parameter in obj_attr: + log_parm = self.__logging_list.get(parameter) + if log_parm: + log_parm() From 4f912ee29cb194f7d99998d130f9addd7b957149 Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Mon, 28 Aug 2023 14:49:28 +0900 Subject: [PATCH 3/8] fix - fixed how to logging prameter in BInLogStreamReader --- pymysqlreplication/binlogstream.py | 19 +++++++++++++------ pymysqlreplication/logging.py | 27 --------------------------- 2 files changed, 13 insertions(+), 33 deletions(-) delete mode 100644 pymysqlreplication/logging.py diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 0c24dda4..94a23c1e 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -2,6 +2,7 @@ import pymysql import struct +import logging from distutils.version import LooseVersion from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE @@ -18,7 +19,6 @@ from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) -from .logging import RenderingLog try: from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID except ImportError: @@ -142,7 +142,7 @@ def __init__(self, connection_settings, server_id, slave_heartbeat=None, is_mariadb=False, ignore_decode_errors=False, - logging=True): + parma_logging=True,): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -220,8 +220,8 @@ def __init__(self, connection_settings, server_id, self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp self.is_mariadb = is_mariadb - if logging: - self.__set_logging() + if parma_logging: + self.__log_valid_parameters() if end_log_pos: self.is_past_end_log_pos = False @@ -643,8 +643,15 @@ def __get_table_information(self, schema, table): else: raise error - def __set_logging(self): - RenderingLog(self.__dict__) + def __log_valid_parameters(self): + ignore_list = ["allowed_events_in_packet","table_map"] + + for parameter,value in self.__dict__.items(): + if parameter.startswith("_BinLogStreamReader__") : + parameter = parameter.replace("_BinLogStreamReader__","") + if value != None and value != False and parameter not in ignore_list: + comment = "Set parameter - {} , value - {}".format(parameter,value) + logging.info(comment) def __iter__(self): return iter(self.fetchone, None) diff --git a/pymysqlreplication/logging.py b/pymysqlreplication/logging.py deleted file mode 100644 index 4844d749..00000000 --- a/pymysqlreplication/logging.py +++ /dev/null @@ -1,27 +0,0 @@ -import logging - -class BaseBinlogStreaRemaderLog: - def __init__(self) -> None: - logging.info("""======Start Binlog stream reader======""") - - -class IsMariaDBLog: - def __init__(self) -> None: - logging.info("Set 'is_mariaDB' ON\n") - - -class NotLogging: - pass - - -class RenderingLog: - __logging_list = { - "is_mariadb":IsMariaDBLog - } - def __init__(self,obj_attr) -> None: - BaseBinlogStreaRemaderLog() - for parameter in obj_attr: - log_parm = self.__logging_list.get(parameter) - if log_parm: - log_parm() - From 3248af902b4004e3ed54703f768040555748b96d Mon Sep 17 00:00:00 2001 From: Pilmo Date: Thu, 31 Aug 2023 00:18:00 +0900 Subject: [PATCH 4/8] modify frozen set to List[str] --- pymysqlreplication/binlogstream.py | 50 +++++++++++++++++------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 94a23c1e..2c36efb5 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -19,6 +19,7 @@ from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) + try: from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID except ImportError: @@ -32,7 +33,6 @@ class ReportSlave(object): - """Represent the values that you may report when connecting as a slave to a master. SHOW SLAVE HOSTS related""" @@ -67,7 +67,7 @@ def __init__(self, value): self.hostname = value def __repr__(self): - return '' %\ + return '' % \ (self.hostname, self.username, self.password, self.port) def encoded(self, server_id, master_id=0): @@ -122,7 +122,6 @@ def encoded(self, server_id, master_id=0): class BinLogStreamReader(object): - """Connect to replication stream and read event """ report_slave = None @@ -142,7 +141,7 @@ def __init__(self, connection_settings, server_id, slave_heartbeat=None, is_mariadb=False, ignore_decode_errors=False, - parma_logging=True,): + parma_logging=True, ): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -313,7 +312,7 @@ def __connect_to_stream(self): 4294967)) # If heartbeat is too low, the connection will disconnect before, # this is also the behavior in mysql - heartbeat = float(min(net_timeout/2., self.slave_heartbeat)) + heartbeat = float(min(net_timeout / 2., self.slave_heartbeat)) if heartbeat > 4294967: heartbeat = 4294967 @@ -346,7 +345,7 @@ def __connect_to_stream(self): cur.close() prelude = struct.pack(' Date: Thu, 31 Aug 2023 14:21:35 +0900 Subject: [PATCH 5/8] fix - Delete annotations and configuration of log --- pymysqlreplication/binlogstream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 2c36efb5..bb92f1f6 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -644,8 +644,7 @@ def __get_table_information(self, schema, table): raise error def __log_valid_parameters(self): - # ignore_list = ["allowed_events_in_packet", "table_map"] - logging.basicConfig(level=logging.INFO) + ignore_list = ["allowed_events_in_packet", "table_map"] for parameter, value in self.__dict__.items(): if not value: continue From 572b8f149c84e27bc8ab514f2c739b193aad0d6f Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Thu, 31 Aug 2023 14:42:02 +0900 Subject: [PATCH 6/8] fix typo - add ',' string --- pymysqlreplication/binlogstream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 2dfd4551..1bcf6abf 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -143,7 +143,7 @@ def __init__(self, connection_settings, server_id, slave_heartbeat=None, is_mariadb=False, annotate_rows_event=False, - ignore_decode_errors=False + ignore_decode_errors=False, parma_logging=True, ): """ Attributes: From c79caabc149639a38b22237315b055745ce1f031 Mon Sep 17 00:00:00 2001 From: Pilmo Date: Sat, 2 Sep 2023 16:09:53 +0900 Subject: [PATCH 7/8] Add description about logging parameter --- pymysqlreplication/binlogstream.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 9171ea57..47ecbb6e 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -145,8 +145,8 @@ def __init__(self, connection_settings, server_id, is_mariadb=False, annotate_rows_event=False, ignore_decode_errors=False, - parma_logging=True, - verify_checksum=False,): + verify_checksum=False, + enable_logging=True,): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -189,6 +189,8 @@ def __init__(self, connection_settings, server_id, ignore_decode_errors: If true, any decode errors encountered when reading column data will be ignored. verify_checksum: If true, verify events read from the binary log by examining checksums. + enable_logging: When set to True, logs various details helpful for debugging and monitoring + When set to False, logging is disabled to enhance performance. """ self.__connection_settings = connection_settings @@ -230,7 +232,7 @@ def __init__(self, connection_settings, server_id, self.skip_to_timestamp = skip_to_timestamp self.is_mariadb = is_mariadb self.__annotate_rows_event = annotate_rows_event - if parma_logging: + if enable_logging: self.__log_valid_parameters() if end_log_pos: @@ -615,7 +617,6 @@ def _allowed_event_list(self, only_events, ignored_events, if only_events is not None: events = set(only_events) else: - a = QueryEvent events = set(( QueryEvent, RotateEvent, @@ -682,20 +683,18 @@ def __get_table_information(self, schema, table): raise error def __log_valid_parameters(self): - ignore_list = ["allowed_events_in_packet", "table_map"] + ignored = ["allowed_events", "table_map"] for parameter, value in self.__dict__.items(): - if not value: - continue if parameter.startswith("_BinLogStreamReader__"): parameter = parameter.replace("_BinLogStreamReader__", "") - if type(value) == bool or type(value) == int: - comment = f"{parameter} is {value}" - elif type(value) == frozenset: + if parameter in ignored or not value: + continue + if type(value) == frozenset: string_list = [str(item).split()[-1][:-2].split('.')[2] for item in value] items = ", ".join(string_list) comment = f"{parameter}: [{items}]" else: - comment = "Set parameter - {}, value - {}".format(parameter, value) + comment = f"{parameter}: {value}" logging.info(comment) def __iter__(self): From 588e9744314f5435d8b1b21d22165a04275988bb Mon Sep 17 00:00:00 2001 From: "cucuridas@gamil.com" <3310223@naver.com> Date: Wed, 13 Sep 2023 03:37:20 +0900 Subject: [PATCH 8/8] merge upstrem/main --- CHANGELOG | 17 +- docs/conf.py | 4 +- pymysqlreplication/constants/BINLOG.py | 3 + pymysqlreplication/packet.py | 3 + pymysqlreplication/row_event.py | 291 ++++++++++++++++++++++++- pymysqlreplication/tests/base.py | 7 + pymysqlreplication/tests/test_basic.py | 183 +++++++++++++++- setup.py | 2 +- 8 files changed, 501 insertions(+), 9 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index ea70c81e..8a0ce31e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -195,4 +195,19 @@ * Fix release error 0.43.0 23/07/2023 -* Bump PyMySQL to 1.1.0 to solve : LookupError: unknown encoding: utf8mb3 \ No newline at end of file +* Bump PyMySQL to 1.1.0 to solve : LookupError: unknown encoding: utf8mb3 + +0.44.0 12/09/2023 +* Add MariadbAnnotateRowsEvent +* Add RandEvent +* Add MariadbStartEncryptionEvent +* Add RowsQueryLogEvent +* Add MariadbBinLogCheckPointEvent +* Add PreviousGtidsEvent +* Add UserVarEvent +* Fix Bug increase history list length and snapshot : Mysql 8.0 version connection was being created along with the opening of a transaction. This led to a problem with looking at the snapshot before creating the table +* Fix Avoid UnicodeDecodeError for non-utf8 QueryEvents +* Enhance Data Integrity with Binlog Event Checksum Verification +* Fix Bug table map event read null_bitmask packet +* Fix Timestamp conversion to return UTC instead of local timezone +* Optimize Handler_read_rnd by removing ORDER BY clause \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index de4e1ded..53a25342 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -48,9 +48,9 @@ # built documents. # # The short X.Y version. -version = '0.43' +version = '0.44' # The full version, including alpha/beta/rc tags. -release = '0.43' +release = '0.44' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/pymysqlreplication/constants/BINLOG.py b/pymysqlreplication/constants/BINLOG.py index a95b28ea..71e5faf4 100644 --- a/pymysqlreplication/constants/BINLOG.py +++ b/pymysqlreplication/constants/BINLOG.py @@ -49,3 +49,6 @@ MARIADB_GTID_EVENT = 0xa2 MARIADB_GTID_GTID_LIST_EVENT = 0xa3 MARIADB_START_ENCRYPTION_EVENT = 0xa4 + +# Common-Footer +BINLOG_CHECKSUM_LEN = 4 \ No newline at end of file diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 665caebe..5a51670b 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -500,3 +500,6 @@ def read_string(self): string += char return string + + def bytes_to_read(self): + return len(self.packet._data) - self.packet._position \ No newline at end of file diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index fcd138d3..18ff2728 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -6,6 +6,7 @@ import json from pymysql.charset import charset_by_name +from enum import Enum from .event import BinLogEvent from .exceptions import TableMetadataUnavailableError @@ -62,7 +63,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) # ndb information if self.extra_data_type == 0: - self.nbd_info_length, self.nbd_info_format = struct.unpack(' BINLOG.BINLOG_CHECKSUM_LEN: + option_metadata_type = self.packet.read(1)[0] + length = self.packet.read_length_coded_binary() + field_type: MetadataFieldType = MetadataFieldType.by_index(option_metadata_type) + + if field_type == MetadataFieldType.SIGNEDNESS: + signed_column_list = self._convert_include_non_numeric_column( + self._read_bool_list(length, True)) + optional_metadata.unsigned_column_list = signed_column_list + + elif field_type == MetadataFieldType.DEFAULT_CHARSET: + optional_metadata.default_charset_collation, optional_metadata.charset_collation = self._read_default_charset( + length) + optional_metadata.charset_collation_list = self._parsed_column_charset_by_default_charset( + optional_metadata.default_charset_collation, + optional_metadata.charset_collation, + self._is_character_column) + + elif field_type == MetadataFieldType.COLUMN_CHARSET: + optional_metadata.column_charset = self._read_ints(length) + optional_metadata.charset_collation_list = self._parsed_column_charset_by_column_charset( + optional_metadata.column_charset, self._is_character_column) + + elif field_type == MetadataFieldType.COLUMN_NAME: + optional_metadata.column_name_list = self._read_column_names(length) + + elif field_type == MetadataFieldType.SET_STR_VALUE: + optional_metadata.set_str_value_list = self._read_type_values(length) + + elif field_type == MetadataFieldType.ENUM_STR_VALUE: + optional_metadata.set_enum_str_value_list = self._read_type_values(length) + + elif field_type == MetadataFieldType.GEOMETRY_TYPE: + optional_metadata.geometry_type_list = self._read_ints(length) + + elif field_type == MetadataFieldType.SIMPLE_PRIMARY_KEY: + optional_metadata.simple_primary_key_list = self._read_ints(length) + + elif field_type == MetadataFieldType.PRIMARY_KEY_WITH_PREFIX: + optional_metadata.primary_keys_with_prefix = self._read_primary_keys_with_prefix(length) + + elif field_type == MetadataFieldType.ENUM_AND_SET_DEFAULT_CHARSET: + optional_metadata.enum_and_set_default_charset, optional_metadata.enum_and_set_charset_collation = self._read_default_charset( + length) + + optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_default_charset( + optional_metadata.enum_and_set_default_charset, + optional_metadata.enum_and_set_charset_collation, + self._is_enum_or_set_column) + + elif field_type == MetadataFieldType.ENUM_AND_SET_COLUMN_CHARSET: + optional_metadata.enum_and_set_default_column_charset_list = self._read_ints(length) + + optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_column_charset( + optional_metadata.enum_and_set_default_column_charset_list, self._is_enum_or_set_column) + + elif field_type == MetadataFieldType.VISIBILITY: + optional_metadata.visibility_list = self._read_bool_list(length, False) + + return optional_metadata + + def _convert_include_non_numeric_column(self, signedness_bool_list): + # The incoming order of columns in the packet represents the indices of the numeric columns. + # Thus, it transforms non-numeric columns to align with the sorting. + bool_list = [] + position = 0 + for i in range(self.column_count): + column_type = self.columns[i].type + if self._is_numeric_column(column_type): + if signedness_bool_list[position]: + bool_list.append(True) + else: + bool_list.append(False) + position += 1 + else: + bool_list.append(False) + + return bool_list + + def _parsed_column_charset_by_default_charset(self, default_charset_collation: int, column_charset_collation: dict, + column_type_detect_function): + column_charset = [] + for i in range(self.column_count): + column_type = self.columns[i].type + if not column_type_detect_function(column_type, dbms=self.dbms): + continue + elif i not in column_charset_collation.keys(): + column_charset.append(default_charset_collation) + else: + column_charset.append(column_charset_collation[i]) + + return column_charset + + def _parsed_column_charset_by_column_charset(self, column_charset_list: list, column_type_detect_function): + column_charset = [] + position = 0 + if len(column_charset_list) == 0: + return + for i in range(self.column_count): + column_type = self.columns[i].type + if not column_type_detect_function(column_type, dbms=self.dbms): + continue + else: + column_charset.append(column_charset_list[position]) + position += 1 + + return column_charset + + def _read_bool_list(self, read_byte_length, signedness_flag): + # if signedness_flag true + # The order of the index in the packet is only the index between the numeric_columns. + # Therefore, we need to use numeric_column_count when calculating bits. + bool_list = [] + bytes_data = self.packet.read(read_byte_length) + + byte = 0 + byte_idx = 0 + bit_idx = 0 + + for i in range(self.column_count): + column_type = self.columns[i].type + if not self._is_numeric_column(column_type) and signedness_flag: + continue + if bit_idx == 0: + byte = bytes_data[byte_idx] + byte_idx += 1 + bool_list.append((byte & (0b10000000 >> bit_idx)) != 0) + bit_idx = (bit_idx + 1) % 8 + return bool_list + + def _read_default_charset(self, length): + charset = {} + read_until = self.packet.read_bytes + length + if self.packet.read_bytes >= read_until: + return + default_charset_collation = self.packet.read_length_coded_binary() + while self.packet.read_bytes < read_until: + column_index = self.packet.read_length_coded_binary() + charset_collation = self.packet.read_length_coded_binary() + charset[column_index] = charset_collation + + return default_charset_collation, charset + + def _read_ints(self, length): + result = [] + read_until = self.packet.read_bytes + length + while self.packet.read_bytes < read_until: + result.append(self.packet.read_length_coded_binary()) + return result + + def _read_column_names(self, length): + result = [] + read_until = self.packet.read_bytes + length + while self.packet.read_bytes < read_until: + result.append(self.packet.read_variable_length_string().decode()) + return result + + def _read_type_values(self, length): + result = [] + read_until = self.packet.read_bytes + length + if self.packet.read_bytes >= read_until: + return + while self.packet.read_bytes < read_until: + type_value_list = [] + value_count = self.packet.read_length_coded_binary() + for i in range(value_count): + value = self.packet.read_variable_length_string() + decode_value = "" + try: + decode_value = value.decode() + except UnicodeDecodeError: + # ignore not utf-8 decode type + pass + type_value_list.append(decode_value) + result.append(type_value_list) + return result + + def _read_primary_keys_with_prefix(self, length): + ints = self._read_ints(length) + result = {} + for i in range(0, len(ints), 2): + result[ints[i]] = ints[i + 1] + return result + + @staticmethod + def _is_character_column(column_type, dbms='mysql'): + if column_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING, FIELD_TYPE.VARCHAR, FIELD_TYPE.BLOB]: + return True + if column_type == FIELD_TYPE.GEOMETRY and dbms == 'mariadb': + return True + return False + + @staticmethod + def _is_enum_column(column_type): + if column_type == FIELD_TYPE.ENUM: + return True + return False + + @staticmethod + def _is_set_column(column_type): + if column_type == FIELD_TYPE.SET: + return True + return False + + @staticmethod + def _is_enum_or_set_column(column_type, dbms='mysql'): + if column_type in [FIELD_TYPE.ENUM, FIELD_TYPE.SET]: + return True + return False + + @staticmethod + def _is_numeric_column(column_type): + if column_type in [FIELD_TYPE.TINY, FIELD_TYPE.SHORT, FIELD_TYPE.INT24, FIELD_TYPE.LONG, + FIELD_TYPE.LONGLONG, FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.FLOAT, + FIELD_TYPE.DOUBLE, + FIELD_TYPE.YEAR]: + return True + return False + +class MetadataFieldType(Enum): + SIGNEDNESS = 1 # Signedness of numeric columns + DEFAULT_CHARSET = 2 # Charsets of character columns + COLUMN_CHARSET = 3 # Charsets of character columns + COLUMN_NAME = 4 # Names of columns + SET_STR_VALUE = 5 # The string values of SET columns + ENUM_STR_VALUE = 6 # The string values in ENUM columns + GEOMETRY_TYPE = 7 # The real type of geometry columns + SIMPLE_PRIMARY_KEY = 8 # The primary key without any prefix + PRIMARY_KEY_WITH_PREFIX = 9 # The primary key with some prefix + ENUM_AND_SET_DEFAULT_CHARSET = 10 # Charsets of ENUM and SET columns + ENUM_AND_SET_COLUMN_CHARSET = 11 # Charsets of ENUM and SET columns + VISIBILITY = 12 + UNKNOWN_METADATA_FIELD_TYPE = 128 + + @staticmethod + def by_index(index): + return MetadataFieldType(index) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 301ee3e9..88acda33 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -63,6 +63,13 @@ def isMySQL80AndMore(self): version = float(self.getMySQLVersion().rsplit('.', 1)[0]) return version >= 8.0 + def isMySQL8014AndMore(self): + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) + version_detail = int(self.getMySQLVersion().rsplit(".", 1)[1]) + if version > 8.0: + return True + return version == 8.0 and version_detail >= 14 + def isMariaDB(self): if self.__is_mariaDB is None: self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index f03b5663..c07b84c3 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -21,9 +21,11 @@ from pymysqlreplication.packet import BinLogPacketWrapper from pymysql.protocol import MysqlPacket -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", - "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", - "TestRowsQueryLogEvents"] +__all__ = [ + "TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", + "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", + "TestRowsQueryLogEvents", "TestOptionalMetaData" +] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -1444,6 +1446,181 @@ def test_query_event_latin1(self): assert event.query == r"CREATE TABLE test_latin1_\xd6\xc6\xdb (a INT)" +class TestOptionalMetaData(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestOptionalMetaData, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(TableMapEvent,), + fail_on_table_metadata_unavailable=True + ) + if not self.isMySQL8014AndMore(): + self.skipTest("Mysql version is under 8.0.14 - pass TestOptionalMetaData") + self.execute("SET GLOBAL binlog_row_metadata='FULL';") + + def test_signedness(self): + create_query = "CREATE TABLE test_signedness (col1 INT, col2 INT UNSIGNED);" + insert_query = "INSERT INTO test_signedness VALUES (-10, 10);" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.unsigned_column_list, [False, True]) + + def test_default_charset(self): + create_query = "CREATE TABLE test_default_charset (name VARCHAR(50)) CHARACTER SET utf8mb4;" + insert_query = "INSERT INTO test_default_charset VALUES ('Hello, World!');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.default_charset_collation, 45) + else: + self.assertEqual(event.optional_metadata.default_charset_collation, 255) + + def test_column_charset(self): + create_query = "CREATE TABLE test_column_charset (col1 VARCHAR(50), col2 VARCHAR(50) CHARACTER SET binary, col3 VARCHAR(50) CHARACTER SET latin1);" + insert_query = "INSERT INTO test_column_charset VALUES ('python', 'mysql', 'replication');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.column_charset, [45, 63, 8]) + else: + self.assertEqual(event.optional_metadata.column_charset, [255, 63, 8]) + + def test_column_name(self): + create_query = "CREATE TABLE test_column_name (col_int INT, col_varchar VARCHAR(30), col_bool BOOL);" + insert_query = "INSERT INTO test_column_name VALUES (1, 'Hello', true);" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.column_name_list, ['col_int', 'col_varchar', 'col_bool']) + + def test_set_str_value(self): + create_query = "CREATE TABLE test_set_str_value (skills SET('Programming', 'Writing', 'Design'));" + insert_query = "INSERT INTO test_set_str_value VALUES ('Programming,Writing');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.set_str_value_list, [['Programming', 'Writing', 'Design']]) + + def test_enum_str_value(self): + create_query = "CREATE TABLE test_enum_str_value (pet ENUM('Dog', 'Cat'));" + insert_query = "INSERT INTO test_enum_str_value VALUES ('Cat');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.set_enum_str_value_list, [['Dog', 'Cat']]) + + def test_geometry_type(self): + create_query = "CREATE TABLE test_geometry_type (location POINT);" + insert_query = "INSERT INTO test_geometry_type VALUES (Point(37.123, 125.987));" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.geometry_type_list, [1]) + + def test_simple_primary_key(self): + create_query = "CREATE TABLE test_simple_primary_key (c_key1 INT, c_key2 INT, c_not_key INT, PRIMARY KEY(c_key1, c_key2));" + insert_query = "INSERT INTO test_simple_primary_key VALUES (1, 2, 3);" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.simple_primary_key_list, [0, 1]) + + def test_primary_key_with_prefix(self): + create_query = "CREATE TABLE test_primary_key_with_prefix (c_key1 CHAR(100), c_key2 CHAR(10), c_not_key INT, c_key3 CHAR(100), PRIMARY KEY(c_key1(5), c_key2, c_key3(10)));" + insert_query = "INSERT INTO test_primary_key_with_prefix VALUES('1', '2', 3, '4');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.primary_keys_with_prefix, {0: 5, 1: 0, 3: 10}) + + def test_enum_and_set_default_charset(self): + create_query = "CREATE TABLE test_enum_and_set_default_charset (pet ENUM('Dog', 'Cat'), skills SET('Programming', 'Writing', 'Design')) CHARACTER SET utf8mb4;" + insert_query = "INSERT INTO test_enum_and_set_default_charset VALUES('Dog', 'Design');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [45, 45]) + else: + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [255, 255]) + + def test_enum_and_set_column_charset(self): + create_query = "CREATE TABLE test_enum_and_set_column_charset (pet ENUM('Dog', 'Cat') CHARACTER SET utf8mb4, number SET('00', '01', '10', '11') CHARACTER SET binary);" + insert_query = "INSERT INTO test_enum_and_set_column_charset VALUES('Cat', '10');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [45, 63]) + else: + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [255, 63]) + + def test_visibility(self): + create_query = "CREATE TABLE test_visibility (name VARCHAR(50), secret_key VARCHAR(50) DEFAULT 'qwerty' INVISIBLE);" + insert_query = "INSERT INTO test_visibility VALUES('Audrey');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if not self.isMariaDB(): + self.assertEqual(event.optional_metadata.visibility_list, [True, False]) + + def tearDown(self): + self.execute("SET GLOBAL binlog_row_metadata='MINIMAL';") + super(TestOptionalMetaData, self).tearDown() + if __name__ == "__main__": import unittest unittest.main() diff --git a/setup.py b/setup.py index 31cda19b..8d2cc77b 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ def run(self): unittest.main(tests, argv=sys.argv[:1]) -version = "0.43.0" +version = "0.44.0" this_directory = Path(__file__).parent long_description = (this_directory / "README.md").read_text()