diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index ecc02dce..4bf28cd6 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -9,12 +9,13 @@ from .packet import BinLogPacketWrapper from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT +from .constants.FLAG import FLAG_STMT_END_F, FLAG_NO_FOREIGN_KEY_CHECKS_F, FLAG_RELAXED_UNIQUE_CHECKS_F, FLAG_COMPLETE_ROWS_F from .event import NotImplementedEvent - MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] #2013 Connection Lost #2006 MySQL server has gone away + class BinLogStreamReader(object): """Connect to replication stream and read event """ @@ -45,6 +46,7 @@ def __init__(self, connection_settings={}, resume_stream=False, #Store table meta information self.table_map = {} + self.dict_table_id = {} self.log_pos = log_pos self.log_file = log_file @@ -154,9 +156,9 @@ def fetchone(self): self._ctl_connection, self.__use_checksum) if binlog_event.event_type == TABLE_MAP_EVENT: - self.table_map[binlog_event.event.table_id] = \ - binlog_event.event.get_table() - + if binlog_event.event.table_id not in self.table_map: + self.table_map[binlog_event.event.table_id] = binlog_event.event.get_table() + self.__manage_table_map(binlog_event.event.table_id) if binlog_event.event_type == ROTATE_EVENT: self.log_pos = binlog_event.event.position self.log_file = binlog_event.event.next_binlog @@ -166,16 +168,18 @@ def fetchone(self): # wrong table schema. # The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it # restarts. That means every rotation we see *could* be a sign of restart and so potentially - # invalidates all our cached table id to schema mappings. This means we have to load them all + # invalidates all our cached table id to schema mappings. This means we have to load them all # again for each logfile which is potentially wasted effort but we can't really do much better # without being broken in restart case self.table_map = {} + self.dict_table_id = {} elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos + self.__manage_if_last_event_of_statement(binlog_event) + if self.__filter_event(binlog_event.event): continue - return binlog_event.event def __filter_event(self, event): @@ -215,5 +219,42 @@ def __get_table_information(self, schema, table): else: raise error + def __manage_if_last_event_of_statement(self, last_event_statement): + """ + looking for flags to FLAG_STMT_END_F + if event is the last event of a statement + """ + for row_event in last_event_statement: + if isinstance(event, row_event): + if event.flags & FLAG_STMT_END_F : + key = "%s.%s" % (self.table_map[event.table_id].schema,self.table_map[event.table_id].table) + + # key exists ? + if key in self.dict_table_id: + # keep the last one + del self.dict_table_id[key] + + # id exists ? + if event.table_id in self.table_map: + # del it + del self.table_map[event.table_id] + break + + def __manage_table_map(self, new_table_id): + """ + Looking for a duplicate entry in self.table_map (same schema.table with old table_id) + from the new_table_id + """ + key = "%s.%s" % (self.table_map[new_table_id].schema,self.table_map[new_table_id].table) + + # key exists ? + if key in self.dict_table_id: + # get old entry + if self.dict_table_id[key] in self.table_map: + # del it + del self.table_map[self.dict_table_id[key]] + # keep the last one + self.dict_table_id[key] = new_table_id + def __iter__(self): return iter(self.fetchone, None) diff --git a/pymysqlreplication/constants/FLAG.py b/pymysqlreplication/constants/FLAG.py new file mode 100644 index 00000000..36e9e5f8 --- /dev/null +++ b/pymysqlreplication/constants/FLAG.py @@ -0,0 +1,11 @@ +# Last event of a statement +FLAG_STMT_END_F = 1 << 0 +# Value of the OPTION_NO_FOREIGN_KEY_CHECKS flag in thd->options +FLAG_NO_FOREIGN_KEY_CHECKS_F = 1 << 1 +# Value of the OPTION_RELAXED_UNIQUE_CHECKS flag in thd->options +FLAG_RELAXED_UNIQUE_CHECKS_F = 1 << 2 +# +# Indicates that rows in this event are complete, that is contain +# values for all columns of the table. +# +FLAG_COMPLETE_ROWS_F = 1 << 3