Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileSource InvalidSourceTap Race Condition #1539

Closed
benpence opened this issue Mar 23, 2016 · 5 comments
Closed

FileSource InvalidSourceTap Race Condition #1539

benpence opened this issue Mar 23, 2016 · 5 comments
Labels

Comments

@benpence
Copy link
Contributor

FileSoure.createHdfsReadTap, when strict mode is false and there are no good paths, will create an InvalidSourceTap. FileSource.validateTaps, if a good path has appeared since .createHdfsReadTap, will not throw. This sounds like a problem, albeit one without using strict mode.

@benpence benpence added the bug label Mar 23, 2016
@jnievelt
Copy link
Contributor

Yeah here's a small repro, which can be run in non-strict Hdfs mode. This overrides validateTaps to simulate the case that one of the paths became usable.

The effect is amplified for Twitter because our internal source definitions can trigger this in strict mode as well.

import com.twitter.scalding.source._
val input = new FixedTypedText[String](TypedText.TAB, "foo_bad_path") { override def validateTaps(mode: Mode): Unit = {} }
TypedPipe.from(input).write(NullSink)
Mar 24, 2016 2:17:53 AM cascading.flow.hadoop.util.HadoopUtil findMainClass
INFO: resolving application jar from found main method on: com.twitter.scalding.BaseScaldingShell$class
Mar 24, 2016 2:17:53 AM cascading.flow.hadoop.planner.HadoopPlanner initialize
INFO: using application jar: /opt/scalding/309/libs/624.jar
Mar 24, 2016 2:17:53 AM cascading.property.AppProps getAppID
INFO: using app.id: 29548BACD55B4ECC8AD015FA158A8AAF
Mar 24, 2016 2:17:53 AM cascading.util.Version printBanner
INFO: Concurrent, Inc - Cascading 2.5.6-t-7
Mar 24, 2016 2:17:53 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell] starting
Mar 24, 2016 2:17:53 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell]  source: InvalidSourceTap["NullScheme"]["InvalidSourceTap-a47ddbbe-d92f-4513-b5a3-334835f4190c"]
Mar 24, 2016 2:17:53 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell]  sink: NullTap["NullScheme"]["nullTap"]
Mar 24, 2016 2:17:53 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell]  parallel execution is enabled: true
Mar 24, 2016 2:17:53 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell]  starting jobs: 1
Mar 24, 2016 2:17:53 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell]  allocating threads: 1
Mar 24, 2016 2:17:53 AM com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy$ apply
INFO: ScaldingShell is a map-only step. Skipping reducer estimation.
Mar 24, 2016 2:17:53 AM cascading.flow.planner.BaseFlowStep logInfo
INFO: [ScaldingShell] starting step: (1/1) nullTap
Mar 24, 2016 2:18:14 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell] stopping all jobs
Mar 24, 2016 2:18:14 AM cascading.flow.planner.BaseFlowStep logInfo
INFO: [ScaldingShell] stopping: (1/1) nullTap
Mar 24, 2016 2:18:14 AM cascading.flow.BaseFlow logInfo
INFO: [ScaldingShell] stopped all jobs
Flow execution failed!
cascading.flow.FlowException: unhandled exception
    at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
    at com.twitter.scalding.Execution$$anonfun$waitFor$2.apply(Execution.scala:710)
    at com.twitter.scalding.Execution$$anonfun$waitFor$2.apply(Execution.scala:709)
    at scala.util.Try$.apply(Try.scala:192)
    at com.twitter.scalding.Execution$.waitFor(Execution.scala:709)
    at com.twitter.scalding.ExecutionContext$$anonfun$waitFor$1.apply(ExecutionContext.scala:140)
    at com.twitter.scalding.ExecutionContext$$anonfun$waitFor$1.apply(ExecutionContext.scala:140)
    at scala.util.Success.flatMap(Try.scala:231)
    at com.twitter.scalding.ExecutionContext$class.waitFor(ExecutionContext.scala:140)
    at com.twitter.scalding.ExecutionContext$$anon$1.waitFor(ExecutionContext.scala:165)
    at com.twitter.scalding.BaseReplState$class.run(ReplImplicits.scala:187)
    at com.twitter.scalding_internal.repl.ReplState$.run(ReplState.scala:8)
    at $line7.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:32)
    at $line7.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$.<clinit>(<console>)
    at $line7.$eval$.$print$lzycompute(<console>:7)
    at $line7.$eval$.$print(<console>:6)
    at $line7.$eval.$print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:784)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1039)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:636)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:635)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:635)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:567)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:563)
    at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:802)
    at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:836)
    at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:694)
    at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404)
    at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:424)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:925)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:911)
    at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:911)
    at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
    at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:911)
    at com.twitter.scalding.BaseScaldingShell$class.process(ScaldingShell.scala:98)
    at com.twitter.scalding_internal.repl.TwitterScaldingShell$.process(Shell.scala:9)
    at com.twitter.scalding.BaseScaldingShell$class.main(ScaldingShell.scala:126)
    at com.twitter.scalding_internal.repl.TwitterScaldingShell$.main(Shell.scala:9)
    at com.twitter.scalding_internal.repl.TwitterScaldingShell.main(Shell.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.StackOverflowError
    at java.util.zip.Inflater.<init>(Inflater.java:102)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:76)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:90)
    at cascading.flow.hadoop.util.JavaObjectSerializer.deserializeList(JavaObjectSerializer.java:231)
    at cascading.flow.hadoop.util.JavaObjectSerializer.deserialize(JavaObjectSerializer.java:76)
    at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:312)
    at cascading.tap.hadoop.io.MultiInputFormat.getConfigs(MultiInputFormat.java:107)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:128)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:134)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:134)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:134)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:134)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:134)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:200)
    at cascading.tap.hadoop.io.MultiInputFormat.getSplits(MultiInputFormat.java:134)

@johnynek
Copy link
Collaborator

Whoa, stackoverflow. That should never happen right? Why the infinite loop?

@benpence
Copy link
Contributor Author

We've been kind of tackling this internally, but basically MultiInputFormat defers split calculation to its serialized InputFormats, of which we've added a MultiInputFormat. The config does not dequeue any of these serialized formats so the 2nd MultiInputFormat, I think, does the same thing as its parent.

@isnotinvain
Copy link
Contributor

This is fixed right?

@benpence
Copy link
Contributor Author

benpence commented Oct 3, 2016

@isnotinvain Yes it is fixed to my knowledge. Next time I will use "Fixes #..."

@benpence benpence closed this as completed Oct 3, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants