Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/optional meta data extraction #471

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ def __connect_to_ctl(self):
self._ctl_connection_settings["autocommit"] = True
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
self._ctl_connection._get_table_information = self.__get_table_information
self._ctl_connection._get_dbms = self.__get_dbms
self.__connected_ctl = True

def __checksum_enabled(self):
Expand Down Expand Up @@ -674,5 +675,17 @@ def __get_table_information(self, schema, table):
else:
raise error

def __get_dbms(self):
if not self.__connected_ctl:
self.__connect_to_ctl()

cur = self._ctl_connection.cursor()
cur.execute("SELECT VERSION();")
version_info = cur.fetchone().get('VERSION()', '')

if 'MariaDB' in version_info:
return 'mariadb'
return 'mysql'

def __iter__(self):
return iter(self.fetchone, None)
3 changes: 3 additions & 0 deletions pymysqlreplication/constants/BINLOG.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@
MARIADB_GTID_EVENT = 0xa2
MARIADB_GTID_GTID_LIST_EVENT = 0xa3
MARIADB_START_ENCRYPTION_EVENT = 0xa4

# Common-Footer
BINLOG_CHECKSUM_LEN = 4
3 changes: 3 additions & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,3 +500,6 @@ def read_string(self):
string += char

return string

def bytes_to_read(self):
return len(self.packet._data) - self.packet._position
289 changes: 288 additions & 1 deletion pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json

from pymysql.charset import charset_by_name
from enum import Enum

from .event import BinLogEvent
from .exceptions import TableMetadataUnavailableError
Expand Down Expand Up @@ -552,7 +553,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
super().__init__(from_packet, event_size,
table_map, ctl_connection, **kwargs)
if self._processed:
#Body
# Body
self.columns_present_bitmap = self.packet.read(
(self.number_of_columns + 7) / 8)
self.columns_present_bitmap2 = self.packet.read(
Expand All @@ -577,6 +578,40 @@ def _dump(self):
row["before_values"][key],
row["after_values"][key]))

class OptionalMetaData:
def __init__(self):
self.unsigned_column_list = []
self.default_charset_collation = None
self.charset_collation = {}
self.column_charset = []
self.column_name_list = []
self.set_str_value_list = []
self.set_enum_str_value_list = []
self.geometry_type_list = []
self.simple_primary_key_list = []
self.primary_keys_with_prefix = {}
self.enum_and_set_default_charset = None
self.enum_and_set_charset_collation = {}
self.enum_and_set_default_column_charset_list = []
self.charset_collation_list = []
self.enum_and_set_collation_list = []
self.visibility_list = []

def dump(self):
print("=== %s ===" % self.__class__.__name__)
print("unsigned_column_list: %s" % self.unsigned_column_list)
print("default_charset_collation: %s" % self.default_charset_collation)
print("charset_collation: %s" % self.charset_collation)
print("column_charset: %s" % self.column_charset)
print("column_name_list: %s" % self.column_name_list)
print("set_str_value_list : %s" % self.set_str_value_list)
print("set_enum_str_value_list : %s" % self.set_enum_str_value_list)
print("geometry_type_list : %s" % self.geometry_type_list)
print("simple_primary_key_list: %s" % self.simple_primary_key_list)
print("primary_keys_with_prefix: %s" % self.primary_keys_with_prefix)
print("visibility_list: %s" % self.visibility_list)
print("charset_collation_list: %s" % self.charset_collation_list)
print("enum_and_set_collation_list: %s" % self.enum_and_set_collation_list)

class TableMapEvent(BinLogEvent):
"""This event describes the structure of a table.
Expand Down Expand Up @@ -633,6 +668,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
else:
self.column_schemas = self._ctl_connection._get_table_information(self.schema, self.table)

self.dbms = self._ctl_connection._get_dbms()
ordinal_pos_loc = 0

if self.column_count != 0:
Expand Down Expand Up @@ -675,6 +711,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
# ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise
## Refer to definition of and call to row.event._is_null() to interpret bitmap corresponding to columns
self.null_bitmask = self.packet.read((self.column_count + 7) / 8)
# optional meta Data
self.optional_metadata = self._get_optional_meta_data()

def get_table(self):
return self.table_obj
Expand All @@ -685,3 +723,252 @@ def _dump(self):
print("Schema: %s" % (self.schema))
print("Table: %s" % (self.table))
print("Columns: %s" % (self.column_count))
self.optional_metadata.dump()

def _get_optional_meta_data(self):
"""
DEFAULT_CHARSET and COLUMN_CHARSET don't appear together,
and ENUM_AND_SET_DEFAULT_CHARSET and ENUM_AND_SET_COLUMN_CHARSET don't appear together.
They are just alternative ways to pack character set information.
When binlogging, it logs character sets in the way that occupies least storage.

