Skip to content

Commit

Permalink
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python …
Browse files Browse the repository at this point in the history
…and fix StreamingQueryException

## What changes were proposed in this pull request?

- Add StreamingQuery.explain and exception to Python.
- Fix StreamingQueryException to not expose `OffsetSeq`.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#16125 from zsxwing/py-streaming-explain.
  • Loading branch information
zsxwing authored and uzadude committed Jan 27, 2017
1 parent 4c01671 commit 7afdae9
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 25 deletions.
9 changes: 8 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,14 @@ object MimaExcludes {
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),

// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),

// [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
)
}

Expand Down
40 changes: 40 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.types import *
from pyspark.sql.utils import StreamingQueryException

__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]

Expand Down Expand Up @@ -132,6 +133,45 @@ def stop(self):
"""
self._jsq.stop()

@since(2.1)
def explain(self, extended=False):
"""Prints the (logical and physical) plans to the console for debugging purpose.
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
>>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
>>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
>>> sq.explain()
== Physical Plan ==
...
>>> sq.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
>>> sq.stop()
"""
# Cannot call `_jsq.explain(...)` because it will print in the JVM process.
# We should print it in the Python process.
print(self._jsq.explainInternal(extended))

@since(2.1)
def exception(self):
"""
:return: the StreamingQueryException if the query was terminated by an exception, or None.
"""
if self._jsq.exception().isDefined():
je = self._jsq.exception().get()
msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info
stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
return StreamingQueryException(msg, stackTrace)
else:
return None


class StreamingQueryManager(object):
"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,35 @@ def test_stream_await_termination(self):
q.stop()
shutil.rmtree(tmpPath)

def test_stream_exception(self):
sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
sq = sdf.writeStream.format('memory').queryName('query_explain').start()
try:
sq.processAllAvailable()
self.assertEqual(sq.exception(), None)
finally:
sq.stop()

from pyspark.sql.functions import col, udf
from pyspark.sql.utils import StreamingQueryException
bad_udf = udf(lambda x: 1 / 0)
sq = sdf.select(bad_udf(col("value")))\
.writeStream\
.format('memory')\
.queryName('this_query')\
.start()
try:
# Process some data to fail the query
sq.processAllAvailable()
self.fail("bad udf should fail the query")
except StreamingQueryException as e:
# This is expected
self.assertTrue("ZeroDivisionError" in e.desc)
finally:
sq.stop()
self.assertTrue(type(sq.exception()) is StreamingQueryException)
self.assertTrue("ZeroDivisionError" in sq.exception().desc)

def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class StreamExecution(
* once, since the field's value may change at any time.
*/
@volatile
protected var availableOffsets = new StreamProgress
var availableOffsets = new StreamProgress

/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1
Expand Down Expand Up @@ -263,7 +263,8 @@ class StreamExecution(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
logError(s"Query $name terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
* @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
* @param startOffset Starting offset (if known) of the range of data in which exception occurred
* @param endOffset Ending offset (if known) of the range of data in exception occurred
* @param startOffset Starting offset in json of the range of data in which exception occurred
* @param endOffset Ending offset in json of the range of data in exception occurred
* @since 2.0.0
*/
@Experimental
class StreamingQueryException private[sql](
@transient val query: StreamingQuery,
class StreamingQueryException private(
causeString: String,
val message: String,
val cause: Throwable,
val startOffset: Option[OffsetSeq] = None,
val endOffset: Option[OffsetSeq] = None)
val startOffset: String,
val endOffset: String)
extends Exception(message, cause) {

private[sql] def this(
query: StreamingQuery,
message: String,
cause: Throwable,
startOffset: String,
endOffset: String) {
this(
// scalastyle:off
s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
|
|${query.asInstanceOf[StreamExecution].toDebugString}
""".stripMargin,
// scalastyle:on
message,
cause,
startOffset,
endOffset)
}

/** Time when the exception occurred */
val time: Long = System.currentTimeMillis

override def toString(): String = {
val causeStr =
s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
s"""
|$causeStr
|
|${query.asInstanceOf[StreamExecution].toDebugString}
""".stripMargin
}
override def toString(): String = causeString
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long) {

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
eventually("microbatch thread not stopped after termination with failure") {
assert(!currentStream.microBatchThread.isAlive)
}
verify(thrownException.query.eq(currentStream),
s"incorrect query reference in exception")
verify(currentStream.exception === Some(thrownException),
s"incorrect exception returned by query.exception()")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
TestAwaitTermination(ExpectException[SparkException]),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
AssertOnQuery(
q => q.exception.get.startOffset.get.offsets ===
q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
"incorrect start offset on exception")
AssertOnQuery(q => {
q.exception.get.startOffset ===
q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
q.exception.get.endOffset ===
q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
}, "incorrect start offset or end offset on exception")
)
}

Expand Down

0 comments on commit 7afdae9

Please sign in to comment.