Skip to content

Commit

Permalink
re-C3'ing the VersionedTap/KeyValueByteScheme that were absorbed from…
Browse files Browse the repository at this point in the history
… dfs-datastores
  • Loading branch information
Cyrille Chépélov (TP12) committed Jan 26, 2016
1 parent b228b73 commit a691774
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
Expand All @@ -29,7 +30,7 @@ public static byte[] getBytes(BytesWritable key) {
}

@Override
public boolean source(FlowProcess<JobConf> flowProcess,
public boolean source(FlowProcess<? extends Configuration> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) throws IOException {
BytesWritable key = (BytesWritable) sourceCall.getContext()[0];
BytesWritable value = (BytesWritable) sourceCall.getContext()[1];
Expand All @@ -47,7 +48,7 @@ public boolean source(FlowProcess<JobConf> flowProcess,
}

@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollector> sinkCall)
public void sink(FlowProcess<? extends Configuration> flowProcess, SinkCall<Void, OutputCollector> sinkCall)
throws IOException {
TupleEntry tupleEntry = sinkCall.getOutgoingEntry();

Expand All @@ -57,4 +58,3 @@ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Void, OutputCollecto
sinkCall.getOutput().collect(new BytesWritable(key), new BytesWritable(val));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
Expand All @@ -30,7 +31,7 @@ public static enum TapMode {SOURCE, SINK}
// sink-specific
private String newVersionPath;

public VersionedTap(String dir, Scheme<JobConf,RecordReader,OutputCollector,?,?> scheme, TapMode mode)
public VersionedTap(String dir, Scheme<Configuration,RecordReader,OutputCollector,?,?> scheme, TapMode mode)
throws IOException {
super(scheme, dir);
this.mode = mode;
Expand Down Expand Up @@ -59,11 +60,11 @@ public String getOutputDirectory() {
return getPath().toString();
}

public VersionedStore getStore(JobConf conf) throws IOException {
public VersionedStore getStore(Configuration conf) throws IOException {
return new VersionedStore(FileSystem.get(conf), getOutputDirectory());
}

public String getSourcePath(JobConf conf) {
public String getSourcePath(Configuration conf) {
VersionedStore store;
try {
store = getStore(conf);
Expand All @@ -77,7 +78,7 @@ public String getSourcePath(JobConf conf) {
}
}

public String getSinkPath(JobConf conf) {
public String getSinkPath(Configuration conf) {
try {
VersionedStore store = getStore(conf);
String sinkPath = (version == null) ? store.createVersion() : store.createVersion(version);
Expand All @@ -91,33 +92,37 @@ public String getSinkPath(JobConf conf) {
}

@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
super.sourceConfInit(process, conf);
FileInputFormat.setInputPaths(conf, getSourcePath(conf));
public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
JobConf jobConf = new JobConf(conf);

super.sourceConfInit(process, jobConf);
FileInputFormat.setInputPaths(jobConf, getSourcePath(jobConf));
}

@Override
public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
super.sinkConfInit(process, conf);
public void sinkConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
JobConf jobConf = new JobConf(conf);

super.sinkConfInit(process, jobConf);

if (newVersionPath == null)
newVersionPath = getSinkPath(conf);
newVersionPath = getSinkPath(jobConf);

FileOutputFormat.setOutputPath(conf, new Path(newVersionPath));
FileOutputFormat.setOutputPath(jobConf, new Path(newVersionPath));
}

@Override
public boolean resourceExists(JobConf jc) throws IOException {
public boolean resourceExists(Configuration jc) throws IOException {
return getStore(jc).mostRecentVersion() != null;
}

@Override
public boolean createResource(JobConf jc) throws IOException {
public boolean createResource(Configuration jc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public boolean deleteResource(JobConf jc) throws IOException {
public boolean deleteResource(Configuration jc) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}

Expand All @@ -131,26 +136,28 @@ public String getIdentifier() {
}

@Override
public long getModifiedTime(JobConf conf) throws IOException {
public long getModifiedTime(Configuration conf) throws IOException {
VersionedStore store = getStore(conf);
return (mode == TapMode.SINK) ? 0 : store.mostRecentVersion();
}

@Override
public boolean commitResource(JobConf conf) throws IOException {
VersionedStore store = new VersionedStore(FileSystem.get(conf), getOutputDirectory());
public boolean commitResource(Configuration conf) throws IOException {
JobConf jobConf = new JobConf(conf);

VersionedStore store = new VersionedStore(FileSystem.get(jobConf), getOutputDirectory());

if (newVersionPath != null) {
store.succeedVersion(newVersionPath);
markSuccessfulOutputDir(new Path(newVersionPath), conf);
markSuccessfulOutputDir(new Path(newVersionPath), jobConf);
newVersionPath = null;
store.cleanup(getVersionsToKeep());
}

return true;
}

private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOException {
private static void markSuccessfulOutputDir(Path path, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
// create a file in the folder to mark it
if (fs.exists(path)) {
Expand All @@ -160,7 +167,7 @@ private static void markSuccessfulOutputDir(Path path, JobConf conf) throws IOEx
}

@Override
public boolean rollbackResource(JobConf conf) throws IOException {
public boolean rollbackResource(Configuration conf) throws IOException {
if (newVersionPath != null) {
getStore(conf).failVersion(newVersionPath);
newVersionPath = null;
Expand Down

0 comments on commit a691774

Please sign in to comment.