Skip to content

Commit

Permalink
add ability to override min/max ids
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanmarz committed Apr 6, 2010
1 parent 5761d53 commit 829119f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
20 changes: 20 additions & 0 deletions src/jvm/cascading/dbmigrate/hadoop/DBConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class DBConfiguration {
public static final String INPUT_COLUMN_NAMES_PROPERTY = "mapred.jdbc.input.column.names";
public static final String PRIMARY_KEY_COLUMN = "mapred.jdbc.primary.key.name";
public static final String NUM_CHUNKS = "mapred.jdbc.num.chunks";
public static final String MIN_ID = "dbmigrate.min.id";
public static final String MAX_ID = "dbmigrate.max.id";

public void configureDB(String driverClass, String dbUrl, String userName, String passwd) {
job.set(DRIVER_CLASS_PROPERTY, driverClass);
Expand Down Expand Up @@ -98,5 +100,23 @@ public void setNumChunks(int numChunks) {
public int getNumChunks() {
return job.getInt(NUM_CHUNKS, 10);
}

public void setMinId(long id) {
job.setLong(MIN_ID, id);
}

public Long getMinId() {
if(job.get(MIN_ID)==null) return null;
return job.getLong(MIN_ID, -1);
}

public void setMaxId(long id) {
job.setLong(MAX_ID, id);
}

public Long getMaxId() {
if(job.get(MAX_ID)==null) return null;
return job.getLong(MAX_ID, -1);
}
}

15 changes: 9 additions & 6 deletions src/jvm/cascading/dbmigrate/hadoop/DBInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public RecordReader<LongWritable, TupleWrapper> getRecordReader(InputSplit split
return new DBRecordReader((DBInputSplit) split, job);
}

private long getMaxId(Connection conn, String tableName, String col) {
private long getMaxId(DBConfiguration conf, Connection conn, String tableName, String col) {
if(conf.getMaxId()!=null) return conf.getMaxId();
try {
PreparedStatement s = conn.prepareStatement("SELECT MAX(" + col + ") FROM " + tableName);
ResultSet rs = s.executeQuery();
Expand All @@ -193,7 +194,8 @@ private long getMaxId(Connection conn, String tableName, String col) {
}
}

private long getMinId(Connection conn, String tableName, String col ) {
private long getMinId(DBConfiguration conf, Connection conn, String tableName, String col ) {
if(conf.getMinId()!=null) return conf.getMinId();
try {
PreparedStatement s = conn.prepareStatement("SELECT MIN(" + col + ") FROM " + tableName);
ResultSet rs = s.executeQuery();
Expand All @@ -213,8 +215,8 @@ public InputSplit[] getSplits(JobConf job, int ignored) throws IOException {
int chunks = conf.getNumChunks();
Connection conn = conf.getConnection();
String primarykeycolumn = conf.getPrimaryKeyColumn();
long maxId = getMaxId(conn, conf.getInputTableName(), conf.getPrimaryKeyColumn());
long minId = getMinId(conn, conf.getInputTableName(), conf.getPrimaryKeyColumn());
long maxId = getMaxId(conf, conn, conf.getInputTableName(), conf.getPrimaryKeyColumn());
long minId = getMinId(conf, conn, conf.getInputTableName(), conf.getPrimaryKeyColumn());
long chunkSize = ((maxId-minId+1) / chunks);
InputSplit[] ret = new InputSplit[chunks];

Expand All @@ -232,12 +234,13 @@ public InputSplit[] getSplits(JobConf job, int ignored) throws IOException {
}
}

public static void setInput(JobConf job, int numChunks, String databaseDriver, String username, String pwd, String dburl, String tableName, String pkColumn, String... columnNames) {
public static void setInput(JobConf job, int numChunks, String databaseDriver, String username, String pwd, String dburl, String tableName, String pkColumn, Long minId, Long maxId, String... columnNames) {
job.setInputFormat(DBInputFormat.class);

DBConfiguration dbConf = new DBConfiguration(job);
dbConf.configureDB(databaseDriver, dburl, username, pwd);

if(minId!=null) dbConf.setMinId(minId.longValue());
if(maxId!=null) dbConf.setMinId(maxId.longValue());
dbConf.setInputTableName(tableName);
dbConf.setInputColumnNames(columnNames);
dbConf.setPrimaryKeyColumn(pkColumn);
Expand Down
17 changes: 14 additions & 3 deletions src/jvm/cascading/dbmigrate/tap/DBMigrateTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@


public class DBMigrateTap extends Tap {
public static class Options {
public Long minId = null;
public Long maxId = null;
}

public class DBMigrateScheme extends Scheme {
String dbDriver;
String dbUrl;
Expand All @@ -32,8 +37,9 @@ public class DBMigrateScheme extends Scheme {
String pkColumn;
String[] columnNames;
int numChunks;
Options options;

public DBMigrateScheme(int numChunks, String dbDriver, String dbUrl, String username, String pwd, String tableName, String pkColumn, String[] columnNames) {
public DBMigrateScheme(int numChunks, String dbDriver, String dbUrl, String username, String pwd, String tableName, String pkColumn, String[] columnNames, Options options) {
super(new Fields(columnNames));
this.dbDriver = dbDriver;
this.dbUrl = dbUrl;
Expand All @@ -43,14 +49,15 @@ public DBMigrateScheme(int numChunks, String dbDriver, String dbUrl, String user
this.pkColumn = pkColumn;
this.columnNames = columnNames;
this.numChunks = numChunks;
this.options = options;
}

@Override
public void sourceInit(Tap tap, JobConf jc) throws IOException {
// a hack for MultiInputFormat to see that there is a child format
FileInputFormat.setInputPaths( jc, getPath() );

DBInputFormat.setInput(jc, numChunks, dbDriver, username, pwd, dbUrl, tableName, pkColumn, columnNames);
DBInputFormat.setInput(jc, numChunks, dbDriver, username, pwd, dbUrl, tableName, pkColumn, options.minId, options.maxId, columnNames);
}

@Override
Expand All @@ -72,7 +79,11 @@ public void sink(TupleEntry te, OutputCollector oc) throws IOException {
String connectionUrl;

public DBMigrateTap(int numChunks, String dbDriver, String dbUrl, String username, String pwd, String tableName, String pkColumn, String[] columnNames) {
setScheme(new DBMigrateScheme(numChunks, dbDriver, dbUrl, username, pwd, tableName, pkColumn, columnNames));
this(numChunks, dbDriver, dbUrl, username, pwd, tableName, pkColumn, columnNames, new Options());
}

public DBMigrateTap(int numChunks, String dbDriver, String dbUrl, String username, String pwd, String tableName, String pkColumn, String[] columnNames, Options options) {
setScheme(new DBMigrateScheme(numChunks, dbDriver, dbUrl, username, pwd, tableName, pkColumn, columnNames, options));
this.connectionUrl = dbUrl;
}

Expand Down

0 comments on commit 829119f

Please sign in to comment.