Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,9 @@ public interface TransactionProcessor
public abstract void onEndTransaction(Transaction txn) throws DatabusException;
};

/** Track current table Id as reported in bin logs */
private long _currTableId = -1;

/** Track current table name as reported in bin logs */
private String _currTableName = "";

/** Transaction object containing the current transaction that is being built from the Binlog **/
private Transaction _transaction = null;

/** Batch of events corresponding to single source (table) in the current transaction **/
private PerSourceTransaction _perSourceTransaction = null;

/** Track current file number for generating SCN **/
private int _currFileNum;

Expand Down Expand Up @@ -139,6 +130,10 @@ public interface TransactionProcessor
/** Flag to indicate the begining of the txn is seen. Used to indicate **/
private boolean _isBeginTxnSeen = false;

/** Track all the table map events, cleared when the binlog rotated **/
private final Map<Long, TableMapEvent> _tableMapEvents = new HashMap<Long, TableMapEvent>();

/** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/
private BlockingQueue<BinlogEventV4> _binlogEventQueue = null;

public ORListener(String name,
Expand Down Expand Up @@ -177,43 +172,10 @@ public void onEvents(BinlogEventV4 event)

private void processTableMapEvent(TableMapEvent tme)
{
String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
long newTableId = tme.getTableId();

final boolean areTableNamesEqual = _currTableName.equals(newTableName);
final boolean areTableIdsEqual = (_currTableId == newTableId);
final boolean didTableNameChange = !(areTableNamesEqual && areTableIdsEqual);
final boolean errorTransition = (areTableNamesEqual && !areTableIdsEqual) || (!areTableNamesEqual && areTableIdsEqual);
_tableMapEvents.put(tme.getTableId(), tme);

if (_currTableName.isEmpty() && (_currTableId == -1))
{
// First TableMapEvent for the transaction. Indicates the first event in the transaction is yet to come
startSource(newTableName, newTableId);
}
else if (didTableNameChange)
{
// Event will come for a new source. Invoke an endSource on currTableName, and a startSource on newTableName
endSource();
startSource(newTableName, newTableId);
}
else
{
_log.error("Unexpected : TableMap Event obtained :" + tme);
throw new DatabusRuntimeException("Unexpected : TableMap Event obtained :" +
" _currTableName = " + _currTableName +
" _curTableId = " + _currTableId +
" newTableName = " + newTableName +
" newTableId = " + newTableId);
}

if (errorTransition)
{
throw new DatabusRuntimeException("TableName and TableId should change simultaneously or not" +
" _currTableName = " + _currTableName +
" _curTableId = " + _currTableId +
" newTableName = " + newTableName +
" newTableId = " + newTableId);
}
String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
startSource(newTableName);
}

private void startXtion(QueryEvent e)
Expand Down Expand Up @@ -273,57 +235,29 @@ private void rollbackXtion(QueryEvent e)

private void reset()
{
_currTableName = "";
_currTableId = -1;
_perSourceTransaction = null;
_transaction = null;
_currTxnSizeInBytes = 0;
}

private void startSource(String newTableName, long newTableId)
private void startSource(String newTableName)
{
_currTableName = newTableName;
_currTableId = newTableId;
if (_perSourceTransaction == null)
{
Short srcId = _tableUriToSrcIdMap.get(_currTableName);
Short srcId = _tableUriToSrcIdMap.get(newTableName);

if (null == srcId)
{
throw new DatabusRuntimeException("Could not find a matching logical source for table Uri (" + _currTableName + ")" );
}
assert(_transaction != null);
_perSourceTransaction = new PerSourceTransaction(srcId);
_transaction.mergePerSourceTransaction(_perSourceTransaction);
}
else
{
throw new DatabusRuntimeException("Seems like a startSource has been received without an endSource for previous source");
}
}

private void endSource()
{
if (_perSourceTransaction != null)
{
_perSourceTransaction = null;
}
else
assert (_transaction != null);
if (_transaction.getPerSourceTransaction(srcId) == null)
{
throw new DatabusRuntimeException("_perSourceTransaction should not be null in endSource()");
_transaction.mergePerSourceTransaction(new PerSourceTransaction(srcId));
}
}

private void deleteRows(DeleteRowsEventV2 dre)
{
List<Row> lp = dre.getRows();
frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE);
frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE);
}

private void deleteRows(DeleteRowsEvent dre)
{
List<Row> lp = dre.getRows();
frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE);
frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE);
}

private void updateRows(UpdateRowsEvent ure)
Expand All @@ -335,7 +269,7 @@ private void updateRows(UpdateRowsEvent ure)
Row r = pr.getAfter();
lr.add(r);
}
frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT);
frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT);
}

private void updateRows(UpdateRowsEventV2 ure)
Expand All @@ -347,27 +281,29 @@ private void updateRows(UpdateRowsEventV2 ure)
Row r = pr.getAfter();
lr.add(r);
}
frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT);
frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT);
}

private void insertRows(WriteRowsEvent wre)
{
frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
}

private void insertRows(WriteRowsEventV2 wre)
{
frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
}

private void frameAvroRecord(BinlogEventV4Header bh, List<Row> rl, final DbusOpcode doc)
private void frameAvroRecord(long tableId, BinlogEventV4Header bh, List<Row> rl, final DbusOpcode doc)
{
try
{
final long timestampInNanos = bh.getTimestamp() * 1000000L;
final long scn = scn(_currFileNum, (int)bh.getPosition());
final boolean isReplicated = false;
VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(_currTableName));
final TableMapEvent tme = _tableMapEvents.get(tableId);
String tableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(tableName));
Schema schema = vs.getSchema();

if ( _log.isDebugEnabled())
Expand All @@ -382,7 +318,7 @@ private void frameAvroRecord(BinlogEventV4Header bh, List<Row> rl, final DbusOpc
List<KeyPair> kps = generateKeyPair(cl, schema);

DbChangeEntry db = new DbChangeEntry(scn, timestampInNanos, gr, doc, isReplicated, schema, kps);
_perSourceTransaction.mergeDbChangeEntrySet(db);
_transaction.getPerSourceTransaction(_tableUriToSrcIdMap.get(tableName)).mergeDbChangeEntrySet(db);
}
} catch (NoSuchSchemaException ne)
{
Expand Down Expand Up @@ -734,6 +670,7 @@ else if (event instanceof RotateEvent)
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
_currFileNum = Integer.parseInt(fileNumStr);
_tableMapEvents.clear();
continue;
}

Expand Down