diff --git a/.gitignore b/.gitignore index e4fe76746a..225eb31b54 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,14 @@ node_modules/ .bsp website/.docusaurus null/ + +# pytorch_lightning logs +**/lightning_logs/* + +# pytest +.pytest_cache/ + +# python wheel +**/build/* +**/dist/* +**/*.egg-info/* diff --git a/build.sbt b/build.sbt index 992bfb3e94..4e982dc1da 100644 --- a/build.sbt +++ b/build.sbt @@ -197,7 +197,7 @@ generateDotnetDoc := { val doxygenHelperFile = join(dotnetSrcDir, "DoxygenHelper.txt") if (doxygenHelperFile.exists()) FileUtils.forceDelete(doxygenHelperFile) FileUtils.writeStringToFile(doxygenHelperFile, fileContent, "utf-8") - runCmd(Seq("bash", "-c","cat DoxygenHelper.txt >> Doxyfile", ""), dotnetSrcDir) + runCmd(Seq("bash", "-c", "cat DoxygenHelper.txt >> Doxyfile", ""), dotnetSrcDir) runCmd(Seq("doxygen"), dotnetSrcDir) } @@ -270,9 +270,11 @@ publishPypi := { val publishDocs = TaskKey[Unit]("publishDocs", "publish docs for scala, python and dotnet") publishDocs := { - generatePythonDoc.value - (root / Compile / unidoc).value - generateDotnetDoc.value + Def.sequential( + generatePythonDoc, + generateDotnetDoc, + (root / Compile / unidoc) + ).value val html = """ |
@@ -382,10 +384,10 @@ lazy val cognitive = (project in file("cognitive")) "com.azure" % "azure-ai-textanalytics" % "5.1.4" ), dependencyOverrides ++= Seq( - "com.fasterxml.jackson.core" % "jackson-databind" % "2.12.5", - "com.fasterxml.jackson.core" % "jackson-core" % "2.12.5", - "com.fasterxml.jackson.core" % "jackson-annotations" % "2.12.5", - "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.12.5", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.12.5", + "com.fasterxml.jackson.core" % "jackson-core" % "2.12.5", + "com.fasterxml.jackson.core" % "jackson-annotations" % "2.12.5", + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.12.5", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.12.5" ), name := "synapseml-cognitive" diff --git a/core/src/main/python/synapse/doc/conf.py b/core/src/main/python/synapse/doc/conf.py index a1c64ced63..a549447d82 100644 --- a/core/src/main/python/synapse/doc/conf.py +++ b/core/src/main/python/synapse/doc/conf.py @@ -12,11 +12,13 @@ # ones. extensions = [ "sphinx.ext.autodoc", + "sphinx.ext.doctest", "sphinx.ext.intersphinx", "sphinx.ext.mathjax", "sphinx.ext.ifconfig", "sphinx.ext.viewcode", "sphinx.ext.napoleon", + "sphinx_paramlinks", ] # Add any paths that contain templates here, relative to this directory. @@ -106,26 +108,17 @@ # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {"https://docs.python.org/": None} +intersphinx_mapping = { + "python": ("https://docs.python.org/3", None), + "torch": ("https://pytorch.org/docs/stable/", None), + "numpy": ("https://numpy.org/doc/stable/", None), + "pytorch_lightning": ("https://pytorch-lightning.readthedocs.io/en/stable/", None), + "torchvision": ("https://pytorch.org/vision/stable/", None), +} # intersphinx_mapping = { "scala": ("/scala/index.html", None) } -# -- Mock out pandas+numpy that can't be found ---------------------------- -import sys - -try: - from unittest.mock import MagicMock # python >= 3.3 -except ImportError: - from mock import Mock as MagicMock # older - - -class Mock(MagicMock): - @classmethod - def __getattr__(cls, name): - return MagicMock() - - -MOCK_MODULES = ["numpy", "pandas"] -sys.modules.update((mod_name, Mock()) for mod_name in MOCK_MODULES) +# -- Mock out pandas that can't be found ---------------------------- +autodoc_mock_imports = ["pandas"] # -- Setup AutoStructify -------------------------------------------------- # Use this if we ever want to use markdown pages instead of rst pages. diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/CodeGen.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/CodeGen.scala index 96c784c455..94f2eed06f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/CodeGen.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/CodeGen.scala @@ -6,6 +6,7 @@ package com.microsoft.azure.synapse.ml.codegen import com.microsoft.azure.synapse.ml.build.BuildInfo import com.microsoft.azure.synapse.ml.codegen.CodegenConfigProtocol._ import com.microsoft.azure.synapse.ml.codegen.DotnetCodegen.dotnetGen +import com.microsoft.azure.synapse.ml.codegen.GenerationUtils.indent import com.microsoft.azure.synapse.ml.core.env.FileUtilities._ import com.microsoft.azure.synapse.ml.core.utils.JarLoadingUtils.instantiateServices import org.apache.commons.io.FileUtils @@ -88,6 +89,7 @@ object CodeGen { |Suggests: | testthat (>= 3.0.0) |Config/testthat/edition: 3 + |Encoding: UTF-8 |""".stripMargin) val scalaVersion = BuildInfo.scalaVersion.split(".".toCharArray).dropRight(1).mkString(".") @@ -134,11 +136,25 @@ object CodeGen { } //noinspection ScalaStyle + //scalastyle:off def generatePyPackageData(conf: CodegenConfig): Unit = { if (!conf.pySrcDir.exists()) { conf.pySrcDir.mkdir() } val extraPackage = if (conf.name.endsWith("core")){" + [\"mmlspark\"]"}else{""} + val requireList = if(conf.name.contains("deep-learning")) { + s"""MINIMUM_SUPPORTED_PYTHON_VERSION = "3.8"""".stripMargin + } else "" + val extraRequirements = if (conf.name.contains("deep-learning")) { + s"""extras_require={"extras": [ + | "cmake", + | "horovod==0.25.0", + | "pytorch_lightning>=1.5.0,<1.5.10", + | "torch==1.11.0", + | "torchvision>=0.12.0" + |]}, + |python_requires=f">={MINIMUM_SUPPORTED_PYTHON_VERSION}",""".stripMargin + } else "" writeFile(join(conf.pySrcDir, "setup.py"), s""" |# Copyright (C) Microsoft Corporation. All rights reserved. @@ -149,6 +165,8 @@ object CodeGen { |import codecs |import os.path | + |$requireList + | |setup( | name="${conf.name}", | version="${conf.pythonizedVersion}", @@ -171,10 +189,17 @@ object CodeGen { | ], | zip_safe=True, | package_data={"synapseml": ["../LICENSE.txt", "../README.txt"]}, + | project_urls={ + | "Website": "https://microsoft.github.io/SynapseML/", + | "Documentation": "https://mmlspark.blob.core.windows.net/docs/${conf.pythonizedVersion}/pyspark/index.html", + | "Source Code": "https://github.com/Microsoft/SynapseML", + | }, + |${indent(extraRequirements, 1)} |) | |""".stripMargin) } + //scalastyle:on def rGen(conf: CodegenConfig): Unit = { println(s"Generating R for ${conf.jarName}") diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/DotnetWrappable.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/DotnetWrappable.scala index 1cc16958b3..b606c11fc7 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/DotnetWrappable.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/codegen/DotnetWrappable.scala @@ -275,7 +275,9 @@ trait DotnetWrappable extends BaseWrappable { val srcFolders = importPath.mkString(".") .replaceAllLiterally("com.microsoft.azure.synapse.ml", "synapse.ml").split(".".toCharArray) val srcDir = FileUtilities.join((Seq(conf.dotnetSrcDir.toString) ++ srcFolders.toSeq): _*) - srcDir.mkdirs() + if (!srcDir.exists()) { + srcDir.mkdirs() + } Files.write( FileUtilities.join(srcDir, dotnetClassName + ".cs").toPath, dotnetClass().getBytes(StandardCharsets.UTF_8)) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala index 38e87598d0..07618d59e4 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksTests.scala @@ -3,53 +3,18 @@ package com.microsoft.azure.synapse.ml.nbtest -import com.microsoft.azure.synapse.ml.core.test.base.TestBase import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities._ import java.util.concurrent.TimeUnit -import scala.collection.mutable +import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.language.existentials -class DatabricksTests extends TestBase { +class DatabricksTests extends DatabricksTestHelper { - val clusterId: String = createClusterInPool(ClusterName, PoolId) - val jobIdsToCancel: mutable.ListBuffer[Int] = mutable.ListBuffer[Int]() - - println("Checking if cluster is active") - tryWithRetries(Seq.fill(60 * 15)(1000).toArray) { () => - assert(isClusterActive(clusterId)) - } - println("Installing libraries") - installLibraries(clusterId) - tryWithRetries(Seq.fill(60 * 3)(1000).toArray) { () => - assert(areLibrariesInstalled(clusterId)) - } - println(s"Creating folder $Folder") - workspaceMkDir(Folder) - - println(s"Submitting jobs") - val parNotebookRuns: Seq[DatabricksNotebookRun] = ParallelizableNotebooks.map(uploadAndSubmitNotebook(clusterId, _)) - parNotebookRuns.foreach(notebookRun => jobIdsToCancel.append(notebookRun.runId)) - - println(s"Submitted ${parNotebookRuns.length} for execution: ${parNotebookRuns.map(_.runId).toList}") - - assert(parNotebookRuns.nonEmpty) - - parNotebookRuns.foreach(run => { - println(s"Testing ${run.notebookName}") - - test(run.notebookName) { - val result = Await.ready( - run.monitor(logLevel = 0), - Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get - - if (!result.isSuccess){ - throw result.failed.get - } - } - }) + val clusterId: String = createClusterInPool(ClusterName, AdbRuntime, NumWorkers, PoolId, "[]") + val jobIdsToCancel: ListBuffer[Int] = databricksTestHelper(clusterId, Libraries, CPUNotebooks) println(s"Submitting nonparallelizable job...") NonParallelizableNotebooks.toIterator.foreach(notebook => { @@ -68,16 +33,12 @@ class DatabricksTests extends TestBase { }) protected override def afterAll(): Unit = { - println("Suite DatabricksTests finished. Running afterAll procedure...") - jobIdsToCancel.foreach(cancelRun) - - deleteCluster(clusterId) - println(s"Deleted cluster with Id $clusterId.") + afterAllHelper(jobIdsToCancel, clusterId, ClusterName) super.afterAll() } - ignore("list running jobs for convenievce") { + ignore("list running jobs for convenience") { val obj = databricksGet("jobs/runs/list?active_only=true&limit=1000") println(obj) } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala index c9abeb0ddb..5c240100c2 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala @@ -6,6 +6,7 @@ package com.microsoft.azure.synapse.ml.nbtest import com.microsoft.azure.synapse.ml.Secrets import com.microsoft.azure.synapse.ml.build.BuildInfo import com.microsoft.azure.synapse.ml.core.env.FileUtilities +import com.microsoft.azure.synapse.ml.core.test.base.TestBase import com.microsoft.azure.synapse.ml.io.http.RESTHelpers import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities.{TimeoutInMillis, monitorJob} import com.microsoft.azure.synapse.ml.nbtest.SprayImplicits._ @@ -18,8 +19,10 @@ import spray.json.{JsArray, JsObject, JsValue, _} import java.io.{File, FileInputStream} import java.time.LocalDateTime -import java.util.concurrent.TimeoutException -import scala.concurrent.{ExecutionContext, Future, blocking} +import java.util.concurrent.{TimeUnit, TimeoutException} +import scala.collection.mutable +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future, blocking} //noinspection ScalaStyle object DatabricksUtilities { @@ -27,7 +30,9 @@ object DatabricksUtilities { // ADB Info val Region = "eastus" val PoolName = "synapseml-build-10.1" + val GpuPoolName = "synapseml-build-10.4-gpu" val AdbRuntime = "10.1.x-scala2.12" + val AdbGpuRuntime = "10.4.x-gpu-ml-scala2.12" val NumWorkers = 5 val AutoTerminationMinutes = 15 @@ -36,7 +41,9 @@ object DatabricksUtilities { .encode(("token:" + Token).getBytes("UTF-8")) val BaseURL = s"https://$Region.azuredatabricks.net/api/2.0/" lazy val PoolId: String = getPoolIdByName(PoolName) + lazy val GpuPoolId: String = getPoolIdByName(GpuPoolName) lazy val ClusterName = s"mmlspark-build-${LocalDateTime.now()}" + lazy val GPUClusterName = s"mmlspark-build-gpu-${LocalDateTime.now()}" val Folder = s"/SynapseMLBuild/build_${BuildInfo.version}" val ScalaVersion: String = BuildInfo.scalaVersion.split(".".toCharArray).dropRight(1).mkString(".") @@ -56,15 +63,28 @@ object DatabricksUtilities { Map("pypi" -> Map("package" -> "mlflow")) ).toJson.compactPrint + // TODO: install synapse.ml.dl wheel package here + val GPULibraries: String = List( + Map("maven" -> Map("coordinates" -> Version, "repo" -> Repository)) + ).toJson.compactPrint + + val GPUInitScripts: String = List( + Map("dbfs" -> Map("destination" -> "dbfs:/FileStore/horovod/horovod_installation.sh")) + ).toJson.compactPrint + // Execution Params val TimeoutInMillis: Int = 40 * 60 * 1000 val NotebookFiles: Array[File] = FileUtilities.recursiveListFiles( FileUtilities.join( - BuildInfo.baseDirectory.getParent, "notebooks").getCanonicalFile) + BuildInfo.baseDirectory.getParent, "notebooks", "features").getCanonicalFile) val ParallelizableNotebooks: Seq[File] = NotebookFiles.filterNot(_.isDirectory) + val CPUNotebooks: Seq[File] = ParallelizableNotebooks.filterNot(_.getAbsolutePath.contains("simple_deep_learning")) + + val GPUNotebooks: Seq[File] = ParallelizableNotebooks.filter(_.getAbsolutePath.contains("simple_deep_learning")) + val NonParallelizableNotebooks: Seq[File] = Nil def databricksGet(path: String): JsValue = { @@ -117,35 +137,55 @@ object DatabricksUtilities { () } + def uploadFileToDBFS(file: File, dest: String): Unit = { + val content = BaseEncoding.base64().encode( + IOUtils.toByteArray(new FileInputStream(file))) + val body = + s""" + |{ + | "contents": "$content", + | "path": "$dest", + | "overwrite": true + |} + """.stripMargin + databricksPost("dbfs/put", body) + () + } + def workspaceRmDir(dir: String): Unit = { val body = s"""{"path": "$dir", "recursive":true}""" databricksPost("workspace/delete", body) () } - def createClusterInPool(clusterName: String, poolId: String): String = { + def createClusterInPool(clusterName: String, + sparkVersion: String, + numWorkers: Int, + poolId: String, + initScripts: String): String = { val body = s""" |{ | "cluster_name": "$clusterName", - | "spark_version": "$AdbRuntime", - | "num_workers": $NumWorkers, + | "spark_version": "$sparkVersion", + | "num_workers": $numWorkers, | "autotermination_minutes": $AutoTerminationMinutes, | "instance_pool_id": "$poolId", | "spark_env_vars": { | "PYSPARK_PYTHON": "/databricks/python3/bin/python3" - | } + | }, + | "init_scripts": $initScripts |} """.stripMargin databricksPost("clusters/create", body).select[String]("cluster_id") } - def installLibraries(clusterId: String): Unit = { + def installLibraries(clusterId: String, libraries: String): Unit = { databricksPost("libraries/install", s""" |{ | "cluster_id": "$clusterId", - | "libraries": $Libraries + | "libraries": $libraries |} """.stripMargin) () @@ -334,6 +374,61 @@ object DatabricksUtilities { } } +abstract class DatabricksTestHelper extends TestBase { + + import DatabricksUtilities._ + + def databricksTestHelper(clusterId: String, libraries: String, notebooks: Seq[File]): mutable.ListBuffer[Int] = { + val jobIdsToCancel: mutable.ListBuffer[Int] = mutable.ListBuffer[Int]() + + println("Checking if cluster is active") + tryWithRetries(Seq.fill(60 * 15)(1000).toArray) { () => + assert(isClusterActive(clusterId)) + } + println("Installing libraries") + installLibraries(clusterId, libraries) + tryWithRetries(Seq.fill(60 * 3)(1000).toArray) { () => + assert(areLibrariesInstalled(clusterId)) + } + println(s"Creating folder $Folder") + workspaceMkDir(Folder) + + println(s"Submitting jobs") + val parNotebookRuns: Seq[DatabricksNotebookRun] = notebooks.map(uploadAndSubmitNotebook(clusterId, _)) + parNotebookRuns.foreach(notebookRun => jobIdsToCancel.append(notebookRun.runId)) + + println(s"Submitted ${parNotebookRuns.length} for execution: ${parNotebookRuns.map(_.runId).toList}") + + assert(parNotebookRuns.nonEmpty) + + parNotebookRuns.foreach(run => { + println(s"Testing ${run.notebookName}") + + test(run.notebookName) { + val result = Await.ready( + run.monitor(logLevel = 0), + Duration(TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get + + if (!result.isSuccess) { + throw result.failed.get + } + } + }) + + jobIdsToCancel + } + + protected def afterAllHelper(jobIdsToCancel: mutable.ListBuffer[Int], + clusterId: String, + clusterName: String): Unit = { + println("Suite test finished. Running afterAll procedure...") + jobIdsToCancel.foreach(cancelRun) + + deleteCluster(clusterId) + println(s"Deleted cluster with Id $clusterId, name $clusterName") + } +} + case class DatabricksNotebookRun(runId: Int, notebookName: String) { def monitor(logLevel: Int = 2): Future[Any] = { monitorJob(runId, TimeoutInMillis, logLevel) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/GPUTests.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/GPUTests.scala new file mode 100644 index 0000000000..ab2b62411d --- /dev/null +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/GPUTests.scala @@ -0,0 +1,28 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.azure.synapse.ml.nbtest + +import com.microsoft.azure.synapse.ml.build.BuildInfo +import com.microsoft.azure.synapse.ml.core.env.FileUtilities +import com.microsoft.azure.synapse.ml.nbtest.DatabricksUtilities._ + +import java.io.File +import scala.collection.mutable.ListBuffer + +class GPUTests extends DatabricksTestHelper { + + val horovodInstallationScript: File = FileUtilities.join( + BuildInfo.baseDirectory.getParent, "deep-learning", + "src", "main", "python", "horovod_installation.sh").getCanonicalFile + uploadFileToDBFS(horovodInstallationScript, "/FileStore/horovod/horovod_installation.sh") + val clusterId: String = createClusterInPool(GPUClusterName, AdbGpuRuntime, 2, GpuPoolId, GPUInitScripts) + val jobIdsToCancel: ListBuffer[Int] = databricksTestHelper(clusterId, GPULibraries, GPUNotebooks) + + protected override def afterAll(): Unit = { + afterAllHelper(jobIdsToCancel, clusterId, GPUClusterName) + + super.afterAll() + } + +} diff --git a/deep-learning/src/main/python/horovod_installation.sh b/deep-learning/src/main/python/horovod_installation.sh new file mode 100644 index 0000000000..822792fc38 --- /dev/null +++ b/deep-learning/src/main/python/horovod_installation.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +# This is the script used to install horovod on ubuntu2004 with NCCL on databricks + +# exit immediately on failure, or if an undefined variable is used +set -eu + +# Install prerequisite libraries that horovod depends on +pip install pytorch-lightning==1.5.0 +pip install torchvision==0.12.0 + +# Remove Outdated Signing Key: +sudo apt-key del 7fa2af80 + +# Install the new cuda-keyring package: +wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/cuda-keyring_1.0-1_all.deb +sudo dpkg -i cuda-keyring_1.0-1_all.deb + +apt-key adv --fetch-keys http://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64/7fa2af80.pub +wget https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64/nvidia-machine-learning-repo-ubuntu2004_1.0.0-1_amd64.deb +dpkg -i ./nvidia-machine-learning-repo-ubuntu2004_1.0.0-1_amd64.deb + + +apt-get update +apt-get install --allow-downgrades --no-install-recommends -y \ +cuda-nvml-dev-11-0=11.0.167-1 \ +cuda-nvcc-11-0=11.0.221-1 \ +cuda-cudart-dev-11-0=11.0.221-1 \ +cuda-libraries-dev-11-0=11.0.3-1 \ +libnccl-dev=2.10.3-1+cuda11.0 \ +libcusparse-dev-11-0=11.1.1.245-1 + +git clone --recursive https://github.com/horovod/horovod.git +cd horovod +# fix version 0.25.0 +git fetch origin refs/tags/v0.25.0:tags/v0.25.0 +git checkout tags/v0.25.0 -b v0.25.0-branch +rm -rf build/ dist/ +HOROVOD_GPU_ALLREDUCE=NCCL HOROVOD_CUDA_HOME=/usr/local/cuda-11/ HOROVOD_WITH_PYTORCH=1 HOROVOD_WITHOUT_MXNET=1 \ +/databricks/python3/bin/python setup.py bdist_wheel + +readlink -f dist/horovod-*.whl + +pip install --no-cache-dir dist/horovod-0.25.0-cp38-cp38-linux_x86_64.whl --force-reinstall --no-deps diff --git a/deep-learning/src/main/python/synapse/ml/dl/DeepVisionClassifier.py b/deep-learning/src/main/python/synapse/ml/dl/DeepVisionClassifier.py new file mode 100644 index 0000000000..3a29483caa --- /dev/null +++ b/deep-learning/src/main/python/synapse/ml/dl/DeepVisionClassifier.py @@ -0,0 +1,295 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +import sys + +import torchvision.transforms as transforms +from horovod.spark.common.backend import SparkBackend +from horovod.spark.lightning import TorchEstimator +from PIL import Image +from pyspark.context import SparkContext +from pyspark.ml.param.shared import Param, Params +from pytorch_lightning.utilities import _module_available +from synapse.ml.dl.DeepVisionModel import DeepVisionModel +from synapse.ml.dl.LitDeepVisionModel import LitDeepVisionModel +from synapse.ml.dl.utils import keywords_catch +from synapse.ml.dl.PredictionParams import PredictionParams + +_HOROVOD_AVAILABLE = _module_available("horovod") +if _HOROVOD_AVAILABLE: + import horovod + + _HOROVOD_EQUAL_0_25_0 = horovod.__version__ == "0.25.0" + if not _HOROVOD_EQUAL_0_25_0: + raise RuntimeError( + "horovod should be of version 0.25.0, found: {}".format(horovod.__version__) + ) +else: + raise ModuleNotFoundError("module not found: horovod") + + +class DeepVisionClassifier(TorchEstimator, PredictionParams): + + backbone = Param( + Params._dummy(), "backbone", "backbone of the deep vision classifier" + ) + + additional_layers_to_train = Param( + Params._dummy(), + "additional_layers_to_train", + "number of last layers to fine tune for the model, should be between 0 and 3", + ) + + num_classes = Param(Params._dummy(), "num_classes", "number of target classes") + + loss_name = Param( + Params._dummy(), + "loss_name", + "string representation of torch.nn.functional loss function for the underlying pytorch_lightning model, e.g. binary_cross_entropy", + ) + + optimizer_name = Param( + Params._dummy(), + "optimizer_name", + "string representation of optimizer function for the underlying pytorch_lightning model", + ) + + dropout_aux = Param( + Params._dummy(), + "dropout_aux", + "numeric value that's applied to googlenet InceptionAux module's dropout layer only: probability of an element to be zeroed", + ) + + transform_fn = Param( + Params._dummy(), + "transform_fn", + "A composition of transforms used to transform and augnment the input image, should be of type torchvision.transforms.Compose", + ) + + @keywords_catch + def __init__( + self, + backbone=None, + additional_layers_to_train=0, + num_classes=None, + optimizer_name="adam", + loss_name="cross_entropy", + dropout_aux=0.7, + transform_fn=None, + # Classifier args + label_col="label", + image_col="image", + prediction_col="prediction", + # TorchEstimator args + num_proc=None, + backend=None, + store=None, + metrics=None, + loss_weights=None, + sample_weight_col=None, + gradient_compression=None, + input_shapes=None, + validation=None, + callbacks=None, + batch_size=None, + val_batch_size=None, + epochs=None, + verbose=1, + random_seed=None, + shuffle_buffer_size=None, + partitions_per_process=None, + run_id=None, + train_minibatch_fn=None, + train_steps_per_epoch=None, + validation_steps_per_epoch=None, + transformation_fn=None, + train_reader_num_workers=None, + trainer_args=None, + val_reader_num_workers=None, + reader_pool_type=None, + label_shapes=None, + inmemory_cache_all=False, + num_gpus=None, + logger=None, + log_every_n_steps=50, + data_module=None, + loader_num_epochs=None, + terminate_on_nan=False, + profiler=None, + debug_data_loader=False, + train_async_data_loader_queue_size=None, + val_async_data_loader_queue_size=None, + use_gpu=True, + mp_start_method=None, + ): + super(DeepVisionClassifier, self).__init__() + + self._setDefault( + backbone=None, + additional_layers_to_train=0, + num_classes=None, + optimizer_name="adam", + loss_name="cross_entropy", + dropout_aux=0.7, + transform_fn=None, + feature_cols=["image"], + label_cols=["label"], + label_col="label", + image_col="image", + prediction_col="prediction", + ) + + kwargs = self._kwargs + self._set(**kwargs) + + self._update_input_shapes() + self._update_cols() + self._update_transformation_fn() + + model = LitDeepVisionModel( + backbone=self.getBackbone(), + additional_layers_to_train=self.getAdditionalLayersToTrain(), + num_classes=self.getNumClasses(), + input_shape=self.getInputShapes()[0], + optimizer_name=self.getOptimizerName(), + loss_name=self.getLossName(), + label_col=self.getLabelCol(), + image_col=self.getImageCol(), + dropout_aux=self.getDropoutAUX(), + ) + self._set(model=model) + + def setBackbone(self, value): + return self._set(backbone=value) + + def getBackbone(self): + return self.getOrDefault(self.backbone) + + def setAdditionalLayersToTrain(self, value): + return self._set(additional_layers_to_train=value) + + def getAdditionalLayersToTrain(self): + return self.getOrDefault(self.additional_layers_to_train) + + def setNumClasses(self, value): + return self._set(num_classes=value) + + def getNumClasses(self): + return self.getOrDefault(self.num_classes) + + def setLossName(self, value): + return self._set(loss_name=value) + + def getLossName(self): + return self.getOrDefault(self.loss_name) + + def setOptimizerName(self, value): + return self._set(optimizer_name=value) + + def getOptimizerName(self): + return self.getOrDefault(self.optimizer_name) + + def setDropoutAUX(self, value): + return self._set(dropout_aux=value) + + def getDropoutAUX(self): + return self.getOrDefault(self.dropout_aux) + + def setTransformFn(self, value): + return self._set(transform_fn=value) + + def getTransformFn(self): + return self.getOrDefault(self.transform_fn) + + def _update_input_shapes(self): + if self.getInputShapes() is None: + if self.getBackbone().startswith("inception"): + self.setInputShapes([[-1, 3, 299, 299]]) + else: + self.setInputShapes([[-1, 3, 224, 224]]) + + def _update_cols(self): + self.setFeatureCols([self.getImageCol()]) + self.setLabelCols([self.getLabelCol()]) + + def _fit(self, dataset): + return super()._fit(dataset) + + # override this method to provide a correct default backend + def _get_or_create_backend(self): + backend = self.getBackend() + num_proc = self.getNumProc() + if backend is None: + if num_proc is None: + num_proc = self._find_num_proc() + backend = SparkBackend( + num_proc, + stdout=sys.stdout, + stderr=sys.stderr, + prefix_output_with_timestamp=True, + verbose=self.getVerbose(), + ) + elif num_proc is not None: + raise ValueError( + 'At most one of parameters "backend" and "num_proc" may be specified' + ) + return backend + + def _find_num_proc(self): + if self.getUseGpu(): + # set it as number of executors for now (ignoring num_gpus per executor) + sc = SparkContext.getOrCreate() + return sc._jsc.sc().getExecutorMemoryStatus().size() - 1 + return None + + def _update_transformation_fn(self): + if self.getTransformationFn() is None: + if self.getTransformFn() is None: + crop_size = self.getInputShapes()[0][-1] + transform = transforms.Compose( + [ + transforms.RandomResizedCrop(crop_size), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize( + mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] + ), + ] + ) + self.setTransformFn(transform) + + def _create_transform_row(image_col, label_col, transform): + def _transform_row(row): + path = row[image_col] + label = row[label_col] + image = Image.open(path).convert("RGB") + image = transform(image).numpy() + return {image_col: image, label_col: label} + + return _transform_row + + self.setTransformationFn( + _create_transform_row( + self.getImageCol(), + self.getLabelCol(), + self.getTransformFn(), + ) + ) + + def get_model_class(self): + return DeepVisionModel + + def _get_model_kwargs(self, model, history, optimizer, run_id, metadata): + return dict( + history=history, + model=model, + optimizer=optimizer, + input_shapes=self.getInputShapes(), + run_id=run_id, + _metadata=metadata, + loss=self.getLoss(), + loss_constructors=self.getLossConstructors(), + label_col=self.getLabelCol(), + image_col=self.getImageCol(), + prediction_col=self.getPredictionCol(), + ) diff --git a/deep-learning/src/main/python/synapse/ml/dl/DeepVisionModel.py b/deep-learning/src/main/python/synapse/ml/dl/DeepVisionModel.py new file mode 100644 index 0000000000..7f35112df4 --- /dev/null +++ b/deep-learning/src/main/python/synapse/ml/dl/DeepVisionModel.py @@ -0,0 +1,124 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +import numpy as np +import torch +import torchvision.transforms as transforms +from horovod.spark.lightning import TorchModel +from PIL import Image +from synapse.ml.dl.PredictionParams import PredictionParams +from pyspark.ml.param import Param, Params, TypeConverters +from pyspark.sql.functions import col, udf +from pyspark.sql.types import DoubleType +from synapse.ml.dl.utils import keywords_catch + + +class DeepVisionModel(TorchModel, PredictionParams): + + transform_fn = Param( + Params._dummy(), + "transform_fn", + "A composition of transforms used to transform and augnment the input image, should be of type torchvision.transforms.Compose", + ) + + @keywords_catch + def __init__( + self, + history=None, + model=None, + input_shapes=None, + optimizer=None, + run_id=None, + _metadata=None, + loss=None, + loss_constructors=None, + # diff from horovod + transform_fn=None, + label_col="label", + image_col="image", + prediction_col="prediction", + ): + super(DeepVisionModel, self).__init__() + + self._setDefault( + optimizer=None, + loss=None, + loss_constructors=None, + input_shapes=None, + transform_fn=None, + label_col="label", + image_col="image", + prediction_col="prediction", + feature_columns=["image"], + label_columns=["label"], + outputCols=["output"], + ) + + kwargs = self._kwargs + self._set(**kwargs) + self._update_transform_fn() + self._update_cols() + + def setTransformFn(self, value): + return self._set(transform_fn=value) + + def getTransformFn(self): + return self.getOrDefault(self.transform_fn) + + def setTransformationFn(self, value): + return self._set(transformation_fn=value) + + def getTransformationFn(self): + return self.getOrDefault(self.transformation_fn) + + def _update_transform_fn(self): + if self.getTransformFn() is None: + crop_size = self.getInputShapes()[0][-1] + transform = transforms.Compose( + [ + transforms.CenterCrop(crop_size), + transforms.ToTensor(), + transforms.Normalize( + mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] + ), + ] + ) + self.setTransformFn(transform) + + def _update_cols(self): + self.setFeatureColumns([self.getImageCol()]) + self.setLabelColoumns([self.getLabelCol()]) + + # override this to open the image if it's a path + def get_prediction_fn(self): + input_shape = self.getInputShapes()[0] + image_col = self.getImageCol() + + def _create_predict_fn(transform): + def predict_fn(model, row): + if type(row[image_col]) == str: + image = Image.open(row[image_col]).convert("RGB") + data = torch.tensor(transform(image).numpy()).reshape(input_shape) + else: + data = torch.tensor([row[image_col]]).reshape(input_shape) + + with torch.no_grad(): + pred = model(data) + + return pred + + return predict_fn + + return _create_predict_fn(self.getTransformFn()) + + # pytorch_lightning module has its own optimizer configuration + def getOptimizer(self): + return None + + def _transform(self, df): + output_df = super()._transform(df) + argmax = udf(lambda v: float(np.argmax(v)), returnType=DoubleType()) + pred_df = output_df.withColumn( + self.getPredictionCol(), argmax(col(self.getOutputCols()[0])) + ) + return pred_df diff --git a/deep-learning/src/main/python/synapse/ml/dl/LitDeepVisionModel.py b/deep-learning/src/main/python/synapse/ml/dl/LitDeepVisionModel.py new file mode 100644 index 0000000000..50342e4567 --- /dev/null +++ b/deep-learning/src/main/python/synapse/ml/dl/LitDeepVisionModel.py @@ -0,0 +1,219 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +import inspect + +import pytorch_lightning as pl +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim +from pytorch_lightning.utilities import _module_available + +_TORCHVISION_AVAILABLE = _module_available("torchvision") +if _TORCHVISION_AVAILABLE: + import torchvision + + _TORCHVISION_GREATER_EQUAL_0_12_0 = torchvision.__version__ >= "0.12.0" + if _TORCHVISION_GREATER_EQUAL_0_12_0: + from torchvision import models + else: + raise RuntimeError( + "torchvision should be >= 0.12.0, found: {}".format(torchvision.__version__) + ) +else: + raise ModuleNotFoundError("module not found: torchvision") + + +class LitDeepVisionModel(pl.LightningModule): + def __init__( + self, + backbone, + additional_layers_to_train, + num_classes, + input_shape, + optimizer_name, + loss_name, + label_col, + image_col, + dropout_aux=0.7, + ): + super(LitDeepVisionModel, self).__init__() + + self.backbone = backbone + self.additional_layers_to_train = additional_layers_to_train + self.num_classes = num_classes + self.input_shape = input_shape + self.optimizer_name = optimizer_name + self.loss_name = loss_name + self.label_col = label_col + self.image_col = image_col + self.dropout_aux = dropout_aux + + self._check_params() + + # Freeze those weights + for p in self.model.parameters(): + p.requires_grad = False + + # Tune certain layers including num_classes + if backbone.startswith("alexnet"): + num_ftrs = self.model.classifier[6].in_features + self.model.classifier[6] = nn.Linear(num_ftrs, num_classes) + elif backbone.startswith("densenet"): + self.model.classifier = nn.Linear( + self.model.classifier.in_features, num_classes + ) + elif backbone.startswith("efficientnet"): + num_ftrs = self.model.classifier[-1].in_features + self.model.classifier = nn.Linear(num_ftrs, num_classes) + elif backbone.startswith("googlenet"): + if self.model.aux_logits: + self.model.aux1 = models.googlenet.InceptionAux( + 512, num_classes, dropout=self.dropout_aux + ) + self.model.aux2 = models.googlenet.InceptionAux( + 528, num_classes, dropout=self.dropout_aux + ) + self.model.fc = nn.Linear(self.model.fc.in_features, num_classes) + elif backbone.startswith("inception"): + """Inception v3 + Be careful, expects (299,299) sized images and has auxiliary output + """ + # Handle the auxilary net + self.model.AuxLogits.fc = nn.Linear( + self.model.AuxLogits.fc.in_features, num_classes + ) + # Handle the primary net + self.model.fc = nn.Linear(self.model.fc.in_features, num_classes) + elif backbone.startswith("mnasnet"): + num_ftrs = self.model.classifier[-1].in_features + self.model.classifier[-1] = nn.Linear(num_ftrs, num_classes) + elif backbone.startswith("mobilenet"): + num_ftrs = self.model.classifier[-1].in_features + self.model.classifier[-1] = nn.Linear(num_ftrs, num_classes) + elif backbone.startswith("regnet"): + self.model.fc = nn.Linear(self.model.fc.in_features, num_classes) + elif ( + backbone.startswith("resnet") + or backbone.startswith("resnext") + or backbone.startswith("wide_resnet") + ): + self.model.fc = nn.Linear(self.model.fc.in_features, num_classes) + elif backbone.startswith("shufflenet"): + self.model.fc = nn.Linear(self.model.fc.in_features, num_classes) + elif backbone.startswith("squeezenet"): + self.model.classifier[1] = nn.Conv2d( + 512, num_classes, kernel_size=(1, 1), stride=(1, 1) + ) + elif backbone.startswith("vgg"): + num_ftrs = self.model.classifier[6].in_features + self.model.classifier[6] = nn.Linear(num_ftrs, num_classes) + elif backbone.startswith("vit"): + num_ftrs = self.model.heads.head.in_features + self.model.heads.head = nn.Linear(num_ftrs, num_classes) + elif backbone.startswith("convnext"): + num_ftrs = self.model.classifier[-1].in_features + self.model.classifier[-1] = nn.Linear(num_ftrs, num_classes) + + # The Lightning checkpoint also saves the arguments passed into the LightningModule init + # under the "hyper_parameters" key in the checkpoint. + self.save_hyperparameters( + "backbone", + "additional_layers_to_train", + "num_classes", + "input_shape", + "optimizer_name", + "loss_name", + "label_col", + "image_col", + "dropout_aux", + ) + + def _check_params(self): + # TORCHVISION + if self.backbone in models.__dict__: + self.model = models.__dict__[self.backbone](pretrained=True) + # TODO: add HUGGINGFACE.TRANSFORMERS + else: + raise ValueError("No model: {} found".format(self.backbone)) + + if self.additional_layers_to_train < 0 or self.additional_layers_to_train > 3: + raise ValueError( + "additional_layers_to_train has to between 0 and 3: {} found".format( + self.additional_layers_to_train + ) + ) + + if self.loss_name.lower() not in F.__dict__: + raise ValueError("No loss function: {} found".format(self.loss_name)) + self.loss_fn = F.__dict__[self.loss_name.lower()] + + optimizers_mapping = { + key.lower(): value + for key, value in optim.__dict__.items() + if inspect.isclass(value) and issubclass(value, optim.Optimizer) + } + if self.optimizer_name.lower() not in optimizers_mapping: + raise ValueError("No optimizer: {} found".format(self.optimizer_name)) + self.optimizer_fn = optimizers_mapping[self.optimizer_name.lower()] + + # tune last additional_layers_to_train layers + def _fine_tune_layers(self): + children = list(self.model.children()) + added_layer, cur_layer = 0, -1 + while added_layer < self.additional_layers_to_train and -cur_layer < len( + children + ): + tunable = False + for p in children[cur_layer].parameters(): + p.requires_grad = True + tunable = True + # only tune those layers contain parameters + if tunable: + added_layer += 1 + cur_layer -= 1 + + def forward(self, x): + x = x.float() + return self.model.forward(x) + + def configure_optimizers(self): + self._fine_tune_layers() + params_to_update = filter(lambda p: p.requires_grad, self.model.parameters()) + return self.optimizer_fn(params_to_update) + + def training_step(self, batch, batch_idx): + loss = self._step(batch, False) + self.log("train_loss", loss) + return loss + + def _step(self, batch, validation): + inputs = {"x": batch[self.image_col].reshape(self.input_shape)} + label = batch[self.label_col] + output = self(**inputs) + + if self.backbone.startswith("inception") and not validation: + # https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958/9 + loss1 = self.loss_fn(output.logits, label.long()) + loss2 = self.loss_fn(output.aux_logits, label.long()) + loss = loss1 + 0.4 * loss2 + else: + loss = self.loss_fn(output, label.long()) + return loss + + def validation_step(self, batch, batch_idx): + loss = self._step(batch, True) + self.log("val_loss", loss) + + def validation_epoch_end(self, outputs): + avg_loss = ( + torch.stack([x["val_loss"] for x in outputs]).mean() + if len(outputs) > 0 + else float("inf") + ) + self.log("avg_val_loss", avg_loss) + + def test_step(self, batch, batch_idx): + loss = self._step(batch, False) + return loss diff --git a/deep-learning/src/main/python/synapse/ml/dl/PredictionParams.py b/deep-learning/src/main/python/synapse/ml/dl/PredictionParams.py new file mode 100644 index 0000000000..e4c68845ec --- /dev/null +++ b/deep-learning/src/main/python/synapse/ml/dl/PredictionParams.py @@ -0,0 +1,70 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +from pyspark.ml.param import Param, Params, TypeConverters + + +class PredictionParams(Params): + + label_col = Param( + Params._dummy(), + "label_col", + "label column name.", + typeConverter=TypeConverters.toString, + ) + + image_col = Param( + Params._dummy(), + "image_col", + "image column name.", + typeConverter=TypeConverters.toString, + ) + + prediction_col = Param( + Params._dummy(), + "prediction_col", + "prediction column name.", + typeConverter=TypeConverters.toString, + ) + + def __init__(self): + super(PredictionParams, self).__init__() + self._setDefault( + label_col="label", image_col="image", prediction_col="prediction" + ) + + def setLabelCol(self, value): + """ + Sets the value of :py:attr:`label_col`. + """ + return self._set(label_col=value) + + def getLabelCol(self): + """ + Gets the value of label_col or its default value. + """ + return self.getOrDefault(self.label_col) + + def setImageCol(self, value): + """ + Sets the value of :py:attr:`image_col`. + """ + return self._set(image_col=value) + + def getImageCol(self): + """ + Gets the value of image_col or its default value. + """ + return self.getOrDefault(self.image_col) + + def setPredictionCol(self, value): + """ + Sets the value of :py:attr:`prediction_col`. + """ + return self._set(prediction_col=value) + + def getPredictionCol(self): + """ + Gets the value of prediction_col or its default value. + """ + return self.getOrDefault(self.prediction_col) diff --git a/deep-learning/src/main/python/synapse/ml/dl/__init__.py b/deep-learning/src/main/python/synapse/ml/dl/__init__.py new file mode 100644 index 0000000000..1a8ad55d9e --- /dev/null +++ b/deep-learning/src/main/python/synapse/ml/dl/__init__.py @@ -0,0 +1,6 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +from synapse.ml.dl.DeepVisionClassifier import * +from synapse.ml.dl.DeepVisionModel import * +from synapse.ml.dl.LitDeepVisionModel import * diff --git a/deep-learning/src/main/python/synapse/ml/dl/utils.py b/deep-learning/src/main/python/synapse/ml/dl/utils.py new file mode 100644 index 0000000000..956d98541c --- /dev/null +++ b/deep-learning/src/main/python/synapse/ml/dl/utils.py @@ -0,0 +1,24 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +from functools import wraps + + +def keywords_catch(func): + """ + A decorator that forces keyword arguments in the wrapped method + and saves actual input keyword arguments in `_kwargs`. + + Notes + ----- + Should only be used to wrap a method where first arg is `self` + """ + + @wraps(func) + def wrapper(self, *args, **kwargs): + if len(args) > 0: + raise TypeError("Method %s forces keyword arguments." % func.__name__) + self._kwargs = kwargs + return func(self, **kwargs) + + return wrapper diff --git a/deep-learning/src/test/python/synapsemltest/dl/__init__.py b/deep-learning/src/test/python/synapsemltest/dl/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/deep-learning/src/test/python/synapsemltest/dl/conftest.py b/deep-learning/src/test/python/synapsemltest/dl/conftest.py new file mode 100644 index 0000000000..200dd6b8f3 --- /dev/null +++ b/deep-learning/src/test/python/synapsemltest/dl/conftest.py @@ -0,0 +1,84 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +import os +import shutil +import subprocess +import urllib +from os.path import join + +import numpy as np +import pytest +import torchvision.transforms as transforms + +IS_WINDOWS = os.name == "nt" +delimiter = "\\" if IS_WINDOWS else "/" +dataset_dir = ( + delimiter.join([os.getcwd(), os.pardir, os.pardir, os.pardir, os.pardir]) + + delimiter +) + + +def _download_dataset(): + + urllib.request.urlretrieve( + "https://mmlspark.blob.core.windows.net/publicwasb/17flowers.tgz", + dataset_dir + "17flowers.tgz", + ) + if os.path.exists(dataset_dir + "jpg"): + shutil.rmtree(dataset_dir + "jpg") + + command = "tar -xzf {}17flowers.tgz -C {} \n".format(dataset_dir, dataset_dir) + subprocess.run(command.split(), stdout=subprocess.PIPE) + os.remove(dataset_dir + "17flowers.tgz") + files = [ + join(dp, f) + for dp, dn, filenames in os.walk(dataset_dir + "jpg") + for f in filenames + if os.path.splitext(f)[1] == ".jpg" + ] + assert len(files) == 1360 + np.random.shuffle(files) + train_files, test_files = np.split(np.array(files), [int(len(files) * 0.75)]) + train_dir = dataset_dir + "jpg{}train".format(delimiter) + test_dir = dataset_dir + "jpg{}test".format(delimiter) + if not os.path.exists(train_dir): + os.makedirs(train_dir) + if not os.path.exists(test_dir): + os.makedirs(test_dir) + + for name in train_files: + path, image = ( + delimiter.join(name.split(delimiter)[:-1]), + name.split(delimiter)[-1], + ) + shutil.move(name, delimiter.join([path, "train", image])) + + for name in test_files: + path, image = ( + delimiter.join(name.split(delimiter)[:-1]), + name.split(delimiter)[-1], + ) + shutil.move(name, delimiter.join([path, "test", image])) + + +@pytest.fixture(scope="module") +def get_data_path(): + if not os.path.exists(join(dataset_dir, "jpg", delimiter, "train")): + _download_dataset() + train_folder = dataset_dir + "jpg" + delimiter + "train" + test_folder = dataset_dir + "jpg" + delimiter + "test" + return train_folder, test_folder + + +@pytest.fixture(scope="module") +def transform(): + transform = transforms.Compose( + [ + transforms.RandomResizedCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), + ] + ) + return transform diff --git a/deep-learning/src/test/python/synapsemltest/dl/test_deep_vision_classifier.py b/deep-learning/src/test/python/synapsemltest/dl/test_deep_vision_classifier.py new file mode 100644 index 0000000000..20e1d45d10 --- /dev/null +++ b/deep-learning/src/test/python/synapsemltest/dl/test_deep_vision_classifier.py @@ -0,0 +1,113 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +import contextlib +import os +import shutil +import tempfile + +import pytest +from horovod.spark.common.store import LocalStore +from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.sql import SparkSession +from pyspark.sql.functions import col +from pyspark.sql.types import DoubleType +from pytorch_lightning.callbacks import ModelCheckpoint +from synapse.ml.dl import * + +from .test_deep_vision_model import MyDummyCallback + + +@contextlib.contextmanager +def tempdir(): + dirpath = tempfile.mkdtemp() + try: + yield dirpath + finally: + shutil.rmtree(dirpath) + + +@contextlib.contextmanager +def local_store(): + with tempdir() as tmp: + store = LocalStore(tmp) + yield store + + +def generate_data(spark, train_folder, test_folder): + train_files = [ + os.path.join(dp, f) + for dp, dn, filenames in os.walk(train_folder) + for f in filenames + if os.path.splitext(f)[1] == ".jpg" + ] + test_files = [ + os.path.join(dp, f) + for dp, dn, filenames in os.walk(test_folder) + for f in filenames + if os.path.splitext(f)[1] == ".jpg" + ] + + def extract_path_and_label(path): + num = int(path.split("/")[-1].split(".")[0].split("_")[1]) + label = num // 81 # Assign the label + return (path, label) + + train_df = spark.createDataFrame( + map(extract_path_and_label, train_files), ["image", "label"] + ).withColumn("label", col("label").cast(DoubleType())) + + test_df = spark.createDataFrame( + map(extract_path_and_label, test_files), ["image", "label"] + ).withColumn("label", col("label").cast(DoubleType())) + + return train_df, test_df + + +class CallbackBackend(object): + def run(self, fn, args=(), kwargs={}, env={}): + return [fn(*args, **kwargs)] * self.num_processes() + + def num_processes(self): + return 1 + + +@pytest.mark.skip(reason="not testing this for now") +def test_mobilenet_v2(get_data_path): + spark = SparkSession.builder.master("local[*]").getOrCreate() + + ctx = CallbackBackend() + + epochs = 5 + callbacks = [ + MyDummyCallback(epochs), + ModelCheckpoint(dirpath="target/mobilenet_v2/"), + ] + + train_folder, test_folder = get_data_path + + with local_store() as store: + + deep_vision_classifier = DeepVisionClassifier( + backbone="mobilenet_v2", + store=store, + backend=ctx, + callbacks=callbacks, + num_classes=17, + batch_size=16, + epochs=epochs, + validation=0.1, + ) + + train_df, test_df = generate_data(spark, train_folder, test_folder) + + deep_vision_model = deep_vision_classifier.fit(train_df) + + pred_df = deep_vision_model.transform(test_df) + evaluator = MulticlassClassificationEvaluator( + predictionCol="prediction", labelCol="label", metricName="accuracy" + ) + accuracy = evaluator.evaluate(pred_df) + assert accuracy > 0.5 + + spark.stop() diff --git a/deep-learning/src/test/python/synapsemltest/dl/test_deep_vision_model.py b/deep-learning/src/test/python/synapsemltest/dl/test_deep_vision_model.py new file mode 100644 index 0000000000..13ab6ca773 --- /dev/null +++ b/deep-learning/src/test/python/synapsemltest/dl/test_deep_vision_model.py @@ -0,0 +1,117 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in project root for information. + +import os +import random +from os.path import join + +import numpy as np +import torch +from PIL import Image +from pytorch_lightning import Trainer +from pytorch_lightning.callbacks import Callback +from torch.utils.data import DataLoader, Dataset + +from synapse.ml.dl import * + + +class MyDummyCallback(Callback): + def __init__(self, epochs=10): + self.epochs = epochs + self.epcoh_end_counter = 0 + self.train_epcoh_end_counter = 0 + self.validation_epoch_end_counter = 0 + + def on_init_start(self, trainer): + print("Starting to init trainer!") + + def on_init_end(self, trainer): + print("Trainer is initialized.") + + def on_epoch_end(self, trainer, model): + print("A epoch ended.") + self.epcoh_end_counter += 1 + + def on_train_epoch_end(self, trainer, model, unused=None): + print("A train epoch ended.") + self.train_epcoh_end_counter += 1 + + def on_validation_epoch_end(self, trainer, model, unused=None): + print("A val epoch ended.") + self.validation_epoch_end_counter += 1 + + def on_train_end(self, trainer, model): + print( + "Training ends:" + f"epcoh_end_counter={self.epcoh_end_counter}, " + f"train_epcoh_end_counter={self.train_epcoh_end_counter}, " + f"validation_epoch_end_counter={self.validation_epoch_end_counter} \n" + ) + assert self.train_epcoh_end_counter <= self.epochs + assert ( + self.train_epcoh_end_counter + self.validation_epoch_end_counter + == self.epcoh_end_counter + ) + + +def test_lit_deep_vision_model(transform, get_data_path): + seed = np.random.randint(2147483647) + random.seed(seed) + torch.manual_seed(seed) + + class ImageDataset(Dataset): + def __init__(self, root, transform): + super(ImageDataset, self).__init__() + self.root = join(root) + self.transform = transform + self.images = [f for f in os.listdir(self.root) if f.split(".")[1] == "jpg"] + + def __getitem__(self, index): + image = Image.open(join(self.root, self.images[index])).convert("RGB") + image = self.transform(image) + label = int(self.images[index].split(".")[0].split("_")[1]) // 81 + return {"image": image, "label": label} + + def __len__(self): + return len(self.images) + + train_folder, test_folder = get_data_path + + train_loader = DataLoader( + ImageDataset( + train_folder, + transform, + ), + batch_size=16, + shuffle=True, + num_workers=0, + pin_memory=True, + ) + + test_loader = DataLoader( + ImageDataset( + test_folder, + transform, + ), + batch_size=16, + shuffle=True, + num_workers=0, # make sure this is 0 to avoid 'can't pickle local object Dataset' error + pin_memory=True, + ) + + epochs = 10 + model = LitDeepVisionModel( + backbone="resnet50", + additional_layers_to_train=1, + num_classes=17, + input_shape=[-1, 3, 224, 224], + optimizer_name="adam", + loss_name="cross_entropy", + label_col="label", + image_col="image", + ) + + callbacks = [MyDummyCallback(epochs)] + trainer = Trainer(callbacks=callbacks, max_epochs=epochs) + trainer.fit(model, train_dataloaders=train_loader) + trainer.test(model, dataloaders=test_loader) diff --git a/environment.yml b/environment.yml index 6dd9579143..565e335993 100644 --- a/environment.yml +++ b/environment.yml @@ -8,11 +8,12 @@ dependencies: - requests=2.26.0 - pip=21.3 - r-base=4.1.1 - - r-sparklyr=1.7.2 + - r-sparklyr=1.7.6 - r-devtools=2.4.2 - pip: - wheel - sphinx==4.2.0 + - sphinx_paramlinks==0.5.2 - sphinx_rtd_theme - coverage - pytest @@ -24,3 +25,10 @@ dependencies: - twine - jupyter - mlflow + - numpy==1.19.3 + - torch==1.11.0 + - torchvision==0.12.0 + - horovod==0.25.0 + - petastorm>=0.11.0 + - pyarrow>=0.15.0 + - pytorch_lightning==1.5.0 diff --git a/notebooks/features/simple_deep_learning/DeepLearning - Deep Vision Classifier.ipynb b/notebooks/features/simple_deep_learning/DeepLearning - Deep Vision Classifier.ipynb new file mode 100644 index 0000000000..318f26875d --- /dev/null +++ b/notebooks/features/simple_deep_learning/DeepLearning - Deep Vision Classifier.ipynb @@ -0,0 +1,199 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Deep Learning - Deep Vision Classifier" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Environment Setup -- reinstall horovod based on new version of pytorch" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# install cloudpickle 2.0.0 to add synapse module for usage of horovod\n", + "%pip install cloudpickle==2.0.0 --force-reinstall --no-deps" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import synapse\n", + "import cloudpickle\n", + "\n", + "cloudpickle.register_pickle_by_value(synapse)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "! horovodrun --check-build" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql.functions import udf, col, regexp_replace\n", + "from pyspark.sql.types import IntegerType\n", + "from pyspark.ml.evaluation import MulticlassClassificationEvaluator" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Read Dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def assign_label(path):\n", + " num = int(path.split(\"/\")[-1].split(\".\")[0].split(\"_\")[1])\n", + " return num // 81\n", + "\n", + "\n", + "assign_label_udf = udf(assign_label, IntegerType())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# These files are already uploaded for build test machine\n", + "train_df = (\n", + " spark.read.format(\"binaryFile\")\n", + " .option(\"pathGlobFilter\", \"*.jpg\")\n", + " .load(\"/tmp/17flowers/train\")\n", + " .withColumn(\"image\", regexp_replace(\"path\", \"dbfs:\", \"/dbfs\"))\n", + " .withColumn(\"label\", assign_label_udf(col(\"path\")))\n", + " .select(\"image\", \"label\")\n", + ")\n", + "\n", + "display(train_df.limit(100))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "test_df = (\n", + " spark.read.format(\"binaryFile\")\n", + " .option(\"pathGlobFilter\", \"*.jpg\")\n", + " .load(\"/tmp/17flowers/test\")\n", + " .withColumn(\"image\", regexp_replace(\"path\", \"dbfs:\", \"/dbfs\"))\n", + " .withColumn(\"label\", assign_label_udf(col(\"path\")))\n", + " .select(\"image\", \"label\")\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Training" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from horovod.spark.common.store import DBFSLocalStore\n", + "from pytorch_lightning.callbacks import ModelCheckpoint\n", + "from synapse.ml.dl import *\n", + "\n", + "run_output_dir = \"/dbfs/FileStore/test/resnet50\"\n", + "store = DBFSLocalStore(run_output_dir)\n", + "\n", + "epochs = 10\n", + "\n", + "callbacks = [ModelCheckpoint(filename=\"{epoch}-{train_loss:.2f}\")]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "deep_vision_classifier = DeepVisionClassifier(\n", + " backbone=\"resnet50\",\n", + " store=store,\n", + " callbacks=callbacks,\n", + " num_classes=17,\n", + " batch_size=16,\n", + " epochs=epochs,\n", + " validation=0.1,\n", + ")\n", + "\n", + "deep_vision_model = deep_vision_classifier.fit(train_df)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Prediction" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pred_df = deep_vision_model.transform(test_df)\n", + "evaluator = MulticlassClassificationEvaluator(\n", + " predictionCol=\"prediction\", labelCol=\"label\", metricName=\"accuracy\"\n", + ")\n", + "print(\"Test accuracy:\", evaluator.evaluate(pred_df))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.8.13 ('synapseml')", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.8.13" + }, + "orig_nbformat": 4, + "vscode": { + "interpreter": { + "hash": "e618ed4c1ce23faf0894af85b02b5324888e2d70eeb0f2451cf171faa1cbba7a" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/pipeline.yaml b/pipeline.yaml index 375bd28653..a9271c426e 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -122,6 +122,14 @@ jobs: scriptType: bash inlineScript: 'sbt "testOnly com.microsoft.azure.synapse.ml.nbtest.DatabricksTests"' condition: and(succeeded(), eq(variables.runTests, 'True')) + - task: AzureCLI@2 + displayName: 'GPU tests' + inputs: + azureSubscription: 'MMLSpark Build' + scriptLocation: inlineScript + scriptType: bash + inlineScript: 'sbt "testOnly com.microsoft.azure.synapse.ml.nbtest.GPUTests"' + condition: and(succeeded(), eq(variables.runTests, 'True')) - task: PublishTestResults@2 displayName: 'Publish Test Results' inputs: @@ -450,8 +458,6 @@ jobs: - template: templates/update_cli.yml - template: templates/conda.yml - template: templates/kv.yml - - bash: curl https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz -o spark-3.2.0-bin-hadoop3.2.tgz - displayName: Download spark - task: AzureCLI@2 displayName: 'Test R Code' timeoutInMinutes: 30 diff --git a/tools/tests/run_r_tests.R b/tools/tests/run_r_tests.R index 996966e01d..4e0c8d27f0 100644 --- a/tools/tests/run_r_tests.R +++ b/tools/tests/run_r_tests.R @@ -1,11 +1,4 @@ library("sparklyr") -tryCatch({ - spark_install_find(version = "3.2.0") -}, - error=function(err) { - spark_install_tar("../../../../../../../spark-3.2.0-bin-hadoop3.2.tgz") - } -) - +spark_install(version = "3.2.0", hadoop_version = "3.2") options("testthat.output_file" = "../../../../r-test-results.xml") devtools::test(reporter = JunitReporter$new()) diff --git a/website/.gitignore b/website/.gitignore index 99b5174260..4f9eab662e 100644 --- a/website/.gitignore +++ b/website/.gitignore @@ -20,6 +20,9 @@ /docs/features/responsible_ai/* !/docs/features/responsible_ai/Data Balance Analysis.md !/docs/features/responsible_ai/Model Interpretation on Spark.md +!/docs/features/simple_deep_learning +/docs/features/simple_deep_learning/* +!/docs/features/simple_deep_learning/about.md !/docs/features/spark_serving /docs/features/spark_serving/* !/docs/features/spark_serving/about.md diff --git a/website/docs/features/simple_deep_learning/about.md b/website/docs/features/simple_deep_learning/about.md new file mode 100644 index 0000000000..f66fc886f4 --- /dev/null +++ b/website/docs/features/simple_deep_learning/about.md @@ -0,0 +1,38 @@ +--- +title: Deep Vision Classification on Databricks +sidebar_label: Deep Vision Classification on Databricks +--- + +## 1. Re-install horovod using our prepared script + +We build on top of torchvision, horovod and pytorch_lightning, so we need to reinstall horovod by building on specific versions of those packages. +Please download our [horovod installation script](https://mmlspark.blob.core.windows.net/publicwasb/horovod_installation.sh) and upload +it to databricks dbfs. + +Add the path of this script to `Init Scripts` section when configuring the spark cluster. +Restarting the cluster will automatically install horovod v0.25.0 with pytorch_lightning v1.5.0 and torchvision v0.12.0. + +## 2. Install SynapseML Deep Learning Component + +You could install the single synapseml-deep-learning wheel package to get the full functionality of deep vision classification. +Run the following command: +```powershell +pip install https://mmlspark.blob.core.windows.net/pip/$SYNAPSEML_SCALA_VERSION/synapseml_deep_learning-$SYNAPSEML_PYTHON_VERSION-py2.py3-none-any.whl +``` + +An alternative is installing the SynapseML jar package in library management section, by adding: +``` +Coordinate: com.microsoft.azure:synapseml_2.12:SYNAPSEML_SCALA_VERSION +Repository: https://mmlspark.azureedge.net/maven +``` +:::note +If you install the jar package, you need to follow the first two cell of this [sample](./DeepLearning%20-%20Deep%20Vision%20Classifier.md/#environment-setup----reinstall-horovod-based-on-new-version-of-pytorch) +to make horovod recognizing our module. +::: + +## 3. Try our sample notebook + +You could follow the rest of this [sample](./DeepLearning%20-%20Deep%20Vision%20Classifier.md) and have a try on your own dataset. + +Supported models (`backbone` parameter for `DeepVisionClassifer`) should be string format of [torchvision supported models](https://github.com/pytorch/vision/blob/v0.12.0/torchvision/models/__init__.py); +You could also check by running `backbone in torchvision.models.__dict__`. diff --git a/website/sidebars.js b/website/sidebars.js index 4c991f14fb..7ad9cc2b03 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -11,6 +11,7 @@ let ss_pages = listExamplePaths("features", "spark_serving"); let ocv_pages = listExamplePaths("features", "opencv"); let cls_pages = listExamplePaths("features", "classification"); let reg_pages = listExamplePaths("features", "regression"); +let dl_pages = listExamplePaths("features", "simple_deep_learning"); let other_pages = listExamplePaths("features", "other"); module.exports = { @@ -88,6 +89,11 @@ module.exports = { label: 'Regression', items: reg_pages, }, + { + type: 'category', + label: 'Simple Deep Learning', + items: dl_pages, + }, { type: 'category', label: 'Other',