Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'release/1.1.4'

  • Loading branch information...
commit e34ba645bb852e9751035acfd0ed736efbee68ba 2 parents 8833110 + 77f8e44
Sam Ritchie sritchie authored
6 README.md
Source Rendered
@@ -12,8 +12,4 @@ For Cascading 1.2 support, use
12 12
13 13 For Cascading 2.0, use
14 14
15   - [backtype/dfs-datastores-cascading "1.1.0"]
16   -
17   -
18   -
19   -
  15 + [backtype/dfs-datastores-cascading "1.1.3"]
4 project.clj
... ... @@ -1,4 +1,4 @@
1   -(defproject backtype/dfs-datastores-cascading "1.1.1"
  1 +(defproject backtype/dfs-datastores-cascading "1.1.4"
2 2 :java-source-path "src/jvm"
3 3 :source-path "src/clj"
4 4 :java-test-path "test/jvm"
@@ -8,7 +8,7 @@
8 8 :junit-options {:fork "off" :haltonfailure "on"}
9 9 :repositories {"conjars" "http://conjars.org/repo"}
10 10 :dependencies [[backtype/dfs-datastores "1.1.0"]
11   - [cascading/cascading-hadoop "2.0.0-wip-184"
  11 + [cascading/cascading-hadoop "2.0.0-wip-281"
12 12 :exclusions [org.codehaus.janino/janino
13 13 org.apache.hadoop/hadoop-core]]]
14 14 :dev-dependencies [[org.apache.hadoop/hadoop-core "0.20.2-dev"]
20 src/jvm/backtype/cascading/tap/NullTap.java
... ... @@ -1,7 +1,6 @@
1 1 package backtype.cascading.tap;
2 2
3 3 import cascading.flow.FlowProcess;
4   -import cascading.flow.hadoop.HadoopFlowProcess;
5 4 import cascading.scheme.Scheme;
6 5 import cascading.scheme.SinkCall;
7 6 import cascading.scheme.SourceCall;
@@ -18,28 +17,32 @@
18 17
19 18 public class NullTap extends Tap {
20 19
21   - public static class NullScheme extends Scheme<HadoopFlowProcess, JobConf, RecordReader, OutputCollector, Object[], Object[]> {
  20 + public static class NullScheme
  21 + extends Scheme<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector, Object[], Object[]> {
  22 +
22 23 public NullScheme() {
23 24 super(Fields.ALL);
24 25 }
25 26
26 27 @Override
27   - public void sourceConfInit(HadoopFlowProcess prcs, Tap tap, JobConf config) {
  28 + public void sourceConfInit(FlowProcess<JobConf> prcs,
  29 + Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
28 30 throw new IllegalArgumentException("Cannot use as a source");
29 31 }
30 32
31 33 @Override
32   - public void sinkConfInit(HadoopFlowProcess prcs, Tap tap, JobConf conf) {
  34 + public void sinkConfInit(FlowProcess<JobConf> prcs,
  35 + Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
33 36 conf.setOutputFormat(NullOutputFormat.class);
34 37 }
35 38
36 39 @Override
37   - public boolean source(HadoopFlowProcess prcs, SourceCall<Object[], RecordReader> sc) throws IOException {
  40 + public boolean source(FlowProcess<JobConf> prcs, SourceCall<Object[], RecordReader> sc) throws IOException {
38 41 throw new IllegalArgumentException("cannot source");
39 42 }
40 43
41 44 @Override
42   - public void sink(HadoopFlowProcess prcs, SinkCall<Object[], OutputCollector> sourceCall) throws IOException {
  45 + public void sink(FlowProcess<JobConf> prcs, SinkCall<Object[], OutputCollector> sourceCall) throws IOException {
43 46 }
44 47 }
45 48
@@ -56,6 +59,11 @@ public NullTap() {
56 59 throw new UnsupportedOperationException("Not supported yet.");
57 60 }
58 61
  62 + @Override public TupleEntryCollector openForWrite(FlowProcess flowProcess, Object o)
  63 + throws IOException {
  64 + return null;
  65 + }
  66 +
59 67 @Override public boolean createResource(Object o) throws IOException {
60 68 return true;
61 69 }
90 src/jvm/backtype/cascading/tap/PailTap.java
... ... @@ -1,11 +1,8 @@
1 1 package backtype.cascading.tap;
2 2
3 3 import backtype.hadoop.pail.*;
4   -import backtype.support.CascadingUtils;
5 4 import backtype.support.Utils;
6   -import cascading.flow.Flow;
7   -import cascading.flow.FlowListener;
8   -import cascading.flow.hadoop.HadoopFlowProcess;
  5 +import cascading.flow.FlowProcess;
9 6 import cascading.scheme.Scheme;
10 7 import cascading.scheme.SinkCall;
11 8 import cascading.scheme.SourceCall;
@@ -28,11 +25,12 @@
28 25
29 26 import java.io.IOException;
30 27 import java.io.Serializable;
  28 +import java.util.Collections;
31 29 import java.util.HashSet;
32 30 import java.util.List;
33 31 import java.util.Set;
34 32
35   -public class PailTap extends Hfs implements FlowListener {
  33 +public class PailTap extends Hfs {
36 34 private static Logger LOG = Logger.getLogger(PailTap.class);
37 35
38 36 public static PailSpec makeSpec(PailSpec given, PailStructure structure) {
@@ -62,7 +60,7 @@ public PailTapOptions(PailSpec spec, String fieldName, List<String>[] attrs, Pai
62 60 }
63 61
64 62
65   - public class PailScheme extends Scheme<HadoopFlowProcess, JobConf, RecordReader, OutputCollector, Object[], Object[]> {
  63 + public class PailScheme extends Scheme<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector, Object[], Object[]> {
66 64 private PailTapOptions _options;
67 65
68 66 public PailScheme(PailTapOptions options) {
@@ -109,7 +107,8 @@ public PailStructure getStructure() {
109 107 }
110 108
111 109 @Override
112   - public void sourceConfInit(HadoopFlowProcess process, Tap tap, JobConf conf) {
  110 + public void sourceConfInit(FlowProcess<JobConf> process,
  111 + Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
113 112 Pail p;
114 113 try {
115 114 p = new Pail(_pailRoot); //make sure it exists
@@ -120,8 +119,8 @@ public void sourceConfInit(HadoopFlowProcess process, Tap tap, JobConf conf) {
120 119 PailFormatFactory.setPailPathLister(conf, _options.lister);
121 120 }
122 121
123   - @Override
124   - public void sinkConfInit(HadoopFlowProcess prcs, Tap tap, JobConf conf) {
  122 + @Override public void sinkConfInit(FlowProcess<JobConf> flowProcess,
  123 + Tap<FlowProcess<JobConf>, JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
125 124 conf.setOutputFormat(PailOutputFormat.class);
126 125 Utils.setObject(conf, PailOutputFormat.SPEC_ARG, getSpec());
127 126 try {
@@ -132,7 +131,7 @@ public void sinkConfInit(HadoopFlowProcess prcs, Tap tap, JobConf conf) {
132 131 }
133 132
134 133 @Override
135   - public void sourcePrepare(HadoopFlowProcess flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
  134 + public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
136 135 sourceCall.setContext(new Object[2]);
137 136
138 137 sourceCall.getContext()[0] = sourceCall.getInput().createKey();
@@ -140,7 +139,7 @@ public void sourcePrepare(HadoopFlowProcess flowProcess, SourceCall<Object[], Re
140 139 }
141 140
142 141 @Override
143   - public boolean source(HadoopFlowProcess process, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
  142 + public boolean source(FlowProcess<JobConf> process, SourceCall<Object[], RecordReader> sourceCall) throws IOException {
144 143 Object k = sourceCall.getContext()[0];
145 144 Object v = sourceCall.getContext()[1];
146 145 boolean result = sourceCall.getInput().next(k, v);
@@ -152,7 +151,7 @@ public boolean source(HadoopFlowProcess process, SourceCall<Object[], RecordRead
152 151 }
153 152
154 153 @Override
155   - public void sink(HadoopFlowProcess process, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
  154 + public void sink(FlowProcess<JobConf> process, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
156 155 TupleEntry tuple = sinkCall.getOutgoingEntry();
157 156
158 157 Object obj = tuple.getObject(0);
@@ -198,7 +197,7 @@ public boolean deleteResource(JobConf conf) throws IOException {
198 197
199 198 //no good way to override this, just had to copy/paste and modify
200 199 @Override
201   - public void sourceConfInit(HadoopFlowProcess process, JobConf conf) {
  200 + public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
202 201 try {
203 202 Path root = getQualifiedPath(conf);
204 203 if(_options.attrs!=null && _options.attrs.length>0) {
@@ -223,7 +222,8 @@ public void sourceConfInit(HadoopFlowProcess process, JobConf conf) {
223 222 }
224 223
225 224 private void makeLocal(JobConf conf, Path qualifiedPath, String infoMessage) {
226   - if( !conf.get( "mapred.job.tracker", "" ).equalsIgnoreCase( "local" ) && qualifiedPath.toUri().getScheme().equalsIgnoreCase( "file" ) )
  225 + if( !conf.get( "mapred.job.tracker", "" ).equalsIgnoreCase( "local" ) &&
  226 + qualifiedPath.toUri().getScheme().equalsIgnoreCase( "file" ) )
227 227 {
228 228 if( LOG.isInfoEnabled() )
229 229 LOG.info( infoMessage + toString() );
@@ -233,48 +233,36 @@ private void makeLocal(JobConf conf, Path qualifiedPath, String infoMessage) {
233 233 }
234 234
235 235 @Override
236   - public void sinkConfInit(HadoopFlowProcess process, JobConf conf) {
237   - if(_options.attrs!=null && _options.attrs.length>0) {
  236 + public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
  237 + if(_options.attrs!=null && _options.attrs.length > 0) {
238 238 throw new TapException("can't declare attributes in a sink");
239 239 }
240 240 super.sinkConfInit(process, conf);
241 241 }
242 242
243   - public void onCompleted(Flow flow) {
244   - try {
245   - //only if it's a sink
246   - if(flow.getFlowStats().isSuccessful() && CascadingUtils.isSinkOf(this, flow)) {
247   - Pail p = Pail.create(_pailRoot, ((PailScheme)getScheme()).getSpec(), false);
248   - FileSystem fs = p.getFileSystem();
249   - Path tmpPath = new Path(_pailRoot, "_temporary");
250   - if(fs.exists(tmpPath)) {
251   - LOG.info("Deleting _temporary directory left by Hadoop job: " + tmpPath.toString());
252   - fs.delete(tmpPath, true);
253   - }
254   -
255   - Path tmp2Path = new Path(_pailRoot, "_temporary2");
256   - if(fs.exists(tmp2Path)) {
257   - LOG.info("Deleting _temporary2 directory: " + tmp2Path.toString());
258   - fs.delete(tmp2Path, true);
259   - }
260   -
261   - Path logPath = new Path(_pailRoot, "_logs");
262   - if(fs.exists(logPath)) {
263   - LOG.info("Deleting _logs directory left by Hadoop job: " + logPath.toString());
264   - fs.delete(logPath, true);
265   - }
266   - }
267   - } catch(IOException e) {
268   - throw new TapException(e);
  243 + @Override
  244 + public boolean commitResource(JobConf conf) throws IOException {
  245 + Pail p = Pail.create(_pailRoot, ((PailScheme)getScheme()).getSpec(), false);
  246 + FileSystem fs = p.getFileSystem();
  247 + Path tmpPath = new Path(_pailRoot, "_temporary");
  248 + if(fs.exists(tmpPath)) {
  249 + LOG.info("Deleting _temporary directory left by Hadoop job: " + tmpPath.toString());
  250 + fs.delete(tmpPath, true);
269 251 }
270   - }
271 252
272   - public void onStarting(Flow flow) {}
  253 + Path tmp2Path = new Path(_pailRoot, "_temporary2");
  254 + if(fs.exists(tmp2Path)) {
  255 + LOG.info("Deleting _temporary2 directory: " + tmp2Path.toString());
  256 + fs.delete(tmp2Path, true);
  257 + }
273 258
274   - public void onStopping(Flow flow) {}
  259 + Path logPath = new Path(_pailRoot, "_logs");
  260 + if(fs.exists(logPath)) {
  261 + LOG.info("Deleting _logs directory left by Hadoop job: " + logPath.toString());
  262 + fs.delete(logPath, true);
  263 + }
275 264
276   - public boolean onThrowable(Flow flow, Throwable thrwbl) {
277   - return false;
  265 + return true;
278 266 }
279 267
280 268 @Override
@@ -290,15 +278,11 @@ public boolean equals(Object object) {
290 278 PailTap other = (PailTap) object;
291 279 Set<List<String>> myattrs = new HashSet<List<String>>();
292 280 if(_options.attrs!=null) {
293   - for(List<String> a: _options.attrs) {
294   - myattrs.add(a);
295   - }
  281 + Collections.addAll(myattrs, _options.attrs);
296 282 }
297 283 Set<List<String>> otherattrs = new HashSet<List<String>>();
298 284 if(other._options.attrs!=null) {
299   - for(List<String> a: other._options.attrs) {
300   - otherattrs.add(a);
301   - }
  285 + Collections.addAll(otherattrs, other._options.attrs);
302 286 }
303 287 return _pailRoot.equals(other._pailRoot) && myattrs.equals(otherattrs);
304 288 }

0 comments on commit e34ba64

Please sign in to comment.
Something went wrong with that request. Please try again.