diff --git a/.gitignore b/.gitignore index ec1a9ac15e..640475c157 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,10 @@ package-lock.json node_modules/ .Rproj.user +# Notebook Test Files +/notebooks/*.py +/notebooks/*.txt + # R output *.Rout diff --git a/core/src/test/scala/com/microsoft/ml/spark/Secrets.scala b/core/src/test/scala/com/microsoft/ml/spark/Secrets.scala index d703586d18..3e52fa3867 100644 --- a/core/src/test/scala/com/microsoft/ml/spark/Secrets.scala +++ b/core/src/test/scala/com/microsoft/ml/spark/Secrets.scala @@ -51,5 +51,7 @@ object Secrets { lazy val TranslatorKey: String = getSecret("translator-key") lazy val PowerbiURL: String = getSecret("powerbi-url") lazy val AdbToken: String = getSecret("adb-token") + lazy val SynapseStorageKey: String = getSecret("mmlsparkeuap-key") + lazy val SynapseSpnKey: String = getSecret("synapse-spn-key") } diff --git a/core/src/test/scala/com/microsoft/ml/spark/nbtest/NotebookTests.scala b/core/src/test/scala/com/microsoft/ml/spark/nbtest/DatabricksTests.scala similarity index 98% rename from core/src/test/scala/com/microsoft/ml/spark/nbtest/NotebookTests.scala rename to core/src/test/scala/com/microsoft/ml/spark/nbtest/DatabricksTests.scala index b4fb8a37c1..32945f93b8 100644 --- a/core/src/test/scala/com/microsoft/ml/spark/nbtest/NotebookTests.scala +++ b/core/src/test/scala/com/microsoft/ml/spark/nbtest/DatabricksTests.scala @@ -3,18 +3,17 @@ package com.microsoft.ml.spark.nbtest -import java.util.concurrent.TimeUnit - import com.microsoft.ml.spark.core.test.base.TestBase import com.microsoft.ml.spark.nbtest.DatabricksUtilities._ +import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.language.existentials /** Tests to validate fuzzing of modules. */ -class NotebookTests extends TestBase { +class DatabricksTests extends TestBase { test("Databricks Notebooks") { val clusterId = createClusterInPool(ClusterName, PoolId) diff --git a/core/src/test/scala/com/microsoft/ml/spark/nbtest/SynapseTests.scala b/core/src/test/scala/com/microsoft/ml/spark/nbtest/SynapseTests.scala new file mode 100644 index 0000000000..e557c99aef --- /dev/null +++ b/core/src/test/scala/com/microsoft/ml/spark/nbtest/SynapseTests.scala @@ -0,0 +1,81 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.ml.spark.nbtest + +import com.microsoft.ml.spark.core.test.base.TestBase +import com.microsoft.ml.spark.nbtest.SynapseUtilities.exec + +import java.io.File +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.language.existentials +import scala.sys.process.Process + +/** Tests to validate fuzzing of modules. */ +class SynapseTests extends TestBase { + + test("Synapse") { + + val os = sys.props("os.name").toLowerCase + os match { + case x if x contains "windows" => + exec("conda activate mmlspark && jupyter nbconvert --to script .\\notebooks\\*.ipynb") + case _ => + Process(s"conda init bash; conda activate mmlspark; jupyter nbconvert --to script ./notebooks/*.ipynb") + } + + SynapseUtilities.listPythonFiles().map(f => { + val newPath = f + .replace(" ", "") + .replace("-", "") + new File(f).renameTo(new File(newPath)) + }) + + val workspaceName = "mmlspark" + val sparkPools = Array("buildpool", "buildpool2", "buildpool3") + + val livyBatchJobs = SynapseUtilities.listPythonJobFiles() + .filterNot(_.contains(" ")) + .filterNot(_.contains("-")) + .map(f => { + val poolName = SynapseUtilities.monitorPool(workspaceName, sparkPools) + val livyUrl = "https://" + + workspaceName + + ".dev.azuresynapse.net/livyApi/versions/2019-11-01-preview/sparkPools/" + + poolName + + "/batches" + val livyBatch: LivyBatch = SynapseUtilities.uploadAndSubmitNotebook(livyUrl, f) + println(s"submitted livy job: ${livyBatch.id} to sparkPool: $poolName") + LivyBatchJob(livyBatch, poolName, livyUrl) + }) + .filterNot(_.livyBatch.state == "none") + + try { + val batchFutures: Array[Future[Any]] = livyBatchJobs.map((batchJob: LivyBatchJob) => { + Future { + val batch = batchJob.livyBatch + val livyUrl = batchJob.livyUrl + + if (batch.state != "success") { + SynapseUtilities.retry(batch.id, livyUrl, SynapseUtilities.TimeoutInMillis, System.currentTimeMillis()) + } + }(ExecutionContext.global) + }) + + val failures = batchFutures + .map(f => Await.ready(f, Duration(SynapseUtilities.TimeoutInMillis.toLong, TimeUnit.MILLISECONDS)).value.get) + .filter(f => f.isFailure) + assert(failures.isEmpty) + } + catch { + case t: Throwable => + livyBatchJobs.foreach { batchJob => + println(s"Cancelling job ${batchJob.livyBatch.id}") + SynapseUtilities.cancelRun(batchJob.livyUrl, batchJob.livyBatch.id) + } + throw t + } + } +} diff --git a/core/src/test/scala/com/microsoft/ml/spark/nbtest/SynapseUtilities.scala b/core/src/test/scala/com/microsoft/ml/spark/nbtest/SynapseUtilities.scala new file mode 100644 index 0000000000..d61c5e3598 --- /dev/null +++ b/core/src/test/scala/com/microsoft/ml/spark/nbtest/SynapseUtilities.scala @@ -0,0 +1,280 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + +package com.microsoft.ml.spark.nbtest + +import com.microsoft.ml.spark.Secrets +import com.microsoft.ml.spark.build.BuildInfo +import com.microsoft.ml.spark.core.env.FileUtilities +import org.apache.commons.io.IOUtils +import org.apache.http.client.entity.UrlEncodedFormEntity +import org.apache.http.client.methods.{HttpDelete, HttpGet, HttpPost} +import org.apache.http.entity.StringEntity +import org.apache.http.message.BasicNameValuePair +import org.json4s.JsonAST.JObject +import org.json4s.jackson.JsonMethods.parse +import org.json4s.jackson.Serialization +import org.json4s.jackson.Serialization.write +import org.json4s.{Formats, NoTypeHints} +import spray.json.DefaultJsonProtocol._ +import spray.json._ +import com.microsoft.ml.spark.io.split2.HasHttpClient + +import java.io.{File, InputStream} +import java.util +import scala.annotation.tailrec +import scala.concurrent.{TimeoutException, blocking} +import scala.io.Source +import scala.language.postfixOps +import scala.sys.process._ + +case class LivyBatch(id: Int, + state: String, + appId: Option[String], + appInfo: Option[JObject], + log: Seq[String]) + +case class LivyBatchJob(livyBatch: LivyBatch, + sparkPool: String, + livyUrl: String) + +case class Application(state: String, + name: String, + livyId: String) + +case class Applications(nJobs: Int, + sparkJobs: Seq[Application]) + +object SynapseUtilities extends HasHttpClient { + + implicit val Fmts: Formats = Serialization.formats(NoTypeHints) + lazy val Token: String = getSynapseToken + + val Folder = s"build_${BuildInfo.version}/scripts" + val TimeoutInMillis: Int = 20 * 60 * 1000 + val StorageAccount: String = "mmlsparkeuap" + val StorageContainer: String = "synapse" + val TenantId: String = "72f988bf-86f1-41af-91ab-2d7cd011db47" + val ClientId: String = "85dde348-dd2b-43e5-9f5a-22262af45332" + + def listPythonFiles(): Array[String] = { + Option( + FileUtilities + .join(BuildInfo.baseDirectory.getParent, "notebooks") + .getCanonicalFile + .listFiles() + .filter(_.getAbsolutePath.endsWith(".py")) + .filter(_.getAbsolutePath.contains("-")) + .filterNot(_.getAbsolutePath.contains("CyberML")) + .filterNot(_.getAbsolutePath.contains("DeepLearning")) + .filterNot(_.getAbsolutePath.contains("ConditionalKNN")) + .filterNot(_.getAbsolutePath.contains("HyperParameterTuning")) + .filterNot(_.getAbsolutePath.contains("Regressor.py")) + .filterNot(_.getAbsolutePath.contains("Overview")) + .filterNot(_.getAbsolutePath.contains("ModelInterpretation")) + .filterNot(_.getAbsolutePath.contains("Interpretability")) + .map(file => file.getAbsolutePath)) + .get + .sorted + } + + def listPythonJobFiles(): Array[String] = { + Option( + FileUtilities + .join(BuildInfo.baseDirectory.getParent, "notebooks") + .getCanonicalFile + .listFiles() + .filter(_.getAbsolutePath.endsWith(".py")) + .filterNot(_.getAbsolutePath.contains("-")) + .filterNot(_.getAbsolutePath.contains(" ")) + .map(file => file.getAbsolutePath)) + .get + .sorted + } + + def listNoteBookFiles(): Array[String] = { + Option( + FileUtilities + .join(BuildInfo.baseDirectory.getParent, "notebooks") + .getCanonicalFile + .listFiles() + .filter(_.getAbsolutePath.endsWith(".ipynb")) + .map(file => file.getAbsolutePath)) + .get + .sorted + } + + def postMortem(batch: LivyBatch, livyUrl: String): LivyBatch = { + batch.log.foreach(println) + write(batch) + batch + } + + def poll(id: Int, livyUrl: String): LivyBatch = { + val getStatusRequest = new HttpGet(s"$livyUrl/$id") + getStatusRequest.setHeader("Authorization", s"Bearer $Token") + val statsResponse = client.execute(getStatusRequest) + val batch = parse(IOUtils.toString(statsResponse.getEntity.getContent, "utf-8")).extract[LivyBatch] + statsResponse.close() + batch + } + + def showSubmittingJobs(workspaceName: String, poolName: String): Applications = { + val uri: String = + "https://" + + s"$workspaceName.dev.azuresynapse.net" + + "/monitoring/workloadTypes/spark/applications" + + "?api-version=2020-10-01-preview" + + "&filter=(((state%20eq%20%27Queued%27)%20or%20(state%20eq%20%27Submitting%27))" + + s"%20and%20(sparkPoolName%20eq%20%27$poolName%27))" + val getRequest = new HttpGet(uri) + getRequest.setHeader("Authorization", s"Bearer $Token") + val jobsResponse = client.execute(getRequest) + val activeJobs = + parse(IOUtils.toString(jobsResponse.getEntity.getContent, "utf-8")) + .extract[Applications] + activeJobs + } + + def checkPool(workspaceName: String, poolName: String): Boolean = { + val nSubmittingJob = showSubmittingJobs(workspaceName, poolName).nJobs + nSubmittingJob == 0 + } + + @tailrec + def monitorPool(workspaceName: String, sparkPools: Array[String]): String = { + + val readyPools = sparkPools.filter(checkPool(workspaceName, _)) + + if (readyPools.length > 0) { + val readyPool = readyPools(0) + println(s"Spark Pool: $readyPool is ready") + readyPool + } + else { + println(s"None spark pool is ready to submit job, waiting 10s") + blocking { + Thread.sleep(10000) + } + monitorPool(workspaceName, sparkPools) + } + } + + @tailrec + def retry(id: Int, livyUrl: String, timeout: Int, startTime: Long): LivyBatch = { + val batch = poll(id, livyUrl) + println(s"batch state $id : ${batch.state}") + if (batch.state == "success") { + batch + } else { + if ((System.currentTimeMillis() - startTime) > timeout) { + throw new TimeoutException(s"Job $id timed out.") + } + else if (batch.state == "dead") { + postMortem(batch, livyUrl) + throw new RuntimeException(s"Dead") + } + else if (batch.state == "error") { + postMortem(batch, livyUrl) + throw new RuntimeException(s"Error") + } + else { + blocking { + Thread.sleep(8000) + } + println(s"retrying id $id") + retry(id, livyUrl, timeout, startTime) + } + } + } + + def uploadAndSubmitNotebook(livyUrl: String, notebookPath: String): LivyBatch = { + val convertedPyScript = new File(notebookPath) + val abfssPath = uploadScript(convertedPyScript.getAbsolutePath, s"$Folder/${convertedPyScript.getName}") + submitRun(livyUrl, abfssPath) + } + + private def uploadScript(file: String, dest: String): String = { + exec(s"az storage fs file upload " + + s" -s $file -p $dest -f $StorageContainer " + + s" --overwrite true " + + s" --account-name $StorageAccount --account-key ${Secrets.SynapseStorageKey}") + s"abfss://$StorageContainer@$StorageAccount.dfs.core.windows.net/$dest" + } + + def exec(command: String): String = { + val os = sys.props("os.name").toLowerCase + os match { + case x if x contains "windows" => Seq("cmd", "/C") ++ Seq(command) !! + case _ => command !! + } + } + + def getSynapseToken: String = { + val spnKey: String = Secrets.SynapseSpnKey + + val uri: String = s"https://login.microsoftonline.com/$TenantId/oauth2/token" + + val createRequest = new HttpPost(uri) + createRequest.setHeader("Content-Type", "application/x-www-form-urlencoded") + + val bodyList: util.List[BasicNameValuePair] = new util.ArrayList[BasicNameValuePair]() + bodyList.add(new BasicNameValuePair("grant_type", "client_credentials")) + bodyList.add(new BasicNameValuePair("client_id", s"$ClientId")) + bodyList.add(new BasicNameValuePair("client_secret", s"$spnKey")) + bodyList.add(new BasicNameValuePair("resource", "https://dev.azuresynapse.net/")) + + createRequest.setEntity(new UrlEncodedFormEntity(bodyList, "UTF-8")) + + val response = client.execute(createRequest) + val inputStream: InputStream = response.getEntity.getContent + val pageContent: String = Source.fromInputStream(inputStream).mkString + pageContent.parseJson.asJsObject().fields("access_token").convertTo[String] + } + + def cancelRun(livyUrl: String, batchId: Int): Unit = { + val createRequest = new HttpDelete(s"$livyUrl/$batchId") + createRequest.setHeader("Authorization", s"Bearer $Token") + val response = client.execute(createRequest) + println(response.getEntity.getContent) + } + + private def submitRun(livyUrl: String, path: String): LivyBatch = { + val excludes: String = "org.scala-lang:scala-reflect," + + "org.apache.spark:spark-tags_2.12," + + "org.scalactic:scalactic_2.12," + + "org.scalatest:scalatest_2.12" + + val livyPayload: String = + s""" + |{ + | "file" : "$path", + | "name" : "${path.split('/').last.replace(".py", "")}", + | "driverMemory" : "28g", + | "driverCores" : 4, + | "executorMemory" : "28g", + | "executorCores" : 4, + | "numExecutors" : 2, + | "conf" : + | { + | "spark.jars.packages" : "com.microsoft.ml.spark:mmlspark:${BuildInfo.version}", + | "spark.jars.repositories" : "https://mmlspark.azureedge.net/maven", + | "spark.jars.excludes": "$excludes", + | "spark.driver.userClassPathFirst": "true", + | "spark.executor.userClassPathFirst": "true" + | } + | } + """.stripMargin + + val createRequest = new HttpPost(livyUrl) + createRequest.setHeader("Content-Type", "application/json") + createRequest.setHeader("Authorization", s"Bearer $Token") + createRequest.setEntity(new StringEntity(livyPayload)) + val response = client.execute(createRequest) + val content: String = IOUtils.toString(response.getEntity.getContent, "utf-8") + val batch: LivyBatch = parse(content).extract[LivyBatch] + val status: Int = response.getStatusLine.getStatusCode + assert(status == 200) + batch + } +} diff --git a/environment.yaml b/environment.yaml index 338862d100..50602326d6 100644 --- a/environment.yaml +++ b/environment.yaml @@ -18,3 +18,5 @@ dependencies: - coverage - pytest - pytest-cov + - nbconvert + - ipython diff --git a/notebooks/AzureSearchIndex - Met Artworks.ipynb b/notebooks/AzureSearchIndex - Met Artworks.ipynb index 0368c0b16b..70304755ae 100644 --- a/notebooks/AzureSearchIndex - Met Artworks.ipynb +++ b/notebooks/AzureSearchIndex - Met Artworks.ipynb @@ -16,49 +16,63 @@ }, { "cell_type": "code", + "execution_count": 3, "source": [ - "import os, sys, time, json, requests\n", - "from pyspark.ml import Transformer, Estimator, Pipeline\n", - "from pyspark.ml.feature import SQLTransformer\n", + "import os, sys, time, json, requests\r\n", + "from pyspark.ml import Transformer, Estimator, Pipeline\r\n", + "from pyspark.ml.feature import SQLTransformer\r\n", "from pyspark.sql.functions import lit, udf, col, split" ], + "outputs": [], "metadata": { "collapsed": true - }, + } + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " from pyspark.sql import SparkSession\r\n", + " spark = SparkSession.builder.getOrCreate()\r\n", + " from notebookutils.mssparkutils.credentials import getSecret\r\n", + " os.environ['VISION_API_KEY'] = getSecret(\"mmlspark-keys\", \"mmlspark-cs-key\")\r\n", + " os.environ['AZURE_SEARCH_KEY'] = getSecret(\"mmlspark-keys\", \"mmlspark-azure-search-key\")" + ], "outputs": [], - "execution_count": 3 + "metadata": {} }, { "cell_type": "code", + "execution_count": 4, "source": [ - "VISION_API_KEY = os.environ['VISION_API_KEY']\n", - "AZURE_SEARCH_KEY = os.environ['AZURE_SEARCH_KEY']\n", - "search_service = \"mmlspark-azure-search\"\n", + "VISION_API_KEY = os.environ['VISION_API_KEY']\r\n", + "AZURE_SEARCH_KEY = os.environ['AZURE_SEARCH_KEY']\r\n", + "search_service = \"mmlspark-azure-search\"\r\n", "search_index = \"test\"" ], + "outputs": [], "metadata": { "collapsed": true - }, - "outputs": [], - "execution_count": 4 + } }, { "cell_type": "code", + "execution_count": 5, "source": [ - "data = spark.read\\\n", - " .format(\"csv\")\\\n", - " .option(\"header\", True)\\\n", - " .load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/metartworks_sample.csv\")\\\n", - " .withColumn(\"searchAction\", lit(\"upload\"))\\\n", - " .withColumn(\"Neighbors\", split(col(\"Neighbors\"), \",\").cast(\"array\"))\\\n", - " .withColumn(\"Tags\", split(col(\"Tags\"), \",\").cast(\"array\"))\\\n", + "data = spark.read\\\r\n", + " .format(\"csv\")\\\r\n", + " .option(\"header\", True)\\\r\n", + " .load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/metartworks_sample.csv\")\\\r\n", + " .withColumn(\"searchAction\", lit(\"upload\"))\\\r\n", + " .withColumn(\"Neighbors\", split(col(\"Neighbors\"), \",\").cast(\"array\"))\\\r\n", + " .withColumn(\"Tags\", split(col(\"Tags\"), \",\").cast(\"array\"))\\\r\n", " .limit(25)" ], + "outputs": [], "metadata": { "collapsed": true - }, - "outputs": [], - "execution_count": 5 + } }, { "cell_type": "markdown", @@ -69,28 +83,28 @@ }, { "cell_type": "code", + "execution_count": 7, "source": [ - "from mmlspark.cognitive import AnalyzeImage\n", - "from mmlspark.stages import SelectColumns\n", - "\n", - "#define pipeline\n", - "describeImage = (AnalyzeImage()\n", - " .setSubscriptionKey(VISION_API_KEY)\n", - " .setLocation(\"eastus\")\n", - " .setImageUrlCol(\"PrimaryImageUrl\")\n", - " .setOutputCol(\"RawImageDescription\")\n", - " .setErrorCol(\"Errors\")\n", - " .setVisualFeatures([\"Categories\", \"Description\", \"Faces\", \"ImageType\", \"Color\", \"Adult\"])\n", - " .setConcurrency(5))\n", - "\n", - "df2 = describeImage.transform(data)\\\n", + "from mmlspark.cognitive import AnalyzeImage\r\n", + "from mmlspark.stages import SelectColumns\r\n", + "\r\n", + "#define pipeline\r\n", + "describeImage = (AnalyzeImage()\r\n", + " .setSubscriptionKey(VISION_API_KEY)\r\n", + " .setLocation(\"eastus\")\r\n", + " .setImageUrlCol(\"PrimaryImageUrl\")\r\n", + " .setOutputCol(\"RawImageDescription\")\r\n", + " .setErrorCol(\"Errors\")\r\n", + " .setVisualFeatures([\"Categories\", \"Description\", \"Faces\", \"ImageType\", \"Color\", \"Adult\"])\r\n", + " .setConcurrency(5))\r\n", + "\r\n", + "df2 = describeImage.transform(data)\\\r\n", " .select(\"*\", \"RawImageDescription.*\").drop(\"Errors\", \"RawImageDescription\")" ], + "outputs": [], "metadata": { "collapsed": true - }, - "outputs": [], - "execution_count": 7 + } }, { "cell_type": "markdown", @@ -108,14 +122,20 @@ }, { "cell_type": "code", + "execution_count": 10, "source": [ - "from mmlspark.cognitive import *\ndf2.writeToAzureSearch(\n subscriptionKey=AZURE_SEARCH_KEY,\n actionCol=\"searchAction\",\n serviceName=search_service,\n indexName=search_index,\n keyCol=\"ObjectID\"\n)" + "from mmlspark.cognitive import *\r\n", + "df2.writeToAzureSearch(\r\n", + " subscriptionKey=AZURE_SEARCH_KEY,\r\n", + " actionCol=\"searchAction\",\r\n", + " serviceName=search_service,\r\n", + " indexName=search_index,\r\n", + " keyCol=\"ObjectID\")" ], + "outputs": [], "metadata": { "scrolled": false - }, - "outputs": [], - "execution_count": 10 + } }, { "cell_type": "markdown", @@ -126,23 +146,15 @@ }, { "cell_type": "code", + "execution_count": 12, "source": [ - "url = 'https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06'.format(search_service, search_index)\nrequests.post(url, json={\"search\": \"Glass\"}, headers = {\"api-key\": AZURE_SEARCH_KEY}).json()" + "url = 'https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06'.format(search_service, search_index)\r\n", + "requests.post(url, json={\"search\": \"Glass\"}, headers = {\"api-key\": AZURE_SEARCH_KEY}).json()" ], + "outputs": [], "metadata": { "collapsed": true - }, - "outputs": [], - "execution_count": 12 - }, - { - "cell_type": "code", - "source": [ - "# " - ], - "metadata": {}, - "outputs": [], - "execution_count": 13 + } } ], "metadata": { diff --git a/notebooks/Classification - Adult Census with Vowpal Wabbit.ipynb b/notebooks/Classification - Adult Census with Vowpal Wabbit.ipynb index 4608bce764..7a8641a772 100644 --- a/notebooks/Classification - Adult Census with Vowpal Wabbit.ipynb +++ b/notebooks/Classification - Adult Census with Vowpal Wabbit.ipynb @@ -3,7 +3,6 @@ { "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ "# Classification - Adult Census using Vowpal Wabbit in MMLSpark\n", "\n", @@ -12,6 +11,18 @@ ")." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -27,7 +38,6 @@ { "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ "Next, we define a pipeline that includes feature engineering and training of a VW classifier. We use a featurizer provided by VW that hashes the feature names. \n", "Note that VW expects classification labels being -1 or 1. Thus, the income category is mapped to this space before feeding training data into the pipeline." @@ -65,7 +75,6 @@ { "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ "Then, we are ready to train the model by fitting the pipeline with the training data." ] @@ -83,7 +92,6 @@ { "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ "After the model is trained, we apply it to predict the income of each sample in the test set." ] @@ -103,7 +111,6 @@ { "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ "Finally, we evaluate the model performance using `ComputeModelStatistics` function which will compute confusion matrix, accuracy, precision, recall, and AUC by default for classificaiton models." ] diff --git a/notebooks/Classification - Adult Census.ipynb b/notebooks/Classification - Adult Census.ipynb index 48686b6ad5..6a271e1835 100644 --- a/notebooks/Classification - Adult Census.ipynb +++ b/notebooks/Classification - Adult Census.ipynb @@ -2,47 +2,58 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, "source": [ "## Classification - Adult Census\n", "\n", "In this example, we try to predict incomes from the *Adult Census* dataset.\n", "\n", "First, we import the packages (use `help(mmlspark)` to view contents)," - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "source": [ + "import os\r\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " from pyspark.sql import SparkSession\r\n", + " spark = SparkSession.builder.getOrCreate()" + ], "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, "source": [ - "import numpy as np\n", + "import numpy as np\r\n", "import pandas as pd" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Now let's read the data and split it to train and test sets:" - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "data = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/AdultCensusIncome.parquet\")\n", - "data = data.select([\"education\", \"marital-status\", \"hours-per-week\", \"income\"])\n", - "train, test = data.randomSplit([0.75, 0.25], seed=123)\n", + "data = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/AdultCensusIncome.parquet\")\r\n", + "data = data.select([\"education\", \"marital-status\", \"hours-per-week\", \"income\"])\r\n", + "train, test = data.randomSplit([0.75, 0.25], seed=123)\r\n", "train.limit(10).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "`TrainClassifier` can be used to initialize and fit a model, it wraps SparkML classifiers.\n", "You can use `help(mmlspark.train.TrainClassifier)` to view the different parameters.\n", @@ -50,53 +61,38 @@ "Note that it implicitly converts the data into the format expected by the algorithm: tokenize\n", "and hash strings, one-hot encodes categorical variables, assembles the features into a vector\n", "and so on. The parameter `numFeatures` controls the number of hashed features." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.train import TrainClassifier\n", - "from pyspark.ml.classification import LogisticRegression\n", + "from mmlspark.train import TrainClassifier\r\n", + "from pyspark.ml.classification import LogisticRegression\r\n", "model = TrainClassifier(model=LogisticRegression(), labelCol=\"income\", numFeatures=256).fit(train)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "After the model is trained, we score it against the test dataset and view metrics." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, + ], "outputs": [], - "source": [ - "from mmlspark.train import ComputeModelStatistics, TrainedClassifierModel\n", - "prediction = model.transform(test)\n", - "metrics = ComputeModelStatistics().transform(prediction)\n", - "metrics.limit(10).toPandas()" - ] + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Finally, we save the model so it can be used in a scoring program." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "model.write().overwrite().save(\"dbfs:/AdultCensus.mml\")" - ] + "if os.environ.get(\"AZURE_SERVICE\", None) != \"Microsoft.ProjectArcadia\":\r\n", + " model.write().overwrite().save(\"dbfs:/AdultCensus.mml\")\r\n", + "else:\r\n", + " model.write().overwrite().save(\"abfss://synapse@mmlsparkeuap.dfs.core.windows.net/models/AdultCensus.mml\")" + ], + "outputs": [], + "metadata": {} } ], "metadata": { @@ -121,4 +117,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/notebooks/Classification - Before and After MMLSpark.ipynb b/notebooks/Classification - Before and After MMLSpark.ipynb index c5895ea93c..bf0430fc5a 100644 --- a/notebooks/Classification - Before and After MMLSpark.ipynb +++ b/notebooks/Classification - Before and After MMLSpark.ipynb @@ -22,6 +22,18 @@ "hyperparameters and choosing the best model." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "markdown", "metadata": {}, diff --git a/notebooks/Classification - Twitter Sentiment with Vowpal Wabbit.ipynb b/notebooks/Classification - Twitter Sentiment with Vowpal Wabbit.ipynb index 8fcb227301..c7be7427b3 100644 --- a/notebooks/Classification - Twitter Sentiment with Vowpal Wabbit.ipynb +++ b/notebooks/Classification - Twitter Sentiment with Vowpal Wabbit.ipynb @@ -29,8 +29,18 @@ "from mmlspark.vw import VowpalWabbitClassifier\n", "from mmlspark.train import ComputeModelStatistics\n", "from pyspark.mllib.evaluation import BinaryClassificationMetrics\n", - "import matplotlib.pyplot as plt\n", - "%matplotlib inline" + "import matplotlib.pyplot as plt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" ] }, { @@ -295,7 +305,7 @@ "plt.ylabel(\"True positive rate\")\n", "plt.plot(x_val, y_val)\n", "# Use display() if you're on Azure Databricks or you can do plt.show()\n", - "display(fig)" + "plt.show()" ] }, { @@ -331,4 +341,4 @@ }, "nbformat": 4, "nbformat_minor": 1 -} +} \ No newline at end of file diff --git a/notebooks/Cognitive Services - Overview.ipynb b/notebooks/Cognitive Services - Overview.ipynb index 8c8cf79745..137c4062a5 100644 --- a/notebooks/Cognitive Services - Overview.ipynb +++ b/notebooks/Cognitive Services - Overview.ipynb @@ -18,6 +18,7 @@ "nbformat_minor": 2, "cells": [ { + "cell_type": "markdown", "source": [ "\n", "\n", @@ -84,10 +85,10 @@ "- [Bing Image search](https://azure.microsoft.com/en-us/services/cognitive-services/bing-image-search-api/) ([Scala](https://mmlspark.blob.core.windows.net/docs/1.0.0-rc3/scala/com/microsoft/ml/spark/cognitive/BingImageSearch.html), [Python](https://mmlspark.blob.core.windows.net/docs/1.0.0-rc3/pyspark/mmlspark.cognitive.html#module-mmlspark.cognitive.BingImageSearch))\n", "- [Azure Cognitive search](https://docs.microsoft.com/en-us/azure/search/search-what-is-azure-search) ([Scala](https://mmlspark.blob.core.windows.net/docs/1.0.0-rc3/scala/index.html#com.microsoft.ml.spark.cognitive.AzureSearchWriter$), [Python](https://mmlspark.blob.core.windows.net/docs/1.0.0-rc3/scala/index.html#com.microsoft.ml.spark.cognitive.AzureSearchWriter$))\n" ], - "cell_type": "markdown", "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Prerequisites\n", "\n", @@ -98,400 +99,431 @@ "1. Choose the run button (triangle icon) in the upper right corner of the cell, then select **Run Cell**.\n", "1. View results in a table below the cell." ], - "cell_type": "markdown", "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Shared code\n", "\n", "To get started, we'll need to add this code to the project:" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "source": [ + "from pyspark.sql.functions import udf, col\r\n", + "from mmlspark.io.http import HTTPTransformer, http_udf\r\n", + "from requests import Request\r\n", + "from pyspark.sql.functions import lit\r\n", + "from pyspark.ml import PipelineModel\r\n", + "from pyspark.sql.functions import col\r\n", + "import os\r\n" + ], "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, "source": [ - "from mmlspark.cognitive import *\n", - "import os\n", - "\n", - "# A general Cognitive Services key for Text Analytics, Computer Vision and Form Recognizer (or use separate keys that belong to each service)\n", - "service_key = os.environ[\"COGNITIVE_SERVICE_KEY\"]\n", - "# A Bing Search v7 subscription key\n", - "bing_search_key = os.environ[\"BING_IMAGE_SEARCH_KEY\"]\n", - "# An Anomaly Dectector subscription key\n", - "anomaly_key = os.environ[\"ANOMALY_API_KEY\"]\n", - "# A Translator subscription key\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " from pyspark.sql import SparkSession\r\n", + " spark = SparkSession.builder.getOrCreate()\r\n", + " from notebookutils.mssparkutils.credentials import getSecret\r\n", + " os.environ['ANOMALY_API_KEY'] = getSecret(\r\n", + " \"mmlspark-keys\", \"anomaly-api-key\")\r\n", + " os.environ['TEXT_API_KEY'] = getSecret(\"mmlspark-keys\", \"mmlspark-cs-key\")\r\n", + " os.environ['BING_IMAGE_SEARCH_KEY'] = getSecret(\r\n", + " \"mmlspark-keys\", \"mmlspark-bing-search-key\")\r\n", + " os.environ['VISION_API_KEY'] = getSecret(\r\n", + " \"mmlspark-keys\", \"mmlspark-cs-key\")\r\n", + " os.environ['AZURE_SEARCH_KEY'] = getSecret(\r\n", + " \"mmlspark-keys\", \"azure-search-key\")" + ], + "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, + "source": [ + "from mmlspark.cognitive import *\r\n", + "\r\n", + "# A general Cognitive Services key for Text Analytics, Computer Vision and Form Recognizer (or use separate keys that belong to each service)\r\n", + "service_key = os.environ[\"COGNITIVE_SERVICE_KEY\"]\r\n", + "# A Bing Search v7 subscription key\r\n", + "bing_search_key = os.environ[\"BING_IMAGE_SEARCH_KEY\"]\r\n", + "# An Anomaly Dectector subscription key\r\n", + "anomaly_key = os.environ[\"ANOMALY_API_KEY\"]\r\n", + "# A Translator subscription key\r\n", "translator_key = os.environ[\"TRANSLATOR_KEY\"]" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Text Analytics sample\n", "\n", "The [Text Analytics](../text-analytics/index.yml) service provides several algorithms for extracting intelligent insights from text. For example, we can find the sentiment of given input text. The service will return a score between 0.0 and 1.0 where low scores indicate negative sentiment and high score indicates positive sentiment. This sample uses three simple sentences and returns the sentiment for each." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.sql.functions import col\n", - "\n", - "# Create a dataframe that's tied to it's column names\n", - "df = spark.createDataFrame([\n", - " (\"I am so happy today, its sunny!\", \"en-US\"),\n", - " (\"I am frustrated by this rush hour traffic\", \"en-US\"),\n", - " (\"The cognitive services on spark aint bad\", \"en-US\"),\n", - "], [\"text\", \"language\"])\n", - "\n", - "# Run the Text Analytics service with options\n", - "sentiment = (TextSentiment()\n", - " .setTextCol(\"text\")\n", - " .setLocation(\"eastus\")\n", - " .setSubscriptionKey(service_key)\n", - " .setOutputCol(\"sentiment\")\n", - " .setErrorCol(\"error\")\n", - " .setLanguageCol(\"language\"))\n", - "\n", - "# Show the results of your text query in a table format\n", - "display(sentiment.transform(df).select(\"text\", col(\"sentiment\")[0].getItem(\"sentiment\").alias(\"sentiment\")))" - ] + "# Create a dataframe that's tied to it's column names\r\n", + "df = spark.createDataFrame([\r\n", + " (\"I am so happy today, its sunny!\", \"en-US\"),\r\n", + " (\"I am frustrated by this rush hour traffic\", \"en-US\"),\r\n", + " (\"The cognitive services on spark aint bad\", \"en-US\"),\r\n", + "], [\"text\", \"language\"])\r\n", + "\r\n", + "# Run the Text Analytics service with options\r\n", + "sentiment = (TextSentiment()\r\n", + " .setTextCol(\"text\")\r\n", + " .setLocation(\"eastus\")\r\n", + " .setSubscriptionKey(service_key)\r\n", + " .setOutputCol(\"sentiment\")\r\n", + " .setErrorCol(\"error\")\r\n", + " .setLanguageCol(\"language\"))\r\n", + "\r\n", + "# Show the results of your text query in a table format\r\n", + "display(sentiment.transform(df).select(\"text\", col(\r\n", + " \"sentiment\")[0].getItem(\"sentiment\").alias(\"sentiment\")))" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Translator sample\n", "[Translator](../translator/index.yml) is a cloud-based machine translation service and is part of the Azure Cognitive Services family of cognitive APIs used to build intelligent apps. Translator is easy to integrate in your applications, websites, tools, and solutions. It allows you to add multi-language user experiences in 90 languages and dialects and can be used for text translation with any operating system. In this sample, we do a simple text translation by providing the sentences you want to translate and target languages you want to translate to." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.sql.functions import col, flatten\n", - "\n", - "# Create a dataframe including sentences you want to translate\n", - "df = spark.createDataFrame([\n", - " ([\"Hello, what is your name?\", \"Bye\"],)\n", - "], [\"text\",])\n", - "\n", - "# Run the Translator service with options\n", - "translate = (Translate()\n", - " .setSubscriptionKey(translator_key)\n", - " .setLocation(\"eastus\")\n", - " .setTextCol(\"text\")\n", - " .setToLanguage([\"zh-Hans\"])\n", - " .setOutputCol(\"translation\"))\n", - "\n", - "# Show the results of the translation.\n", - "display(translate\n", - " .transform(df)\n", - " .withColumn(\"translation\", flatten(col(\"translation.translations\")))\n", - " .withColumn(\"translation\", col(\"translation.text\"))\n", + "from pyspark.sql.functions import col, flatten\r\n", + "\r\n", + "# Create a dataframe including sentences you want to translate\r\n", + "df = spark.createDataFrame([\r\n", + " ([\"Hello, what is your name?\", \"Bye\"],)\r\n", + "], [\"text\",])\r\n", + "\r\n", + "# Run the Translator service with options\r\n", + "translate = (Translate()\r\n", + " .setSubscriptionKey(translator_key)\r\n", + " .setLocation(\"eastus\")\r\n", + " .setTextCol(\"text\")\r\n", + " .setToLanguage([\"zh-Hans\"])\r\n", + " .setOutputCol(\"translation\"))\r\n", + "\r\n", + "# Show the results of the translation.\r\n", + "display(translate\r\n", + " .transform(df)\r\n", + " .withColumn(\"translation\", flatten(col(\"translation.translations\")))\r\n", + " .withColumn(\"translation\", col(\"translation.text\"))\r\n", " .select(\"translation\"))" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Form Recognizer sample\n", "[Form Recognizer](../form-recognizer/index.yml) is a part of Azure Applied AI Services that lets you build automated data processing software using machine learning technology. Identify and extract text, key/value pairs, selection marks, tables, and structure from your documents—the service outputs structured data that includes the relationships in the original file, bounding boxes, confidence and more. In this sample, we analyze a business card image and extract its information into structured data." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.sql.functions import col, explode\n", - "\n", - "# Create a dataframe containing the source files\n", - "imageDf = spark.createDataFrame([\n", - " (\"https://mmlspark.blob.core.windows.net/datasets/FormRecognizer/business_card.jpg\",)\n", - "], [\"source\",])\n", - "\n", - "# Run the Form Recognizer service\n", - "analyzeBusinessCards = (AnalyzeBusinessCards()\n", - " .setSubscriptionKey(service_key)\n", - " .setLocation(\"eastus\")\n", - " .setImageUrlCol(\"source\")\n", - " .setOutputCol(\"businessCards\"))\n", - "\n", - "# Show the results of recognition.\n", - "display(analyzeBusinessCards\n", - " .transform(imageDf)\n", - " .withColumn(\"documents\", explode(col(\"businessCards.analyzeResult.documentResults.fields\")))\n", + "from pyspark.sql.functions import col, explode\r\n", + "\r\n", + "# Create a dataframe containing the source files\r\n", + "imageDf = spark.createDataFrame([\r\n", + " (\"https://mmlspark.blob.core.windows.net/datasets/FormRecognizer/business_card.jpg\",)\r\n", + "], [\"source\",])\r\n", + "\r\n", + "# Run the Form Recognizer service\r\n", + "analyzeBusinessCards = (AnalyzeBusinessCards()\r\n", + " .setSubscriptionKey(service_key)\r\n", + " .setLocation(\"eastus\")\r\n", + " .setImageUrlCol(\"source\")\r\n", + " .setOutputCol(\"businessCards\"))\r\n", + "\r\n", + "# Show the results of recognition.\r\n", + "display(analyzeBusinessCards\r\n", + " .transform(imageDf)\r\n", + " .withColumn(\"documents\", explode(col(\"businessCards.analyzeResult.documentResults.fields\")))\r\n", " .select(\"source\", \"documents\"))" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Computer Vision sample\n", "\n", "[Computer Vision](../computer-vision/index.yml) analyzes images to identify structure such as faces, objects, and natural-language descriptions. In this sample, we tag a list of images. Tags are one-word descriptions of things in the image like recognizable objects, people, scenery, and actions." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "# Create a dataframe with the image URLs\n", - "df = spark.createDataFrame([\n", - " (\"https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/objects.jpg\", ),\n", - " (\"https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/dog.jpg\", ),\n", - " (\"https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/house.jpg\", )\n", - " ], [\"image\", ])\n", - "\n", - "# Run the Computer Vision service. Analyze Image extracts infortmation from/about the images.\n", - "analysis = (AnalyzeImage()\n", - " .setLocation(\"eastus\")\n", - " .setSubscriptionKey(service_key)\n", - " .setVisualFeatures([\"Categories\",\"Color\",\"Description\",\"Faces\",\"Objects\",\"Tags\"])\n", - " .setOutputCol(\"analysis_results\")\n", - " .setImageUrlCol(\"image\")\n", - " .setErrorCol(\"error\"))\n", - "\n", - "# Show the results of what you wanted to pull out of the images.\n", - "display(analysis.transform(df).select(\"image\", \"analysis_results.description.tags\"))" - ] + "# Create a dataframe with the image URLs\r\n", + "df = spark.createDataFrame([\r\n", + " (\"https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/objects.jpg\", ),\r\n", + " (\"https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/dog.jpg\", ),\r\n", + " (\"https://raw.githubusercontent.com/Azure-Samples/cognitive-services-sample-data-files/master/ComputerVision/Images/house.jpg\", )\r\n", + "], [\"image\", ])\r\n", + "\r\n", + "# Run the Computer Vision service. Analyze Image extracts infortmation from/about the images.\r\n", + "analysis = (AnalyzeImage()\r\n", + " .setLocation(\"eastus\")\r\n", + " .setSubscriptionKey(service_key)\r\n", + " .setVisualFeatures([\"Categories\", \"Color\", \"Description\", \"Faces\", \"Objects\", \"Tags\"])\r\n", + " .setOutputCol(\"analysis_results\")\r\n", + " .setImageUrlCol(\"image\")\r\n", + " .setErrorCol(\"error\"))\r\n", + "\r\n", + "# Show the results of what you wanted to pull out of the images.\r\n", + "display(analysis.transform(df).select(\r\n", + " \"image\", \"analysis_results.description.tags\"))\r\n" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Bing Image Search sample\n", "\n", "[Bing Image Search](../bing-image-search/overview.md) searches the web to retrieve images related to a user's natural language query. In this sample, we use a text query that looks for images with quotes. It returns a list of image URLs that contain photos related to our query." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.ml import PipelineModel\n", - "\n", - "# Number of images Bing will return per query\n", - "imgsPerBatch = 10\n", - "# A list of offsets, used to page into the search results\n", - "offsets = [(i*imgsPerBatch,) for i in range(100)]\n", - "# Since web content is our data, we create a dataframe with options on that data: offsets\n", - "bingParameters = spark.createDataFrame(offsets, [\"offset\"])\n", - "\n", - "# Run the Bing Image Search service with our text query\n", - "bingSearch = (BingImageSearch()\n", - " .setSubscriptionKey(bing_search_key)\n", - " .setOffsetCol(\"offset\")\n", - " .setQuery(\"Martin Luther King Jr. quotes\")\n", - " .setCount(imgsPerBatch)\n", - " .setOutputCol(\"images\"))\n", - "\n", - "# Transformer that extracts and flattens the richly structured output of Bing Image Search into a simple URL column\n", - "getUrls = BingImageSearch.getUrlTransformer(\"images\", \"url\")\n", - "\n", - "# This displays the full results returned, uncomment to use\n", - "# display(bingSearch.transform(bingParameters))\n", - "\n", - "# Since we have two services, they are put into a pipeline\n", - "pipeline = PipelineModel(stages=[bingSearch, getUrls])\n", - "\n", - "# Show the results of your search: image URLs\n", - "display(pipeline.transform(bingParameters))" - ] + "# Number of images Bing will return per query\r\n", + "imgsPerBatch = 10\r\n", + "# A list of offsets, used to page into the search results\r\n", + "offsets = [(i*imgsPerBatch,) for i in range(100)]\r\n", + "# Since web content is our data, we create a dataframe with options on that data: offsets\r\n", + "bingParameters = spark.createDataFrame(offsets, [\"offset\"])\r\n", + "\r\n", + "# Run the Bing Image Search service with our text query\r\n", + "bingSearch = (BingImageSearch()\r\n", + " .setSubscriptionKey(bing_search_key)\r\n", + " .setOffsetCol(\"offset\")\r\n", + " .setQuery(\"Martin Luther King Jr. quotes\")\r\n", + " .setCount(imgsPerBatch)\r\n", + " .setOutputCol(\"images\"))\r\n", + "\r\n", + "# Transformer that extracts and flattens the richly structured output of Bing Image Search into a simple URL column\r\n", + "getUrls = BingImageSearch.getUrlTransformer(\"images\", \"url\")\r\n", + "\r\n", + "# This displays the full results returned, uncomment to use\r\n", + "# display(bingSearch.transform(bingParameters))\r\n", + "\r\n", + "# Since we have two services, they are put into a pipeline\r\n", + "pipeline = PipelineModel(stages=[bingSearch, getUrls])\r\n", + "\r\n", + "# Show the results of your search: image URLs\r\n", + "display(pipeline.transform(bingParameters))\r\n" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Speech-to-Text sample\n", "The [Speech-to-text](../speech-service/index-speech-to-text.yml) service converts streams or files of spoken audio to text. In this sample, we transcribe one audio file." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "# Create a dataframe with our audio URLs, tied to the column called \"url\"\n", - "df = spark.createDataFrame([(\"https://mmlspark.blob.core.windows.net/datasets/Speech/audio2.wav\",)\n", - " ], [\"url\"])\n", - "\n", - "# Run the Speech-to-text service to translate the audio into text\n", - "speech_to_text = (SpeechToTextSDK()\n", - " .setSubscriptionKey(service_key)\n", - " .setLocation(\"eastus\")\n", - " .setOutputCol(\"text\")\n", - " .setAudioDataCol(\"url\")\n", - " .setLanguage(\"en-US\")\n", - " .setProfanity(\"Masked\"))\n", - "\n", - "# Show the results of the translation\n", - "display(speech_to_text.transform(df).select(\"url\", \"text.DisplayText\"))" - ] + "# Create a dataframe with our audio URLs, tied to the column called \"url\"\r\n", + "df = spark.createDataFrame([(\"https://mmlspark.blob.core.windows.net/datasets/Speech/audio2.wav\",)\r\n", + " ], [\"url\"])\r\n", + "\r\n", + "# Run the Speech-to-text service to translate the audio into text\r\n", + "speech_to_text = (SpeechToTextSDK()\r\n", + " .setSubscriptionKey(service_key)\r\n", + " .setLocation(\"eastus\")\r\n", + " .setOutputCol(\"text\")\r\n", + " .setAudioDataCol(\"url\")\r\n", + " .setLanguage(\"en-US\")\r\n", + " .setProfanity(\"Masked\"))\r\n", + "\r\n", + "# Show the results of the translation\r\n", + "display(speech_to_text.transform(df).select(\"url\", \"text.DisplayText\"))\r\n" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Anomaly Detector sample\n", "\n", "[Anomaly Detector](../anomaly-detector/index.yml) is great for detecting irregularities in your time series data. In this sample, we use the service to find anomalies in the entire time series." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.sql.functions import lit\n", - "\n", - "# Create a dataframe with the point data that Anomaly Detector requires\n", - "df = spark.createDataFrame([\n", - " (\"1972-01-01T00:00:00Z\", 826.0),\n", - " (\"1972-02-01T00:00:00Z\", 799.0),\n", - " (\"1972-03-01T00:00:00Z\", 890.0),\n", - " (\"1972-04-01T00:00:00Z\", 900.0),\n", - " (\"1972-05-01T00:00:00Z\", 766.0),\n", - " (\"1972-06-01T00:00:00Z\", 805.0),\n", - " (\"1972-07-01T00:00:00Z\", 821.0),\n", - " (\"1972-08-01T00:00:00Z\", 20000.0),\n", - " (\"1972-09-01T00:00:00Z\", 883.0),\n", - " (\"1972-10-01T00:00:00Z\", 898.0),\n", - " (\"1972-11-01T00:00:00Z\", 957.0),\n", - " (\"1972-12-01T00:00:00Z\", 924.0),\n", - " (\"1973-01-01T00:00:00Z\", 881.0),\n", - " (\"1973-02-01T00:00:00Z\", 837.0),\n", - " (\"1973-03-01T00:00:00Z\", 9000.0)\n", - "], [\"timestamp\", \"value\"]).withColumn(\"group\", lit(\"series1\"))\n", - "\n", - "# Run the Anomaly Detector service to look for irregular data\n", - "anamoly_detector = (SimpleDetectAnomalies()\n", - " .setSubscriptionKey(anomaly_key)\n", - " .setLocation(\"eastus\")\n", - " .setTimestampCol(\"timestamp\")\n", - " .setValueCol(\"value\")\n", - " .setOutputCol(\"anomalies\")\n", - " .setGroupbyCol(\"group\")\n", - " .setGranularity(\"monthly\"))\n", - "\n", - "# Show the full results of the analysis with the anomalies marked as \"True\"\n", - "display(anamoly_detector.transform(df).select(\"timestamp\", \"value\", \"anomalies.isAnomaly\"))" - ] + "# Create a dataframe with the point data that Anomaly Detector requires\r\n", + "df = spark.createDataFrame([\r\n", + " (\"1972-01-01T00:00:00Z\", 826.0),\r\n", + " (\"1972-02-01T00:00:00Z\", 799.0),\r\n", + " (\"1972-03-01T00:00:00Z\", 890.0),\r\n", + " (\"1972-04-01T00:00:00Z\", 900.0),\r\n", + " (\"1972-05-01T00:00:00Z\", 766.0),\r\n", + " (\"1972-06-01T00:00:00Z\", 805.0),\r\n", + " (\"1972-07-01T00:00:00Z\", 821.0),\r\n", + " (\"1972-08-01T00:00:00Z\", 20000.0),\r\n", + " (\"1972-09-01T00:00:00Z\", 883.0),\r\n", + " (\"1972-10-01T00:00:00Z\", 898.0),\r\n", + " (\"1972-11-01T00:00:00Z\", 957.0),\r\n", + " (\"1972-12-01T00:00:00Z\", 924.0),\r\n", + " (\"1973-01-01T00:00:00Z\", 881.0),\r\n", + " (\"1973-02-01T00:00:00Z\", 837.0),\r\n", + " (\"1973-03-01T00:00:00Z\", 9000.0)\r\n", + "], [\"timestamp\", \"value\"]).withColumn(\"group\", lit(\"series1\"))\r\n", + "\r\n", + "# Run the Anomaly Detector service to look for irregular data\r\n", + "anamoly_detector = (SimpleDetectAnomalies()\r\n", + " .setSubscriptionKey(anomaly_key)\r\n", + " .setLocation(\"eastus\")\r\n", + " .setTimestampCol(\"timestamp\")\r\n", + " .setValueCol(\"value\")\r\n", + " .setOutputCol(\"anomalies\")\r\n", + " .setGroupbyCol(\"group\")\r\n", + " .setGranularity(\"monthly\"))\r\n", + "\r\n", + "# Show the full results of the analysis with the anomalies marked as \"True\"\r\n", + "display(anamoly_detector.transform(df).select(\r\n", + " \"timestamp\", \"value\", \"anomalies.isAnomaly\"))" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Arbitrary web APIs\n", "\n", "With HTTP on Spark, any web service can be used in your big data pipeline. In this example, we use the [World Bank API](http://api.worldbank.org/v2/country/) to get information about various countries around the world." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from requests import Request\n", - "from mmlspark.io.http import HTTPTransformer, http_udf\n", - "from pyspark.sql.functions import udf, col\n", - "\n", - "# Use any requests from the python requests library\n", - "def world_bank_request(country):\n", - " return Request(\"GET\", \"http://api.worldbank.org/v2/country/{}?format=json\".format(country))\n", - "\n", - "# Create a dataframe with spcificies which countries we want data on\n", - "df = (spark.createDataFrame([(\"br\",),(\"usa\",)], [\"country\"])\n", - " .withColumn(\"request\", http_udf(world_bank_request)(col(\"country\"))))\n", - "\n", - "# Much faster for big data because of the concurrency :)\n", - "client = (HTTPTransformer()\n", - " .setConcurrency(3)\n", - " .setInputCol(\"request\")\n", - " .setOutputCol(\"response\"))\n", - "\n", - "# Get the body of the response\n", - "def get_response_body(resp):\n", - " return resp.entity.content.decode()\n", - "\n", - "# Show the details of the country data returned\n", - "display(client.transform(df).select(\"country\", udf(get_response_body)(col(\"response\")).alias(\"response\")))" - ] + "# Use any requests from the python requests library\r\n", + "\r\n", + "def world_bank_request(country):\r\n", + " return Request(\"GET\", \"http://api.worldbank.org/v2/country/{}?format=json\".format(country))\r\n", + "\r\n", + "\r\n", + "# Create a dataframe with spcificies which countries we want data on\r\n", + "df = (spark.createDataFrame([(\"br\",), (\"usa\",)], [\"country\"])\r\n", + " .withColumn(\"request\", http_udf(world_bank_request)(col(\"country\"))))\r\n", + "\r\n", + "# Much faster for big data because of the concurrency :)\r\n", + "client = (HTTPTransformer()\r\n", + " .setConcurrency(3)\r\n", + " .setInputCol(\"request\")\r\n", + " .setOutputCol(\"response\"))\r\n", + "\r\n", + "# Get the body of the response\r\n", + "\r\n", + "\r\n", + "def get_response_body(resp):\r\n", + " return resp.entity.content.decode()\r\n", + "\r\n", + "\r\n", + "# Show the details of the country data returned\r\n", + "display(client.transform(df)\r\n", + " .select(\"country\", udf(get_response_body)(col(\"response\"))\r\n", + " .alias(\"response\")))\r\n" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Azure Cognitive search sample\n", "\n", "In this example, we show how you can enrich data using Cognitive Skills and write to an Azure Search Index using MMLSpark." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.cognitive import *\n", - "\n", - "VISION_API_KEY = os.environ['VISION_API_KEY']\n", - "AZURE_SEARCH_KEY = os.environ['AZURE_SEARCH_KEY']\n", - "search_service = \"mmlspark-azure-search\"\n", - "search_index = \"test-33467690\"\n", - "\n", - "df = spark.createDataFrame([(\"upload\", \"0\", \"https://mmlspark.blob.core.windows.net/datasets/DSIR/test1.jpg\"), \n", - " (\"upload\", \"1\", \"https://mmlspark.blob.core.windows.net/datasets/DSIR/test2.jpg\")], \n", - " [\"searchAction\", \"id\", \"url\"])\n", - "\n", - "tdf = AnalyzeImage()\\\n", - " .setSubscriptionKey(VISION_API_KEY)\\\n", - " .setLocation(\"eastus\")\\\n", - " .setImageUrlCol(\"url\")\\\n", - " .setOutputCol(\"analyzed\")\\\n", - " .setErrorCol(\"errors\")\\\n", - " .setVisualFeatures([\"Categories\", \"Tags\", \"Description\", \"Faces\", \"ImageType\", \"Color\", \"Adult\"])\\\n", - " .transform(df)\\\n", - " .select(\"*\", \"analyzed.*\")\\\n", - " .drop(\"errors\", \"analyzed\")\n", - "\n", - "tdf.writeToAzureSearch(subscriptionKey=AZURE_SEARCH_KEY,\n", - " actionCol=\"searchAction\",\n", - " serviceName=search_service,\n", - " indexName=search_index,\n", - " keyCol=\"id\")" - ] + "VISION_API_KEY = os.environ['VISION_API_KEY']\r\n", + "AZURE_SEARCH_KEY = os.environ['AZURE_SEARCH_KEY']\r\n", + "search_service = \"mmlspark-azure-search\"\r\n", + "search_index = \"test-33467690\"\r\n", + "\r\n", + "df = spark.createDataFrame([(\"upload\", \"0\", \"https://mmlspark.blob.core.windows.net/datasets/DSIR/test1.jpg\"),\r\n", + " (\"upload\", \"1\", \"https://mmlspark.blob.core.windows.net/datasets/DSIR/test2.jpg\")],\r\n", + " [\"searchAction\", \"id\", \"url\"])\r\n", + "\r\n", + "tdf = AnalyzeImage()\\\r\n", + " .setSubscriptionKey(VISION_API_KEY)\\\r\n", + " .setLocation(\"eastus\")\\\r\n", + " .setImageUrlCol(\"url\")\\\r\n", + " .setOutputCol(\"analyzed\")\\\r\n", + " .setErrorCol(\"errors\")\\\r\n", + " .setVisualFeatures([\"Categories\", \"Tags\", \"Description\", \"Faces\", \"ImageType\", \"Color\", \"Adult\"])\\\r\n", + " .transform(df).select(\"*\", \"analyzed.*\")\\\r\n", + " .drop(\"errors\", \"analyzed\")\r\n", + "\r\n", + "tdf.writeToAzureSearch(subscriptionKey=AZURE_SEARCH_KEY,\r\n", + " actionCol=\"searchAction\",\r\n", + " serviceName=search_service,\r\n", + " indexName=search_index,\r\n", + " keyCol=\"id\")\r\n" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## See also\n", "\n", "* [Recipe: Anomaly Detection](./recipes/anomaly-detection.md)\n", "* [Recipe: Art Explorer](./recipes/art-explorer.md)" ], - "cell_type": "markdown", "metadata": {} } ] diff --git a/notebooks/CognitiveServices - Celebrity Quote Analysis.ipynb b/notebooks/CognitiveServices - Celebrity Quote Analysis.ipynb index c9f40d6807..81e0d75cd1 100644 --- a/notebooks/CognitiveServices - Celebrity Quote Analysis.ipynb +++ b/notebooks/CognitiveServices - Celebrity Quote Analysis.ipynb @@ -28,6 +28,14 @@ "from pyspark.ml.feature import SQLTransformer\n", "import os\n", "\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()\n", + " from notebookutils.mssparkutils.credentials import getSecret\n", + " os.environ['VISION_API_KEY'] = getSecret(\"mmlspark-keys\", \"mmlspark-cs-key\")\n", + " os.environ['TEXT_API_KEY'] = getSecret(\"mmlspark-keys\", \"mmlspark-cs-key\")\n", + " os.environ['BING_IMAGE_SEARCH_KEY'] = getSecret(\"mmlspark-keys\", \"mmlspark-bing-search-key\")\n", + "\n", "#put your service keys here\n", "TEXT_API_KEY = os.environ[\"TEXT_API_KEY\"]\n", "VISION_API_KEY = os.environ[\"VISION_API_KEY\"]\n", diff --git a/notebooks/ConditionalKNN - Exploring Art Across Cultures.ipynb b/notebooks/ConditionalKNN - Exploring Art Across Cultures.ipynb index 8105a5bcdf..9f8480cb69 100644 --- a/notebooks/ConditionalKNN - Exploring Art Across Cultures.ipynb +++ b/notebooks/ConditionalKNN - Exploring Art Across Cultures.ipynb @@ -30,6 +30,7 @@ { "cell_type": "code", "source": [ + "from pyspark.sql.types import BooleanType\n", "from pyspark.sql.types import *\n", "from pyspark.ml.feature import Normalizer\n", "from pyspark.sql.functions import lit, array, array_contains, udf, col, struct\n", @@ -39,7 +40,13 @@ "\n", "import requests\n", "import numpy as np\n", - "import matplotlib.pyplot as plt" + "import matplotlib.pyplot as plt\n", + "\n", + "import os\n", + "\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()\n" ], "metadata": {}, "outputs": [], @@ -71,8 +78,9 @@ "cell_type": "code", "source": [ "# loads the dataset and the two trained CKNN models for querying by medium and culture\n", - "df = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/met_and_rijks.parquet\")\n", - "display(df.drop(\"Norm_Features\"))" + "df = spark.read.parquet(\n", + " \"wasbs://publicwasb@mmlspark.blob.core.windows.net/met_and_rijks.parquet\")\n", + "display(df.drop(\"Norm_Features\"))\n" ], "metadata": {}, "outputs": [], @@ -89,16 +97,14 @@ { "cell_type": "code", "source": [ - "from pyspark.sql.types import BooleanType\n", - "\n", - "#mediums = ['prints', 'drawings', 'ceramics', 'textiles', 'paintings', \"musical instruments\",\"glass\", 'accessories', 'photographs', \"metalwork\", \n", + "# mediums = ['prints', 'drawings', 'ceramics', 'textiles', 'paintings', \"musical instruments\",\"glass\", 'accessories', 'photographs', \"metalwork\",\n", "# \"sculptures\", \"weapons\", \"stone\", \"precious\", \"paper\", \"woodwork\", \"leatherwork\", \"uncategorized\"]\n", "\n", "mediums = ['paintings', 'glass', 'ceramics']\n", "\n", - "#cultures = ['african (general)', 'american', 'ancient american', 'ancient asian', 'ancient european', 'ancient middle-eastern', 'asian (general)', \n", - "# 'austrian', 'belgian', 'british', 'chinese', 'czech', 'dutch', 'egyptian']#, 'european (general)', 'french', 'german', 'greek', \n", - "# 'iranian', 'italian', 'japanese', 'latin american', 'middle eastern', 'roman', 'russian', 'south asian', 'southeast asian', \n", + "# cultures = ['african (general)', 'american', 'ancient american', 'ancient asian', 'ancient european', 'ancient middle-eastern', 'asian (general)',\n", + "# 'austrian', 'belgian', 'british', 'chinese', 'czech', 'dutch', 'egyptian']#, 'european (general)', 'french', 'german', 'greek',\n", + "# 'iranian', 'italian', 'japanese', 'latin american', 'middle eastern', 'roman', 'russian', 'south asian', 'southeast asian',\n", "# 'spanish', 'swiss', 'various']\n", "\n", "cultures = ['japanese', 'american', 'african (general)']\n", @@ -111,9 +117,10 @@ "culture_set = set(cultures)\n", "selected_ids = {\"AK-RBK-17525-2\", \"AK-MAK-1204\", \"AK-RAK-2015-2-9\"}\n", "\n", - "small_df = df.where(udf(lambda medium, culture, id_val: (medium in medium_set) or (culture in culture_set) or (id_val in selected_ids), BooleanType())(\"Classification\", \"Culture\", \"id\"))\n", + "small_df = df.where(udf(lambda medium, culture, id_val: (medium in medium_set) or (\n", + " culture in culture_set) or (id_val in selected_ids), BooleanType())(\"Classification\", \"Culture\", \"id\"))\n", "\n", - "small_df.count()" + "small_df.count()\n" ], "metadata": {}, "outputs": [], @@ -131,11 +138,11 @@ "cell_type": "code", "source": [ "medium_cknn = (ConditionalKNN()\n", - " .setOutputCol(\"Matches\")\n", - " .setFeaturesCol(\"Norm_Features\")\n", - " .setValuesCol(\"Thumbnail_Url\")\n", - " .setLabelCol(\"Classification\")\n", - " .fit(small_df))" + " .setOutputCol(\"Matches\")\n", + " .setFeaturesCol(\"Norm_Features\")\n", + " .setValuesCol(\"Thumbnail_Url\")\n", + " .setLabelCol(\"Classification\")\n", + " .fit(small_df))" ], "metadata": {}, "outputs": [], @@ -145,11 +152,11 @@ "cell_type": "code", "source": [ "culture_cknn = (ConditionalKNN()\n", - " .setOutputCol(\"Matches\")\n", - " .setFeaturesCol(\"Norm_Features\")\n", - " .setValuesCol(\"Thumbnail_Url\")\n", - " .setLabelCol(\"Culture\")\n", - " .fit(small_df))" + " .setOutputCol(\"Matches\")\n", + " .setFeaturesCol(\"Norm_Features\")\n", + " .setValuesCol(\"Thumbnail_Url\")\n", + " .setLabelCol(\"Culture\")\n", + " .fit(small_df))\n" ], "metadata": {}, "outputs": [], @@ -170,11 +177,11 @@ "cell_type": "code", "source": [ "def add_matches(classes, cknn, df):\n", - " results = df\n", - " for label in classes:\n", - " results = (cknn.transform(results.withColumn(\"conditioner\", array(lit(label))))\n", - " .withColumnRenamed(\"Matches\", \"Matches_{}\".format(label)))\n", - " return results" + " results = df\n", + " for label in classes:\n", + " results = (cknn.transform(results.withColumn(\"conditioner\", array(lit(label))))\n", + " .withColumnRenamed(\"Matches\", \"Matches_{}\".format(label)))\n", + " return results" ], "metadata": {}, "outputs": [], @@ -191,35 +198,37 @@ "cell_type": "code", "source": [ "def plot_img(axis, url, title):\n", - " try:\n", - " response = requests.get(url)\n", - " img = Image.open(BytesIO(response.content)).convert('RGB')\n", - " axis.imshow(img, aspect=\"equal\")\n", - " except:\n", - " pass\n", - " if title is not None: axis.set_title(title, fontsize=4)\n", - " axis.axis(\"off\")\n", + " try:\n", + " response = requests.get(url)\n", + " img = Image.open(BytesIO(response.content)).convert('RGB')\n", + " axis.imshow(img, aspect=\"equal\")\n", + " except:\n", + " pass\n", + " if title is not None:\n", + " axis.set_title(title, fontsize=4)\n", + " axis.axis(\"off\")\n", + "\n", "\n", "def plot_urls(url_arr, titles, filename):\n", - " nx, ny = url_arr.shape\n", - " \n", - " plt.figure(figsize=(nx*5, ny*5), dpi=1600)\n", - " fig, axes = plt.subplots(ny,nx)\n", - " \n", - " # reshape required in the case of 1 image query\n", - " if len(axes.shape) == 1:\n", - " axes = axes.reshape(1, -1)\n", - " \n", - " for i in range(nx):\n", - " for j in range(ny):\n", - " if j == 0:\n", - " plot_img(axes[j, i], url_arr[i,j], titles[i])\n", - " else:\n", - " plot_img(axes[j, i], url_arr[i,j], None)\n", - " \n", - " plt.savefig(filename, dpi=1600) # saves the results as a PNG\n", - "\n", - " display(plt.show())" + " nx, ny = url_arr.shape\n", + "\n", + " plt.figure(figsize=(nx*5, ny*5), dpi=1600)\n", + " fig, axes = plt.subplots(ny, nx)\n", + "\n", + " # reshape required in the case of 1 image query\n", + " if len(axes.shape) == 1:\n", + " axes = axes.reshape(1, -1)\n", + "\n", + " for i in range(nx):\n", + " for j in range(ny):\n", + " if j == 0:\n", + " plot_img(axes[j, i], url_arr[i, j], titles[i])\n", + " else:\n", + " plot_img(axes[j, i], url_arr[i, j], None)\n", + "\n", + " plt.savefig(filename, dpi=1600) # saves the results as a PNG\n", + "\n", + " display(plt.show())" ], "metadata": {}, "outputs": [], @@ -228,14 +237,40 @@ { "cell_type": "markdown", "source": [ - "### Putting it all together\nBelow, we define `test_all()` to take in the data, CKNN models, the art id values to query on, and the file path to save the output visualization to. The medium and culture models were previously trained and loaded." + "### Putting it all together\n", + "Below, we define `test_all()` to take in the data, CKNN models, the art id values to query on, and the file path to save the output visualization to. The medium and culture models were previously trained and loaded." ], "metadata": {} }, { "cell_type": "code", "source": [ - "# main method to test a particular dataset with two CKNN models and a set of art IDs, saving the result to filename.png\n\ndef test_all(data, cknn_medium, cknn_culture, test_ids, root):\n is_nice_obj = udf(lambda obj: obj in test_ids, BooleanType())\n test_df = data.where(is_nice_obj(\"id\"))\n \n results_df_medium = add_matches(mediums, cknn_medium, test_df)\n results_df_culture = add_matches(cultures, cknn_culture, results_df_medium)\n \n results = results_df_culture.collect()\n \n original_urls = [row[\"Thumbnail_Url\"] for row in results]\n \n culture_urls = [ [row[\"Matches_{}\".format(label)][0][\"value\"] for row in results] for label in cultures]\n culture_url_arr = np.array([original_urls] + culture_urls)[:, :]\n plot_urls(culture_url_arr, [\"Original\"] + cultures, root + \"matches_by_culture.png\")\n \n medium_urls = [ [row[\"Matches_{}\".format(label)][0][\"value\"] for row in results] for label in mediums]\n medium_url_arr = np.array([original_urls] + medium_urls)[:, :]\n plot_urls(medium_url_arr, [\"Original\"] + mediums, root + \"matches_by_medium.png\")\n \n return results_df_culture" + "# main method to test a particular dataset with two CKNN models and a set of art IDs, saving the result to filename.png\n", + "\n", + "def test_all(data, cknn_medium, cknn_culture, test_ids, root):\n", + " is_nice_obj = udf(lambda obj: obj in test_ids, BooleanType())\n", + " test_df = data.where(is_nice_obj(\"id\"))\n", + "\n", + " results_df_medium = add_matches(mediums, cknn_medium, test_df)\n", + " results_df_culture = add_matches(cultures, cknn_culture, results_df_medium)\n", + "\n", + " results = results_df_culture.collect()\n", + "\n", + " original_urls = [row[\"Thumbnail_Url\"] for row in results]\n", + "\n", + " culture_urls = [[row[\"Matches_{}\".format(\n", + " label)][0][\"value\"] for row in results] for label in cultures]\n", + " culture_url_arr = np.array([original_urls] + culture_urls)[:, :]\n", + " plot_urls(culture_url_arr, [\"Original\"] +\n", + " cultures, root + \"matches_by_culture.png\")\n", + "\n", + " medium_urls = [[row[\"Matches_{}\".format(\n", + " label)][0][\"value\"] for row in results] for label in mediums]\n", + " medium_url_arr = np.array([original_urls] + medium_urls)[:, :]\n", + " plot_urls(medium_url_arr, [\"Original\"] +\n", + " mediums, root + \"matches_by_medium.png\")\n", + "\n", + " return results_df_culture\n" ], "metadata": {}, "outputs": [], @@ -244,14 +279,20 @@ { "cell_type": "markdown", "source": [ - "### Demo\nThe following cell performs batched queries given desired image IDs and a filename to save the visualization.\n\n\n" + "### Demo\n", + "The following cell performs batched queries given desired image IDs and a filename to save the visualization.\n", + "\n", + "\n", + "" ], "metadata": {} }, { "cell_type": "code", "source": [ - "# sample query\nresult_df = test_all(small_df, medium_cknn, culture_cknn, selected_ids, root=\".\")" + "# sample query\n", + "result_df = test_all(small_df, medium_cknn, culture_cknn,\n", + " selected_ids, root=\".\")\n" ], "metadata": {}, "outputs": [], diff --git a/notebooks/HttpOnSpark - Working with Arbitrary Web APIs.ipynb b/notebooks/HttpOnSpark - Working with Arbitrary Web APIs.ipynb index c37d49517c..5a40d43b15 100644 --- a/notebooks/HttpOnSpark - Working with Arbitrary Web APIs.ipynb +++ b/notebooks/HttpOnSpark - Working with Arbitrary Web APIs.ipynb @@ -28,6 +28,12 @@ "metadata": {}, "outputs": [], "source": [ + "import os\n", + "\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()\n", + "\n", "from pyspark.sql.functions import struct\n", "from pyspark.sql.types import *\n", "from mmlspark.io.http import *\n", @@ -72,4 +78,4 @@ }, "nbformat": 4, "nbformat_minor": 1 -} +} \ No newline at end of file diff --git a/notebooks/LightGBM - Overview.ipynb b/notebooks/LightGBM - Overview.ipynb index 8af166b1a7..68bfc175bc 100644 --- a/notebooks/LightGBM - Overview.ipynb +++ b/notebooks/LightGBM - Overview.ipynb @@ -22,13 +22,14 @@ "nbformat_minor": 2, "cells": [ { + "cell_type": "markdown", "source": [ "# LightGBM" ], - "cell_type": "markdown", "metadata": {} }, { + "cell_type": "markdown", "source": [ "[LightGBM](https://github.com/Microsoft/LightGBM) is an open-source,\n", "distributed, high-performance gradient boosting (GBDT, GBRT, GBM, or\n", @@ -59,10 +60,10 @@ "- LightGBMRegressor: used for building regression models. For example, to predict the house price, we could build a regression model with LightGBMRegressor.\n", "- LightGBMRanker: used for building ranking models. For example, to predict website searching result relevance, we could build a ranking model with LightGBMRanker." ], - "cell_type": "markdown", "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Bankruptcy Prediction with LightGBM Classifier\n", "\n", @@ -70,205 +71,223 @@ "\n", "In this example, we use LightGBM to build a classification model in order to predict bankruptcy." ], - "cell_type": "markdown", "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Read dataset" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "source": [ + "import os\r\n", + "\r\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " from pyspark.sql import SparkSession\r\n", + " spark = SparkSession.builder.getOrCreate()" + ], "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, "source": [ - "df = spark.read.format(\"csv\")\\\n", - " .option(\"header\", True)\\\n", - " .option(\"inferSchema\", True)\\\n", - " .load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv\")\n", - "# print dataset size\n", - "print(\"records read: \" + str(df.count()))\n", - "print(\"Schema: \")\n", + "df = spark.read.format(\"csv\")\\\r\n", + " .option(\"header\", True)\\\r\n", + " .option(\"inferSchema\", True)\\\r\n", + " .load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv\")\r\n", + "# print dataset size\r\n", + "print(\"records read: \" + str(df.count()))\r\n", + "print(\"Schema: \")\r\n", "df.printSchema()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "display(df)" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Split the dataset into train and test" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "train, test = df.randomSplit([0.85, 0.15], seed=1)" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Add featurizer to convert features to vector" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.ml.feature import VectorAssembler\n", - "feature_cols = df.columns[1:]\n", - "featurizer = VectorAssembler(\n", - " inputCols=feature_cols,\n", - " outputCol='features'\n", - ")\n", - "train_data = featurizer.transform(train)['Bankrupt?', 'features']\n", + "from pyspark.ml.feature import VectorAssembler\r\n", + "feature_cols = df.columns[1:]\r\n", + "featurizer = VectorAssembler(\r\n", + " inputCols=feature_cols,\r\n", + " outputCol='features'\r\n", + ")\r\n", + "train_data = featurizer.transform(train)['Bankrupt?', 'features']\r\n", "test_data = featurizer.transform(test)['Bankrupt?', 'features']" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Check if the data is unbalanced" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "display(train_data.groupBy(\"Bankrupt?\").count())" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Model Training" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.lightgbm import LightGBMClassifier\n", + "from mmlspark.lightgbm import LightGBMClassifier\r\n", "model = LightGBMClassifier(objective=\"binary\", featuresCol=\"features\", labelCol=\"Bankrupt?\", isUnbalance=True)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "model = model.fit(train_data)" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "By calling \"saveNativeModel\", it allows you to extract the underlying lightGBM model for fast deployment after you train on Spark." ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.lightgbm import LightGBMClassificationModel\n", - "model.saveNativeModel(\"/lgbmclassifier.model\")\n", - "model = LightGBMClassificationModel.loadNativeModelFromFile(\"/lgbmclassifier.model\")" - ] + "from mmlspark.lightgbm import LightGBMClassificationModel\r\n", + "\r\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " model.saveNativeModel(\"/models/lgbmclassifier.model\")\r\n", + " model = LightGBMClassificationModel.loadNativeModelFromFile(\"/models/lgbmclassifier.model\")\r\n", + "else:\r\n", + " model.saveNativeModel(\"/lgbmclassifier.model\")\r\n", + " model = LightGBMClassificationModel.loadNativeModelFromFile(\"/lgbmclassifier.model\")\r\n" + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Feature Importances Visualization" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "import pandas as pd\n", - "import matplotlib.pyplot as plt\n", - "\n", - "feature_importances = model.getFeatureImportances()\n", - "fi = pd.Series(feature_importances,index = feature_cols)\n", - "fi = fi.sort_values(ascending = True)\n", - "f_index = fi.index\n", - "f_values = fi.values\n", - " \n", - "# print feature importances \n", - "print ('f_index:',f_index)\n", - "print ('f_values:',f_values)\n", - "\n", - "# plot\n", - "x_index = list(range(len(fi)))\n", - "x_index = [x/len(fi) for x in x_index]\n", - "plt.rcParams['figure.figsize'] = (20,20)\n", - "plt.barh(x_index,f_values,height = 0.028 ,align=\"center\",color = 'tan',tick_label=f_index)\n", - "plt.xlabel('importances')\n", - "plt.ylabel('features')\n", + "import pandas as pd\r\n", + "import matplotlib.pyplot as plt\r\n", + "\r\n", + "feature_importances = model.getFeatureImportances()\r\n", + "fi = pd.Series(feature_importances,index = feature_cols)\r\n", + "fi = fi.sort_values(ascending = True)\r\n", + "f_index = fi.index\r\n", + "f_values = fi.values\r\n", + " \r\n", + "# print feature importances \r\n", + "print ('f_index:',f_index)\r\n", + "print ('f_values:',f_values)\r\n", + "\r\n", + "# plot\r\n", + "x_index = list(range(len(fi)))\r\n", + "x_index = [x/len(fi) for x in x_index]\r\n", + "plt.rcParams['figure.figsize'] = (20,20)\r\n", + "plt.barh(x_index,f_values,height = 0.028 ,align=\"center\",color = 'tan',tick_label=f_index)\r\n", + "plt.xlabel('importances')\r\n", + "plt.ylabel('features')\r\n", "plt.show()" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Model Prediction" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "predictions = model.transform(test_data)\n", "predictions.limit(10).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "from mmlspark.train import ComputeModelStatistics\n", "metrics = ComputeModelStatistics(evaluationMetric=\"classification\", labelCol='Bankrupt?', scoredLabelsCol='prediction').transform(predictions)\n", "display(metrics)" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## Quantile Regression for Drug Discovery with LightGBMRegressor\n", "\n", @@ -276,106 +295,103 @@ "\n", "In this example, we show how to use LightGBM to build a simple regression model." ], - "cell_type": "markdown", "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Read dataset" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "triazines = spark.read.format(\"libsvm\")\\\n", " .load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/triazines.scale.svmlight\")" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "# print some basic info\n", "print(\"records read: \" + str(triazines.count()))\n", "print(\"Schema: \")\n", "triazines.printSchema()\n", "display(triazines.limit(10))" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Split dataset into train and test" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "train, test = triazines.randomSplit([0.85, 0.15], seed=1)" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Model Training" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "from mmlspark.lightgbm import LightGBMRegressor\n", "model = LightGBMRegressor(objective='quantile',\n", " alpha=0.2,\n", " learningRate=0.3,\n", " numLeaves=31).fit(train)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "print(model.getFeatureImportances())" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Model Prediction" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "scoredData = model.transform(test)\n", "display(scoredData)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "from mmlspark.train import ComputeModelStatistics\n", "metrics = ComputeModelStatistics(evaluationMetric='regression',\n", @@ -383,27 +399,27 @@ " scoresCol='prediction') \\\n", " .transform(scoredData)\n", "display(metrics)" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "## LightGBM Ranker" ], - "cell_type": "markdown", "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Read dataset" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "df = spark.read.format(\"parquet\").load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/lightGBMRanker_train.parquet\")\n", "# print some basic info\n", @@ -411,20 +427,20 @@ "print(\"Schema: \")\n", "df.printSchema()\n", "display(df.limit(10))" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Model Training" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "from mmlspark.lightgbm import LightGBMRanker\n", "\n", @@ -442,34 +458,36 @@ " numIterations=200,\n", " evalAt=[1,3,5],\n", " metric='ndcg')" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "lgbm_ranker_model = lgbm_ranker.fit(df)" - ] + ], + "outputs": [], + "metadata": {} }, { + "cell_type": "markdown", "source": [ "#### Model Prediction" ], - "cell_type": "markdown", "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "dt = spark.read.format(\"parquet\").load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/lightGBMRanker_test.parquet\")\n", "predictions = lgbm_ranker_model.transform(dt)\n", "predictions.limit(10).toPandas()" - ] + ], + "outputs": [], + "metadata": {} } ] } \ No newline at end of file diff --git a/notebooks/ModelInterpretation - Snow Leopard Detection.ipynb b/notebooks/ModelInterpretation - Snow Leopard Detection.ipynb index 4be5c881bc..5c9fb57245 100644 --- a/notebooks/ModelInterpretation - Snow Leopard Detection.ipynb +++ b/notebooks/ModelInterpretation - Snow Leopard Detection.ipynb @@ -2,42 +2,42 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, "source": [ "## Automated Snow Leopard Detection with Microsoft ML for Apache Spark\n", "\n", - "" - ] + "" + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], "source": [ "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()\n", + " from notebookutils.mssparkutils.credentials import getSecret\n", + " os.environ[\"BING_IMAGE_SEARCH_KEY\"] = getSecret(\"mmlspark-keys\", \"bing-image-search-key\")\n", "\n", "# WARNING this notebook requires alot of memory.\n", "# If you get a heap space error, try dropping the number of images bing returns\n", "# or by writing out the images to parquet first\n", "\n", "# Replace the following with a line like: BING_IMAGE_SEARCH_KEY = \"hdwo2oyd3o928s.....\"\n", - "BING_IMAGE_SEARCH_KEY = os.environ[\"BING_IMAGE_SEARCH_KEY\"] #please add your key here" - ] + "BING_IMAGE_SEARCH_KEY = os.environ[\"BING_IMAGE_SEARCH_KEY\"]" + ], + "outputs": [], + "metadata": { + "collapsed": true + } }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], "source": [ "from mmlspark.cognitive import *\n", "from mmlspark.core.spark import FluentAPI\n", - "import os\n", "from pyspark.sql.functions import lit\n", "\n", "def bingPhotoSearch(name, queries, pages):\n", @@ -56,22 +56,22 @@ " .mlTransform(BingImageSearch.getUrlTransformer(\"images\", \"urls\")) \\\n", " .withColumn(\"labels\", lit(name)) \\\n", " .limit(400)\n" - ] + ], + "outputs": [], + "metadata": { + "collapsed": true + } }, { "cell_type": "markdown", - "metadata": {}, "source": [ - "" - ] + "" + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], "source": [ "def displayDF(df, n=5, image_cols = set([\"urls\"])):\n", " rows = df.take(n)\n", @@ -128,161 +128,162 @@ " displayHTML(style + body)\n", " except:\n", " pass" - ] + ], + "outputs": [], + "metadata": { + "collapsed": true + } }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], "source": [ "snowLeopardQueries = [\"snow leopard\"]\n", "snowLeopardUrls = bingPhotoSearch(\"snow leopard\", snowLeopardQueries, pages=100)\n", "displayDF(snowLeopardUrls)" - ] + ], + "outputs": [], + "metadata": { + "collapsed": true + } }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "randomWords = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/random_words.parquet\").cache()\n", "randomWords.show()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], "source": [ - "randomLinks = randomWords \\\n", - " .mlTransform(BingImageSearch()\n", - " .setSubscriptionKey(BING_IMAGE_SEARCH_KEY)\n", - " .setCount(10)\n", - " .setQueryCol(\"words\")\n", - " .setOutputCol(\"images\")) \\\n", - " .mlTransform(BingImageSearch.getUrlTransformer(\"images\", \"urls\")) \\\n", - " .withColumn(\"label\", lit(\"other\")) \\\n", - " .limit(400)\n", - " \n", + "randomLinks = randomWords \\\r\n", + " .mlTransform(BingImageSearch()\r\n", + " .setSubscriptionKey(BING_IMAGE_SEARCH_KEY)\r\n", + " .setCount(10)\r\n", + " .setQueryCol(\"words\")\r\n", + " .setOutputCol(\"images\")) \\\r\n", + " .mlTransform(BingImageSearch.getUrlTransformer(\"images\", \"urls\")) \\\r\n", + " .withColumn(\"label\", lit(\"other\")) \\\r\n", + " .limit(400)\r\n", + " \r\n", "displayDF(randomLinks)" - ] + ], + "outputs": [], + "metadata": { + "collapsed": true + } }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "images = snowLeopardUrls.union(randomLinks).distinct().repartition(100)\\\n", - " .mlTransform(BingImageSearch.downloadFromUrls(\"urls\", \"image\", concurrency=5, timeout=5000))\\\n", - " .dropna()\n", - "\n", + "images = snowLeopardUrls.union(randomLinks).distinct().repartition(100)\\\r\n", + " .mlTransform(BingImageSearch.downloadFromUrls(\"urls\", \"image\", concurrency=5, timeout=5000))\\\r\n", + " .dropna()\r\n", + "\r\n", "train, test = images.randomSplit([.7,.3], seed=1)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.ml import Pipeline\n", - "from pyspark.ml.feature import StringIndexer\n", - "from pyspark.ml.classification import LogisticRegression\n", - "from pyspark.sql.functions import udf\n", - "from mmlspark.downloader import ModelDownloader\n", - "from mmlspark.cntk import ImageFeaturizer\n", - "from mmlspark.stages import UDFTransformer\n", - "from pyspark.sql.types import *\n", - "\n", - "def getIndex(row):\n", - " return float(row[1])\n", - "\n", - "try:\n", - " network = ModelDownloader(spark, \"Models/\").downloadByName(\"ResNet50\")\n", - "except:\n", - " network = ModelDownloader(spark, \"dbfs:/Models/\").downloadByName(\"ResNet50\")\n", - "\n", - "model = Pipeline(stages=[\n", - " StringIndexer(inputCol = \"labels\", outputCol=\"index\"),\n", - " ImageFeaturizer(inputCol=\"image\", outputCol=\"features\", cutOutputLayers=1).setModel(network),\n", - " LogisticRegression(maxIter=5, labelCol=\"index\", regParam=10.0),\n", - " UDFTransformer()\\\n", - " .setUDF(udf(getIndex, DoubleType()))\\\n", - " .setInputCol(\"probability\")\\\n", - " .setOutputCol(\"leopard_prob\")\n", - "])\n", - "\n", + "from pyspark.ml import Pipeline\r\n", + "from pyspark.ml.feature import StringIndexer\r\n", + "from pyspark.ml.classification import LogisticRegression\r\n", + "from pyspark.sql.functions import udf\r\n", + "from mmlspark.downloader import ModelDownloader\r\n", + "from mmlspark.cntk import ImageFeaturizer\r\n", + "from mmlspark.stages import UDFTransformer\r\n", + "from pyspark.sql.types import *\r\n", + "\r\n", + "def getIndex(row):\r\n", + " return float(row[1])\r\n", + "\r\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " network = ModelDownloader(spark, \"abfss://synapse@mmlsparkeuap.dfs.core.windows.net/models/\").downloadByName(\"ResNet50\")\r\n", + "else:\r\n", + " network = ModelDownloader(spark, \"dbfs:/Models/\").downloadByName(\"ResNet50\")\r\n", + "\r\n", + "model = Pipeline(stages=[\r\n", + " StringIndexer(inputCol = \"labels\", outputCol=\"index\"),\r\n", + " ImageFeaturizer(inputCol=\"image\", outputCol=\"features\", cutOutputLayers=1).setModel(network),\r\n", + " LogisticRegression(maxIter=5, labelCol=\"index\", regParam=10.0),\r\n", + " UDFTransformer()\\\r\n", + " .setUDF(udf(getIndex, DoubleType()))\\\r\n", + " .setInputCol(\"probability\")\\\r\n", + " .setOutputCol(\"leopard_prob\")\r\n", + "])\r\n", + "\r\n", "fitModel = model.fit(train)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ - "" - ] + "" + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, + "source": [ + "def plotConfusionMatrix(df, label, prediction, classLabels):\r\n", + " from mmlspark.plot import confusionMatrix\r\n", + " import matplotlib.pyplot as plt\r\n", + " fig = plt.figure(figsize=(4.5, 4.5))\r\n", + " confusionMatrix(df, label, prediction, classLabels)\r\n", + " display(fig)\r\n", + "\r\n", + "if os.environ.get(\"AZURE_SERVICE\", None) != \"Microsoft.ProjectArcadia\":\r\n", + " plotConfusionMatrix(fitModel.transform(test), \"index\", \"prediction\", fitModel.stages[0].labels)" + ], + "outputs": [], "metadata": { "collapsed": true - }, - "outputs": [], - "source": [ - "def plotConfusionMatrix(df, label, prediction, classLabels):\n", - " from mmlspark.plot import confusionMatrix\n", - " import matplotlib.pyplot as plt\n", - " fig = plt.figure(figsize=(4.5, 4.5))\n", - " confusionMatrix(df, label, prediction, classLabels)\n", - " display(fig)\n", - "\n", - "plotConfusionMatrix(fitModel.transform(test), \"index\", \"prediction\", fitModel.stages[0].labels)" - ] + } }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], "source": [ - "import urllib.request\n", - "from mmlspark.lime import ImageLIME\n", - "\n", - "test_image_url = \"https://mmlspark.blob.core.windows.net/graphics/SnowLeopardAD/snow_leopard1.jpg\"\n", - "with urllib.request.urlopen(test_image_url) as url:\n", - " barr = url.read()\n", - "test_subsample = spark.createDataFrame([(bytearray(barr),)], [\"image\"])\n", - "\n", - "lime = ImageLIME()\\\n", - " .setModel(fitModel)\\\n", - " .setPredictionCol(\"leopard_prob\")\\\n", - " .setOutputCol(\"weights\")\\\n", - " .setInputCol(\"image\")\\\n", - " .setCellSize(100.0)\\\n", - " .setModifier(50.0)\\\n", - " .setNSamples(300)\n", - "\n", + "import urllib.request\r\n", + "from mmlspark.lime import ImageLIME\r\n", + "\r\n", + "test_image_url = \"https://mmlspark.blob.core.windows.net/graphics/SnowLeopardAD/snow_leopard1.jpg\"\r\n", + "with urllib.request.urlopen(test_image_url) as url:\r\n", + " barr = url.read()\r\n", + "test_subsample = spark.createDataFrame([(bytearray(barr),)], [\"image\"])\r\n", + "\r\n", + "lime = ImageLIME()\\\r\n", + " .setModel(fitModel)\\\r\n", + " .setPredictionCol(\"leopard_prob\")\\\r\n", + " .setOutputCol(\"weights\")\\\r\n", + " .setInputCol(\"image\")\\\r\n", + " .setCellSize(100.0)\\\r\n", + " .setModifier(50.0)\\\r\n", + " .setNSamples(300)\r\n", + "\r\n", "result = lime.transform(test_subsample)" - ] + ], + "outputs": [], + "metadata": { + "collapsed": true + } }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "import PIL, io, numpy as np\n", @@ -304,16 +305,21 @@ " display()\n", "\n", "# Gets first row from the LIME-transformed data frame\n", - "plot_superpixels(result.take(1)[0])" - ] + "if os.environ.get(\"AZURE_SERVICE\", None) != \"Microsoft.ProjectArcadia\":\n", + " plot_superpixels(result.take(1)[0])" + ], + "outputs": [], + "metadata": { + "collapsed": true + } }, { "cell_type": "markdown", - "metadata": {}, "source": [ "### Your results will look like:\n", - "" - ] + "" + ], + "metadata": {} } ], "metadata": { diff --git a/notebooks/OpenCV - Pipeline Image Transformations.ipynb b/notebooks/OpenCV - Pipeline Image Transformations.ipynb index 0368d08757..e6b4cda376 100644 --- a/notebooks/OpenCV - Pipeline Image Transformations.ipynb +++ b/notebooks/OpenCV - Pipeline Image Transformations.ipynb @@ -26,6 +26,11 @@ "metadata": {}, "outputs": [], "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()\n", + "\n", "import mmlspark\n", "import numpy as np\n", "from mmlspark.opencv import toNDArray\n", diff --git a/notebooks/Regression - Auto Imports.ipynb b/notebooks/Regression - Auto Imports.ipynb index a700443528..271a17c751 100644 --- a/notebooks/Regression - Auto Imports.ipynb +++ b/notebooks/Regression - Auto Imports.ipynb @@ -29,6 +29,18 @@ "using `pandas.read_csv()`" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/notebooks/Regression - Flight Delays with DataCleaning.ipynb b/notebooks/Regression - Flight Delays with DataCleaning.ipynb similarity index 95% rename from notebooks/Regression - Flight Delays with DataCleaning.ipynb rename to notebooks/Regression - Flight Delays with DataCleaning.ipynb index 260ad8e730..c4340228fc 100644 --- a/notebooks/Regression - Flight Delays with DataCleaning.ipynb +++ b/notebooks/Regression - Flight Delays with DataCleaning.ipynb @@ -25,6 +25,18 @@ "First, import the pandas package" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/notebooks/Regression - Flight Delays.ipynb b/notebooks/Regression - Flight Delays.ipynb index dad2e6c10d..590915e7cc 100644 --- a/notebooks/Regression - Flight Delays.ipynb +++ b/notebooks/Regression - Flight Delays.ipynb @@ -13,6 +13,18 @@ "First, import the packages." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -102,8 +114,11 @@ "metadata": {}, "outputs": [], "source": [ - "import random\n", - "model_name = \"dbfs:/flightDelayModel.mml\"\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " model_name = \"/models/flightDelayModel.mml\"\n", + "else:\n", + " model_name = \"dbfs:/flightDelayModel.mml\"\n", + "\n", "model.write().overwrite().save(model_name)\n", "flightDelayModel = TrainedRegressorModel.load(model_name)\n", "\n", diff --git a/notebooks/Regression - Vowpal Wabbit vs. LightGBM vs. Linear Regressor.ipynb b/notebooks/Regression - Vowpal Wabbit vs. LightGBM vs. Linear Regressor.ipynb index e091b62d98..51a71519cf 100644 --- a/notebooks/Regression - Vowpal Wabbit vs. LightGBM vs. Linear Regressor.ipynb +++ b/notebooks/Regression - Vowpal Wabbit vs. LightGBM vs. Linear Regressor.ipynb @@ -13,6 +13,18 @@ " [Spark MLlib Linear Regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression)." ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -20,9 +32,6 @@ "outputs": [], "source": [ "import math\n", - "from matplotlib.colors import ListedColormap, Normalize\n", - "from matplotlib.cm import get_cmap\n", - "import matplotlib.pyplot as plt\n", "from mmlspark.train import ComputeModelStatistics\n", "from mmlspark.vw import VowpalWabbitRegressor, VowpalWabbitFeaturizer\n", "from mmlspark.lightgbm import LightGBMRegressor\n", @@ -54,9 +63,7 @@ "\n", "feature_cols = ['f' + str(i) for i in range(boston.data.shape[1])]\n", "header = ['target'] + feature_cols\n", - "df = spark.createDataFrame(\n", - " pd.DataFrame(data=np.column_stack((boston.target, boston.data)), columns=header)\n", - ").repartition(1)\n", + "df = spark.createDataFrame(pd.DataFrame(data=np.column_stack((boston.target, boston.data)), columns=header)).repartition(1)\n", "print(\"Dataframe has {} rows\".format(df.count()))\n", "display(df.limit(10).toPandas())" ] @@ -67,9 +74,7 @@ "metadata": {}, "outputs": [], "source": [ - "train_data, test_data = df.randomSplit([0.75, 0.25], seed=42)\n", - "train_data.cache()\n", - "test_data.cache()" + "train_data, test_data = df.randomSplit([0.75, 0.25], seed=42)" ] }, { @@ -106,25 +111,7 @@ "features = train_data.columns[1:]\n", "values = train_data.drop('target').toPandas()\n", "ncols = 5\n", - "nrows = math.ceil(len(features) / ncols)\n", - "\n", - "yy = [r['target'] for r in train_data.select('target').collect()]\n", - "\n", - "f, axes = plt.subplots(nrows, ncols, sharey=True, figsize=(30,10))\n", - "f.tight_layout()\n", - "\n", - "for irow in range(nrows):\n", - " axes[irow][0].set_ylabel('target')\n", - " for icol in range(ncols):\n", - " try:\n", - " feat = features[irow*ncols + icol]\n", - " xx = values[feat]\n", - "\n", - " axes[irow][icol].scatter(xx, yy, s=10, alpha=0.25)\n", - " axes[irow][icol].set_xlabel(feat)\n", - " axes[irow][icol].get_yaxis().set_ticks([])\n", - " except IndexError:\n", - " f.delaxes(axes[irow][icol])" + "nrows = math.ceil(len(features) / ncols)" ] }, { @@ -142,10 +129,7 @@ "metadata": {}, "outputs": [], "source": [ - "featurizer = VectorAssembler(\n", - " inputCols=feature_cols,\n", - " outputCol='features'\n", - ")\n", + "featurizer = VectorAssembler(inputCols=feature_cols, outputCol='features')\n", "lr_train_data = featurizer.transform(train_data)['target', 'features']\n", "lr_test_data = featurizer.transform(test_data)['target', 'features']\n", "display(lr_train_data.limit(10).toPandas())" @@ -158,9 +142,7 @@ "outputs": [], "source": [ "# By default, `maxIter` is 100. Other params you may want to change include: `regParam`, `elasticNetParam`, etc.\n", - "lr = LinearRegression(\n", - " labelCol='target',\n", - ")\n", + "lr = LinearRegression(labelCol='target')\n", "\n", "lr_model = lr.fit(lr_train_data)\n", "lr_predictions = lr_model.transform(lr_test_data)\n", @@ -188,8 +170,7 @@ "metrics = ComputeModelStatistics(\n", " evaluationMetric='regression',\n", " labelCol='target',\n", - " scoresCol='prediction'\n", - ").transform(lr_predictions)\n", + " scoresCol='prediction').transform(lr_predictions)\n", "\n", "results = metrics.toPandas()\n", "results.insert(0, 'model', ['Spark MLlib - Linear Regression'])\n", @@ -218,8 +199,8 @@ "source": [ "vw_featurizer = VowpalWabbitFeaturizer(\n", " inputCols=feature_cols,\n", - " outputCol='features',\n", - ")\n", + " outputCol='features')\n", + "\n", "vw_train_data = vw_featurizer.transform(train_data)['target', 'features']\n", "vw_test_data = vw_featurizer.transform(test_data)['target', 'features']\n", "display(vw_train_data.limit(10).toPandas())" @@ -243,8 +224,7 @@ "vwr = VowpalWabbitRegressor(\n", " labelCol='target',\n", " args=args,\n", - " numPasses=100,\n", - ")\n", + " numPasses=100)\n", "\n", "# To reduce number of partitions (which will effect performance), use `vw_train_data.repartition(1)`\n", "vw_train_data_2 = vw_train_data.repartition(1).cache()\n", @@ -264,15 +244,14 @@ "metrics = ComputeModelStatistics(\n", " evaluationMetric='regression',\n", " labelCol='target',\n", - " scoresCol='prediction'\n", - ").transform(vw_predictions)\n", + " scoresCol='prediction').transform(vw_predictions)\n", "\n", "vw_result = metrics.toPandas()\n", "vw_result.insert(0, 'model', ['Vowpal Wabbit'])\n", "results = results.append(\n", " vw_result,\n", - " ignore_index=True\n", - ")\n", + " ignore_index=True)\n", + "\n", "display(results)" ] }, @@ -295,8 +274,7 @@ " learningRate=0.3,\n", " numLeaves=31,\n", " labelCol='target',\n", - " numIterations=100,\n", - ")\n", + " numIterations=100)\n", "\n", "# Using one partition since the training dataset is very small\n", "repartitioned_data = lr_train_data.repartition(1).cache()\n", @@ -316,15 +294,15 @@ "metrics = ComputeModelStatistics(\n", " evaluationMetric='regression',\n", " labelCol='target',\n", - " scoresCol='prediction'\n", - ").transform(lg_predictions)\n", + " scoresCol='prediction').transform(lg_predictions)\n", "\n", "lg_result = metrics.toPandas()\n", "lg_result.insert(0, 'model', ['LightGBM'])\n", + "\n", "results = results.append(\n", " lg_result,\n", - " ignore_index=True\n", - ")\n", + " ignore_index=True)\n", + "\n", "display(results)" ] }, @@ -343,30 +321,49 @@ "metadata": {}, "outputs": [], "source": [ - "cmap = get_cmap('YlOrRd')\n", + "if os.environ.get(\"AZURE_SERVICE\", None) != \"Microsoft.ProjectArcadia\":\n", + " from matplotlib.colors import ListedColormap, Normalize\n", + " from matplotlib.cm import get_cmap\n", + " import matplotlib.pyplot as plt\n", + "\n", + " f, axes = plt.subplots(nrows, ncols, sharey=True, figsize=(30,10))\n", + " f.tight_layout()\n", + " yy = [r['target'] for r in train_data.select('target').collect()]\n", + " for irow in range(nrows):\n", + " axes[irow][0].set_ylabel('target')\n", + " for icol in range(ncols):\n", + " try:\n", + " feat = features[irow*ncols + icol]\n", + " xx = values[feat]\n", + " axes[irow][icol].scatter(xx, yy, s=10, alpha=0.25)\n", + " axes[irow][icol].set_xlabel(feat)\n", + " axes[irow][icol].get_yaxis().set_ticks([])\n", + " except IndexError:\n", + " f.delaxes(axes[irow][icol])\n", + "\n", + " cmap = get_cmap('YlOrRd')\n", "\n", - "target = np.array(test_data.select('target').collect()).flatten()\n", - "model_preds = [\n", - " (\"Spark MLlib Linear Regression\", lr_predictions),\n", - " (\"Vowpal Wabbit\", vw_predictions),\n", - " (\"LightGBM\", lg_predictions)\n", - "]\n", + " target = np.array(test_data.select('target').collect()).flatten()\n", + " model_preds = [\n", + " (\"Spark MLlib Linear Regression\", lr_predictions),\n", + " (\"Vowpal Wabbit\", vw_predictions),\n", + " (\"LightGBM\", lg_predictions)]\n", "\n", - "f, axes = plt.subplots(1, len(model_preds), sharey=True, figsize=(18, 6))\n", - "f.tight_layout()\n", + " f, axes = plt.subplots(1, len(model_preds), sharey=True, figsize=(18, 6))\n", + " f.tight_layout()\n", "\n", - "for i, (model_name, preds) in enumerate(model_preds):\n", - " preds = np.array(preds.select('prediction').collect()).flatten()\n", - " err = np.absolute(preds - target)\n", + " for i, (model_name, preds) in enumerate(model_preds):\n", + " preds = np.array(preds.select('prediction').collect()).flatten()\n", + " err = np.absolute(preds - target)\n", "\n", - " norm = Normalize()\n", - " clrs = cmap(np.asarray(norm(err)))[:, :-1]\n", - " axes[i].scatter(preds, target, s=60, c=clrs, edgecolors='#888888', alpha=0.75)\n", - " axes[i].plot((0, 60), (0, 60), linestyle='--', color='#888888')\n", - " axes[i].set_xlabel('Predicted values')\n", - " if i ==0:\n", - " axes[i].set_ylabel('Actual values')\n", - " axes[i].set_title(model_name)" + " norm = Normalize()\n", + " clrs = cmap(np.asarray(norm(err)))[:, :-1]\n", + " axes[i].scatter(preds, target, s=60, c=clrs, edgecolors='#888888', alpha=0.75)\n", + " axes[i].plot((0, 60), (0, 60), linestyle='--', color='#888888')\n", + " axes[i].set_xlabel('Predicted values')\n", + " if i ==0:\n", + " axes[i].set_ylabel('Actual values')\n", + " axes[i].set_title(model_name)" ] }, { diff --git a/notebooks/SparkServing - Deploying a Classifier.ipynb b/notebooks/SparkServing - Deploying a Classifier.ipynb index 400cabd995..854ae260a0 100644 --- a/notebooks/SparkServing - Deploying a Classifier.ipynb +++ b/notebooks/SparkServing - Deploying a Classifier.ipynb @@ -9,6 +9,18 @@ "First, we import needed packages:" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "code", "execution_count": null, @@ -206,4 +218,4 @@ }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/notebooks/TextAnalytics - Amazon Book Reviews with Word2Vec.ipynb b/notebooks/TextAnalytics - Amazon Book Reviews with Word2Vec.ipynb index ec50dce740..9cd06cdd91 100644 --- a/notebooks/TextAnalytics - Amazon Book Reviews with Word2Vec.ipynb +++ b/notebooks/TextAnalytics - Amazon Book Reviews with Word2Vec.ipynb @@ -2,188 +2,200 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, "source": [ "## TextAnalytics - Amazon Book Reviews with Word2Vec\n", "\n", "Yet again, now using the `Word2Vec` Estimator from Spark. We can use the tree-based\n", "learners from spark in this scenario due to the lower dimensionality representation of\n", "features." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "import pandas as pd\n" - ] + "import os\r\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " from pyspark.sql import SparkSession\r\n", + " spark = SparkSession.builder.getOrCreate()" + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "source": [ + "import pandas as pd\r\n" + ], "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, "source": [ - "data = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet\")\n", + "data = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet\")\r\n", "data.limit(10).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Modify the label column to predict a rating greater than 3." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "processedData = data.withColumn(\"label\", data[\"rating\"] > 3) \\\n", - " .select([\"text\", \"label\"])\n", + "processedData = data.withColumn(\"label\", data[\"rating\"] > 3) \\\r\n", + " .select([\"text\", \"label\"])\r\n", "processedData.limit(5).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Split the dataset into train, test and validation sets." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ "train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20])" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Use `Tokenizer` and `Word2Vec` to generate the features." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.ml import Pipeline\n", - "from pyspark.ml.feature import Tokenizer, Word2Vec\n", - "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"words\")\n", - "partitions = train.rdd.getNumPartitions()\n", - "word2vec = Word2Vec(maxIter=4, seed=42, inputCol=\"words\", outputCol=\"features\",\n", - " numPartitions=partitions)\n", + "from pyspark.ml import Pipeline\r\n", + "from pyspark.ml.feature import Tokenizer, Word2Vec\r\n", + "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"words\")\r\n", + "partitions = train.rdd.getNumPartitions()\r\n", + "word2vec = Word2Vec(maxIter=4, seed=42, inputCol=\"words\", outputCol=\"features\",\r\n", + " numPartitions=partitions)\r\n", "textFeaturizer = Pipeline(stages = [tokenizer, word2vec]).fit(train)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Transform each of the train, test and validation datasets." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "ptrain = textFeaturizer.transform(train).select([\"label\", \"features\"])\n", - "ptest = textFeaturizer.transform(test).select([\"label\", \"features\"])\n", - "pvalidation = textFeaturizer.transform(validation).select([\"label\", \"features\"])\n", + "ptrain = textFeaturizer.transform(train).select([\"label\", \"features\"])\r\n", + "ptest = textFeaturizer.transform(test).select([\"label\", \"features\"])\r\n", + "pvalidation = textFeaturizer.transform(validation).select([\"label\", \"features\"])\r\n", "ptrain.limit(5).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Generate several models with different parameters from the training data." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier\n", - "from mmlspark.train import TrainClassifier\n", - "import itertools\n", - "\n", - "lrHyperParams = [0.05, 0.2]\n", - "logisticRegressions = [LogisticRegression(regParam = hyperParam)\n", - " for hyperParam in lrHyperParams]\n", - "lrmodels = [TrainClassifier(model=lrm, labelCol=\"label\").fit(ptrain)\n", - " for lrm in logisticRegressions]\n", - "\n", - "rfHyperParams = itertools.product([5, 10], [2, 3])\n", - "randomForests = [RandomForestClassifier(numTrees=hyperParam[0], maxDepth=hyperParam[1])\n", - " for hyperParam in rfHyperParams]\n", - "rfmodels = [TrainClassifier(model=rfm, labelCol=\"label\").fit(ptrain)\n", - " for rfm in randomForests]\n", - "\n", - "gbtHyperParams = itertools.product([8, 16], [2, 3])\n", - "gbtclassifiers = [GBTClassifier(maxBins=hyperParam[0], maxDepth=hyperParam[1])\n", - " for hyperParam in gbtHyperParams]\n", - "gbtmodels = [TrainClassifier(model=gbt, labelCol=\"label\").fit(ptrain)\n", - " for gbt in gbtclassifiers]\n", - "\n", + "from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier\r\n", + "from mmlspark.train import TrainClassifier\r\n", + "import itertools\r\n", + "\r\n", + "lrHyperParams = [0.05, 0.2]\r\n", + "logisticRegressions = [LogisticRegression(regParam = hyperParam)\r\n", + " for hyperParam in lrHyperParams]\r\n", + "lrmodels = [TrainClassifier(model=lrm, labelCol=\"label\").fit(ptrain)\r\n", + " for lrm in logisticRegressions]\r\n", + "\r\n", + "rfHyperParams = itertools.product([5, 10], [2, 3])\r\n", + "randomForests = [RandomForestClassifier(numTrees=hyperParam[0], maxDepth=hyperParam[1])\r\n", + " for hyperParam in rfHyperParams]\r\n", + "rfmodels = [TrainClassifier(model=rfm, labelCol=\"label\").fit(ptrain)\r\n", + " for rfm in randomForests]\r\n", + "\r\n", + "gbtHyperParams = itertools.product([8, 16], [2, 3])\r\n", + "gbtclassifiers = [GBTClassifier(maxBins=hyperParam[0], maxDepth=hyperParam[1])\r\n", + " for hyperParam in gbtHyperParams]\r\n", + "gbtmodels = [TrainClassifier(model=gbt, labelCol=\"label\").fit(ptrain)\r\n", + " for gbt in gbtclassifiers]\r\n", + "\r\n", "trainedModels = lrmodels + rfmodels + gbtmodels" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Find the best model for the given test dataset." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.automl import FindBestModel\n", - "bestModel = FindBestModel(evaluationMetric=\"AUC\", models=trainedModels).fit(ptest)\n", - "bestModel.getRocCurve().show()\n", - "bestModel.getBestModelMetrics().show()\n", + "from mmlspark.automl import FindBestModel\r\n", + "bestModel = FindBestModel(evaluationMetric=\"AUC\", models=trainedModels).fit(ptest)\r\n", + "bestModel.getRocCurve().show()\r\n", + "bestModel.getBestModelMetrics().show()\r\n", "bestModel.getAllModelMetrics().show()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Get the accuracy from the validation dataset." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.train import ComputeModelStatistics\n", - "predictions = bestModel.transform(pvalidation)\n", - "metrics = ComputeModelStatistics().transform(predictions)\n", - "print(\"Best model's accuracy on validation set = \"\n", - " + \"{0:.2f}%\".format(metrics.first()[\"accuracy\"] * 100))\n", - "print(\"Best model's AUC on validation set = \"\n", + "from mmlspark.train import ComputeModelStatistics\r\n", + "predictions = bestModel.transform(pvalidation)\r\n", + "metrics = ComputeModelStatistics().transform(predictions)\r\n", + "print(\"Best model's accuracy on validation set = \"\r\n", + " + \"{0:.2f}%\".format(metrics.first()[\"accuracy\"] * 100))\r\n", + "print(\"Best model's AUC on validation set = \"\r\n", " + \"{0:.2f}%\".format(metrics.first()[\"AUC\"] * 100))" - ] + ], + "outputs": [], + "metadata": {} } ], "metadata": { diff --git a/notebooks/TextAnalytics - Amazon Book Reviews.ipynb b/notebooks/TextAnalytics - Amazon Book Reviews.ipynb index 54c20a0c8c..e700ea2beb 100644 --- a/notebooks/TextAnalytics - Amazon Book Reviews.ipynb +++ b/notebooks/TextAnalytics - Amazon Book Reviews.ipynb @@ -2,145 +2,157 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, "source": [ "## TextAnalytics - Amazon Book Reviews\n", "\n", "Again, try to predict Amazon book ratings greater than 3 out of 5, this time using\n", "the `TextFeaturizer` module which is a composition of several text analytics APIs that\n", "are native to Spark." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "import pandas as pd\n" - ] + "import os\r\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\r\n", + " from pyspark.sql import SparkSession\r\n", + " spark = SparkSession.builder.getOrCreate()" + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "source": [ + "import pandas as pd" + ], "outputs": [], + "metadata": {} + }, + { + "cell_type": "code", + "execution_count": null, "source": [ - "data = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet\")\n", + "data = spark.read.parquet(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet\")\r\n", "data.limit(10).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Use `TextFeaturizer` to generate our features column. We remove stop words, and use TF-IDF\n", "to generate 2²⁰ sparse features." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.featurize.text import TextFeaturizer\n", - "textFeaturizer = TextFeaturizer() \\\n", - " .setInputCol(\"text\").setOutputCol(\"features\") \\\n", + "from mmlspark.featurize.text import TextFeaturizer\r\n", + "textFeaturizer = TextFeaturizer() \\\r\n", + " .setInputCol(\"text\").setOutputCol(\"features\") \\\r\n", " .setUseStopWordsRemover(True).setUseIDF(True).setMinDocFreq(5).setNumFeatures(1 << 16).fit(data)" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "processedData = textFeaturizer.transform(data)\n", + "processedData = textFeaturizer.transform(data)\r\n", "processedData.limit(5).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Change the label so that we can predict whether the rating is greater than 3 using a binary\n", "classifier." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "processedData = processedData.withColumn(\"label\", processedData[\"rating\"] > 3) \\\n", - " .select([\"features\", \"label\"])\n", + "processedData = processedData.withColumn(\"label\", processedData[\"rating\"] > 3) \\\r\n", + " .select([\"features\", \"label\"])\r\n", "processedData.limit(5).toPandas()" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Train several Logistic Regression models with different regularizations." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20])\n", - "from pyspark.ml.classification import LogisticRegression\n", - "\n", - "lrHyperParams = [0.05, 0.1, 0.2, 0.4]\n", - "logisticRegressions = [LogisticRegression(regParam = hyperParam) for hyperParam in lrHyperParams]\n", - "\n", - "from mmlspark.train import TrainClassifier\n", + "train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20])\r\n", + "from pyspark.ml.classification import LogisticRegression\r\n", + "\r\n", + "lrHyperParams = [0.05, 0.1, 0.2, 0.4]\r\n", + "logisticRegressions = [LogisticRegression(regParam = hyperParam) for hyperParam in lrHyperParams]\r\n", + "\r\n", + "from mmlspark.train import TrainClassifier\r\n", "lrmodels = [TrainClassifier(model=lrm, labelCol=\"label\").fit(train) for lrm in logisticRegressions]" - ] + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Find the model with the best AUC on the test set." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.automl import FindBestModel, BestModel\n", - "bestModel = FindBestModel(evaluationMetric=\"AUC\", models=lrmodels).fit(test)\n", - "bestModel.getRocCurve().show()\n", - "bestModel.getBestModelMetrics().show()\n", - "bestModel.getAllModelMetrics().show()\n" - ] + "from mmlspark.automl import FindBestModel, BestModel\r\n", + "bestModel = FindBestModel(evaluationMetric=\"AUC\", models=lrmodels).fit(test)\r\n", + "bestModel.getRocCurve().show()\r\n", + "bestModel.getBestModelMetrics().show()\r\n", + "bestModel.getAllModelMetrics().show()\r\n" + ], + "outputs": [], + "metadata": {} }, { "cell_type": "markdown", - "metadata": {}, "source": [ "Use the optimized `ComputeModelStatistics` API to find the model accuracy." - ] + ], + "metadata": {} }, { "cell_type": "code", "execution_count": null, - "metadata": {}, - "outputs": [], "source": [ - "from mmlspark.train import ComputeModelStatistics\n", - "predictions = bestModel.transform(validation)\n", - "metrics = ComputeModelStatistics().transform(predictions)\n", - "print(\"Best model's accuracy on validation set = \"\n", + "from mmlspark.train import ComputeModelStatistics\r\n", + "predictions = bestModel.transform(validation)\r\n", + "metrics = ComputeModelStatistics().transform(predictions)\r\n", + "print(\"Best model's accuracy on validation set = \"\r\n", " + \"{0:.2f}%\".format(metrics.first()[\"accuracy\"] * 100))" - ] + ], + "outputs": [], + "metadata": {} } ], "metadata": { diff --git a/notebooks/Vowpal Wabbit - Overview.ipynb b/notebooks/Vowpal Wabbit - Overview.ipynb index d9180b303f..0934c36ebc 100644 --- a/notebooks/Vowpal Wabbit - Overview.ipynb +++ b/notebooks/Vowpal Wabbit - Overview.ipynb @@ -82,6 +82,19 @@ "cell_type": "markdown", "metadata": {} }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "if os.environ.get(\"AZURE_SERVICE\", None) == \"Microsoft.ProjectArcadia\":\n", + " from pyspark.sql import SparkSession\n", + " spark = SparkSession.builder.getOrCreate()" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/pipeline.yaml b/pipeline.yaml index 9239a62236..d9628745eb 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -113,7 +113,49 @@ jobs: inputs: azureSubscription: 'MMLSpark Build' scriptLocation: inlineScript - inlineScript: 'sbt "testOnly com.microsoft.ml.spark.nbtest.**"' + inlineScript: 'sbt "testOnly com.microsoft.ml.spark.nbtest.DatabricksTests"' + condition: and(succeeded(), eq(variables.runTests, 'True')) + - task: PublishTestResults@2 + displayName: 'Publish Test Results' + inputs: + testResultsFiles: '**/test-reports/TEST-*.xml' + failTaskOnFailedTests: true + condition: and(eq(variables.runTests, 'True'), succeededOrFailed()) + +- job: E2E_Synapse + cancelTimeoutInMinutes: 0 + pool: + vmImage: ubuntu-18.04 + steps: + # - template: templates/ivy_cache.yml + - bash: echo "##vso[task.prependpath]$CONDA/bin" + displayName: Add conda to PATH + - bash: conda info + - bash: conda env create -f environment.yaml + displayName: Create Anaconda environment + - task: AzureKeyVault@1 + inputs: + azureSubscription: 'MMLSpark Build' + keyVaultName: mmlspark-keys + - bash: | + source activate mmlspark + jupyter nbconvert --to script ./notebooks/*.ipynb* + sbt packagePython + sbt publishBlob + displayName: Publish Blob Artifacts + env: + STORAGE_KEY: $(storage-key) + NEXUS-UN: $(nexus-un) + NEXUS-PW: $(nexus-pw) + PGP-PRIVATE: $(pgp-private) + PGP-PUBLIC: $(pgp-public) + PGP-PW: $(pgp-pw) + - task: AzureCLI@1 + displayName: 'E2E' + inputs: + azureSubscription: 'MMLSpark Build' + scriptLocation: inlineScript + inlineScript: 'sbt "testOnly com.microsoft.ml.spark.nbtest.SynapseTests"' condition: and(succeeded(), eq(variables.runTests, 'True')) - task: PublishTestResults@2 displayName: 'Publish Test Results'