Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public interface TransactionProcessor
/** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/
private BlockingQueue<BinlogEventV4> _binlogEventQueue = null;

/** Milli sec timeout for _binlogEventQueue operation **/
private long _queueTimeoutMs = 100L;

public ORListener(String name,
int currentFileNumber,
Logger log,
Expand All @@ -144,7 +147,8 @@ public ORListener(String name,
Map<String, Short> tableUriToSrcIdMap,
Map<String, String> tableUriToSrcNameMap,
SchemaRegistryService schemaRegistryService,
int maxQueueSize)
int maxQueueSize,
long queueTimeoutMs)
{
super("ORListener_" + name);
_log = log;
Expand All @@ -155,6 +159,7 @@ public ORListener(String name,
_schemaRegistryService = schemaRegistryService;
_currFileNum = currentFileNumber;
_binlogEventQueue = new LinkedBlockingQueue<BinlogEventV4>(maxQueueSize);
_queueTimeoutMs = queueTimeoutMs;
}

@Override
Expand All @@ -163,7 +168,7 @@ public void onEvents(BinlogEventV4 event)
boolean isPut = false;
do {
try {
isPut = _binlogEventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
isPut = _binlogEventQueue.offer(event, _queueTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
_log.error("failed to put binlog event to binlogEventQueue event: " + event, e);
}
Expand Down Expand Up @@ -638,9 +643,24 @@ public void run() {

eventList.clear();
int eventNumber = _binlogEventQueue.drainTo(eventList);
if (eventNumber == 0)
{
try
{
event = _binlogEventQueue.poll(_queueTimeoutMs, TimeUnit.MILLISECONDS);
if (event != null)
{
eventList.add(event);
eventNumber = eventList.size();
}
}
catch (InterruptedException e)
{
_log.info("Interrupted when poll from _binlogEventQueue!!");
}
}
for (int i = 0; i < eventNumber; i++)
{

event = eventList.get(i);
if (event == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ void initOpenReplicator(long scn)
String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);
// we should use a new ORListener to drop the left events in binlogEventQueue and the half processed transaction.
_orListener = new ORListener(_sourceName, logid, _log, _binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,
_tableUriToSrcNameMap, _schemaRegistryService, 200);
_tableUriToSrcNameMap, _schemaRegistryService, 200, 100L);

_or.setBinlogFileName(binlogFile);
_or.setBinlogPosition(offset);
Expand Down