-
Notifications
You must be signed in to change notification settings - Fork 819
/
DatabricksTests.scala
71 lines (58 loc) · 2.48 KB
/
DatabricksTests.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.
package com.microsoft.ml.spark.nbtest
import com.microsoft.ml.spark.core.test.base.TestBase
import com.microsoft.ml.spark.nbtest.DatabricksUtilities._
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.language.existentials
/** Tests to validate fuzzing of modules. */
class DatabricksTests extends TestBase {
test("Databricks Notebooks") {
val clusterId = createClusterInPool(ClusterName, PoolId)
val jobIdsToCancel = mutable.ListBuffer[Int]()
try {
println("Checking if cluster is active")
tryWithRetries(Seq.fill(60 * 15)(1000).toArray) { () =>
assert(isClusterActive(clusterId))
}
println("Installing libraries")
installLibraries(clusterId)
tryWithRetries(Seq.fill(60 * 3)(1000).toArray) { () =>
assert(isClusterActive(clusterId))
}
println(s"Creating folder $Folder")
workspaceMkDir(Folder)
println(s"Submitting jobs")
val parJobIds = ParallizableNotebooks.map(uploadAndSubmitNotebook(clusterId, _))
parJobIds.foreach(jobIdsToCancel.append(_))
println(s"Submitted ${parJobIds.length} for execution: ${parJobIds.toList}")
println(s"Monitoring Parallel Jobs...")
val monitors = parJobIds.map((runId: Int) => monitorJob(runId, TimeoutInMillis, logLevel = 2))
println(s"Awaiting parallelizable jobs...")
val parFailures = monitors
.map(Await.ready(_, Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get)
.filter(_.isFailure)
println(s"Submitting nonparallelizable job...")
val nonParFailutes = NonParallizableNotebooks.toIterator.map { nb =>
val jid = uploadAndSubmitNotebook(clusterId, nb)
jobIdsToCancel.append(jid)
val monitor = monitorJob(jid, TimeoutInMillis, logLevel = 2)
Await.ready(monitor, Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get
}.filter(_.isFailure).toArray
assert(parFailures.isEmpty && nonParFailutes.isEmpty)
} finally {
jobIdsToCancel.foreach { jid =>
println(s"Cancelling job $jid")
cancelRun(jid)
}
deleteCluster(clusterId)
}
}
ignore("list running jobs for convenievce") {
val obj = databricksGet("jobs/runs/list?active_only=true&limit=1000")
println(obj)
}
}