Skip to content

Commit

Permalink
Updated error message thrown by c.f.FlowStep when unable to find c.t.…
Browse files Browse the repository at this point in the history
…Tap or c.p.Pipe instances in the flow plan due to a Class serialized field not implementing #hashCode() or #equals() and relying in the object identity.
  • Loading branch information
cwensel committed Mar 23, 2011
1 parent 988b74b commit 1104a65
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ Cascading Change Log

1.2.3 [unreleased]

Updated error message thrown by c.f.FlowStep when unable to find c.t.Tap or c.p.Pipe instances in the flow plan due
to a Class serialized field not implementing #hashCode() or #equals() and relying in the object identity.

Added error message explaining the Hadoop mapred.jobtracker.completeuserjobs.maximum property needs to be increased
when dealing with large numbers of jobs.

Expand Down
23 changes: 22 additions & 1 deletion src/core/cascading/flow/FlowStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private void initFromTraps( JobConf conf, Map<String, Tap> traps ) throws IOExce

private void initFromSources( JobConf conf ) throws IOException
{
JobConf[] fromJobs = new JobConf[sources.size()];
JobConf[] fromJobs = new JobConf[ sources.size() ];
int i = 0;

for( Tap tap : sources.keySet() )
Expand Down Expand Up @@ -441,6 +441,8 @@ public Tap getReducerTrap( String name )
*/
public Set<Scope> getPreviousScopes( FlowElement flowElement )
{
assertFlowElement( flowElement );

return graph.incomingEdgesOf( flowElement );
}

Expand All @@ -452,6 +454,8 @@ public Set<Scope> getPreviousScopes( FlowElement flowElement )
*/
public Scope getNextScope( FlowElement flowElement )
{
assertFlowElement( flowElement );

Set<Scope> set = graph.outgoingEdgesOf( flowElement );

if( set.size() != 1 )
Expand All @@ -462,9 +466,26 @@ public Scope getNextScope( FlowElement flowElement )

public Set<Scope> getNextScopes( FlowElement flowElement )
{
assertFlowElement( flowElement );

return graph.outgoingEdgesOf( flowElement );
}

private void assertFlowElement( FlowElement flowElement )
{
if( !graph.containsVertex( flowElement ) )
{
String message = "unable to find %s in plan, class and serializable fields must implement #hashCode() and #equals()";

if( flowElement instanceof Pipe )
message = Util.formatTrace( (Pipe) flowElement, String.format( message, "pipe" ) );
else if( flowElement instanceof Tap )
message = Util.formatTrace( (Tap) flowElement, String.format( message, "tap" ) );

throw new IllegalStateException( message );
}
}

public FlowElement getNextFlowElement( Scope scope )
{
return graph.getEdgeTarget( scope );
Expand Down
14 changes: 14 additions & 0 deletions src/core/cascading/tap/Tap.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
Expand Down Expand Up @@ -68,6 +69,9 @@ public abstract class Tap implements FlowElement, Serializable
/** Field mode */
SinkMode sinkMode = SinkMode.KEEP;

/** Field trace */
private String trace = Util.captureDebugTrace( getClass() );

/**
* Convenience function to make an array of Tap instances.
*
Expand Down Expand Up @@ -109,6 +113,16 @@ public Scheme getScheme()
return scheme;
}

/**
* Method getTrace return the trace of this object.
*
* @return String
*/
public String getTrace()
{
return trace;
}

/**
* Method isWriteDirect returns true if this instances {@link cascading.tuple.TupleEntryCollector} should be used to sink values.
*
Expand Down
16 changes: 15 additions & 1 deletion src/core/cascading/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import cascading.operation.Operation;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -303,7 +304,7 @@ public static String[] removeNulls( String... strings )
list.add( string );
}

return list.toArray( new String[list.size()] );
return list.toArray( new String[ list.size() ] );
}

public static Collection<String> quote( Collection<String> collection, String quote )
Expand Down Expand Up @@ -529,6 +530,19 @@ public static String formatTrace( Pipe pipe, String message )
return "[" + truncate( pipe.getName(), 25 ) + "][" + trace + "] " + message;
}

public static String formatTrace( Tap tap, String message )
{
if( tap == null )
return message;

String trace = tap.getTrace();

if( trace == null )
return message;

return "[" + truncate( tap.toString(), 25 ) + "][" + trace + "] " + message;
}

public static String formatTrace( Operation operation, String message )
{
if( !( operation instanceof BaseOperation ) )
Expand Down

0 comments on commit 1104a65

Please sign in to comment.