Skip to content

Commit

Permalink
Improved factoring:
Browse files Browse the repository at this point in the history
1. AbstractBinaryLogClient now no longer depends on EventListener or LifecycleListener.
2. LifecycleListener goes back to accepting BinaryLogClient arguments.

Still haven't run tests.
  • Loading branch information
ldcasillas-progreso committed Apr 18, 2014
1 parent 8edbda1 commit f39aceb
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void connect() throws IOException {
if (logger.isLoggable(Level.INFO)) {
logger.info("Connected to " + hostname + ":" + port + " at " + binlogFilename + "/" + binlogPosition);
}
getLifecycleListener().onConnect(this);
onConnect();
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
Expand Down Expand Up @@ -321,7 +321,7 @@ private void listenForEventPackets() throws IOException {
event = eventDeserializer.nextEvent(inputStream);
} catch (Exception e) {
if (isConnected()) {
getLifecycleListener().onEventDeserializationFailure(this, e);
onEventDeserializationFailure(e);
}
continue;
}
Expand All @@ -332,7 +332,7 @@ private void listenForEventPackets() throws IOException {
}
} catch (Exception e) {
if (isConnected()) {
getLifecycleListener().onCommunicationFailure(this, e);
onCommunicationFailure(e);
}
} finally {
if (isConnected()) {
Expand Down Expand Up @@ -369,7 +369,7 @@ private ResultSetRowPacket[] readResultSet() throws IOException {
}

private void notifyEventListener(Event event) {
getEventListener().onEvent(event);
onEvent(event);
}

/**
Expand Down Expand Up @@ -415,12 +415,34 @@ private void disconnectChannel() throws IOException {
channel.close();
}
} finally {
getLifecycleListener().onDisconnect(this);
onDisconnect();
}
}

public abstract BinaryLogClient.LifecycleListener getLifecycleListener();
/**
* Invoked once for each {@link Event}, in the order they are processed.
*/
protected abstract void onEvent(Event event);

public abstract BinaryLogClient.EventListener getEventListener();
/**
* Invoked when a connection is established.
*/
protected abstract void onConnect();

/**
* It's guarantied to be called before {@link #onDisconnect()}) in case of communication failure.
*/
protected abstract void onCommunicationFailure(Exception ex);

/**
* Called in case of failed event deserialization. Note this type of error does NOT cause client to
* disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually.
*/
protected abstract void onEventDeserializationFailure(Exception ex);

/**
* Called upon disconnect (regardless of the reason).
*/
protected abstract void onDisconnect();

}
116 changes: 67 additions & 49 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,17 @@
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.SocketFactory;
import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket;
import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket;
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;

import java.io.EOFException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -126,7 +100,7 @@ public void connect(long timeoutInMilliseconds) throws IOException, TimeoutExcep
final CountDownLatch countDownLatch = new CountDownLatch(1);
BinaryLogClient.AbstractLifecycleListener connectListener = new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onConnect(AbstractBinaryLogClient client) {
public void onConnect(BinaryLogClient client) {
countDownLatch.countDown();
}
};
Expand Down Expand Up @@ -164,8 +138,8 @@ public void run() {
}

@Override
public EventListener getEventListener() {
return eventListener;
protected void onEvent(Event event) {
eventListener.onEvent(event);
}

/**
Expand Down Expand Up @@ -197,9 +171,25 @@ public void unregisterEventListener(EventListener listener) {
eventListener.unregisterEventListener(listener);
}


@Override
public LifecycleListener getLifecycleListener() {
return lifecycleListener;
protected void onConnect() {
lifecycleListener.onConnect(this);
}

@Override
protected void onCommunicationFailure(Exception ex) {
lifecycleListener.onCommunicationFailure(this, ex);
}

@Override
protected void onEventDeserializationFailure(Exception ex) {
lifecycleListener.onEventDeserializationFailure(this, ex);
}

@Override
protected void onDisconnect() {
lifecycleListener.onDisconnect(this);
}

/**
Expand Down Expand Up @@ -239,9 +229,16 @@ public interface EventListener {
void onEvent(Event event);
}

/**
* An {@link EventListener} that rebroadcasts events to a dynamically managed list of other event listeners.
*/
public class BroadcastEventListener implements EventListener {
private final List<EventListener> eventListeners = new LinkedList<EventListener>();

/**
* Rebroadcast the event to the child listeners. If any of the children throws an exception, we log it and
* continue with the next one.
*/
@Override
public void onEvent(Event event) {
for (BinaryLogClient.EventListener eventListener : eventListeners) {
Expand Down Expand Up @@ -305,25 +302,27 @@ public interface LifecycleListener {

/**
* Called once client has successfully logged in but before started to receive binlog events.
* @param client
*/
void onConnect(AbstractBinaryLogClient client);
void onConnect(BinaryLogClient client);

/**
* It's guarantied to be called before {@link #onDisconnect(AbstractBinaryLogClient)}) in case of
* It's guarantied to be called before {@link #onDisconnect(BinaryLogClient)}) in case of
* communication failure.
*/
void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex);
void onCommunicationFailure(BinaryLogClient client, Exception ex);

/**
* Called in case of failed event deserialization. Note this type of error does NOT cause client to
* disconnect. If you wish to stop receiving events you'll need to fire client.disconnect() manually.
*/
void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex);
void onEventDeserializationFailure(BinaryLogClient client, Exception ex);

/**
* Called upon disconnect (regardless of the reason).
* @param client
*/
void onDisconnect(AbstractBinaryLogClient client);
void onDisconnect(BinaryLogClient client);
}

/**
Expand All @@ -332,44 +331,63 @@ public interface LifecycleListener {
public static abstract class AbstractLifecycleListener implements LifecycleListener {

@Override
public void onConnect(AbstractBinaryLogClient client) {
public void onConnect(BinaryLogClient client) {
}

@Override
public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
}

@Override
public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
}

@Override
public void onDisconnect(AbstractBinaryLogClient client) {
public void onDisconnect(BinaryLogClient client) {
}

}

/**
* A {@link LifecycleListener} that rebroadcasts events to a dynamic list of children.
*/
public static class BroadcastLifecycleListener implements LifecycleListener {
final List<LifecycleListener> lifecycleListeners = new LinkedList<LifecycleListener>();

@Override
public void onConnect(AbstractBinaryLogClient client) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onConnect(BinaryLogClient client) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onConnect(client);
}
}
}

@Override
public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onCommunicationFailure(client, ex);
}
}
}

