diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java index 83b6146a..0723d62c 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java @@ -95,18 +95,9 @@ public interface TransactionProcessor public abstract void onEndTransaction(Transaction txn) throws DatabusException; }; - /** Track current table Id as reported in bin logs */ - private long _currTableId = -1; - - /** Track current table name as reported in bin logs */ - private String _currTableName = ""; - /** Transaction object containing the current transaction that is being built from the Binlog **/ private Transaction _transaction = null; - /** Batch of events corresponding to single source (table) in the current transaction **/ - private PerSourceTransaction _perSourceTransaction = null; - /** Track current file number for generating SCN **/ private int _currFileNum; @@ -139,6 +130,10 @@ public interface TransactionProcessor /** Flag to indicate the begining of the txn is seen. Used to indicate **/ private boolean _isBeginTxnSeen = false; + /** Track all the table map events, cleared when the binlog rotated **/ + private final Map _tableMapEvents = new HashMap(); + + /** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/ private BlockingQueue _binlogEventQueue = null; public ORListener(String name, @@ -177,43 +172,10 @@ public void onEvents(BinlogEventV4 event) private void processTableMapEvent(TableMapEvent tme) { - String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase(); - long newTableId = tme.getTableId(); - - final boolean areTableNamesEqual = _currTableName.equals(newTableName); - final boolean areTableIdsEqual = (_currTableId == newTableId); - final boolean didTableNameChange = !(areTableNamesEqual && areTableIdsEqual); - final boolean errorTransition = (areTableNamesEqual && !areTableIdsEqual) || (!areTableNamesEqual && areTableIdsEqual); + _tableMapEvents.put(tme.getTableId(), tme); - if (_currTableName.isEmpty() && (_currTableId == -1)) - { - // First TableMapEvent for the transaction. Indicates the first event in the transaction is yet to come - startSource(newTableName, newTableId); - } - else if (didTableNameChange) - { - // Event will come for a new source. Invoke an endSource on currTableName, and a startSource on newTableName - endSource(); - startSource(newTableName, newTableId); - } - else - { - _log.error("Unexpected : TableMap Event obtained :" + tme); - throw new DatabusRuntimeException("Unexpected : TableMap Event obtained :" + - " _currTableName = " + _currTableName + - " _curTableId = " + _currTableId + - " newTableName = " + newTableName + - " newTableId = " + newTableId); - } - - if (errorTransition) - { - throw new DatabusRuntimeException("TableName and TableId should change simultaneously or not" + - " _currTableName = " + _currTableName + - " _curTableId = " + _currTableId + - " newTableName = " + newTableName + - " newTableId = " + newTableId); - } + String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase(); + startSource(newTableName); } private void startXtion(QueryEvent e) @@ -273,57 +235,29 @@ private void rollbackXtion(QueryEvent e) private void reset() { - _currTableName = ""; - _currTableId = -1; - _perSourceTransaction = null; _transaction = null; _currTxnSizeInBytes = 0; } - private void startSource(String newTableName, long newTableId) + private void startSource(String newTableName) { - _currTableName = newTableName; - _currTableId = newTableId; - if (_perSourceTransaction == null) - { - Short srcId = _tableUriToSrcIdMap.get(_currTableName); + Short srcId = _tableUriToSrcIdMap.get(newTableName); - if (null == srcId) - { - throw new DatabusRuntimeException("Could not find a matching logical source for table Uri (" + _currTableName + ")" ); - } - assert(_transaction != null); - _perSourceTransaction = new PerSourceTransaction(srcId); - _transaction.mergePerSourceTransaction(_perSourceTransaction); - } - else - { - throw new DatabusRuntimeException("Seems like a startSource has been received without an endSource for previous source"); - } - } - - private void endSource() - { - if (_perSourceTransaction != null) - { - _perSourceTransaction = null; - } - else + assert (_transaction != null); + if (_transaction.getPerSourceTransaction(srcId) == null) { - throw new DatabusRuntimeException("_perSourceTransaction should not be null in endSource()"); + _transaction.mergePerSourceTransaction(new PerSourceTransaction(srcId)); } } private void deleteRows(DeleteRowsEventV2 dre) { - List lp = dre.getRows(); - frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE); + frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE); } private void deleteRows(DeleteRowsEvent dre) { - List lp = dre.getRows(); - frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE); + frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE); } private void updateRows(UpdateRowsEvent ure) @@ -335,7 +269,7 @@ private void updateRows(UpdateRowsEvent ure) Row r = pr.getAfter(); lr.add(r); } - frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT); + frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT); } private void updateRows(UpdateRowsEventV2 ure) @@ -347,27 +281,29 @@ private void updateRows(UpdateRowsEventV2 ure) Row r = pr.getAfter(); lr.add(r); } - frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT); + frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT); } private void insertRows(WriteRowsEvent wre) { - frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT); + frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT); } private void insertRows(WriteRowsEventV2 wre) { - frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT); + frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT); } - private void frameAvroRecord(BinlogEventV4Header bh, List rl, final DbusOpcode doc) + private void frameAvroRecord(long tableId, BinlogEventV4Header bh, List rl, final DbusOpcode doc) { try { final long timestampInNanos = bh.getTimestamp() * 1000000L; final long scn = scn(_currFileNum, (int)bh.getPosition()); final boolean isReplicated = false; - VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(_currTableName)); + final TableMapEvent tme = _tableMapEvents.get(tableId); + String tableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase(); + VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(tableName)); Schema schema = vs.getSchema(); if ( _log.isDebugEnabled()) @@ -382,7 +318,7 @@ private void frameAvroRecord(BinlogEventV4Header bh, List rl, final DbusOpc List kps = generateKeyPair(cl, schema); DbChangeEntry db = new DbChangeEntry(scn, timestampInNanos, gr, doc, isReplicated, schema, kps); - _perSourceTransaction.mergeDbChangeEntrySet(db); + _transaction.getPerSourceTransaction(_tableUriToSrcIdMap.get(tableName)).mergeDbChangeEntrySet(db); } } catch (NoSuchSchemaException ne) { @@ -734,6 +670,7 @@ else if (event instanceof RotateEvent) _log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix); String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1); _currFileNum = Integer.parseInt(fileNumStr); + _tableMapEvents.clear(); continue; }