Skip to content

Commit

Permalink
feat: merge SynapseE2E test into Spark3.1 branch and test (#1382)
Browse files Browse the repository at this point in the history
* revert: revert changes of spark 3.2

* fix: change azure-ai-textanalytics dependency to shaded jar and rename namespace to make it compatible with spark 3.1

* allow branch spark3.1 to trigger pipeline

* fix shaded jar

* fix fasterxml by adding it ahead of coreDependencies

* fix io.netty issue

* fix io.netty issue

* fix databricks conflicts

* fix libraries syntax

* exclude io.netty:netty-tcnative-boringssl-static

* update adbRuntime

* exclude org.antlr while installing libraries on dbx clusters

* fix adbruntime

* fix adbruntime

* fix adb runtime

* fix adb submit job error

* ignore geospatialServices notebooks for adb because adb 9.1 runtime doesn"t support sending http requests to them

* fix: Make SynapseE2E Tests work now with Spark 3.2 (#1362)

* Trying to use only pool with Spark 3.2

* Updating install instructions for synapse to use 0.9.4

* Changing syntax to grab ipynb files

* Line breaking to comply with styling

* Changing ipynb filter for windows

* Fixing string new line syntax

* Improvements to SynapseTests

* Adding more spark pools 3.2

* Adjusting list tests not to assert

* Improving dev doc, livyPayLoad

* Changing SynapseWS to mmlsparkppe

* Changing synapse URL to dogfood

* Removing dogfood from token acquisition

* Fixing exludes syntax

* Adding 2 more Apache Spark Pools

* Improving the developer docs

* Adjusting identation on developer-readme

* Bumping Synapse test timeout to 40 min

* Applying PR feedback

Co-authored-by: Serena Ruan <82044803+serena-ruan@users.noreply.github.com>

* change to spark3.1 pools

* add more spark pools

* Show detailed response of livy

* Update url cuz spark3.1 is in prod already

* Update SynapseTests.scala

* Update SynapseUtilities.scala

* fix: remove concurrency parameter for MVAD (#1383)

* remove concurrency parameter for MVAD

* fix: fix node-fetch version security & error in MVAD sample

Co-authored-by: Mark Hamilton <mhamilton723@gmail.com>

* fix: expose response error out for better debugging if the error is returned by http directly (#1391)

* merge `turn synapse tests into multiple test`

Co-authored-by: Ric Serradas <riserrad@microsoft.com>
Co-authored-by: Mark Hamilton <mhamilton723@gmail.com>
  • Loading branch information
3 people committed Feb 12, 2022
1 parent bf9dc13 commit d1b5151
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ trait MADHttpRequest extends HasURL with HasSubscriptionKey with HasAsyncReply {

trait MADBase extends HasOutputCol
with MADHttpRequest with HasSetLocation with HasInputCols
with ConcurrencyParams with ComplexParamsWritable with Wrappable
with ComplexParamsWritable with Wrappable
with HasSubscriptionKey with HasErrorCol with BasicLogging {

val startTime = new Param[String](this, "startTime", "A required field, start time" +
Expand Down Expand Up @@ -400,8 +400,14 @@ class FitMultivariateAnomaly(override val uid: String) extends Estimator[DetectM
val responseDict = IOUtils.toString(response.entity.get.content, "UTF-8")
.parseJson.asJsObject.fields

this.setDiagnosticsInfo(responseDict("modelInfo").asJsObject.fields
.get("diagnosticsInfo").map(_.convertTo[DiagnosticsInfo]).get)
val modelInfoFields = responseDict("modelInfo").asJsObject.fields

if (modelInfoFields.get("status").get.asInstanceOf[JsString].value == "FAILED") {
val errors = modelInfoFields.get("errors").map(_.convertTo[Seq[DMAError]]).get.toJson.compactPrint
throw new RuntimeException(s"Caught errors during fitting: $errors")
}

this.setDiagnosticsInfo(modelInfoFields.get("diagnosticsInfo").map(_.convertTo[DiagnosticsInfo]).get)

val simpleDetectMultivariateAnomaly = new DetectMultivariateAnomaly()
.setSubscriptionKey(getSubscriptionKey)
Expand Down Expand Up @@ -471,10 +477,18 @@ class DetectMultivariateAnomaly(override val uid: String) extends Model[DetectMu
val response = handlingFunc(Client, request)

val responseJson = IOUtils.toString(response.entity.get.content, "UTF-8")
.parseJson.asJsObject.fields("results").toString()
.parseJson.asJsObject.fields

val summary = responseJson.get("summary").map(_.convertTo[DMASummary]).get
if (summary.status == "FAILED") {
val errors = summary.errors.get.toJson.compactPrint
throw new RuntimeException(s"Caught errors during inference: $errors")
}

val results = responseJson.get("results").get.toString()

val outputDF = df.sparkSession.read
.json(df.sparkSession.createDataset(Seq(responseJson))(Encoders.STRING))
.json(df.sparkSession.createDataset(Seq(results))(Encoders.STRING))
.toDF()
.sort(col("timestamp").asc)
.withColumnRenamed("timestamp", "resultTimestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,6 @@ object MADJsonProtocol extends DefaultJsonProtocol {
implicit val DMAContributorEnc: RootJsonFormat[DMAContributor] = jsonFormat2(DMAContributor.apply)
implicit val DMAValueEnc: RootJsonFormat[DMAValue] = jsonFormat4(DMAValue.apply)
implicit val DMAResEnc: RootJsonFormat[DMAResult] = jsonFormat3(DMAResult.apply)
implicit val DMASetupInfoEnc: RootJsonFormat[DMASetupInfo] = jsonFormat3(DMASetupInfo.apply)
implicit val DMASummaryEnc: RootJsonFormat[DMASummary] = jsonFormat4(DMASummary.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ class FitMultivariateAnomalySuite extends EstimatorFuzzing[FitMultivariateAnomal
.setIntermediateSaveDir(intermediateSaveDir)
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.setConcurrency(5)

test("SimpleMultiAnomalyEstimator basic usage with connectionString") {

Expand Down Expand Up @@ -205,6 +204,40 @@ class FitMultivariateAnomalySuite extends EstimatorFuzzing[FitMultivariateAnomal
"or {storageName, storageKey, endpoint, sasToken, containerName} in order to access the blob container"))
}

test("Expose correct error message during fitting") {
val caught = intercept[RuntimeException] {
val testDf = df.limit(50)
val smae = simpleMultiAnomalyEstimator
.setSlidingWindow(200)
.setConnectionString(connectionString)
smae.fit(testDf)
}
assert(caught.getMessage.contains("Not enough data."))
}

test("Expose correct error message during inference") {
val caught = intercept[RuntimeException] {
val testDf = df.limit(50)
val smae = simpleMultiAnomalyEstimator
.setSlidingWindow(200)
.setConnectionString(connectionString)
val model = smae.fit(df)
modelIdList ++= Seq(model.getModelId)
smae.cleanUpIntermediateData()
val diagnosticsInfo = smae.getDiagnosticsInfo.get
assert(diagnosticsInfo.variableStates.get.length.equals(3))

model.setStartTime(startTime)
.setEndTime(endTime)
.setOutputCol("result")
.setTimestampCol(timestampColumn)
.setInputCols(inputColumns)
.transform(testDf)
.collect()
}
assert(caught.getMessage.contains("Not enough data."))
}

override def testSerialization(): Unit = {
println("ignore the Serialization Fuzzing test because fitting process takes more than 3 minutes")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ object FileUtilities {
val CREATE = S.CREATE
}

def recursiveListFiles(f: File): Array[File] = {
val these = f.listFiles()
these ++ these
.filter(_.isDirectory)
.flatMap(recursiveListFiles)
.filter(!_.isDirectory)
}

def allFiles(dir: File, pred: (File => Boolean) = null): Array[File] = {
def loop(dir: File): Array[File] = {
val (dirs, files) = dir.listFiles.sorted.partition(_.isDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.microsoft.azure.synapse.ml.nbtest

import com.microsoft.azure.synapse.ml.core.test.base.TestBase
import DatabricksUtilities._
import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities._

import java.util.concurrent.TimeUnit
import scala.collection.mutable
Expand All @@ -15,57 +15,67 @@ import scala.language.existentials
/** Tests to validate fuzzing of modules. */
class DatabricksTests extends TestBase {

test("Databricks Notebooks") {
val clusterId = createClusterInPool(ClusterName, PoolId)
val jobIdsToCancel = mutable.ListBuffer[Int]()
try {
println("Checking if cluster is active")
tryWithRetries(Seq.fill(60 * 15)(1000).toArray) { () =>
assert(isClusterActive(clusterId))
}
println("Installing libraries")
installLibraries(clusterId)
tryWithRetries(Seq.fill(60 * 3)(1000).toArray) { () =>
assert(isClusterActive(clusterId))
}
println(s"Creating folder $Folder")
workspaceMkDir(Folder)

println(s"Submitting jobs")
val parJobIds = ParallizableNotebooks.map(uploadAndSubmitNotebook(clusterId, _))
parJobIds.foreach(jobIdsToCancel.append(_))

println(s"Submitted ${parJobIds.length} for execution: ${parJobIds.toList}")

println(s"Monitoring Parallel Jobs...")
val monitors = parJobIds.map((runId: Int) => monitorJob(runId, TimeoutInMillis, logLevel = 2))

println(s"Awaiting parallelizable jobs...")
val parFailures = monitors
.map(Await.ready(_, Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get)
.filter(_.isFailure)

println(s"Submitting nonparallelizable job...")
val nonParFailutes = NonParallizableNotebooks.toIterator.map { nb =>
val jid = uploadAndSubmitNotebook(clusterId, nb)
jobIdsToCancel.append(jid)
val monitor = monitorJob(jid, TimeoutInMillis, logLevel = 2)
Await.ready(monitor, Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get
}.filter(_.isFailure).toArray

assert(parFailures.isEmpty && nonParFailutes.isEmpty)
} finally {
jobIdsToCancel.foreach { jid =>
println(s"Cancelling job $jid")
cancelRun(jid)
}
deleteCluster(clusterId)
val clusterId: String = createClusterInPool(ClusterName, PoolId)
val jobIdsToCancel: mutable.ListBuffer[Int] = mutable.ListBuffer[Int]()

println("Checking if cluster is active")
tryWithRetries(Seq.fill(60 * 15)(1000).toArray) { () =>
assert(isClusterActive(clusterId))
}
println("Installing libraries")
installLibraries(clusterId)
tryWithRetries(Seq.fill(60 * 3)(1000).toArray) { () =>
assert(isClusterActive(clusterId))
}
println(s"Creating folder $Folder")
workspaceMkDir(Folder)

println(s"Submitting jobs")
val parNotebookRuns: Seq[DatabricksNotebookRun] = ParallelizableNotebooks.map(uploadAndSubmitNotebook(clusterId, _))
parNotebookRuns.foreach(notebookRun => jobIdsToCancel.append(notebookRun.runId))

println(s"Submitted ${parNotebookRuns.length} for execution: ${parNotebookRuns.map(_.runId).toList}")

assert(parNotebookRuns.nonEmpty)

parNotebookRuns.foreach(run => {
println(s"Testing ${run.notebookName}")

test(run.notebookName) {
val result = Await.ready(
run.monitor(logLevel = 0),
Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get

assert(result.isSuccess)
}
})

println(s"Submitting nonparallelizable job...")
NonParallelizableNotebooks.toIterator.foreach(notebook => {
val run: DatabricksNotebookRun = uploadAndSubmitNotebook(clusterId, notebook)
jobIdsToCancel.append(run.runId)

test(run.notebookName) {
val result = Await.ready(
run.monitor(logLevel = 0),
Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get

assert(result.isSuccess)
}
})

protected override def afterAll(): Unit = {
println("Suite DatabricksTests finished. Running afterAll procedure...")
jobIdsToCancel.foreach(cancelRun)

deleteCluster(clusterId)
println(s"Deleted cluster with Id $clusterId.")

super.afterAll()
}

ignore("list running jobs for convenievce") {
val obj = databricksGet("jobs/runs/list?active_only=true&limit=1000")
println(obj)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import SprayImplicits._
import com.microsoft.azure.synapse.ml.Secrets
import com.microsoft.azure.synapse.ml.build.BuildInfo
import com.microsoft.azure.synapse.ml.core.env.StreamUtilities._
import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities.{TimeoutInMillis, monitorJob}
import org.apache.commons.io.IOUtils
import org.apache.http.client.methods.{HttpGet, HttpPost}
import org.apache.http.entity.StringEntity
Expand Down Expand Up @@ -63,31 +64,27 @@ object DatabricksUtilities extends HasHttpClient {
// Execution Params
val TimeoutInMillis: Int = 40 * 60 * 1000

def recursiveListFiles(f: File): Array[File] = {
val files = f.listFiles()
files.filter(_.isFile) ++ files.filter(_.isDirectory).flatMap(recursiveListFiles)
}
val NotebookFiles: Array[File] = FileUtilities.recursiveListFiles(
FileUtilities.join(
BuildInfo.baseDirectory.getParent, "notebooks").getCanonicalFile)

val NotebookFiles: Array[File] = recursiveListFiles(FileUtilities.join(BuildInfo.baseDirectory.getParent,
"notebooks").getCanonicalFile)
val ParallelizableNotebooks: Seq[File] = NotebookFiles.filterNot(_.isDirectory)
// filter out geospatialServices cuz ADB's 9.1 Runtime doesn't support sending requests to them
.filterNot(_.getName.contains("GeospatialServices"))

val ParallizableNotebooks: Seq[File] = NotebookFiles

val NonParallizableNotebooks: Seq[File] = Nil
val NonParallelizableNotebooks: Seq[File] = Nil

def retry[T](backoffs: List[Int], f: () => T): T = {
def retry[T](backoffList: List[Int], f: () => T): T = {
try {
f()
} catch {
case t: Throwable =>
val waitTime = backoffs.headOption.getOrElse(throw t)
val waitTime = backoffList.headOption.getOrElse(throw t)
println(s"Caught error: $t with message ${t.getMessage}, waiting for $waitTime")
blocking {
Thread.sleep(waitTime.toLong)
}
retry(backoffs.tail, f)
retry(backoffList.tail, f)
}
}

Expand Down Expand Up @@ -217,7 +214,7 @@ object DatabricksUtilities extends HasHttpClient {
()
}

def submitRun(clusterId: String, notebookPath: String, timeout: Int = 10 * 60): Int = {
def submitRun(clusterId: String, notebookPath: String): Int = {
val body =
s"""
|{
Expand Down Expand Up @@ -308,9 +305,15 @@ object DatabricksUtilities extends HasHttpClient {
}(ExecutionContext.global)
}

def uploadAndSubmitNotebook(clusterId: String, notebookFile: File): Int = {
uploadNotebook(notebookFile, Folder + "/" + notebookFile.getName)
submitRun(clusterId, Folder + "/" + notebookFile.getName)
def uploadAndSubmitNotebook(clusterId: String, notebookFile: File): DatabricksNotebookRun = {
val destination: String = Folder + "/" + notebookFile.getName
uploadNotebook(notebookFile, destination)
val runId: Int = submitRun(clusterId, destination)
val run: DatabricksNotebookRun = DatabricksNotebookRun(runId, notebookFile.getName)

println(s"Successfully submitted job run id ${run.runId} for notebook ${run.notebookName}")

run
}

def cancelRun(runId: Int): Unit = {
Expand Down Expand Up @@ -357,5 +360,10 @@ object DatabricksUtilities extends HasHttpClient {
def cancelAllJobs(clusterId: String): Unit = {
listActiveJobs(clusterId).foreach(cancelRun)
}
}

case class DatabricksNotebookRun(runId: Int, notebookName: String) {
def monitor(logLevel: Int = 2): Future[Any] = {
monitorJob(runId, TimeoutInMillis, logLevel)
}
}
Loading

0 comments on commit d1b5151

Please sign in to comment.