From 0fb1dea7f0c62ce047ef93481e9f095a87e803da Mon Sep 17 00:00:00 2001 From: Duprat Date: Thu, 19 Dec 2013 20:49:48 +0100 Subject: [PATCH 1/5] Create yieldbinlogstream.py --- pymysqlreplication/yieldbinlogstream.py | 295 ++++++++++++++++++++++++ 1 file changed, 295 insertions(+) create mode 100644 pymysqlreplication/yieldbinlogstream.py diff --git a/pymysqlreplication/yieldbinlogstream.py b/pymysqlreplication/yieldbinlogstream.py new file mode 100644 index 00000000..c8b703b1 --- /dev/null +++ b/pymysqlreplication/yieldbinlogstream.py @@ -0,0 +1,295 @@ +# -*- coding: utf-8 -*- + +import pymysql +import pymysql.cursors +import struct + +from pymysql.constants.COMMAND import COM_BINLOG_DUMP +from pymysql.util import int2byte + +from pymysqlreplication.packet import BinLogPacketWrapper +from pymysqlreplication.constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT +from pymysqlreplication.event import NotImplementedEvent, QueryEvent +from pymysqlreplication.row_event import WriteRowsEvent, DeleteRowsEvent, UpdateRowsEvent + +import time + +last_event_statement = (WriteRowsEvent, DeleteRowsEvent, UpdateRowsEvent) + +# have to be in constants.FlagsEvent.py +#------------------------------------------ +# Last event of a statement */ +FLAG_STMT_END_F = 1 << 0 +# Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options */ +FLAG_NO_FOREIGN_KEY_CHECKS_F = 1 << 1 +# Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options */ +FLAG_RELAXED_UNIQUE_CHECKS_F = 1 << 2 +# +# Indicates that rows in this event are complete, that is contain +# values for all columns of the table. +# +FLAG_COMPLETE_ROWS_F = 1 << 3 + +class YieldBinLogStreamReader(object): + """Connect to replication stream and read event + """ + + def __init__(self, connection_settings={}, resume_stream=False, + blocking=False, only_events=None, server_id=255, + log_file=None, log_pos=None, filter_non_implemented_events=True): + """ + Attributes: + resume_stream: Start for event from position or the latest event of + binlog or from older available event + blocking: Read on stream is blocking + only_events: Array of allowed events + log_file: Set replication start log file + log_pos: Set replication start log pos + """ + self.__connection_settings = connection_settings + self.__connection_settings["charset"] = "utf8" + + self.__connected_stream = False + self.__connected_ctl = False + self.__resume_stream = resume_stream + self.__blocking = blocking + self.__only_events = only_events + self.__filter_non_implemented_events = filter_non_implemented_events + self.__server_id = server_id + self.__use_checksum = False + + #Store table meta information + self.table_map = {} + # added by YD - 2013/12/12 + #----------------------- + self.dict_table_id = {} + + self.log_pos = log_pos + self.log_file = log_file + + def close(self): + if self.__connected_stream: + self._stream_connection.close() + self.__connected_stream = False + if self.__connected_ctl: + self._ctl_connection.close() + self.__connected_ctl = False + + def __connect_to_ctl(self): + self._ctl_connection_settings = dict(self.__connection_settings) + self._ctl_connection_settings["db"] = "information_schema" + self._ctl_connection_settings["cursorclass"] = \ + pymysql.cursors.DictCursor + self._ctl_connection = pymysql.connect(**self._ctl_connection_settings) + self._ctl_connection._get_table_information = self.__get_table_information + self.__connected_ctl = True + + def __checksum_enabled(self): + '''Return True if binlog-checksum = CRC32. Only for MySQL > 5.6 ''' + cur = self._stream_connection.cursor() + cur.execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'") + result = cur.fetchone() + cur.close() + + if result is None: + return False + var, value = result[:2] + if value == 'NONE': + return False + return True + + def __connect_to_stream(self): + # log_pos (4) -- position in the binlog-file to start the stream with + # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1) + # server_id (4) -- server id of this slave + # log_file (string.EOF) -- filename of the binlog on the master + self._stream_connection = pymysql.connect(**self.__connection_settings) + + self.__use_checksum = self.__checksum_enabled() + + #If cheksum is enabled we need to inform the server about the that we support it + if self.__use_checksum: + cur = self._stream_connection.cursor() + cur.execute("set @master_binlog_checksum= @@global.binlog_checksum") + cur.close() + + # only when log_file and log_pos both provided, the position info is + # valid, if not, get the current position from master + if self.log_file is None or self.log_pos is None: + cur = self._stream_connection.cursor() + cur.execute("SHOW MASTER STATUS") + self.log_file, self.log_pos = cur.fetchone()[:2] + cur.close() + + prelude = struct.pack(' Date: Sun, 5 Jan 2014 20:49:35 +0100 Subject: [PATCH 2/5] Merge yieldbinlogstream.py into binlogstream.py --- pymysqlreplication/binlogstream.py | 61 ++++- pymysqlreplication/yieldbinlogstream.py | 295 ------------------------ 2 files changed, 56 insertions(+), 300 deletions(-) delete mode 100644 pymysqlreplication/yieldbinlogstream.py diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index ecc02dce..af918341 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -11,9 +11,21 @@ from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT from .event import NotImplementedEvent - MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] #2013 Connection Lost #2006 MySQL server has gone away +# have to be in constants.FlagsEvent.py +#------------------------------------------ +# Last event of a statement */ +FLAG_STMT_END_F = 1 << 0 +# Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options */ +FLAG_NO_FOREIGN_KEY_CHECKS_F = 1 << 1 +# Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options */ +FLAG_RELAXED_UNIQUE_CHECKS_F = 1 << 2 +# +# Indicates that rows in this event are complete, that is contain +# values for all columns of the table. +# +FLAG_COMPLETE_ROWS_F = 1 << 3 class BinLogStreamReader(object): """Connect to replication stream and read event @@ -45,6 +57,7 @@ def __init__(self, connection_settings={}, resume_stream=False, #Store table meta information self.table_map = {} + self.dict_table_id = {} self.log_pos = log_pos self.log_file = log_file @@ -154,9 +167,9 @@ def fetchone(self): self._ctl_connection, self.__use_checksum) if binlog_event.event_type == TABLE_MAP_EVENT: - self.table_map[binlog_event.event.table_id] = \ - binlog_event.event.get_table() - + if binlog_event.event.table_id not in self.table_map: + self.table_map[binlog_event.event.table_id] = binlog_event.event.get_table() + self.__manage_table_map(binlog_event.event.table_id) if binlog_event.event_type == ROTATE_EVENT: self.log_pos = binlog_event.event.position self.log_file = binlog_event.event.next_binlog @@ -166,7 +179,7 @@ def fetchone(self): # wrong table schema. # The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it # restarts. That means every rotation we see *could* be a sign of restart and so potentially - # invalidates all our cached table id to schema mappings. This means we have to load them all + # invalidates all our cached table id to schema mappings. This means we have to load them all # again for each logfile which is potentially wasted effort but we can't really do much better # without being broken in restart case self.table_map = {} @@ -215,5 +228,43 @@ def __get_table_information(self, schema, table): else: raise error + def __manage_if_last_event_of_statement(self, event): + """ + looking for flags to FLAG_STMT_END_F + if event is the last event of a statement + """ + for row_event in last_event_statement: + if isinstance(event, row_event): + if event.flags & FLAG_STMT_END_F : + key = "%s.%s" % (self.table_map[event.table_id].schema,self.table_map[event.table_id].table) + + # key exists ? + if key in self.dict_table_id: + # keep the last one + del self.dict_table_id[key] + + # id exists ? + if event.table_id in self.table_map: + # del it + del self.table_map[event.table_id] + break + + def __manage_table_map(self, new_table_id): + """ + # added by YD - 2013/12/12 + Looking for a duplicate entry in self.table_map (same schema.table with old table_id) + from the new_table_id + """ + key = "%s.%s" % (self.table_map[new_table_id].schema,self.table_map[new_table_id].table) + + # key exists ? + if key in self.dict_table_id: + # get old entry + if self.dict_table_id[key] in self.table_map: + # del it + del self.table_map[self.dict_table_id[key]] + # keep the last one + self.dict_table_id[key] = new_table_id + def __iter__(self): return iter(self.fetchone, None) diff --git a/pymysqlreplication/yieldbinlogstream.py b/pymysqlreplication/yieldbinlogstream.py deleted file mode 100644 index c8b703b1..00000000 --- a/pymysqlreplication/yieldbinlogstream.py +++ /dev/null @@ -1,295 +0,0 @@ -# -*- coding: utf-8 -*- - -import pymysql -import pymysql.cursors -import struct - -from pymysql.constants.COMMAND import COM_BINLOG_DUMP -from pymysql.util import int2byte - -from pymysqlreplication.packet import BinLogPacketWrapper -from pymysqlreplication.constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT -from pymysqlreplication.event import NotImplementedEvent, QueryEvent -from pymysqlreplication.row_event import WriteRowsEvent, DeleteRowsEvent, UpdateRowsEvent - -import time - -last_event_statement = (WriteRowsEvent, DeleteRowsEvent, UpdateRowsEvent) - -# have to be in constants.FlagsEvent.py -#------------------------------------------ -# Last event of a statement */ -FLAG_STMT_END_F = 1 << 0 -# Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options */ -FLAG_NO_FOREIGN_KEY_CHECKS_F = 1 << 1 -# Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options */ -FLAG_RELAXED_UNIQUE_CHECKS_F = 1 << 2 -# -# Indicates that rows in this event are complete, that is contain -# values for all columns of the table. -# -FLAG_COMPLETE_ROWS_F = 1 << 3 - -class YieldBinLogStreamReader(object): - """Connect to replication stream and read event - """ - - def __init__(self, connection_settings={}, resume_stream=False, - blocking=False, only_events=None, server_id=255, - log_file=None, log_pos=None, filter_non_implemented_events=True): - """ - Attributes: - resume_stream: Start for event from position or the latest event of - binlog or from older available event - blocking: Read on stream is blocking - only_events: Array of allowed events - log_file: Set replication start log file - log_pos: Set replication start log pos - """ - self.__connection_settings = connection_settings - self.__connection_settings["charset"] = "utf8" - - self.__connected_stream = False - self.__connected_ctl = False - self.__resume_stream = resume_stream - self.__blocking = blocking - self.__only_events = only_events - self.__filter_non_implemented_events = filter_non_implemented_events - self.__server_id = server_id - self.__use_checksum = False - - #Store table meta information - self.table_map = {} - # added by YD - 2013/12/12 - #----------------------- - self.dict_table_id = {} - - self.log_pos = log_pos - self.log_file = log_file - - def close(self): - if self.__connected_stream: - self._stream_connection.close() - self.__connected_stream = False - if self.__connected_ctl: - self._ctl_connection.close() - self.__connected_ctl = False - - def __connect_to_ctl(self): - self._ctl_connection_settings = dict(self.__connection_settings) - self._ctl_connection_settings["db"] = "information_schema" - self._ctl_connection_settings["cursorclass"] = \ - pymysql.cursors.DictCursor - self._ctl_connection = pymysql.connect(**self._ctl_connection_settings) - self._ctl_connection._get_table_information = self.__get_table_information - self.__connected_ctl = True - - def __checksum_enabled(self): - '''Return True if binlog-checksum = CRC32. Only for MySQL > 5.6 ''' - cur = self._stream_connection.cursor() - cur.execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'") - result = cur.fetchone() - cur.close() - - if result is None: - return False - var, value = result[:2] - if value == 'NONE': - return False - return True - - def __connect_to_stream(self): - # log_pos (4) -- position in the binlog-file to start the stream with - # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1) - # server_id (4) -- server id of this slave - # log_file (string.EOF) -- filename of the binlog on the master - self._stream_connection = pymysql.connect(**self.__connection_settings) - - self.__use_checksum = self.__checksum_enabled() - - #If cheksum is enabled we need to inform the server about the that we support it - if self.__use_checksum: - cur = self._stream_connection.cursor() - cur.execute("set @master_binlog_checksum= @@global.binlog_checksum") - cur.close() - - # only when log_file and log_pos both provided, the position info is - # valid, if not, get the current position from master - if self.log_file is None or self.log_pos is None: - cur = self._stream_connection.cursor() - cur.execute("SHOW MASTER STATUS") - self.log_file, self.log_pos = cur.fetchone()[:2] - cur.close() - - prelude = struct.pack(' Date: Sun, 5 Jan 2014 20:57:54 +0100 Subject: [PATCH 3/5] Move constant to another file --- pymysqlreplication/binlogstream.py | 15 +-------------- pymysqlreplication/constants/FLAG.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 14 deletions(-) create mode 100644 pymysqlreplication/constants/FLAG.py diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index af918341..096bed8a 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -9,23 +9,11 @@ from .packet import BinLogPacketWrapper from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT +from .constants.FLAG import FLAG_STMT_END_F, FLAG_NO_FOREIGN_KEY_CHECKS_F, FLAG_RELAXED_UNIQUE_CHECKS_F, FLAG_COMPLETE_ROWS_F from .event import NotImplementedEvent MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] #2013 Connection Lost #2006 MySQL server has gone away -# have to be in constants.FlagsEvent.py -#------------------------------------------ -# Last event of a statement */ -FLAG_STMT_END_F = 1 << 0 -# Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options */ -FLAG_NO_FOREIGN_KEY_CHECKS_F = 1 << 1 -# Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options */ -FLAG_RELAXED_UNIQUE_CHECKS_F = 1 << 2 -# -# Indicates that rows in this event are complete, that is contain -# values for all columns of the table. -# -FLAG_COMPLETE_ROWS_F = 1 << 3 class BinLogStreamReader(object): """Connect to replication stream and read event @@ -251,7 +239,6 @@ def __manage_if_last_event_of_statement(self, event): def __manage_table_map(self, new_table_id): """ - # added by YD - 2013/12/12 Looking for a duplicate entry in self.table_map (same schema.table with old table_id) from the new_table_id """ diff --git a/pymysqlreplication/constants/FLAG.py b/pymysqlreplication/constants/FLAG.py new file mode 100644 index 00000000..36e9e5f8 --- /dev/null +++ b/pymysqlreplication/constants/FLAG.py @@ -0,0 +1,11 @@ +# Last event of a statement +FLAG_STMT_END_F = 1 << 0 +# Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options +FLAG_NO_FOREIGN_KEY_CHECKS_F = 1 << 1 +# Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options +FLAG_RELAXED_UNIQUE_CHECKS_F = 1 << 2 +# +# Indicates that rows in this event are complete, that is contain +# values for all columns of the table. +# +FLAG_COMPLETE_ROWS_F = 1 << 3 From f0b12f3f2430e76d0f1d90761bf6664b91be1d36 Mon Sep 17 00:00:00 2001 From: Julien Duponchelle Date: Tue, 15 Jul 2014 15:12:53 +0200 Subject: [PATCH 4/5] Reset dict table id also --- pymysqlreplication/binlogstream.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 096bed8a..99a17d72 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -15,6 +15,7 @@ MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] #2013 Connection Lost #2006 MySQL server has gone away + class BinLogStreamReader(object): """Connect to replication stream and read event """ @@ -171,6 +172,7 @@ def fetchone(self): # again for each logfile which is potentially wasted effort but we can't really do much better # without being broken in restart case self.table_map = {} + self.dict_table_id = {} elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos From c8a6732b699cb8af2fbc918308799020ac4313a4 Mon Sep 17 00:00:00 2001 From: Julien Duponchelle Date: Tue, 15 Jul 2014 15:58:19 +0200 Subject: [PATCH 5/5] Manage last event --- pymysqlreplication/binlogstream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 99a17d72..4bf28cd6 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -176,9 +176,10 @@ def fetchone(self): elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos + self.__manage_if_last_event_of_statement(binlog_event) + if self.__filter_event(binlog_event.event): continue - return binlog_event.event def __filter_event(self, event): @@ -218,7 +219,7 @@ def __get_table_information(self, schema, table): else: raise error - def __manage_if_last_event_of_statement(self, event): + def __manage_if_last_event_of_statement(self, last_event_statement): """ looking for flags to FLAG_STMT_END_F if event is the last event of a statement