New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pull cascading flowDef thread into cascading_backend #1681
Conversation
// get an immutable copy | ||
val cleanUp = filesToCleanup.synchronized { filesToCleanup.toList } | ||
if (cleanUp.nonEmpty) { | ||
Runtime.getRuntime.addShutdownHook(TempFileCleanup(cleanUp, mode)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since i'm reading this, any thoughts on how we should handle this instead. Not for doing in this PR obviously. I'm guessing we don't run the cleanup right now because we might have done a toIterableExecution and are using it post the run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good question. I'm currently thinking of making some methods like:
object Execution {
def forceToDisk[T](t: TypedPipe[T]): Execution[TypedPipe[T]] = ...
def toIterable[T](t: TypedPipe[T]): Execution[Iterable[T]] = ...
}
and just have those two be new subclasses of the Execution trait. Then each Writer can have a functions like:
trait Writer {
def forceToDisk[T](c: Config, m: Mode, t: TypedPipe[T]): Future[TypedPipe[T]]
def toIterable[T](c: Config, m: Mode, t: TypedPipe[T]): Future[Iterable[T]]
}
which execution will just call through to. What do you think of that?
When we get to that stage, basically Execution really is just a memo-izing State/Reader monad. In cats-like description Execution[T] = StateT[Future, (Config, Mode, ExecutionContext), T]
with some memoization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't that clear: if we do the above, the logic about cleanup files can happen when you do forceToDisk, which Writer is driving, so it can keep the list of clean-up files it needs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i like this ^^ makes it more pluggable, and ultimately separating the 2 notions inside the execution feels like a good place to handle it. The question of maybe when to do the delete is still rough but not sure what to do there. I don't know if we can hook into some sort of hadoop fs shutdown thing. But separating this logic out from typed pipe to the platforms is 👍
👍 lgtm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. A few minor comments.
sealed trait ToWrite | ||
object ToWrite { | ||
case class SimpleWrite[T](pipe: TypedPipe[T], sink: TypedSink[T]) extends ToWrite | ||
case class PreparedWrite[T](fn: (Config, Mode) => SimpleWrite[T]) extends ToWrite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to change the visibility of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the idea is that we want to be able to write new backends. To do that, you will need to be able to see these types (so, in this case AsyncFlowDefRunner needs to be able to pattern match on them).
} | ||
|
||
/** | ||
* This holds an internal thread to submit run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"submit run"
filesToCleanup ++= files | ||
} | ||
|
||
private def toFuture[T](t: Try[T]): Future[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already defined elsewhere. Can we just put it in some central place or use bijection's or inline it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's so simple, I hear you, but I don't want to make a public API which we can't delete, not use add a bunch of syntax to call bijection here. Can we let this minor duplication slide?
def finished(mode: Mode): Unit = { | ||
messageQueue.put(Stop) | ||
// get an immutable copy | ||
val cleanUp = filesToCleanup.synchronized { filesToCleanup.toList } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the addition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous stuff was not thread safe.
merging now, and can continue fixes in the next in the series. |
* Pull cascading flowDef thread into cascading_backend * Make ToWrite public * remove unused imports
This is a first step towards factoring Execution so that it does not know about the backend. This is a good cleanup regardless, because we separate the concern of caching from producing Futures from FlowDefs.
But I want to drive towards the
Mode
providing something to take a sequence ofTypedPipe[T], TypedSink[T]
pairs and returning a futureUnit
(and maybe counters) when they have all been written.