TLV format data (TYPE, LENGTH, VALUE)
"""
optional_metadata = OptionalMetaData()
while self.packet.bytes_to_read() > BINLOG.BINLOG_CHECKSUM_LEN:
option_metadata_type = self.packet.read(1)[0]
length = self.packet.read_length_coded_binary()
field_type: MetadataFieldType = MetadataFieldType.by_index(option_metadata_type)

if field_type == MetadataFieldType.SIGNEDNESS:
signed_column_list = self._convert_include_non_numeric_column(
self._read_bool_list(length, True))
optional_metadata.unsigned_column_list = signed_column_list

elif field_type == MetadataFieldType.DEFAULT_CHARSET:
optional_metadata.default_charset_collation, optional_metadata.charset_collation = self._read_default_charset(
length)
optional_metadata.charset_collation_list = self._parsed_column_charset_by_default_charset(
optional_metadata.default_charset_collation,
optional_metadata.charset_collation,
self._is_character_column)

elif field_type == MetadataFieldType.COLUMN_CHARSET:
optional_metadata.column_charset = self._read_ints(length)
optional_metadata.charset_collation_list = self._parsed_column_charset_by_column_charset(
optional_metadata.column_charset, self._is_character_column)

elif field_type == MetadataFieldType.COLUMN_NAME:
optional_metadata.column_name_list = self._read_column_names(length)

elif field_type == MetadataFieldType.SET_STR_VALUE:
optional_metadata.set_str_value_list = self._read_type_values(length)

elif field_type == MetadataFieldType.ENUM_STR_VALUE:
optional_metadata.set_enum_str_value_list = self._read_type_values(length)

elif field_type == MetadataFieldType.GEOMETRY_TYPE:
optional_metadata.geometry_type_list = self._read_ints(length)

elif field_type == MetadataFieldType.SIMPLE_PRIMARY_KEY:
optional_metadata.simple_primary_key_list = self._read_ints(length)

elif field_type == MetadataFieldType.PRIMARY_KEY_WITH_PREFIX:
optional_metadata.primary_keys_with_prefix = self._read_primary_keys_with_prefix(length)

elif field_type == MetadataFieldType.ENUM_AND_SET_DEFAULT_CHARSET:
optional_metadata.enum_and_set_default_charset, optional_metadata.enum_and_set_charset_collation = self._read_default_charset(
length)

optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_default_charset(
optional_metadata.enum_and_set_default_charset,
optional_metadata.enum_and_set_charset_collation,
self._is_enum_or_set_column)

elif field_type == MetadataFieldType.ENUM_AND_SET_COLUMN_CHARSET:
optional_metadata.enum_and_set_default_column_charset_list = self._read_ints(length)

optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_column_charset(
optional_metadata.enum_and_set_default_column_charset_list, self._is_enum_or_set_column)

elif field_type == MetadataFieldType.VISIBILITY:
optional_metadata.visibility_list = self._read_bool_list(length, False)

return optional_metadata

def _convert_include_non_numeric_column(self, signedness_bool_list):
# The incoming order of columns in the packet represents the indices of the numeric columns.
# Thus, it transforms non-numeric columns to align with the sorting.
bool_list = []
position = 0
for i in range(self.column_count):
column_type = self.columns[i].type
if self._is_numeric_column(column_type):
if signedness_bool_list[position]:
bool_list.append(True)
else:
bool_list.append(False)
position += 1
else:
bool_list.append(False)

return bool_list

def _parsed_column_charset_by_default_charset(self, default_charset_collation: int, column_charset_collation: dict,
column_type_detect_function):
column_charset = []
for i in range(self.column_count):
column_type = self.columns[i].type
if not column_type_detect_function(column_type, dbms=self.dbms):
continue
elif i not in column_charset_collation.keys():
column_charset.append(default_charset_collation)
else:
column_charset.append(column_charset_collation[i])

return column_charset

def _parsed_column_charset_by_column_charset(self, column_charset_list: list, column_type_detect_function):
column_charset = []
position = 0
if len(column_charset_list) == 0:
return
for i in range(self.column_count):
column_type = self.columns[i].type
if not column_type_detect_function(column_type, dbms=self.dbms):
continue
else:
column_charset.append(column_charset_list[position])
position += 1

return column_charset

def _read_bool_list(self, read_byte_length, signedness_flag):
# if signedness_flag true
# The order of the index in the packet is only the index between the numeric_columns.
# Therefore, we need to use numeric_column_count when calculating bits.
bool_list = []
bytes_data = self.packet.read(read_byte_length)

byte = 0
byte_idx = 0
bit_idx = 0

for i in range(self.column_count):
column_type = self.columns[i].type
if not self._is_numeric_column(column_type) and signedness_flag:
continue
if bit_idx == 0:
byte = bytes_data[byte_idx]
byte_idx += 1
bool_list.append((byte & (0b10000000 >> bit_idx)) != 0)
bit_idx = (bit_idx + 1) % 8
return bool_list

def _read_default_charset(self, length):
charset = {}
read_until = self.packet.read_bytes + length
if self.packet.read_bytes >= read_until:
return
default_charset_collation = self.packet.read_length_coded_binary()
while self.packet.read_bytes < read_until:
column_index = self.packet.read_length_coded_binary()
charset_collation = self.packet.read_length_coded_binary()
charset[column_index] = charset_collation

return default_charset_collation, charset

def _read_ints(self, length):
result = []
read_until = self.packet.read_bytes + length
while self.packet.read_bytes < read_until:
result.append(self.packet.read_length_coded_binary())
return result

def _read_column_names(self, length):
result = []
read_until = self.packet.read_bytes + length
while self.packet.read_bytes < read_until:
result.append(self.packet.read_variable_length_string().decode())
return result

def _read_type_values(self, length):
result = []
read_until = self.packet.read_bytes + length
if self.packet.read_bytes >= read_until:
return
while self.packet.read_bytes < read_until:
type_value_list = []
value_count = self.packet.read_length_coded_binary()
for i in range(value_count):
value = self.packet.read_variable_length_string()
decode_value = ""
try:
decode_value = value.decode()
except UnicodeDecodeError:
# ignore not utf-8 decode type
pass
type_value_list.append(decode_value)
result.append(type_value_list)
return result

def _read_primary_keys_with_prefix(self, length):
ints = self._read_ints(length)
result = {}
for i in range(0, len(ints), 2):
result[ints[i]] = ints[i + 1]
return result

@staticmethod
def _is_character_column(column_type, dbms='mysql'):
if column_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING, FIELD_TYPE.VARCHAR, FIELD_TYPE.BLOB]:
return True
if column_type == FIELD_TYPE.GEOMETRY and dbms == 'mariadb':
return True
return False

@staticmethod
def _is_enum_column(column_type):
if column_type == FIELD_TYPE.ENUM:
return True
return False

@staticmethod
def _is_set_column(column_type):
if column_type == FIELD_TYPE.SET:
return True
return False

@staticmethod
def _is_enum_or_set_column(column_type, dbms='mysql'):
if column_type in [FIELD_TYPE.ENUM, FIELD_TYPE.SET]:
return True
return False

@staticmethod
def _is_numeric_column(column_type):
if column_type in [FIELD_TYPE.TINY, FIELD_TYPE.SHORT, FIELD_TYPE.INT24, FIELD_TYPE.LONG,
FIELD_TYPE.LONGLONG, FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.FLOAT,
FIELD_TYPE.DOUBLE,
FIELD_TYPE.YEAR]:
return True
return False

class MetadataFieldType(Enum):
SIGNEDNESS = 1 # Signedness of numeric columns
DEFAULT_CHARSET = 2 # Charsets of character columns
COLUMN_CHARSET = 3 # Charsets of character columns
COLUMN_NAME = 4 # Names of columns
SET_STR_VALUE = 5 # The string values of SET columns
ENUM_STR_VALUE = 6 # The string values in ENUM columns
GEOMETRY_TYPE = 7 # The real type of geometry columns
SIMPLE_PRIMARY_KEY = 8 # The primary key without any prefix
PRIMARY_KEY_WITH_PREFIX = 9 # The primary key with some prefix
ENUM_AND_SET_DEFAULT_CHARSET = 10 # Charsets of ENUM and SET columns
ENUM_AND_SET_COLUMN_CHARSET = 11 # Charsets of ENUM and SET columns
VISIBILITY = 12
UNKNOWN_METADATA_FIELD_TYPE = 128

@staticmethod
def by_index(index):
return MetadataFieldType(index)
7 changes: 7 additions & 0 deletions pymysqlreplication/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def isMySQL80AndMore(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
return version >= 8.0

def isMySQL8014AndMore(self):
version = float(self.getMySQLVersion().rsplit(".", 1)[0])
version_detail = int(self.getMySQLVersion().rsplit(".", 1)[1])
if version > 8.0:
return True
return version == 8.0 and version_detail >= 14

def isMariaDB(self):
if self.__is_mariaDB is None:
self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0]
Expand Down