From d1b515170a999df73b6278eae8ea3e83681b478e Mon Sep 17 00:00:00 2001 From: Serena Ruan <82044803+serena-ruan@users.noreply.github.com> Date: Sat, 12 Feb 2022 10:50:38 +0800 Subject: [PATCH] feat: merge SynapseE2E test into Spark3.1 branch and test (#1382) * revert: revert changes of spark 3.2 * fix: change azure-ai-textanalytics dependency to shaded jar and rename namespace to make it compatible with spark 3.1 * allow branch spark3.1 to trigger pipeline * fix shaded jar * fix fasterxml by adding it ahead of coreDependencies * fix io.netty issue * fix io.netty issue * fix databricks conflicts * fix libraries syntax * exclude io.netty:netty-tcnative-boringssl-static * update adbRuntime * exclude org.antlr while installing libraries on dbx clusters * fix adbruntime * fix adbruntime * fix adb runtime * fix adb submit job error * ignore geospatialServices notebooks for adb because adb 9.1 runtime doesn"t support sending http requests to them * fix: Make SynapseE2E Tests work now with Spark 3.2 (#1362) * Trying to use only pool with Spark 3.2 * Updating install instructions for synapse to use 0.9.4 * Changing syntax to grab ipynb files * Line breaking to comply with styling * Changing ipynb filter for windows * Fixing string new line syntax * Improvements to SynapseTests * Adding more spark pools 3.2 * Adjusting list tests not to assert * Improving dev doc, livyPayLoad * Changing SynapseWS to mmlsparkppe * Changing synapse URL to dogfood * Removing dogfood from token acquisition * Fixing exludes syntax * Adding 2 more Apache Spark Pools * Improving the developer docs * Adjusting identation on developer-readme * Bumping Synapse test timeout to 40 min * Applying PR feedback Co-authored-by: Serena Ruan <82044803+serena-ruan@users.noreply.github.com> * change to spark3.1 pools * add more spark pools * Show detailed response of livy * Update url cuz spark3.1 is in prod already * Update SynapseTests.scala * Update SynapseUtilities.scala * fix: remove concurrency parameter for MVAD (#1383) * remove concurrency parameter for MVAD * fix: fix node-fetch version security & error in MVAD sample Co-authored-by: Mark Hamilton * fix: expose response error out for better debugging if the error is returned by http directly (#1391) * merge `turn synapse tests into multiple test` Co-authored-by: Ric Serradas Co-authored-by: Mark Hamilton --- .../MultivariateAnomalyDetection.scala | 24 +++- .../MultivariateAnomalyDetectorSchemas.scala | 2 + .../MultivariateAnamolyDetectionSuite.scala | 35 +++++- .../synapse/ml/core/env/FileUtilities.scala | 8 ++ .../synapse/ml/nbtest/DatabricksTests.scala | 104 +++++++++-------- .../ml/nbtest/DatabricksUtilities.scala | 40 ++++--- .../synapse/ml/nbtest/SynapseTests.scala | 109 +++++++++--------- .../synapse/ml/nbtest/SynapseUtilities.scala | 91 +++++++++------ pipeline.yaml | 1 + .../estimators/cognitive/_MAD.md | 6 +- website/docs/reference/developer-readme.md | 33 ++++-- website/package.json | 3 +- .../estimators/cognitive/_MAD.md | 6 +- website/yarn.lock | 28 ++++- 14 files changed, 306 insertions(+), 184 deletions(-) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetection.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetection.scala index ba326a8eb3..9437115719 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetection.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetection.scala @@ -115,7 +115,7 @@ trait MADHttpRequest extends HasURL with HasSubscriptionKey with HasAsyncReply { trait MADBase extends HasOutputCol with MADHttpRequest with HasSetLocation with HasInputCols - with ConcurrencyParams with ComplexParamsWritable with Wrappable + with ComplexParamsWritable with Wrappable with HasSubscriptionKey with HasErrorCol with BasicLogging { val startTime = new Param[String](this, "startTime", "A required field, start time" + @@ -400,8 +400,14 @@ class FitMultivariateAnomaly(override val uid: String) extends Estimator[DetectM val responseDict = IOUtils.toString(response.entity.get.content, "UTF-8") .parseJson.asJsObject.fields - this.setDiagnosticsInfo(responseDict("modelInfo").asJsObject.fields - .get("diagnosticsInfo").map(_.convertTo[DiagnosticsInfo]).get) + val modelInfoFields = responseDict("modelInfo").asJsObject.fields + + if (modelInfoFields.get("status").get.asInstanceOf[JsString].value == "FAILED") { + val errors = modelInfoFields.get("errors").map(_.convertTo[Seq[DMAError]]).get.toJson.compactPrint + throw new RuntimeException(s"Caught errors during fitting: $errors") + } + + this.setDiagnosticsInfo(modelInfoFields.get("diagnosticsInfo").map(_.convertTo[DiagnosticsInfo]).get) val simpleDetectMultivariateAnomaly = new DetectMultivariateAnomaly() .setSubscriptionKey(getSubscriptionKey) @@ -471,10 +477,18 @@ class DetectMultivariateAnomaly(override val uid: String) extends Model[DetectMu val response = handlingFunc(Client, request) val responseJson = IOUtils.toString(response.entity.get.content, "UTF-8") - .parseJson.asJsObject.fields("results").toString() + .parseJson.asJsObject.fields + + val summary = responseJson.get("summary").map(_.convertTo[DMASummary]).get + if (summary.status == "FAILED") { + val errors = summary.errors.get.toJson.compactPrint + throw new RuntimeException(s"Caught errors during inference: $errors") + } + + val results = responseJson.get("results").get.toString() val outputDF = df.sparkSession.read - .json(df.sparkSession.createDataset(Seq(responseJson))(Encoders.STRING)) + .json(df.sparkSession.createDataset(Seq(results))(Encoders.STRING)) .toDF() .sort(col("timestamp").asc) .withColumnRenamed("timestamp", "resultTimestamp") diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetectorSchemas.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetectorSchemas.scala index 43ec9d4634..9b48c4d30d 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetectorSchemas.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/cognitive/MultivariateAnomalyDetectorSchemas.scala @@ -95,4 +95,6 @@ object MADJsonProtocol extends DefaultJsonProtocol { implicit val DMAContributorEnc: RootJsonFormat[DMAContributor] = jsonFormat2(DMAContributor.apply) implicit val DMAValueEnc: RootJsonFormat[DMAValue] = jsonFormat4(DMAValue.apply) implicit val DMAResEnc: RootJsonFormat[DMAResult] = jsonFormat3(DMAResult.apply) + implicit val DMASetupInfoEnc: RootJsonFormat[DMASetupInfo] = jsonFormat3(DMASetupInfo.apply) + implicit val DMASummaryEnc: RootJsonFormat[DMASummary] = jsonFormat4(DMASummary.apply) } diff --git a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/cognitive/split4/MultivariateAnamolyDetectionSuite.scala b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/cognitive/split4/MultivariateAnamolyDetectionSuite.scala index 5bcf11ef65..03eeb2d00c 100644 --- a/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/cognitive/split4/MultivariateAnamolyDetectionSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/azure/synapse/ml/cognitive/split4/MultivariateAnamolyDetectionSuite.scala @@ -128,7 +128,6 @@ class FitMultivariateAnomalySuite extends EstimatorFuzzing[FitMultivariateAnomal .setIntermediateSaveDir(intermediateSaveDir) .setTimestampCol(timestampColumn) .setInputCols(inputColumns) - .setConcurrency(5) test("SimpleMultiAnomalyEstimator basic usage with connectionString") { @@ -205,6 +204,40 @@ class FitMultivariateAnomalySuite extends EstimatorFuzzing[FitMultivariateAnomal "or {storageName, storageKey, endpoint, sasToken, containerName} in order to access the blob container")) } + test("Expose correct error message during fitting") { + val caught = intercept[RuntimeException] { + val testDf = df.limit(50) + val smae = simpleMultiAnomalyEstimator + .setSlidingWindow(200) + .setConnectionString(connectionString) + smae.fit(testDf) + } + assert(caught.getMessage.contains("Not enough data.")) + } + + test("Expose correct error message during inference") { + val caught = intercept[RuntimeException] { + val testDf = df.limit(50) + val smae = simpleMultiAnomalyEstimator + .setSlidingWindow(200) + .setConnectionString(connectionString) + val model = smae.fit(df) + modelIdList ++= Seq(model.getModelId) + smae.cleanUpIntermediateData() + val diagnosticsInfo = smae.getDiagnosticsInfo.get + assert(diagnosticsInfo.variableStates.get.length.equals(3)) + + model.setStartTime(startTime) + .setEndTime(endTime) + .setOutputCol("result") + .setTimestampCol(timestampColumn) + .setInputCols(inputColumns) + .transform(testDf) + .collect() + } + assert(caught.getMessage.contains("Not enough data.")) + } + override def testSerialization(): Unit = { println("ignore the Serialization Fuzzing test because fitting process takes more than 3 minutes") } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/FileUtilities.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/FileUtilities.scala index f2b2907f9e..c59c133b3e 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/FileUtilities.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/FileUtilities.scala @@ -26,6 +26,14 @@ object FileUtilities { val CREATE = S.CREATE } + def recursiveListFiles(f: File): Array[File] = { + val these = f.listFiles() + these ++ these + .filter(_.isDirectory) + .flatMap(recursiveListFiles) + .filter(!_.isDirectory) + } + def allFiles(dir: File, pred: (File => Boolean) = null): Array[File] = { def loop(dir: File): Array[File] = { val (dirs, files) = dir.listFiles.sorted.partition(_.isDirectory) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala index 63c88ac131..f8093bdcc7 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala @@ -4,7 +4,7 @@ package com.microsoft.azure.synapse.ml.nbtest import com.microsoft.azure.synapse.ml.core.test.base.TestBase -import DatabricksUtilities._ +import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities._ import java.util.concurrent.TimeUnit import scala.collection.mutable @@ -15,57 +15,67 @@ import scala.language.existentials /** Tests to validate fuzzing of modules. */ class DatabricksTests extends TestBase { - test("Databricks Notebooks") { - val clusterId = createClusterInPool(ClusterName, PoolId) - val jobIdsToCancel = mutable.ListBuffer[Int]() - try { - println("Checking if cluster is active") - tryWithRetries(Seq.fill(60 * 15)(1000).toArray) { () => - assert(isClusterActive(clusterId)) - } - println("Installing libraries") - installLibraries(clusterId) - tryWithRetries(Seq.fill(60 * 3)(1000).toArray) { () => - assert(isClusterActive(clusterId)) - } - println(s"Creating folder $Folder") - workspaceMkDir(Folder) - - println(s"Submitting jobs") - val parJobIds = ParallizableNotebooks.map(uploadAndSubmitNotebook(clusterId, _)) - parJobIds.foreach(jobIdsToCancel.append(_)) - - println(s"Submitted ${parJobIds.length} for execution: ${parJobIds.toList}") - - println(s"Monitoring Parallel Jobs...") - val monitors = parJobIds.map((runId: Int) => monitorJob(runId, TimeoutInMillis, logLevel = 2)) - - println(s"Awaiting parallelizable jobs...") - val parFailures = monitors - .map(Await.ready(_, Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get) - .filter(_.isFailure) - - println(s"Submitting nonparallelizable job...") - val nonParFailutes = NonParallizableNotebooks.toIterator.map { nb => - val jid = uploadAndSubmitNotebook(clusterId, nb) - jobIdsToCancel.append(jid) - val monitor = monitorJob(jid, TimeoutInMillis, logLevel = 2) - Await.ready(monitor, Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get - }.filter(_.isFailure).toArray - - assert(parFailures.isEmpty && nonParFailutes.isEmpty) - } finally { - jobIdsToCancel.foreach { jid => - println(s"Cancelling job $jid") - cancelRun(jid) - } - deleteCluster(clusterId) + val clusterId: String = createClusterInPool(ClusterName, PoolId) + val jobIdsToCancel: mutable.ListBuffer[Int] = mutable.ListBuffer[Int]() + + println("Checking if cluster is active") + tryWithRetries(Seq.fill(60 * 15)(1000).toArray) { () => + assert(isClusterActive(clusterId)) + } + println("Installing libraries") + installLibraries(clusterId) + tryWithRetries(Seq.fill(60 * 3)(1000).toArray) { () => + assert(isClusterActive(clusterId)) + } + println(s"Creating folder $Folder") + workspaceMkDir(Folder) + + println(s"Submitting jobs") + val parNotebookRuns: Seq[DatabricksNotebookRun] = ParallelizableNotebooks.map(uploadAndSubmitNotebook(clusterId, _)) + parNotebookRuns.foreach(notebookRun => jobIdsToCancel.append(notebookRun.runId)) + + println(s"Submitted ${parNotebookRuns.length} for execution: ${parNotebookRuns.map(_.runId).toList}") + + assert(parNotebookRuns.nonEmpty) + + parNotebookRuns.foreach(run => { + println(s"Testing ${run.notebookName}") + + test(run.notebookName) { + val result = Await.ready( + run.monitor(logLevel = 0), + Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get + + assert(result.isSuccess) } + }) + + println(s"Submitting nonparallelizable job...") + NonParallelizableNotebooks.toIterator.foreach(notebook => { + val run: DatabricksNotebookRun = uploadAndSubmitNotebook(clusterId, notebook) + jobIdsToCancel.append(run.runId) + + test(run.notebookName) { + val result = Await.ready( + run.monitor(logLevel = 0), + Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get + + assert(result.isSuccess) + } + }) + + protected override def afterAll(): Unit = { + println("Suite DatabricksTests finished. Running afterAll procedure...") + jobIdsToCancel.foreach(cancelRun) + + deleteCluster(clusterId) + println(s"Deleted cluster with Id $clusterId.") + + super.afterAll() } ignore("list running jobs for convenievce") { val obj = databricksGet("jobs/runs/list?active_only=true&limit=1000") println(obj) } - } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala index ef845aeafd..ac1b0e9872 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala @@ -13,6 +13,7 @@ import SprayImplicits._ import com.microsoft.azure.synapse.ml.Secrets import com.microsoft.azure.synapse.ml.build.BuildInfo import com.microsoft.azure.synapse.ml.core.env.StreamUtilities._ +import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities.{TimeoutInMillis, monitorJob} import org.apache.commons.io.IOUtils import org.apache.http.client.methods.{HttpGet, HttpPost} import org.apache.http.entity.StringEntity @@ -63,31 +64,27 @@ object DatabricksUtilities extends HasHttpClient { // Execution Params val TimeoutInMillis: Int = 40 * 60 * 1000 - def recursiveListFiles(f: File): Array[File] = { - val files = f.listFiles() - files.filter(_.isFile) ++ files.filter(_.isDirectory).flatMap(recursiveListFiles) - } + val NotebookFiles: Array[File] = FileUtilities.recursiveListFiles( + FileUtilities.join( + BuildInfo.baseDirectory.getParent, "notebooks").getCanonicalFile) - val NotebookFiles: Array[File] = recursiveListFiles(FileUtilities.join(BuildInfo.baseDirectory.getParent, - "notebooks").getCanonicalFile) + val ParallelizableNotebooks: Seq[File] = NotebookFiles.filterNot(_.isDirectory) // filter out geospatialServices cuz ADB's 9.1 Runtime doesn't support sending requests to them .filterNot(_.getName.contains("GeospatialServices")) - val ParallizableNotebooks: Seq[File] = NotebookFiles - - val NonParallizableNotebooks: Seq[File] = Nil + val NonParallelizableNotebooks: Seq[File] = Nil - def retry[T](backoffs: List[Int], f: () => T): T = { + def retry[T](backoffList: List[Int], f: () => T): T = { try { f() } catch { case t: Throwable => - val waitTime = backoffs.headOption.getOrElse(throw t) + val waitTime = backoffList.headOption.getOrElse(throw t) println(s"Caught error: $t with message ${t.getMessage}, waiting for $waitTime") blocking { Thread.sleep(waitTime.toLong) } - retry(backoffs.tail, f) + retry(backoffList.tail, f) } } @@ -217,7 +214,7 @@ object DatabricksUtilities extends HasHttpClient { () } - def submitRun(clusterId: String, notebookPath: String, timeout: Int = 10 * 60): Int = { + def submitRun(clusterId: String, notebookPath: String): Int = { val body = s""" |{ @@ -308,9 +305,15 @@ object DatabricksUtilities extends HasHttpClient { }(ExecutionContext.global) } - def uploadAndSubmitNotebook(clusterId: String, notebookFile: File): Int = { - uploadNotebook(notebookFile, Folder + "/" + notebookFile.getName) - submitRun(clusterId, Folder + "/" + notebookFile.getName) + def uploadAndSubmitNotebook(clusterId: String, notebookFile: File): DatabricksNotebookRun = { + val destination: String = Folder + "/" + notebookFile.getName + uploadNotebook(notebookFile, destination) + val runId: Int = submitRun(clusterId, destination) + val run: DatabricksNotebookRun = DatabricksNotebookRun(runId, notebookFile.getName) + + println(s"Successfully submitted job run id ${run.runId} for notebook ${run.notebookName}") + + run } def cancelRun(runId: Int): Unit = { @@ -357,5 +360,10 @@ object DatabricksUtilities extends HasHttpClient { def cancelAllJobs(clusterId: String): Unit = { listActiveJobs(clusterId).foreach(cancelRun) } +} +case class DatabricksNotebookRun(runId: Int, notebookName: String) { + def monitor(logLevel: Int = 2): Future[Any] = { + monitorJob(runId, TimeoutInMillis, logLevel) + } } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala index 54009bfdf5..47cccb5ef9 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseTests.scala @@ -4,78 +4,75 @@ package com.microsoft.azure.synapse.ml.nbtest import com.microsoft.azure.synapse.ml.core.test.base.TestBase -import SynapseUtilities.exec +import com.microsoft.azure.synapse.ml.nbtest.SynapseUtilities.exec import java.io.File +import java.nio.file.{Path, Paths} import java.util.concurrent.TimeUnit +import scala.concurrent.Await import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.existentials import scala.sys.process.Process /** Tests to validate fuzzing of modules. */ class SynapseTests extends TestBase { + val os: String = sys.props("os.name").toLowerCase + os match { + case x if x contains "windows" => + exec("conda activate synapseml " + + "&& jupyter nbconvert --to script .\\notebooks\\features\\**\\*.ipynb") + case _ => + Process(s"conda init bash; conda activate synapseml; " + + "jupyter nbconvert --to script ./notebooks/features/**/*.ipynb") + } - ignore("Synapse") { + SynapseUtilities.listPythonFiles().map(f => { + val newPath = f + .replace(" ", "") + .replace("-", "") + new File(f).renameTo(new File(newPath)) + }) - val os = sys.props("os.name").toLowerCase - os match { - case x if x contains "windows" => - exec("conda activate synapseml && jupyter nbconvert --to script .\\notebooks\\*.ipynb") - case _ => - Process(s"conda init bash; conda activate synapseml; jupyter nbconvert --to script ./notebooks/*.ipynb") - } + val workspaceName = "mmlsparkppe" + val sparkPools: Array[String] = Array( + "e2etstspark31i1", + "e2etstspark31i2", + "e2etstspark31i3", + "e2etstspark31i4", + "e2etstspark31i5") - SynapseUtilities.listPythonFiles().map(f => { - val newPath = f - .replace(" ", "") - .replace("-", "") - new File(f).renameTo(new File(newPath)) - }) + SynapseUtilities.listPythonJobFiles() + .filterNot(_.contains(" ")) + .filterNot(_.contains("-")) + .foreach(file => { + val poolName = SynapseUtilities.monitorPool(workspaceName, sparkPools) + val livyUrl = "https://" + + workspaceName + + ".dev.azuresynapse-dogfood.net/livyApi/versions/2019-11-01-preview/sparkPools/" + + poolName + + "/batches" + val livyBatch: LivyBatch = SynapseUtilities.uploadAndSubmitNotebook(livyUrl, file) + val path: Path = Paths.get(file) + val fileName: String = path.getFileName.toString - val workspaceName = "mmlspark" - val sparkPools = Array("buildpool", "buildpool2", "buildpool3") + println(s"submitted livy job: ${livyBatch.id} for file $fileName to sparkPool: $poolName") - val livyBatchJobs = SynapseUtilities.listPythonJobFiles() - .filterNot(_.contains(" ")) - .filterNot(_.contains("-")) - .map(f => { - val poolName = SynapseUtilities.monitorPool(workspaceName, sparkPools) - val livyUrl = "https://" + - workspaceName + - ".dev.azuresynapse.net/livyApi/versions/2019-11-01-preview/sparkPools/" + - poolName + - "/batches" - val livyBatch: LivyBatch = SynapseUtilities.uploadAndSubmitNotebook(livyUrl, f) - println(s"submitted livy job: ${livyBatch.id} to sparkPool: $poolName") - LivyBatchJob(livyBatch, poolName, livyUrl) - }) - .filterNot(_.livyBatch.state == "none") + val livyBatchJob: LivyBatchJob = LivyBatchJob(livyBatch, poolName, livyUrl) - try { - val batchFutures: Array[Future[Any]] = livyBatchJobs.map((batchJob: LivyBatchJob) => { - Future { - val batch = batchJob.livyBatch - val livyUrl = batchJob.livyUrl + test(fileName) { + try { + val result = Await.ready( + livyBatchJob.monitor(), + Duration(SynapseUtilities.TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get - if (batch.state != "success") { - SynapseUtilities.retry(batch.id, livyUrl, SynapseUtilities.TimeoutInMillis, System.currentTimeMillis()) - } - }(ExecutionContext.global) - }) + assert(result.isSuccess) + } catch { + case t: Throwable => + println(s"Cancelling job ${livyBatchJob.livyBatch.id} for file $fileName") + SynapseUtilities.cancelRun(livyBatchJob.livyUrl, livyBatchJob.livyBatch.id) - val failures = batchFutures - .map(f => Await.ready(f, Duration(SynapseUtilities.TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get) - .filter(f => f.isFailure) - assert(failures.isEmpty) - } - catch { - case t: Throwable => - livyBatchJobs.foreach { batchJob => - println(s"Cancelling job ${batchJob.livyBatch.id}") - SynapseUtilities.cancelRun(batchJob.livyUrl, batchJob.livyBatch.id) + throw t } - throw t - } - } + } + }) } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala index 580435161f..153d4ba535 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala @@ -23,7 +23,7 @@ import spray.json._ import java.io.{File, InputStream} import java.util import scala.annotation.tailrec -import scala.concurrent.{TimeoutException, blocking} +import scala.concurrent.{ExecutionContext, Future, TimeoutException, blocking} import scala.io.Source import scala.language.postfixOps import scala.sys.process._ @@ -36,7 +36,19 @@ case class LivyBatch(id: Int, case class LivyBatchJob(livyBatch: LivyBatch, sparkPool: String, - livyUrl: String) + livyUrl: String) { + def monitor(): Future[Any] = { + Future { + if(livyBatch.state != "success") { + SynapseUtilities.retry( + livyBatch.id, + livyUrl, + SynapseUtilities.TimeoutInMillis, + System.currentTimeMillis()) + } + }(ExecutionContext.global) + } +} case class Application(state: String, name: String, @@ -51,18 +63,19 @@ object SynapseUtilities extends HasHttpClient { lazy val Token: String = getSynapseToken val Folder = s"build_${BuildInfo.version}/scripts" - val TimeoutInMillis: Int = 20 * 60 * 1000 + val TimeoutInMillis: Int = 30 * 60 * 1000 // 30 minutes val StorageAccount: String = "mmlsparkeuap" - val StorageContainer: String = "synapse" + val StorageContainer: String = "mmlsparkppefs" val TenantId: String = "72f988bf-86f1-41af-91ab-2d7cd011db47" val ClientId: String = "85dde348-dd2b-43e5-9f5a-22262af45332" def listPythonFiles(): Array[String] = { - Option( - FileUtilities - .join(BuildInfo.baseDirectory.getParent, "notebooks") + Option({ + val rootDirectory = FileUtilities + .join(BuildInfo.baseDirectory.getParent, "notebooks/features") .getCanonicalFile - .listFiles() + + FileUtilities.recursiveListFiles(rootDirectory) .filter(_.getAbsolutePath.endsWith(".py")) .filter(_.getAbsolutePath.contains("-")) .filterNot(_.getAbsolutePath.contains("CyberML")) @@ -73,38 +86,43 @@ object SynapseUtilities extends HasHttpClient { .filterNot(_.getAbsolutePath.contains("Overview")) .filterNot(_.getAbsolutePath.contains("ModelInterpretation")) .filterNot(_.getAbsolutePath.contains("Interpretability")) - .map(file => file.getAbsolutePath)) - .get - .sorted + .map(file => file.getAbsolutePath) + }) + .get + .sorted } def listPythonJobFiles(): Array[String] = { - Option( - FileUtilities - .join(BuildInfo.baseDirectory.getParent, "notebooks") - .getCanonicalFile - .listFiles() - .filter(_.getAbsolutePath.endsWith(".py")) - .filterNot(_.getAbsolutePath.contains("-")) - .filterNot(_.getAbsolutePath.contains(" ")) - .map(file => file.getAbsolutePath)) - .get - .sorted + Option({ + val rootDirectory = FileUtilities + .join(BuildInfo.baseDirectory.getParent, "notebooks/features") + .getCanonicalFile + + FileUtilities.recursiveListFiles(rootDirectory) + .filter(_.getAbsolutePath.endsWith(".py")) + .filterNot(_.getAbsolutePath.contains("-")) + .filterNot(_.getAbsolutePath.contains(" ")) + .map(file => file.getAbsolutePath) + }) + .get + .sorted } def listNoteBookFiles(): Array[String] = { - Option( - FileUtilities - .join(BuildInfo.baseDirectory.getParent, "notebooks") + Option({ + val rootDirectory = FileUtilities + .join(BuildInfo.baseDirectory.getParent, "notebooks/features") .getCanonicalFile - .listFiles() + + FileUtilities.recursiveListFiles(rootDirectory) .filter(_.getAbsolutePath.endsWith(".ipynb")) - .map(file => file.getAbsolutePath)) - .get - .sorted + .map(file => file.getAbsolutePath) + }) + .get + .sorted } - def postMortem(batch: LivyBatch, livyUrl: String): LivyBatch = { + def postMortem(batch: LivyBatch): LivyBatch = { batch.log.foreach(println) write(batch) batch @@ -122,7 +140,7 @@ object SynapseUtilities extends HasHttpClient { def showSubmittingJobs(workspaceName: String, poolName: String): Applications = { val uri: String = "https://" + - s"$workspaceName.dev.azuresynapse.net" + + s"$workspaceName.dev.azuresynapse-dogfood.net" + "/monitoring/workloadTypes/spark/applications" + "?api-version=2020-10-01-preview" + "&filter=(((state%20eq%20%27Queued%27)%20or%20(state%20eq%20%27Submitting%27))" + @@ -152,7 +170,7 @@ object SynapseUtilities extends HasHttpClient { readyPool } else { - println(s"None spark pool is ready to submit job, waiting 10s") + println(s"No spark pool is ready to submit a new job, waiting 10s") blocking { Thread.sleep(10000) } @@ -171,11 +189,11 @@ object SynapseUtilities extends HasHttpClient { throw new TimeoutException(s"Job $id timed out.") } else if (batch.state == "dead") { - postMortem(batch, livyUrl) + postMortem(batch) throw new RuntimeException(s"Dead") } else if (batch.state == "error") { - postMortem(batch, livyUrl) + postMortem(batch) throw new RuntimeException(s"Error") } else { @@ -243,7 +261,8 @@ object SynapseUtilities extends HasHttpClient { val excludes: String = "org.scala-lang:scala-reflect," + "org.apache.spark:spark-tags_2.12," + "org.scalactic:scalactic_2.12," + - "org.scalatest:scalatest_2.12" + "org.scalatest:scalatest_2.12," + + "org.slf4j:slf4j-api" val livyPayload: String = s""" @@ -257,7 +276,7 @@ object SynapseUtilities extends HasHttpClient { | "numExecutors" : 2, | "conf" : | { - | "spark.jars.packages" : "com.microsoft.azure:synapseml:${BuildInfo.version}", + | "spark.jars.packages" : "com.microsoft.azure:synapseml_2.12:${BuildInfo.version}", | "spark.jars.repositories" : "https://mmlspark.azureedge.net/maven", | "spark.jars.excludes": "$excludes", | "spark.driver.userClassPathFirst": "true", diff --git a/pipeline.yaml b/pipeline.yaml index e8fc5118cc..d526fec191 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -2,6 +2,7 @@ resources: - repo: self trigger: + batch: true branches: include: - master diff --git a/website/docs/documentation/estimators/cognitive/_MAD.md b/website/docs/documentation/estimators/cognitive/_MAD.md index be792f7fdb..a4b2860411 100644 --- a/website/docs/documentation/estimators/cognitive/_MAD.md +++ b/website/docs/documentation/estimators/cognitive/_MAD.md @@ -40,8 +40,7 @@ fitMultivariateAnomaly = (FitMultivariateAnomaly() .setTimestampCol(timestampColumn) .setInputCols(inputColumns) .setSlidingWindow(200) - .setConnectionString(connectionString) - .setConcurrency(5)) + .setConnectionString(connectionString)) # uncomment below for fitting your own dataframe # model = fitMultivariateAnomaly.fit(df) @@ -74,8 +73,7 @@ val fitMultivariateAnomaly = (new FitMultivariateAnomaly() .setTimestampCol(timestampColumn) .setInputCols(inputColumns) .setSlidingWindow(200) - .setConnectionString(connectionString) - .setConcurrency(5)) + .setConnectionString(connectionString)) val df = (spark.read.format("csv") .option("header", True) diff --git a/website/docs/reference/developer-readme.md b/website/docs/reference/developer-readme.md index d711c04b5f..cd9624b645 100644 --- a/website/docs/reference/developer-readme.md +++ b/website/docs/reference/developer-readme.md @@ -8,20 +8,33 @@ description: SynapseML Development Setup # SynapseML Development Setup 1) [Install SBT](https://www.scala-sbt.org/1.x/docs/Setup.html) - - Make sure to download JDK 11 if you don't have it -3) Fork the repository on github - - This is required if you would like to make PRs. If you choose the fork option, replace the clone link below with that of your fork. -2) Git Clone your fork, or the repo directly - - `git clone https://github.com/Microsoft/SynapseML.git` - - NOTE: If you would like to contribute to synapseml regularly, add your fork as a remote named ``origin`` and Microsoft/SynapseML as a remote named ``upstream`` -3) Run sbt to compile and grab datasets + - Make sure to download [JDK 11](https://www.oracle.com/java/technologies/javase/jdk11-archive-downloads.html) if you don't have it +2) Fork the repository on github + - See how to here: [Fork a repo - GitHub Docs](https://docs.github.com/en/get-started/quickstart/fork-a-repo) +3) Clone your fork + - `git clone https://github.com//SynapseML.git` + - This will automatically add your fork as the default remote, called `origin` +4) Add another Git Remote to track the original SynapseML repo. It's recommended to call it `upstream`: + - `git remote add upstream https://github.com/microsoft/SynapseML.git` + - See more about Git remotes here: [Git - Working with remotes](https://git-scm.com/book/en/v2/Git-Basics-Working-with-Remotes) +5) Run sbt to compile and grab datasets - `cd synapseml` - `sbt setup` -4) [Install IntelliJ](https://www.jetbrains.com/idea/download) - - Install Scala plugins during install -5) Configure IntelliJ +6) [Install IntelliJ](https://www.jetbrains.com/idea/download) +7) Configure IntelliJ + - Install [Scala plugin](https://plugins.jetbrains.com/plugin/1347-scala) during initialization - **OPEN** the synapseml directory - If the project does not automatically import,click on `build.sbt` and import project +8) Prepare your Python Environment + - Install [Miniconda](https://docs.conda.io/en/latest/miniconda.html) + - Note: if you want to run conda commands from IntelliJ, you may need to select the option to add conda to PATH during installation. + - Activate the `synapseml` conda environment by running `conda env create -f environment.yaml` from the `synapseml` directory. + +> NOTE +> +> If you will be regularly contributing to the SynapseML repo, you'll want to keep your fork synced with the +> upstream repository. Please read [this GitHub doc](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork) +> to know more and learn techniques about how to do it. # Publishing and Using Build Secrets diff --git a/website/package.json b/website/package.json index cd1219d649..61fe891f5a 100644 --- a/website/package.json +++ b/website/package.json @@ -38,7 +38,8 @@ "@docusaurus/**/browserslist": "^4.16.5", "@docusaurus/**/nth-check": "^2.0.1", "yarn-audit-fix/**/ansi-regex": "^5.0.1", - "@docusaurus/**/ansi-regex": "^5.0.1" + "@docusaurus/**/ansi-regex": "^5.0.1", + "node-fetch": "^2.6.7" }, "browserslist": { "production": [ diff --git a/website/versioned_docs/version-0.9.5/documentation/estimators/cognitive/_MAD.md b/website/versioned_docs/version-0.9.5/documentation/estimators/cognitive/_MAD.md index be792f7fdb..a4b2860411 100644 --- a/website/versioned_docs/version-0.9.5/documentation/estimators/cognitive/_MAD.md +++ b/website/versioned_docs/version-0.9.5/documentation/estimators/cognitive/_MAD.md @@ -40,8 +40,7 @@ fitMultivariateAnomaly = (FitMultivariateAnomaly() .setTimestampCol(timestampColumn) .setInputCols(inputColumns) .setSlidingWindow(200) - .setConnectionString(connectionString) - .setConcurrency(5)) + .setConnectionString(connectionString)) # uncomment below for fitting your own dataframe # model = fitMultivariateAnomaly.fit(df) @@ -74,8 +73,7 @@ val fitMultivariateAnomaly = (new FitMultivariateAnomaly() .setTimestampCol(timestampColumn) .setInputCols(inputColumns) .setSlidingWindow(200) - .setConnectionString(connectionString) - .setConcurrency(5)) + .setConnectionString(connectionString)) val df = (spark.read.format("csv") .option("header", True) diff --git a/website/yarn.lock b/website/yarn.lock index b7893066a7..a06b750cf4 100644 --- a/website/yarn.lock +++ b/website/yarn.lock @@ -6534,10 +6534,12 @@ node-emoji@^1.10.0: dependencies: lodash.toarray "^4.4.0" -node-fetch@2.6.1: - version "2.6.1" - resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.1.tgz#045bd323631f76ed2e2b55573394416b639a0052" - integrity sha512-V4aYg89jEoVRxRb2fJdAg8FHvI7cEyYdVAh94HH0UIK8oJxUfkjlDQN9RbMx+bEjP7+ggMiFRprSti032Oipxw== +node-fetch@2.6.1, node-fetch@^2.6.7: + version "2.6.7" + resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.7.tgz#24de9fba827e3b4ae44dc8b20256a379160052ad" + integrity sha512-ZjMPFEfVx5j+y2yF35Kzx5sF7kDzxuDj6ziH4FFbOp87zKDZNx8yExJIb05OGF4Nlt9IHFIMBkRl41VdvcNdbQ== + dependencies: + whatwg-url "^5.0.0" node-forge@^1.2.0: version "1.2.1" @@ -8603,6 +8605,11 @@ totalist@^1.0.0: resolved "https://registry.yarnpkg.com/totalist/-/totalist-1.1.0.tgz#a4d65a3e546517701e3e5c37a47a70ac97fe56df" integrity sha512-gduQwd1rOdDMGxFG1gEvhV88Oirdo2p+KjoYFU7k2g+i7n6AFFbDQ5kMPUsW0pNbfQsB/cwXvT1i4Bue0s9g5g== +tr46@~0.0.3: + version "0.0.3" + resolved "https://registry.yarnpkg.com/tr46/-/tr46-0.0.3.tgz#8184fd347dac9cdc185992f3a6622e14b9d9ab6a" + integrity sha1-gYT9NH2snNwYWZLzpmIuFLnZq2o= + trim-trailing-lines@^1.0.0: version "1.1.4" resolved "https://registry.yarnpkg.com/trim-trailing-lines/-/trim-trailing-lines-1.1.4.tgz#bd4abbec7cc880462f10b2c8b5ce1d8d1ec7c2c0" @@ -9015,6 +9022,11 @@ web-namespaces@^1.0.0, web-namespaces@^1.1.2: resolved "https://registry.yarnpkg.com/web-namespaces/-/web-namespaces-1.1.4.tgz#bc98a3de60dadd7faefc403d1076d529f5e030ec" integrity sha512-wYxSGajtmoP4WxfejAPIr4l0fVh+jeMXZb08wNc0tMg6xsfZXj3cECqIK0G7ZAqUq0PP8WlMDtaOGVBTAWztNw== +webidl-conversions@^3.0.0: + version "3.0.1" + resolved "https://registry.yarnpkg.com/webidl-conversions/-/webidl-conversions-3.0.1.tgz#24534275e2a7bc6be7bc86611cc16ae0a5654871" + integrity sha1-JFNCdeKnvGvnvIZhHMFq4KVlSHE= + webpack-bundle-analyzer@^4.4.2: version "4.4.2" resolved "https://registry.yarnpkg.com/webpack-bundle-analyzer/-/webpack-bundle-analyzer-4.4.2.tgz#39898cf6200178240910d629705f0f3493f7d666" @@ -9155,6 +9167,14 @@ websocket-extensions@>=0.1.1: resolved "https://registry.yarnpkg.com/websocket-extensions/-/websocket-extensions-0.1.4.tgz#7f8473bc839dfd87608adb95d7eb075211578a42" integrity sha512-OqedPIGOfsDlo31UNwYbCFMSaO9m9G/0faIHj5/dZFDMFqPTcx6UwqyOy3COEaEOg/9VsGIpdqn62W5KhoKSpg== +whatwg-url@^5.0.0: + version "5.0.0" + resolved "https://registry.yarnpkg.com/whatwg-url/-/whatwg-url-5.0.0.tgz#966454e8765462e37644d3626f6742ce8b70965d" + integrity sha1-lmRU6HZUYuN2RNNib2dCzotwll0= + dependencies: + tr46 "~0.0.3" + webidl-conversions "^3.0.0" + which@^1.3.1: version "1.3.1" resolved "https://registry.yarnpkg.com/which/-/which-1.3.1.tgz#a45043d54f5805316da8d62f9f50918d3da70b0a"