Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand All @@ -23,6 +26,7 @@
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.RotateEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
Expand Down Expand Up @@ -54,6 +58,7 @@
import com.google.code.or.common.glossary.column.TinyColumn;
import com.google.code.or.common.glossary.column.YearColumn;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DatabusThreadBase;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.producers.ds.DbChangeEntry;
Expand All @@ -74,7 +79,7 @@
* This class is responsible for converting Bin log events to Avro records using schemaRegistry and calling an application callback
* to let the application generate DbusEvent and append to EventBuffer.
*/
class ORListener implements BinlogEventListener
class ORListener extends DatabusThreadBase implements BinlogEventListener
{
/**
* Application Callback from this class to process one transaction
Expand Down Expand Up @@ -134,143 +139,40 @@ public interface TransactionProcessor
/** Flag to indicate the begining of the txn is seen. Used to indicate **/
private boolean _isBeginTxnSeen = false;

public ORListener(int currentFileNumber,
private BlockingQueue<BinlogEventV4> binlogEventQueue = null;

public ORListener(String name,
int currentFileNumber,
Logger log,
String binlogFilePrefix,
TransactionProcessor txnProcessor,
Map<String, Short> tableUriToSrcIdMap,
Map<String, String> tableUriToSrcNameMap,
SchemaRegistryService schemaRegistryService)
SchemaRegistryService schemaRegistryService,
int maxQueueSize)
{
super("ORListener_" + name);
_log = log;
_txnProcessor = txnProcessor;
_binlogFilePrefix = binlogFilePrefix;
_tableUriToSrcIdMap = tableUriToSrcIdMap;
_tableUriToSrcNameMap = tableUriToSrcNameMap;
_schemaRegistryService = schemaRegistryService;
_currFileNum = currentFileNumber;
binlogEventQueue = new LinkedBlockingQueue<BinlogEventV4>(maxQueueSize);
}

@Override
public void onEvents(BinlogEventV4 event)
{
if ( event == null)
{
_log.error("Received null event");
return;
}

// Beginning of Txn
if (event instanceof QueryEvent)
{
QueryEvent qe = (QueryEvent)event;
String sql = qe.getSql().toString();
if ("BEGIN".equalsIgnoreCase(sql))
{
_isBeginTxnSeen = true;
_log.info("BEGIN sql: " + sql);
_currTxnSizeInBytes = event.getHeader().getEventLength();
startXtion(qe);
return;
}
}

if ( ! _isBeginTxnSeen )
{
if (_log.isDebugEnabled())
{
_log.debug("Skipping event (" + event
+ ") as this is before the start of first transaction");
boolean isPut = false;
do {
try {
isPut = binlogEventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
_log.error("failed to put binlog event to binlogEventQueue event: " + event, e);
}
return;
}

_currTxnSizeInBytes += event.getHeader().getEventLength();

if (event instanceof QueryEvent)
{
QueryEvent qe = (QueryEvent)event;
String sql = qe.getSql().toString();
if ("COMMIT".equalsIgnoreCase(sql))
{
_log.debug("COMMIT sql: " + sql);
endXtion(qe);
return;
}
else if ("ROLLBACK".equalsIgnoreCase(sql))
{
_log.debug("ROLLBACK sql: " + sql);
rollbackXtion(qe);
return;
}
else
{
// Ignore DDL statements for now
_log.debug("Likely DDL statement sql: " + sql);
return;
}
}
else if (event instanceof RotateEvent)
{
RotateEvent re = (RotateEvent)event;
String fileName = re.getBinlogFileName().toString();
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
_currFileNum = Integer.parseInt(fileNumStr);
}
else if (event instanceof XidEvent)
{
XidEvent xe = (XidEvent)event;
long xid = xe.getXid();
_log.debug("Treating XID event with xid = " + xid + " as commit for the transaction");
endXtion(xe);
return;
}
else if (event instanceof WriteRowsEvent)
{
WriteRowsEvent wre = (WriteRowsEvent)event;
insertRows(wre);
}
else if (event instanceof WriteRowsEventV2)
{
WriteRowsEventV2 wre = (WriteRowsEventV2)event;
insertRows(wre);
}
else if (event instanceof UpdateRowsEvent)
{
UpdateRowsEvent ure = (UpdateRowsEvent)event;
updateRows(ure);
}
else if (event instanceof UpdateRowsEventV2)
{
UpdateRowsEventV2 ure = (UpdateRowsEventV2)event;
updateRows(ure);
}
else if (event instanceof DeleteRowsEventV2)
{
DeleteRowsEventV2 dre = (DeleteRowsEventV2)event;
deleteRows(dre);
}
else if (event instanceof DeleteRowsEvent)
{
DeleteRowsEvent dre = (DeleteRowsEvent)event;
deleteRows(dre);
}
else if (event instanceof TableMapEvent)
{
TableMapEvent tme = (TableMapEvent)event;
processTableMapEvent(tme);
}
else
{
_log.warn("Skipping !! Unknown OR event e: " + event);
return;
}

if ( _log.isDebugEnabled())
{
_log.debug("e: " + event);
}
} while (!isPut && !isShutdownRequested());
}


Expand Down Expand Up @@ -774,4 +676,153 @@ public static long scn(int logId, int offset)
scn |= offset;
return scn;
}

@Override
public void run() {
List<BinlogEventV4> eventList = new ArrayList<BinlogEventV4>();
BinlogEventV4 event;
boolean isShutdown = false;
while (!isShutdown)
{
eventList.clear();
int eventNumber = binlogEventQueue.drainTo(eventList);
for (int i = 0; i < eventNumber; i++)
{
if (isShutdownRequested())
{
isShutdown = true;
break;
}

event = eventList.get(i);
if (event == null)
{
_log.error("Received null event");
continue;
}

try {
// Beginning of Txn
if (event instanceof QueryEvent)
{
QueryEvent qe = (QueryEvent)event;
String sql = qe.getSql().toString();
if ("BEGIN".equalsIgnoreCase(sql))
{
_isBeginTxnSeen = true;
_log.info("BEGIN sql: " + sql);
_currTxnSizeInBytes = event.getHeader().getEventLength();
startXtion(qe);
continue;
}
}

if ( ! _isBeginTxnSeen )
{
if (_log.isDebugEnabled())
{
_log.debug("Skipping event (" + event
+ ") as this is before the start of first transaction");
}
continue;
}

_currTxnSizeInBytes += event.getHeader().getEventLength();

if (event instanceof QueryEvent)
{
QueryEvent qe = (QueryEvent)event;
String sql = qe.getSql().toString();
if ("COMMIT".equalsIgnoreCase(sql))
{
_log.debug("COMMIT sql: " + sql);
endXtion(qe);
continue;
}
else if ("ROLLBACK".equalsIgnoreCase(sql))
{
_log.debug("ROLLBACK sql: " + sql);
rollbackXtion(qe);
continue;
}
else
{
// Ignore DDL statements for now
_log.debug("Likely DDL statement sql: " + sql);
continue;
}
}
else if (event instanceof RotateEvent)
{
RotateEvent re = (RotateEvent)event;
String fileName = re.getBinlogFileName().toString();
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
_currFileNum = Integer.parseInt(fileNumStr);
}
else if (event instanceof XidEvent)
{
XidEvent xe = (XidEvent)event;
long xid = xe.getXid();
_log.debug("Treating XID event with xid = " + xid + " as commit for the transaction");
endXtion(xe);
continue;
}
else if (event instanceof WriteRowsEvent)
{
WriteRowsEvent wre = (WriteRowsEvent)event;
insertRows(wre);
}
else if (event instanceof WriteRowsEventV2)
{
WriteRowsEventV2 wre = (WriteRowsEventV2)event;
insertRows(wre);
}
else if (event instanceof UpdateRowsEvent)
{
UpdateRowsEvent ure = (UpdateRowsEvent)event;
updateRows(ure);
}
else if (event instanceof UpdateRowsEventV2)
{
UpdateRowsEventV2 ure = (UpdateRowsEventV2)event;
updateRows(ure);
}
else if (event instanceof DeleteRowsEventV2)
{
DeleteRowsEventV2 dre = (DeleteRowsEventV2)event;
deleteRows(dre);
}
else if (event instanceof DeleteRowsEvent)
{
DeleteRowsEvent dre = (DeleteRowsEvent)event;
deleteRows(dre);
}
else if (event instanceof TableMapEvent)
{
TableMapEvent tme = (TableMapEvent)event;
processTableMapEvent(tme);
}
else if (event instanceof FormatDescriptionEvent)
{
// we don't need process this event
_log.info("receive FormatDescriptionEvent event");
}
else
{
_log.warn("Skipping !! Unknown OR event e: " + event);
continue;
}

if ( _log.isDebugEnabled())
{
_log.debug("e: " + event);
}
} catch (Exception e) {
_log.error("failed to process binlog event, event: " + event, e);
}
}
}
_log.info("ORListener Thread done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,14 @@ public class EventProducerThread extends DatabusThreadBase implements Transactio

private final long _sinceScn;

private ORListener _orListener;

private String _sourceName;

public EventProducerThread(String sourceName, long sinceScn)
{
super("OpenReplicator_" + sourceName);
_sourceName = sourceName;
_sinceScn = sinceScn;
}

Expand All @@ -324,18 +329,19 @@ public void run()
int logid = logid(_sinceScn);

String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);
ORListener orl = new ORListener(logid, _log, _binlogFilePrefix,
_producerThread, _tableUriToSrcIdMap, _tableUriToSrcNameMap,
_schemaRegistryService);
_orListener = new ORListener(_sourceName, logid, _log,
_binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,
_tableUriToSrcNameMap, _schemaRegistryService, 200);

_or.setBinlogFileName(binlogFile);
_or.setBinlogPosition(offset);
_or.setBinlogEventListener(orl);
_or.setBinlogEventListener(_orListener);

try
{
_log.info(String.format("Open Replicator starting from %s@%d", binlogFile, offset));
_or.start();
_orListener.start();
} catch (Exception e)
{
throw new DatabusRuntimeException("failed to start open replicator: " + e.getMessage(), e);
Expand Down Expand Up @@ -382,6 +388,9 @@ public void onEndTransaction(Transaction txn)
{
try
{
// Because the current thread is orListener thread, so shutdown() will block the thread itself.
// So we just set orListener's shutdown flag, the orListener thead will exit immediately after this function.
_orListener.shutdownAsynchronously();
_or.stop(10, TimeUnit.SECONDS);
}
catch (Exception e)
Expand Down