@Override
public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onEventDeserializationFailure(client, ex);
}
}
}

@Override
public void onDisconnect(AbstractBinaryLogClient client) {
throw new UnsupportedOperationException("UNIMPLEMENTED"); // TODO
public void onDisconnect(BinaryLogClient client) {
synchronized(lifecycleListeners) {
for (LifecycleListener listener : lifecycleListeners) {
listener.onDisconnect(client);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.github.shyiko.mysql.binlog.jmx;

import com.github.shyiko.mysql.binlog.AbstractBinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
Expand Down Expand Up @@ -100,24 +99,24 @@ public void onEvent(Event event) {
}

@Override
public void onEventDeserializationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
numberOfSkippedEvents.getAndIncrement();
lastEventHeader.set(null);
timestampOfLastEvent.set(getCurrentTimeMillis());
totalNumberOfEventsSeen.getAndIncrement();
}

@Override
public void onDisconnect(AbstractBinaryLogClient client) {
public void onDisconnect(BinaryLogClient client) {
numberOfDisconnects.getAndIncrement();
}

@Override
public void onConnect(AbstractBinaryLogClient client) {
public void onConnect(BinaryLogClient client) {
}

@Override
public void onCommunicationFailure(AbstractBinaryLogClient client, Exception ex) {
public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
}

protected long getCurrentTimeMillis() {
Expand Down

0 comments on commit f39aceb

Please sign in to comment.