Skip to content

Commit

Permalink
Merge pull request #47 from zendesk/config_to_context
Browse files Browse the repository at this point in the history
REFACTOR-ONLY refactor some functionality out of MaxwellConfig
  • Loading branch information
Ben Osheroff committed Apr 20, 2015
2 parents 944ea61 + 2e8ab33 commit f41354a
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 124 deletions.
18 changes: 9 additions & 9 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Expand Up @@ -2,7 +2,6 @@

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.slf4j.Logger;
Expand All @@ -17,6 +16,7 @@
public class Maxwell {
private Schema schema;
private MaxwellConfig config;
private MaxwellContext context;
static final Logger LOGGER = LoggerFactory.getLogger(Maxwell.class);

private void initFirstRun(Connection connection) throws SQLException, IOException, SchemaSyncError {
Expand All @@ -28,19 +28,20 @@ private void initFirstRun(Connection connection) throws SQLException, IOExceptio
SchemaStore store = new SchemaStore(connection, this.schema, pos);
store.save();

this.config.setInitialPosition(pos);
this.context.setInitialPosition(pos);
}

private void run(String[] args) throws Exception {
this.config = MaxwellConfig.buildConfig("config.properties", args);
this.context = new MaxwellContext(this.config);

try ( Connection connection = this.config.getConnectionPool().getConnection() ) {
try ( Connection connection = this.context.getConnectionPool().getConnection() ) {
MaxwellMysqlStatus.ensureMysqlState(connection);
SchemaStore.ensureMaxwellSchema(connection);

if ( this.config.getInitialPosition() != null ) {
LOGGER.info("Maxwell is booting, starting at " + this.config.getInitialPosition());
SchemaStore store = SchemaStore.restore(connection, this.config.getInitialPosition());
if ( this.context.getInitialPosition() != null ) {
LOGGER.info("Maxwell is booting, starting at " + this.context.getInitialPosition());
SchemaStore store = SchemaStore.restore(connection, this.context.getInitialPosition());
this.schema = store.getSchema();
} else {
initFirstRun(connection);
Expand All @@ -51,10 +52,9 @@ private void run(String[] args) throws Exception {
return;
}

AbstractProducer producer = this.config.getProducer();
AbstractProducer producer = this.context.getProducer();

MaxwellParser p = new MaxwellParser(this.schema, producer);
p.setConfig(this.config);
MaxwellParser p = new MaxwellParser(this.schema, producer, this.context, this.context.getInitialPosition());
p.run();

}
Expand Down
85 changes: 7 additions & 78 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
@@ -1,27 +1,21 @@
package com.zendesk.maxwell;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Enumeration;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import snaq.db.ConnectionPool;
import joptsimple.BuiltinHelpFormatter;
import joptsimple.OptionParser;
import joptsimple.OptionSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.FileProducer;
import com.zendesk.maxwell.producer.MaxwellKafkaProducer;
import com.zendesk.maxwell.schema.SchemaPosition;

public class MaxwellConfig {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellConfig.class);
Expand All @@ -31,15 +25,11 @@ public class MaxwellConfig {
public String mysqlUser;
public String mysqlPassword;

private BinlogPosition initialPosition;
private final Properties kafkaProperties;
private String kafkaTopic;
private String producerType;
private String outputFile;
private Long serverID;
private SchemaPosition schemaPosition;
public final Properties kafkaProperties;
public String kafkaTopic;
public String producerType;
public String outputFile;

private ConnectionPool connectionPool;

public MaxwellConfig() {
this.kafkaProperties = new Properties();
Expand All @@ -51,40 +41,6 @@ public String getConnectionURI() {
return "jdbc:mysql://" + mysqlHost + ":" + mysqlPort;
}

public ConnectionPool getConnectionPool() {
if ( this.connectionPool != null )
return this.connectionPool;

this.connectionPool = new ConnectionPool("MaxwellConnectionPool", 10, 0, 10, getConnectionURI(), mysqlUser, mysqlPassword);
return this.connectionPool;
}

public void terminate() {
this.schemaPosition.stop();
this.schemaPosition = null;
this.connectionPool.release();
this.connectionPool = null;
}

private SchemaPosition getSchemaPosition() throws SQLException {
if ( this.schemaPosition == null ) {
this.schemaPosition = new SchemaPosition(this.getConnectionPool(), this.getServerID());
this.schemaPosition.start();
}
return this.schemaPosition;
}

public BinlogPosition getInitialPosition() throws FileNotFoundException, IOException, SQLException {
if ( this.initialPosition != null )
return this.initialPosition;

this.initialPosition = getSchemaPosition().get();
return this.initialPosition;
}

public void setInitialPosition(BinlogPosition position) throws SQLException {
this.getSchemaPosition().set(position);
}

private OptionParser getOptionParser() {
OptionParser parser = new OptionParser();
Expand Down Expand Up @@ -196,31 +152,4 @@ private void usage(String string) {
public Properties getKafkaProperties() {
return this.kafkaProperties;
}

public AbstractProducer getProducer() throws IOException {
switch ( this.producerType ) {
case "file":
return new FileProducer(this, this.outputFile);
case "kafka":
return new MaxwellKafkaProducer(this, this.kafkaProperties, this.kafkaTopic);
case "stdout":
default:
return new StdoutProducer(this);
}
}

public Long getServerID() throws SQLException {
if ( this.serverID != null)
return this.serverID;

try ( Connection c = getConnectionPool().getConnection() ) {
ResultSet rs = c.createStatement().executeQuery("SELECT @@server_id as server_id");
if ( !rs.next() ) {
throw new RuntimeException("Could not retrieve server_id!");
}
this.serverID = rs.getLong("server_id");
return this.serverID;
}
}

}
88 changes: 88 additions & 0 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
@@ -0,0 +1,88 @@
package com.zendesk.maxwell;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

import com.zendesk.maxwell.producer.*;
import com.zendesk.maxwell.schema.SchemaPosition;

import snaq.db.ConnectionPool;

public class MaxwellContext {
private final ConnectionPool connectionPool;
private final MaxwellConfig config;
private SchemaPosition schemaPosition;
private Long serverID;
private BinlogPosition initialPosition;

public MaxwellContext(MaxwellConfig config) {
this.config = config;
this.connectionPool = new ConnectionPool("MaxwellConnectionPool", 10, 0, 10,
config.getConnectionURI(), config.mysqlUser, config.mysqlPassword);
}

public MaxwellConfig getConfig() {
return this.config;
}

public ConnectionPool getConnectionPool() {
return this.connectionPool;
}

public void terminate() {
if ( this.schemaPosition != null ) {
this.schemaPosition.stop();
this.schemaPosition = null;
}
this.connectionPool.release();
}

private SchemaPosition getSchemaPosition() throws SQLException {
if ( this.schemaPosition == null ) {
this.schemaPosition = new SchemaPosition(this.getConnectionPool(), this.getServerID());
this.schemaPosition.start();
}
return this.schemaPosition;
}

public BinlogPosition getInitialPosition() throws FileNotFoundException, IOException, SQLException {
if ( this.initialPosition != null )
return this.initialPosition;

this.initialPosition = getSchemaPosition().get();
return this.initialPosition;
}

public void setInitialPosition(BinlogPosition position) throws SQLException {
this.getSchemaPosition().set(position);
}

public Long getServerID() throws SQLException {
if ( this.serverID != null)
return this.serverID;

try ( Connection c = getConnectionPool().getConnection() ) {
ResultSet rs = c.createStatement().executeQuery("SELECT @@server_id as server_id");
if ( !rs.next() ) {
throw new RuntimeException("Could not retrieve server_id!");
}
this.serverID = rs.getLong("server_id");
return this.serverID;
}
}

public AbstractProducer getProducer() throws IOException {
switch ( this.config.producerType ) {
case "file":
return new FileProducer(this, this.config.outputFile);
case "kafka":
return new MaxwellKafkaProducer(this, this.config.getKafkaProperties(), this.config.kafkaTopic);
case "stdout":
default:
return new StdoutProducer(this);
}
}
}
27 changes: 13 additions & 14 deletions src/main/java/com/zendesk/maxwell/MaxwellParser.java
@@ -1,5 +1,5 @@
package com.zendesk.maxwell;
import java.io.FileNotFoundException;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
Expand Down Expand Up @@ -41,37 +41,36 @@ public class MaxwellParser {

private final MaxwellTableCache tableCache = new MaxwellTableCache();
private final OpenReplicator replicator;
private MaxwellConfig config;
private final MaxwellContext context;
private final AbstractProducer producer;

static final Logger LOGGER = LoggerFactory.getLogger(MaxwellParser.class);

public MaxwellParser(Schema currentSchema, AbstractProducer producer) throws Exception {
public MaxwellParser(Schema currentSchema, AbstractProducer producer, MaxwellContext ctx, BinlogPosition start) throws Exception {
this.schema = currentSchema;

TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
this.binlogEventListener = new MaxwellBinlogEventListener(queue);

this.replicator = new OpenReplicator();

this.replicator.setBinlogEventListener(this.binlogEventListener);

this.replicator.setHost(ctx.getConfig().mysqlHost);
this.replicator.setUser(ctx.getConfig().mysqlUser);
this.replicator.setPassword(ctx.getConfig().mysqlPassword);
this.replicator.setPort(ctx.getConfig().mysqlPort);

this.producer = producer;

this.context = ctx;
this.setBinlogPosition(start);
}

public void setBinlogPosition(BinlogPosition p) {
this.replicator.setBinlogFileName(p.getFile());
this.replicator.setBinlogPosition(p.getOffset());
}

public void setConfig(MaxwellConfig c) throws FileNotFoundException, IOException, SQLException {
this.replicator.setHost(c.mysqlHost);
this.replicator.setUser(c.mysqlUser);
this.replicator.setPassword(c.mysqlPassword);
this.replicator.setPort(c.mysqlPort);
this.config = c;
setBinlogPosition(c.getInitialPosition());
}

public void setPort(int port) {
this.replicator.setPort(port);
}
Expand Down Expand Up @@ -191,7 +190,7 @@ private void processQueryEvent(QueryEvent event) throws SchemaSyncError, SQLExce
tableCache.clear();
BinlogPosition p = eventBinlogPosition(event);
LOGGER.info("storing schema @" + p + " after applying \"" + sql.replace('\n',' ') + "\"");
try ( Connection c = this.config.getConnectionPool().getConnection() ) {
try ( Connection c = this.context.getConnectionPool().getConnection() ) {
new SchemaStore(c, schema, p).save();
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/zendesk/maxwell/StdoutProducer.java
Expand Up @@ -3,15 +3,15 @@
import com.zendesk.maxwell.producer.AbstractProducer;

public class StdoutProducer extends AbstractProducer {
public StdoutProducer(MaxwellConfig config) {
super(config);
public StdoutProducer(MaxwellContext context) {
super(context);
}

@Override
public void push(MaxwellAbstractRowsEvent e) throws Exception {
for ( String json : e.toJSONStrings() ) {
System.out.println(json);
}
this.config.setInitialPosition(e.getNextBinlogPosition());
this.context.setInitialPosition(e.getNextBinlogPosition());
}
}
@@ -1,13 +1,13 @@
package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellAbstractRowsEvent;
import com.zendesk.maxwell.MaxwellConfig;
import com.zendesk.maxwell.MaxwellContext;

public abstract class AbstractProducer {
protected final MaxwellConfig config;
protected final MaxwellContext context;

public AbstractProducer(MaxwellConfig config) {
this.config = config;
public AbstractProducer(MaxwellContext context) {
this.context = context;
}
abstract public void push(MaxwellAbstractRowsEvent e) throws Exception;

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/zendesk/maxwell/producer/FileProducer.java
Expand Up @@ -5,14 +5,14 @@
import java.io.IOException;

import com.zendesk.maxwell.MaxwellAbstractRowsEvent;
import com.zendesk.maxwell.MaxwellConfig;
import com.zendesk.maxwell.MaxwellContext;

public class FileProducer extends AbstractProducer {
private final File file;
private final FileWriter fileWriter;

public FileProducer(MaxwellConfig config, String filename) throws IOException {
super(config);
public FileProducer(MaxwellContext context, String filename) throws IOException {
super(context);
this.file = new File(filename);
this.fileWriter = new FileWriter(this.file, true);
}
Expand All @@ -25,6 +25,6 @@ public void push(MaxwellAbstractRowsEvent e) throws Exception {
this.fileWriter.flush();
}
this.onComplete(e);
config.setInitialPosition(e.getNextBinlogPosition());
context.setInitialPosition(e.getNextBinlogPosition());
}
}

0 comments on commit f41354a

Please sign in to comment.