Skip to content

Commit

Permalink
feat: Add Infrastructure to Run Tests on Synapse (#1014)
Browse files Browse the repository at this point in the history
* add livy api wrappers

* intermediate stage

* progress

* finish running tests on synapse

* add testing step for synapse in ADO

* remove dummy test

* fix job names

* fix compile issue for !!

* fix conda activation in test

* fix conda init for synapse test

* hotfix: remove useless import

* update: fixed e2e

* update: fixed e2e

* update: fixed e2e

* update: fixed e2e

* update: add synapse test

* update: add Synapse Tests

* update: renamed files

* update: renamed files

* update: renamed files

* update: renamed files

* update: renamed files

* update: renamed files

* update: renamed files

* update: add synapse config

* update: add token

* update: add synapse test framework

* update: add synapse test framework

* rename

* remove py files

* add convert

* update: format code

* bugfix: upload and submit job

* update: update spark job setting

* update: update ipynb files

* bugfix: synapse job bugfix

* update: PROD to PPE

* bugfix: fix bug

* bugfix: build bug

* bugfix: build bug

* bugfix: linux test run

* update: sorted python files

* bugfix: use absolute path

* update: update pipeline.yaml

* update: update pipeline.yaml

* update: gated build bug fix

* bugfix: fix job failed bug

* bugfix: fix job failed bug

* bugfix: fix job failed bug

* update: gated build bug fix

* update: gated build bug fix

* update: gated build bug fix

* update: gated build bug fix

* update: gated build bug fix

* update: ready to PR

* update: ready to PR

* bugfix: http failures

* bugfix: reduce pool number

* bugfix: change to E2E

* merge master

* update: update synapse workspace and storage account

* update: update synapse workspace and storage account

* update: update synapse workspace and storage account

* update: update synapse workspace and storage account

* update: update synapse workspace and storage account

* update: update synapse workspace and storage account

* update: update synapse workspace and storage account

* bugfix: sample notebook in synapse

* resolve conflict

* add files

* update: synapse bug fix

* update: synapse bug fix

* update: synapse bug fix

* update: rename DatabricksTests

* update: update samples

* update: update samples

* update: update samples

* update: merge master

* fix: disable ivy_cache

* fix: E2E on Synapse

* fix: E2E on Synapse

* fix: E2E on Synapse

* fix: E2E on Synapse

* fix: E2E on Synapse

* fix: remove notebook

* fix: E2E on Synapse

* fix: E2E on Synapse

* fix: E2E on Synapse

* fix: E2E on Synapse

* feat: E2E in Synapse

* feat: E2E in Synapse

* feat: E2E in Synapse

* feat: E2E in Synapse

Co-authored-by: Rohit Agrawal (AI+C) <ragrawal@microsoft.com>
Co-authored-by: xuwq1993 <azureuser@wenqxvm.ootabanf4lauvfxj2zqn1gsxld.ix.internal.cloudapp.net>
  • Loading branch information
3 people committed Aug 20, 2021
1 parent de4b47b commit 3ae67ab
Show file tree
Hide file tree
Showing 27 changed files with 1,546 additions and 898 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -46,6 +46,10 @@ package-lock.json
node_modules/
.Rproj.user

# Notebook Test Files
/notebooks/*.py
/notebooks/*.txt

# R output
*.Rout

Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/com/microsoft/ml/spark/Secrets.scala
Expand Up @@ -51,5 +51,7 @@ object Secrets {
lazy val TranslatorKey: String = getSecret("translator-key")
lazy val PowerbiURL: String = getSecret("powerbi-url")
lazy val AdbToken: String = getSecret("adb-token")
lazy val SynapseStorageKey: String = getSecret("mmlsparkeuap-key")
lazy val SynapseSpnKey: String = getSecret("synapse-spn-key")

}
Expand Up @@ -3,18 +3,17 @@

package com.microsoft.ml.spark.nbtest

import java.util.concurrent.TimeUnit

import com.microsoft.ml.spark.core.test.base.TestBase
import com.microsoft.ml.spark.nbtest.DatabricksUtilities._

import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.existentials

/** Tests to validate fuzzing of modules. */
class NotebookTests extends TestBase {
class DatabricksTests extends TestBase {

test("Databricks Notebooks") {
val clusterId = createClusterInPool(ClusterName, PoolId)
Expand Down
@@ -0,0 +1,81 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.ml.spark.nbtest

import com.microsoft.ml.spark.core.test.base.TestBase
import com.microsoft.ml.spark.nbtest.SynapseUtilities.exec

import java.io.File
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.existentials
import scala.sys.process.Process

/** Tests to validate fuzzing of modules. */
class SynapseTests extends TestBase {

test("Synapse") {

val os = sys.props("os.name").toLowerCase
os match {
case x if x contains "windows" =>
exec("conda activate mmlspark && jupyter nbconvert --to script .\\notebooks\\*.ipynb")
case _ =>
Process(s"conda init bash; conda activate mmlspark; jupyter nbconvert --to script ./notebooks/*.ipynb")
}

SynapseUtilities.listPythonFiles().map(f => {
val newPath = f
.replace(" ", "")
.replace("-", "")
new File(f).renameTo(new File(newPath))
})

val workspaceName = "mmlspark"
val sparkPools = Array("buildpool", "buildpool2", "buildpool3")

val livyBatchJobs = SynapseUtilities.listPythonJobFiles()
.filterNot(_.contains(" "))
.filterNot(_.contains("-"))
.map(f => {
val poolName = SynapseUtilities.monitorPool(workspaceName, sparkPools)
val livyUrl = "https://" +
workspaceName +
".dev.azuresynapse.net/livyApi/versions/2019-11-01-preview/sparkPools/" +
poolName +
"/batches"
val livyBatch: LivyBatch = SynapseUtilities.uploadAndSubmitNotebook(livyUrl, f)
println(s"submitted livy job: ${livyBatch.id} to sparkPool: $poolName")
LivyBatchJob(livyBatch, poolName, livyUrl)
})
.filterNot(_.livyBatch.state == "none")

try {
val batchFutures: Array[Future[Any]] = livyBatchJobs.map((batchJob: LivyBatchJob) => {
Future {
val batch = batchJob.livyBatch
val livyUrl = batchJob.livyUrl

if (batch.state != "success") {
SynapseUtilities.retry(batch.id, livyUrl, SynapseUtilities.TimeoutInMillis, System.currentTimeMillis())
}
}(ExecutionContext.global)
})

val failures = batchFutures
.map(f => Await.ready(f, Duration(SynapseUtilities.TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get)
.filter(f => f.isFailure)
assert(failures.isEmpty)
}
catch {
case t: Throwable =>
livyBatchJobs.foreach { batchJob =>
println(s"Cancelling job ${batchJob.livyBatch.id}")
SynapseUtilities.cancelRun(batchJob.livyUrl, batchJob.livyBatch.id)
}
throw t
}
}
}

0 comments on commit 3ae67ab

Please sign in to comment.