diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/BingImageSearch.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/BingImageSearch.scala index 776b84524d..60e2aa00a3 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/BingImageSearch.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/BingImageSearch.scala @@ -3,27 +3,25 @@ package com.microsoft.ml.spark.cognitive -import java.net.URL import com.microsoft.ml.spark.core.utils.AsyncUtils import com.microsoft.ml.spark.logging.BasicLogging import com.microsoft.ml.spark.stages.Lambda import org.apache.commons.io.IOUtils import org.apache.http.client.methods.{HttpGet, HttpRequestBase} import org.apache.http.entity.AbstractHttpEntity -import org.apache.spark.binary.ConfUtils import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.ComplexParamsReadable import org.apache.spark.ml.param.ServiceParam import org.apache.spark.ml.util._ +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.functions.{col, explode, udf} +import org.apache.spark.sql.functions.{col, explode} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row} +import spray.json.DefaultJsonProtocol._ +import java.net.URL import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} -import spray.json.DefaultJsonProtocol._ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Serializable { @@ -68,13 +66,15 @@ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Seria class BingImageSearch(override val uid: String) extends CognitiveServicesBase(uid) - with HasCognitiveServiceInput with HasInternalJsonOutputParser with BasicLogging { + with HasCognitiveServiceInput with HasInternalJsonOutputParser with BasicLogging with HasSetLinkedService { logClass() override protected lazy val pyInternalWrapper = true def this() = this(Identifiable.randomUID("BingImageSearch")) + def urlPath: String = "/v7.0/images/search" + setDefault(url -> "https://api.bing.microsoft.com/v7.0/images/search") override def prepareMethod(): HttpRequestBase = new HttpGet() diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/CognitiveServiceBase.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/CognitiveServiceBase.scala index 26f9efaa40..8cacdf8292 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/CognitiveServiceBase.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/CognitiveServiceBase.scala @@ -241,6 +241,19 @@ trait HasSetLinkedService extends Wrappable with HasURL with HasSubscriptionKey } } +trait HasSetLinkedServiceUsingLocation extends HasSetLinkedService with HasSetLocation { + override def setLinkedService(v: String): this.type = { + val classPath = "mssparkutils.cognitiveService" + val linkedServiceClass = ScalaClassLoader(getClass.getClassLoader).tryToLoadClass(classPath) + val locationMethod = linkedServiceClass.get.getMethod("getLocation", v.getClass) + val keyMethod = linkedServiceClass.get.getMethod("getKey", v.getClass) + val location = locationMethod.invoke(linkedServiceClass.get, v).toString + val key = keyMethod.invoke(linkedServiceClass.get, v).toString + setLocation(location) + setSubscriptionKey(key) + } +} + trait HasSetLocation extends Wrappable with HasURL with HasUrlPath { override def pyAdditionalMethods: String = super.pyAdditionalMethods + { """ @@ -277,6 +290,12 @@ abstract class CognitiveServicesBaseNoHandler(val uid: String) extends Transform assert(badColumns.isEmpty, s"Could not find dynamic columns: $badColumns in columns: ${schema.fieldNames.toSet}") + val missingRequiredParams = this.getRequiredParams.filter { + p => this.get(p).isEmpty && this.getDefault(p).isEmpty + } + assert(missingRequiredParams.isEmpty, + s"Missing required params: ${missingRequiredParams.map(s => s.name).mkString("(", ", ", ")")}") + val dynamicParamCols = getVectorParamMap.values.toList.map(col) match { case Nil => Seq(lit(false).alias("placeholder")) case l => l diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/ComputerVision.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/ComputerVision.scala index 74f3a546ff..238242825a 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/ComputerVision.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/ComputerVision.scala @@ -312,8 +312,8 @@ class RecognizeText(override val uid: String) "printed text recognition is performed. If 'Handwritten' is specified," + " handwriting recognition is performed", { - case Left(_) => true - case Right(s) => Set("Printed", "Handwritten")(s) + case Left(s) => Set("Printed", "Handwritten")(s) + case Right(_) => true }, isURLParam = true) def getMode: String = getScalarParam(mode) @@ -361,8 +361,8 @@ class ReadImage(override val uid: String) " so only provide a language code if you would like to force the documented" + " to be processed as that specific language.", { - case Left(_) => true - case Right(s) => Set("en", "nl", "fr", "de", "it", "pt", "es")(s) + case Left(s) => Set("en", "nl", "fr", "de", "it", "pt", "es")(s) + case Right(_) => true }, isURLParam = true) def setLanguage(v: String): this.type = setScalarParam(language, v) diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/DocumentTranslator.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/DocumentTranslator.scala index c3a23bd3e6..76ede8bb5a 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/DocumentTranslator.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/DocumentTranslator.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row} import spray.json._ import java.net.URI +import scala.reflect.internal.util.ScalaClassLoader trait DocumentTranslatorAsyncReply extends BasicAsyncReply { @@ -137,6 +138,17 @@ class DocumentTranslator(override val uid: String) extends CognitiveServicesBase ))).toJson.compactPrint, ContentType.APPLICATION_JSON)) } + override def setLinkedService(v: String): this.type = { + val classPath = "mssparkutils.cognitiveService" + val linkedServiceClass = ScalaClassLoader(getClass.getClassLoader).tryToLoadClass(classPath) + val nameMethod = linkedServiceClass.get.getMethod("getName", v.getClass) + val keyMethod = linkedServiceClass.get.getMethod("getKey", v.getClass) + val name = nameMethod.invoke(linkedServiceClass.get, v).toString + val key = keyMethod.invoke(linkedServiceClass.get, v).toString + setServiceName(name) + setSubscriptionKey(key) + } + override def setServiceName(v: String): this.type = { super.setServiceName(v) setUrl(s"https://$getServiceName.cognitiveservices.azure.com/" + urlPath) diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/FormRecognizer.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/FormRecognizer.scala index cfc82209c2..4adf873660 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/FormRecognizer.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/FormRecognizer.scala @@ -71,8 +71,8 @@ trait HasModelID extends HasServiceParams { trait HasLocale extends HasServiceParams { val locale = new ServiceParam[String](this, "locale", "Locale of the receipt. Supported" + " locales: en-AU, en-CA, en-GB, en-IN, en-US.", { - case Left(_) => true - case Right(s) => Set("en-AU", "en-CA", "en-GB", "en-IN", "en-US")(s) + case Left(s) => Set("en-AU", "en-CA", "en-GB", "en-IN", "en-US")(s) + case Right(_) => true }, isURLParam = true) def setLocale(v: String): this.type = setScalarParam(locale, v) @@ -258,7 +258,7 @@ object ListCustomModels extends ComplexParamsReadable[ListCustomModels] class ListCustomModels(override val uid: String) extends CognitiveServicesBase(uid) with HasCognitiveServiceInput with HasInternalJsonOutputParser - with HasSetLocation with BasicLogging { + with HasSetLocation with HasSetLinkedService with BasicLogging { logClass() def this() = this(Identifiable.randomUID("ListCustomModels")) @@ -283,7 +283,7 @@ object GetCustomModel extends ComplexParamsReadable[GetCustomModel] class GetCustomModel(override val uid: String) extends CognitiveServicesBase(uid) with HasCognitiveServiceInput with HasInternalJsonOutputParser - with HasSetLocation with BasicLogging with HasModelID { + with HasSetLocation with HasSetLinkedService with BasicLogging with HasModelID { logClass() def this() = this(Identifiable.randomUID("GetCustomModel")) diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToText.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToText.scala index b52c36583c..52706fba99 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToText.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToText.scala @@ -21,7 +21,7 @@ object SpeechToText extends ComplexParamsReadable[SpeechToText] with Serializabl class SpeechToText(override val uid: String) extends CognitiveServicesBase(uid) with HasCognitiveServiceInput with HasInternalJsonOutputParser with HasSetLocation with BasicLogging - with HasSetLinkedService { + with HasSetLinkedServiceUsingLocation { logClass() def this() = this(Identifiable.randomUID("SpeechToText")) diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToTextSDK.scala index 8e261f5bfe..af86ef1f9d 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/SpeechToTextSDK.scala @@ -77,7 +77,7 @@ private[ml] class BlockingQueueIterator[T](lbq: LinkedBlockingQueue[Option[T]], abstract class SpeechSDKBase extends Transformer with HasSetLocation with HasServiceParams with HasOutputCol with HasURL with HasSubscriptionKey with ComplexParamsWritable with BasicLogging - with HasSetLinkedService { + with HasSetLinkedServiceUsingLocation { type ResponseType <: SharedSpeechFields @@ -198,16 +198,6 @@ abstract class SpeechSDKBase extends Transformer def urlPath: String = "/sts/v1.0/issuetoken" - override def setLinkedService(v: String): this.type = { - val classPath = "mssparkutils.cognitiveService" - val linkedServiceClass = ScalaClassLoader(getClass.getClassLoader).tryToLoadClass(classPath) - val locationMethod = linkedServiceClass.get.getMethod("getLocation", v.getClass) - val keyMethod = linkedServiceClass.get.getMethod("getKey", v.getClass) - val location = locationMethod.invoke(linkedServiceClass.get, v).toString - val key = keyMethod.invoke(linkedServiceClass.get, v).toString - setLocation(location) - setSubscriptionKey(key) - } setDefault(language -> Left("en-us")) setDefault(profanity -> Left("Masked")) diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalytics.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalytics.scala index c85f7780f8..ddbda40e49 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalytics.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalytics.scala @@ -95,6 +95,12 @@ abstract class TextAnalyticsBase(override val uid: String) extends CognitiveServ override protected def getInternalTransformer(schema: StructType): PipelineModel = { val dynamicParamColName = DatasetExtensions.findUnusedColumnName("dynamic", schema) + val missingRequiredParams = this.getRequiredParams.filter { + p => this.get(p).isEmpty && this.getDefault(p).isEmpty + } + assert(missingRequiredParams.isEmpty, + s"Missing required params: ${missingRequiredParams.map(s => s.name).mkString("(", ", ", ")")}") + def reshapeToArray(parameterName: String): Option[(Transformer, String, String)] = { val reshapedColName = DatasetExtensions.findUnusedColumnName(parameterName, schema) getVectorParamMap.get(parameterName).flatMap { @@ -296,6 +302,18 @@ class NER(override val uid: String) extends TextAnalyticsBase(uid) with BasicLog def urlPath: String = "/text/analytics/v3.0/entities/recognition/general" } +object PII extends ComplexParamsReadable[PII] + +class PII(override val uid: String) extends TextAnalyticsBase(uid) with BasicLogging { + logClass() + + def this() = this(Identifiable.randomUID("PII")) + + override def responseDataType: StructType = PIIResponseV3.schema + + def urlPath: String = "/text/analytics/v3.1/entities/recognition/pii" +} + object LanguageDetector extends ComplexParamsReadable[LanguageDetector] class LanguageDetector(override val uid: String) diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDK.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDK.scala index d84d0dc299..71acdaf669 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDK.scala @@ -30,12 +30,33 @@ import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} trait HasOptions extends HasServiceParams { - val options = new Param[TextAnalyticsRequestOptionsV4]( - this, name = "options", "text analytics request options") + val modelVersion = new Param[String]( + this, name = "modelVersion", "modelVersion option") - def getOptions: Option[TextAnalyticsRequestOptionsV4] = get(options) + def getModelVersion: Option[String] = get(modelVersion) - def setOptions(v: TextAnalyticsRequestOptionsV4): this.type = set(options, v) + def setModelVersion(v: String): this.type = set(modelVersion, v) + + val includeStatistics = new Param[Boolean]( + this, name = "includeStatistics", "includeStatistics option") + + def getIncludeStatistics: Option[Boolean] = get(includeStatistics) + + def setIncludeStatistics(v: Boolean): this.type = set(includeStatistics, v) + + val disableServiceLogs = new Param[Boolean]( + this, name = "disableServiceLogs", "disableServiceLogs option") + + def getDisableServiceLogs: Option[Boolean] = get(disableServiceLogs) + + def setDisableServiceLogs(v: Boolean): this.type = set(disableServiceLogs, v) + + val includeOpinionMining = new Param[Boolean]( + this, name = "includeOpinionMining", "includeOpinionMining option") + + def getIncludeOpinionMining: Option[Boolean] = get(includeOpinionMining) + + def setIncludeOpinionMining(v: Boolean): this.type = set(includeOpinionMining, v) } @@ -49,7 +70,6 @@ abstract class TextAnalyticsSDKBase[T]() val responseBinding: SparkBindings[TAResponseV4[T]] def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], text: Seq[String], lang: Seq[String]): TAResponseV4[T] @@ -68,14 +88,9 @@ abstract class TextAnalyticsSDKBase[T]() .map(ct => Duration.fromNanos((ct * math.pow(10, 9)).toLong)) //scalastyle:ignore magic.number .getOrElse(Duration.Inf) - val requestOptions = get(options) match { - case Some(o) => Some(toSDK(o)) - case None => None - } - val futures = rows.map { row => Future { - val results = invokeTextAnalytics(client, requestOptions, getValue(row, text), getValue(row, language)) + val results = invokeTextAnalytics(client, getValue(row, text), getValue(row, language)) Row.fromSeq(row.toSeq ++ Seq(toRow(results))) // Adding a new column }(ExecutionContext.global) } @@ -153,14 +168,18 @@ class LanguageDetectionV4(override val uid: String) override val responseBinding: SparkBindings[TAResponseV4[DetectedLanguageV4]] = DetectLanguageResponseV4 override def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], input: Seq[String], hints: Seq[String]): TAResponseV4[DetectedLanguageV4] = { val documents = (input, hints, input.indices).zipped.map { (doc, hint, i) => new DetectLanguageInput(i.toString, doc, hint) }.asJava - val response = client.detectLanguageBatchWithResponse(documents, options.orNull, Context.NONE).getValue + val options = new TextAnalyticsRequestOptions() + .setModelVersion(getModelVersion.getOrElse("latest")) + .setIncludeStatistics(getIncludeStatistics.getOrElse(false)) + .setServiceLogsDisabled(getDisableServiceLogs.getOrElse(false)) + + val response = client.detectLanguageBatchWithResponse(documents, options, Context.NONE).getValue toResponse(response.asScala, response.getModelVersion) } } @@ -176,14 +195,17 @@ class KeyphraseExtractionV4(override val uid: String) override val responseBinding: SparkBindings[TAResponseV4[KeyphraseV4]] = KeyPhraseResponseV4 override def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], input: Seq[String], lang: Seq[String]): TAResponseV4[KeyphraseV4] = { val documents = (input, lang, lang.indices).zipped.map { (doc, lang, i) => new TextDocumentInput(i.toString, doc).setLanguage(lang) }.asJava + val options = new TextAnalyticsRequestOptions() + .setModelVersion(getModelVersion.getOrElse("latest")) + .setIncludeStatistics(getIncludeStatistics.getOrElse(false)) + .setServiceLogsDisabled(getDisableServiceLogs.getOrElse(false)) - val response = client.extractKeyPhrasesBatchWithResponse(documents, options.orNull, Context.NONE).getValue + val response = client.extractKeyPhrasesBatchWithResponse(documents, options, Context.NONE).getValue toResponse(response.asScala, response.getModelVersion) } } @@ -194,19 +216,25 @@ class TextSentimentV4(override val uid: String) extends TextAnalyticsSDKBase[SentimentScoredDocumentV4]() { logClass() - def this() = this(Identifiable.randomUID("KeyphraseExtractionV4")) + def this() = this(Identifiable.randomUID("TextSentimentV4")) override val responseBinding: SparkBindings[TAResponseV4[SentimentScoredDocumentV4]] = SentimentResponseV4 override def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], input: Seq[String], lang: Seq[String]): TAResponseV4[SentimentScoredDocumentV4] = { + val documents = (input, lang, lang.indices).zipped.map { (doc, lang, i) => new TextDocumentInput(i.toString, doc).setLanguage(lang) }.asJava - val response = client.analyzeSentimentBatchWithResponse(documents, options.orNull, Context.NONE).getValue + val options = new AnalyzeSentimentOptions() + .setModelVersion(getModelVersion.getOrElse("latest")) + .setIncludeStatistics(getIncludeStatistics.getOrElse(false)) + .setServiceLogsDisabled(getDisableServiceLogs.getOrElse(false)) + .setIncludeOpinionMining(getIncludeOpinionMining.getOrElse(true)) + + val response = client.analyzeSentimentBatchWithResponse(documents, options, Context.NONE).getValue toResponse(response.asScala, response.getModelVersion) } } @@ -221,14 +249,18 @@ class PIIV4(override val uid: String) extends TextAnalyticsSDKBase[PIIEntityColl override val responseBinding: SparkBindings[TAResponseV4[PIIEntityCollectionV4]] = PIIResponseV4 override def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], input: Seq[String], lang: Seq[String]): TAResponseV4[PIIEntityCollectionV4] = { val documents = (input, lang, lang.indices).zipped.map { (doc, lang, i) => new TextDocumentInput(i.toString, doc).setLanguage(lang) }.asJava - val response = client.recognizePiiEntitiesBatchWithResponse(documents, null, Context.NONE).getValue + val options = new RecognizePiiEntitiesOptions() + .setModelVersion(getModelVersion.getOrElse("latest")) + .setIncludeStatistics(getIncludeStatistics.getOrElse(false)) + .setServiceLogsDisabled(getDisableServiceLogs.getOrElse(false)) + + val response = client.recognizePiiEntitiesBatchWithResponse(documents, options, Context.NONE).getValue toResponse(response.asScala, response.getModelVersion) } } @@ -243,14 +275,17 @@ class HealthcareV4(override val uid: String) extends TextAnalyticsSDKBase[Health override val responseBinding: SparkBindings[TAResponseV4[HealthEntitiesResultV4]] = HealthcareResponseV4 override def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], input: Seq[String], lang: Seq[String]): TAResponseV4[HealthEntitiesResultV4] = { val documents = (input, lang, lang.indices).zipped.map { (doc, lang, i) => new TextDocumentInput(i.toString, doc).setLanguage(lang) }.asJava + val options = new AnalyzeHealthcareEntitiesOptions() + .setModelVersion(getModelVersion.getOrElse("latest")) + .setIncludeStatistics(getIncludeStatistics.getOrElse(false)) + .setServiceLogsDisabled(getDisableServiceLogs.getOrElse(false)) - val poller = client.beginAnalyzeHealthcareEntities(documents, null, Context.NONE) + val poller = client.beginAnalyzeHealthcareEntities(documents, options, Context.NONE) poller.waitForCompletion() val pagedResults = poller.getFinalResult.asScala @@ -268,14 +303,18 @@ class EntityLinkingV4(override val uid: String) extends TextAnalyticsSDKBase[Lin override val responseBinding: SparkBindings[TAResponseV4[LinkedEntityCollectionV4]] = LinkedEntityResponseV4 override def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], input: Seq[String], lang: Seq[String]): TAResponseV4[LinkedEntityCollectionV4] = { val documents = (input, lang, lang.indices).zipped.map { (doc, lang, i) => new TextDocumentInput(i.toString, doc).setLanguage(lang) }.asJava - val response = client.recognizeLinkedEntitiesBatchWithResponse(documents, options.orNull, Context.NONE).getValue + val options = new RecognizeLinkedEntitiesOptions() + .setModelVersion(getModelVersion.getOrElse("latest")) + .setIncludeStatistics(getIncludeStatistics.getOrElse(false)) + .setServiceLogsDisabled(getDisableServiceLogs.getOrElse(false)) + + val response = client.recognizeLinkedEntitiesBatchWithResponse(documents, options, Context.NONE).getValue toResponse(response.asScala, response.getModelVersion) } } @@ -290,14 +329,18 @@ class NERV4(override val uid: String) extends TextAnalyticsSDKBase[NERCollection override val responseBinding: SparkBindings[TAResponseV4[NERCollectionV4]] = NERResponseV4 override def invokeTextAnalytics(client: TextAnalyticsClient, - options: Option[TextAnalyticsRequestOptions], input: Seq[String], lang: Seq[String]): TAResponseV4[NERCollectionV4] = { val documents = (input, lang, lang.indices).zipped.map { (doc, lang, i) => new TextDocumentInput(i.toString, doc).setLanguage(lang) }.asJava - val response = client.recognizeEntitiesBatchWithResponse(documents, options.orNull, Context.NONE).getValue + val options = new RecognizeEntitiesOptions() + .setModelVersion(getModelVersion.getOrElse("latest")) + .setIncludeStatistics(getIncludeStatistics.getOrElse(false)) + .setServiceLogsDisabled(getDisableServiceLogs.getOrElse(false)) + + val response = client.recognizeEntitiesBatchWithResponse(documents, options, Context.NONE).getValue toResponse(response.asScala, response.getModelVersion) } } diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDKSchemasV4.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDKSchemasV4.scala index 9e17dc0efd..286af23123 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDKSchemasV4.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSDKSchemasV4.scala @@ -39,10 +39,6 @@ case class TAWarningV4(warningCode: String, message: String) case class TextDocumentInputs(id: String, text: String) -case class TextAnalyticsRequestOptionsV4(modelVersion: String, - includeStatistics: Boolean, - disableServiceLogs: Boolean) - case class KeyphraseV4(keyPhrases: Seq[String], warnings: Seq[TAWarningV4]) case class SentimentConfidenceScoreV4(negative: Double, neutral: Double, positive: Double) @@ -55,11 +51,11 @@ case class SentimentScoredDocumentV4(sentiment: String, case class SentimentSentenceV4(text: String, sentiment: String, confidenceScores: SentimentConfidenceScoreV4, - opinion: Option[Seq[OpinionV4]], + opinions: Option[Seq[OpinionV4]], offset: Int, length: Int) -case class OpinionV4(target: TargetV4, assessment: Seq[AssessmentV4]) +case class OpinionV4(target: TargetV4, assessments: Seq[AssessmentV4]) case class TargetV4(text: String, sentiment: String, @@ -336,13 +332,6 @@ object SDKConverters { entity.getEntities.getWarnings.asScala.toSeq.map(fromSDK)) } - def toSDK(textAnalyticsRequestOptionsV4: TextAnalyticsRequestOptionsV4): TextAnalyticsRequestOptions = { - new TextAnalyticsRequestOptions() - .setModelVersion(textAnalyticsRequestOptionsV4.modelVersion) - .setIncludeStatistics(textAnalyticsRequestOptionsV4.includeStatistics) - .setServiceLogsDisabled(textAnalyticsRequestOptionsV4.disableServiceLogs) - } - def unpackResult[T <: TextAnalyticsResult, U](result: T)(implicit converter: T => U): (Option[TAErrorV4], Option[DocumentStatistics], Option[U]) = { if (result.isError) { diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSchemas.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSchemas.scala index e37b41ee62..5c0bc3a464 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSchemas.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextAnalyticsSchemas.scala @@ -105,6 +105,22 @@ case class NEREntityV3(text: String, length: Integer, confidenceScore: Double) +// NER Pii Schemas + +object PIIResponseV3 extends SparkBindings[TAResponse[PIIDocV3]] + +case class PIIDocV3(id: String, + entities: Seq[PIIEntityV3], + warnings: Seq[TAWarning], + statistics: Option[DocumentStatistics]) + +case class PIIEntityV3(text: String, + category: String, + subcategory: Option[String] = None, + offset: Integer, + length: Integer, + confidenceScore: Double) + // KeyPhrase Schemas object KeyPhraseResponseV3 extends SparkBindings[TAResponse[KeyPhraseScoreV3]] diff --git a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextTranslator.scala b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextTranslator.scala index b6d2623b6f..2eb1f6adf6 100644 --- a/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextTranslator.scala +++ b/cognitive/src/main/scala/com/microsoft/ml/spark/cognitive/TextTranslator.scala @@ -3,14 +3,19 @@ package com.microsoft.ml.spark.cognitive +import com.microsoft.ml.spark.core.schema.DatasetExtensions +import com.microsoft.ml.spark.io.http.SimpleHTTPTransformer import com.microsoft.ml.spark.logging.BasicLogging -import org.apache.http.client.methods.{HttpEntityEnclosingRequestBase, HttpRequestBase} +import com.microsoft.ml.spark.stages.{DropColumns, Lambda, UDFTransformer} +import org.apache.http.client.methods.{HttpEntityEnclosingRequestBase, HttpPost, HttpRequestBase} import org.apache.http.entity.{AbstractHttpEntity, ContentType, StringEntity} -import org.apache.spark.ml.ComplexParamsReadable +import org.apache.spark.injections.UDFUtils +import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel, Transformer} import org.apache.spark.ml.param.ServiceParam import org.apache.spark.ml.util.Identifiable import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{ArrayType, DataType, StructType} +import org.apache.spark.sql.functions.{array, col, lit, struct} +import org.apache.spark.sql.types.{ArrayType, DataType, StringType, StructType} import spray.json.DefaultJsonProtocol._ import spray.json._ @@ -38,6 +43,8 @@ trait HasTextInput extends HasServiceParams { def setText(v: Seq[String]): this.type = setScalarParam(text, v) + def setText(v: String): this.type = setScalarParam(text, Seq(v)) + def getTextCol: String = getVectorParam(text) def setTextCol(v: String): this.type = setVectorParam(text, v) @@ -72,79 +79,124 @@ trait HasToLanguage extends HasServiceParams { def getToLanguageCol: String = getVectorParam(toLanguage) } -trait TextAsOnlyEntity extends HasTextInput with HasCognitiveServiceInput { - - override protected def prepareEntity: Row => Option[AbstractHttpEntity] = { - r => - Some(new StringEntity( - getValueOpt(r, text) - .map(x => x.map(y => Map("Text" -> y))).toJson.compactPrint, ContentType.APPLICATION_JSON)) - } -} - -abstract class TextTranslatorBase(override val uid: String) extends CognitiveServicesBase(uid) - with HasInternalJsonOutputParser with HasCognitiveServiceInput with HasSubscriptionRegion - with HasSetLocation with HasSetLinkedService { - - protected val subscriptionRegionHeaderName = "Ocp-Apim-Subscription-Region" - - override protected def contentType: Row => String = { _ => "application/json; charset=UTF-8" } +trait TextAsOnlyEntity extends HasTextInput with HasCognitiveServiceInput with HasSubscriptionRegion { override protected def inputFunc(schema: StructType): Row => Option[HttpRequestBase] = { - val rowToUrl = prepareUrl - val rowToEntity = prepareEntity; { row: Row => if (shouldSkip(row)) { None + } else if (getValue(row, text).forall(Option(_).isEmpty)) { + None } else { - val req = prepareMethod() - req.setURI(new URI(rowToUrl(row))) - getValueOpt(row, subscriptionKey).foreach( - req.setHeader(subscriptionKeyHeaderName, _)) - getValueOpt(row, subscriptionRegion).foreach( - req.setHeader(subscriptionRegionHeaderName, _) - ) - req.setHeader("Content-Type", contentType(row)) - - req match { - case er: HttpEntityEnclosingRequestBase => - rowToEntity(row).foreach(er.setEntity) - case _ => + val urlParams: Array[ServiceParam[Any]] = + getUrlParams.asInstanceOf[Array[ServiceParam[Any]]] + + val texts = getValue(row, text) + + val base = getUrl + "?api-version=3.0" + val appended = if (!urlParams.isEmpty) { + "&" + URLEncodingUtils.format(urlParams.flatMap(p => + getValueOpt(row, p).map { + val pName = p.name match { + case "fromLanguage" => "from" + case "toLanguage" => "to" + case s => s + } + v => pName -> p.toValueString(v) + } + ).toMap) + } else { + "" } - Some(req) + + val post = new HttpPost(base + appended) + getValueOpt(row, subscriptionKey).foreach(post.setHeader("Ocp-Apim-Subscription-Key", _)) + getValueOpt(row, subscriptionRegion).foreach(post.setHeader("Ocp-Apim-Subscription-Region", _)) + post.setHeader("Content-Type", "application/json; charset=UTF-8") + + val json = texts.map(s => Map("Text" -> s)).toJson.compactPrint + post.setEntity(new StringEntity(json, "UTF-8")) + Some(post) } } } - override protected def prepareUrl: Row => String = { - val urlParams: Array[ServiceParam[Any]] = - getUrlParams.asInstanceOf[Array[ServiceParam[Any]]]; + override protected def prepareEntity: Row => Option[AbstractHttpEntity] = { _ => None } +} + +abstract class TextTranslatorBase(override val uid: String) extends CognitiveServicesBase(uid) + with HasInternalJsonOutputParser with HasSubscriptionRegion + with HasSetLocation with HasSetLinkedServiceUsingLocation { + - // This semicolon is needed to avoid argument confusion - def replaceName(s: String): String = { - if (s == "fromLanguage") { - "from" - } else if (s == "toLanguage") { - "to" - } else { - s + protected def reshapeColumns(schema: StructType, parameterNames: Seq[String]) + : Seq[(Transformer, String, String)] = { + + def reshapeToArray(parameterName: String): Option[(Transformer, String, String)] = { + val reshapedColName = DatasetExtensions.findUnusedColumnName(parameterName, schema) + getVectorParamMap.get(parameterName).flatMap { + case c if schema(c).dataType == StringType => + Some((Lambda(_.withColumn(reshapedColName, array(col(getVectorParam(parameterName))))), + getVectorParam(parameterName), + reshapedColName)) + case _ => None } } - { row: Row => - val base = getUrl + "?api-version=3.0" - val appended = if (!urlParams.isEmpty) { - "&" + URLEncodingUtils.format(urlParams.flatMap(p => - getValueOpt(row, p).map { - v => replaceName(p.name) -> p.toValueString(v) - } - ).toMap) - } else { - "" + + parameterNames.flatMap(x => reshapeToArray(x)) + } + + // noinspection ScalaStyle + protected def customGetInternalTransformer(schema: StructType, + parameterNames: Seq[String]): PipelineModel = { + val dynamicParamColName = DatasetExtensions.findUnusedColumnName("dynamic", schema) + + val missingRequiredParams = this.getRequiredParams.filter { + p => this.get(p).isEmpty && this.getDefault(p).isEmpty + } + assert(missingRequiredParams.isEmpty, + s"Missing required params: ${missingRequiredParams.map(s => s.name).mkString("(", ", ", ")")}") + + val reshapeCols = reshapeColumns(schema, parameterNames) + + val newColumnMapping = reshapeCols.map { + case (_, oldCol, newCol) => (oldCol, newCol) + }.toMap + + val columnsToGroup = getVectorParamMap.values.size match { + case 0 => getVectorParamMap.values.toList.map(col) match { + case Nil => Seq(lit(false).alias("placeholder")) + case l => l } - base + appended + case _ => getVectorParamMap.map { case (_, oldCol) => + val newCol = newColumnMapping.getOrElse(oldCol, oldCol) + col(newCol).alias(oldCol) + }.toSeq } + + val stages = reshapeCols.map(_._1).toArray ++ Array( + Lambda(_.withColumn( + dynamicParamColName, + struct(columnsToGroup: _*))), + new SimpleHTTPTransformer() + .setInputCol(dynamicParamColName) + .setOutputCol(getOutputCol) + .setInputParser(getInternalInputParser(schema)) + .setOutputParser(getInternalOutputParser(schema)) + .setHandler(getHandler) + .setConcurrency(getConcurrency) + .setConcurrentTimeout(get(concurrentTimeout)) + .setErrorCol(getErrorCol), + new DropColumns().setCols(Array( + dynamicParamColName) ++ newColumnMapping.values.toArray.asInstanceOf[Array[String]]) + ) + + NamespaceInjections.pipelineModel(stages) } + override protected def getInternalTransformer(schema: StructType): PipelineModel = + customGetInternalTransformer(schema, Seq("text")) + override def setLocation(v: String): this.type = { setSubscriptionRegion(v) setUrl("https://api.cognitive.microsofttranslator.com/" + urlPath) @@ -162,6 +214,51 @@ class Translate(override val uid: String) extends TextTranslatorBase(uid) def urlPath: String = "translate" + override protected def inputFunc(schema: StructType): Row => Option[HttpRequestBase] = { + { row: Row => + if (shouldSkip(row)) { + None + } else if (getValue(row, text).forall(Option(_).isEmpty)) { + None + } else if (getValue(row, toLanguage).forall(Option(_).isEmpty)) { + None + } else { + val urlParams: Array[ServiceParam[Any]] = + getUrlParams.asInstanceOf[Array[ServiceParam[Any]]] + + val texts = getValue(row, text) + + val base = getUrl + "?api-version=3.0" + val appended = if (!urlParams.isEmpty) { + "&" + URLEncodingUtils.format(urlParams.flatMap(p => + getValueOpt(row, p).map { + val pName = p.name match { + case "fromLanguage" => "from" + case "toLanguage" => "to" + case s => s + } + v => pName -> p.toValueString(v) + } + ).toMap) + } else { + "" + } + + val post = new HttpPost(base + appended) + getValueOpt(row, subscriptionKey).foreach(post.setHeader("Ocp-Apim-Subscription-Key", _)) + getValueOpt(row, subscriptionRegion).foreach(post.setHeader("Ocp-Apim-Subscription-Region", _)) + post.setHeader("Content-Type", "application/json; charset=UTF-8") + + val json = texts.map(s => Map("Text" -> s)).toJson.compactPrint + post.setEntity(new StringEntity(json, "UTF-8")) + Some(post) + } + } + } + + override protected def getInternalTransformer(schema: StructType): PipelineModel = + customGetInternalTransformer(schema, Seq("text", "toLanguage")) + val toLanguage = new ServiceParam[Seq[String]](this, "toLanguage", "Specifies the language of the output" + " text. The target language must be one of the supported languages included in the translation scope." + " For example, use to=de to translate to German. It's possible to translate to multiple languages simultaneously" + @@ -171,6 +268,8 @@ class Translate(override val uid: String) extends TextTranslatorBase(uid) def setToLanguage(v: Seq[String]): this.type = setScalarParam(toLanguage, v) + def setToLanguage(v: String): this.type = setScalarParam(toLanguage, Seq(v)) + def setToLanguageCol(v: String): this.type = setVectorParam(toLanguage, v) val fromLanguage = new ServiceParam[String](this, "fromLanguage", "Specifies the language of the input" + @@ -186,8 +285,8 @@ class Translate(override val uid: String) extends TextTranslatorBase(uid) val textType = new ServiceParam[String](this, "textType", "Defines whether the text being" + " translated is plain text or HTML text. Any HTML needs to be a well-formed, complete element. Possible values" + " are: plain (default) or html.", { - case Left(_) => true - case Right(s) => Set("plain", "html")(s) + case Left(s) => Set("plain", "html")(s) + case Right(_) => true }, isURLParam = true) def setTextType(v: String): this.type = setScalarParam(textType, v) @@ -206,8 +305,8 @@ class Translate(override val uid: String) extends TextTranslatorBase(uid) val profanityAction = new ServiceParam[String](this, "profanityAction", "Specifies how" + " profanities should be treated in translations. Possible values are: NoAction (default), Marked or Deleted. ", { - case Left(_) => true - case Right(s) => Set("NoAction", "Marked", "Deleted")(s) + case Left(s) => Set("NoAction", "Marked", "Deleted")(s) + case Right(_) => true }, isURLParam = true) def setProfanityAction(v: String): this.type = setScalarParam(profanityAction, v) @@ -216,8 +315,8 @@ class Translate(override val uid: String) extends TextTranslatorBase(uid) val profanityMarker = new ServiceParam[String](this, "profanityMarker", "Specifies how" + " profanities should be marked in translations. Possible values are: Asterisk (default) or Tag.", { - case Left(_) => true - case Right(s) => Set("Asterisk", "Tag")(s) + case Left(s) => Set("Asterisk", "Tag")(s) + case Right(_) => true }, isURLParam = true) def setProfanityMarker(v: String): this.type = setScalarParam(profanityMarker, v) @@ -378,6 +477,8 @@ trait HasTextAndTranslationInput extends HasServiceParams { def setTextAndTranslation(v: Seq[(String, String)]): this.type = setScalarParam(textAndTranslation, v) + def setTextAndTranslation(v: (String, String)): this.type = setScalarParam(textAndTranslation, Seq(v)) + def getTextAndTranslationCol: String = getVectorParam(textAndTranslation) def setTextAndTranslationCol(v: String): this.type = setVectorParam(textAndTranslation, v) @@ -387,20 +488,66 @@ trait HasTextAndTranslationInput extends HasServiceParams { object DictionaryExamples extends ComplexParamsReadable[DictionaryExamples] class DictionaryExamples(override val uid: String) extends TextTranslatorBase(uid) - with HasTextAndTranslationInput with HasFromLanguage with HasToLanguage with BasicLogging { + with HasTextAndTranslationInput with HasFromLanguage with HasToLanguage + with HasCognitiveServiceInput with BasicLogging { logClass() def this() = this(Identifiable.randomUID("DictionaryExamples")) def urlPath: String = "dictionary/examples" - override protected def prepareEntity: Row => Option[AbstractHttpEntity] = { - r => - Some(new StringEntity( - getValue(r, textAndTranslation).asInstanceOf[Seq[Row]] - .map(x => Map("Text" -> x.getString(0), "Translation" -> x.getString(1))) - .toJson.compactPrint, ContentType.APPLICATION_JSON)) + override protected def inputFunc(schema: StructType): Row => Option[HttpRequestBase] = { + { row: Row => + if (shouldSkip(row)) { + None + } else { + val urlParams: Array[ServiceParam[Any]] = + getUrlParams.asInstanceOf[Array[ServiceParam[Any]]] + + val textAndTranslations = getValue(row, textAndTranslation) + if (textAndTranslations.isEmpty) + None + else { + + val base = getUrl + "?api-version=3.0" + val appended = if (!urlParams.isEmpty) { + "&" + URLEncodingUtils.format(urlParams.flatMap(p => + getValueOpt(row, p).map { + val pName = p.name match { + case "fromLanguage" => "from" + case "toLanguage" => "to" + case s => s + } + v => pName -> p.toValueString(v) + } + ).toMap) + } else { + "" + } + + val post = new HttpPost(base + appended) + getValueOpt(row, subscriptionKey).foreach(post.setHeader("Ocp-Apim-Subscription-Key", _)) + getValueOpt(row, subscriptionRegion).foreach(post.setHeader("Ocp-Apim-Subscription-Region", _)) + post.setHeader("Content-Type", "application/json; charset=UTF-8") + + val json = textAndTranslations.head.getClass.getTypeName match { + case "scala.Tuple2" => textAndTranslations.map( + t => Map("Text" -> t._1, "Translation" -> t._2)).toJson.compactPrint + case _ => textAndTranslations.asInstanceOf[Seq[Row]].map( + s => Map("Text" -> s.getString(0), "Translation" -> s.getString(1))).toJson.compactPrint + } + + post.setEntity(new StringEntity(json, "UTF-8")) + Some(post) + } + } + } } + override protected def prepareEntity: Row => Option[AbstractHttpEntity] = { _ => None } + + override protected def getInternalTransformer(schema: StructType): PipelineModel = + customGetInternalTransformer(schema, Seq("textAndTranslation")) + override def responseDataType: DataType = ArrayType(DictionaryExamplesResponse.schema) } diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/AnamolyDetectionSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/AnamolyDetectionSuite.scala index 9da24d479a..d3d3b556f5 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/AnamolyDetectionSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/AnamolyDetectionSuite.scala @@ -9,7 +9,7 @@ import com.microsoft.ml.spark.core.test.base.TestBase import com.microsoft.ml.spark.core.test.fuzzing.{TestObject, TransformerFuzzing} import org.apache.spark.ml.util.MLReadable import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.functions.{col, collect_list, lit, struct} +import org.apache.spark.sql.functions.{col, collect_list, lit, sort_array, struct} trait AnomalyKey { lazy val anomalyKey = sys.env.getOrElse("ANOMALY_API_KEY", Secrets.AnomalyApiKey) @@ -39,7 +39,7 @@ trait AnomalyDetectorSuiteBase extends TestBase with AnomalyKey { .withColumn("group", lit(1)) .withColumn("inputs", struct(col("timestamp"), col("value"))) .groupBy(col("group")) - .agg(collect_list(col("inputs")).alias("inputs")) + .agg(sort_array(collect_list(col("inputs"))).alias("inputs")) lazy val df2: DataFrame = Seq( ("2000-01-24T08:46:00Z", 826.0), @@ -61,7 +61,7 @@ trait AnomalyDetectorSuiteBase extends TestBase with AnomalyKey { .withColumn("group", lit(1)) .withColumn("inputs", struct(col("timestamp"), col("value"))) .groupBy(col("group")) - .agg(collect_list(col("inputs")).alias("inputs")) + .agg(sort_array(collect_list(col("inputs"))).alias("inputs")) } @@ -93,6 +93,20 @@ class DetectLastAnomalySuite extends TransformerFuzzing[DetectLastAnomaly] with assert(result.isAnomaly) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new DetectLastAnomaly() + .setSubscriptionKey(anomalyKey) + .setLocation("westus2") + .setOutputCol("anomalies") + .setErrorCol("errors") + .transform(df).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("granularity")) + assert(caught.getMessage.contains("series")) + } + override def testObjects(): Seq[TestObject[DetectLastAnomaly]] = Seq(new TestObject(ad, df)) @@ -117,6 +131,19 @@ class DetectAnomaliesSuite extends TransformerFuzzing[DetectAnomalies] with Anom assert(result.isAnomaly.count({b => b}) == 2) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new DetectAnomalies() + .setSubscriptionKey(anomalyKey) + .setLocation("westus2") + .setOutputCol("anomalies") + .transform(df).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("granularity")) + assert(caught.getMessage.contains("series")) + } + override def testObjects(): Seq[TestObject[DetectAnomalies]] = Seq(new TestObject(ad, df)) @@ -181,6 +208,19 @@ class SimpleDetectAnomaliesSuite extends TransformerFuzzing[SimpleDetectAnomalie .show(truncate=false) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new SimpleDetectAnomalies() + .setSubscriptionKey(anomalyKey) + .setLocation("westus2") + .setOutputCol("anomalies") + .setGroupbyCol("group") + .transform(sdf).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("granularity")) + } + //TODO Nulls, different cardinalities override def testObjects(): Seq[TestObject[SimpleDetectAnomalies]] = diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ComputerVisionSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ComputerVisionSuite.scala index c0f3076565..acaf07db6c 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ComputerVisionSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ComputerVisionSuite.scala @@ -493,6 +493,11 @@ class DescribeImageSuite extends TransformerFuzzing[DescribeImage] assert(tags("person") && tags("glasses")) } + override def assertDFEq(df1: DataFrame, df2: DataFrame)(implicit eq: Equality[DataFrame]): Unit = { + super.assertDFEq(df1.select("descriptions.description.tags", "descriptions.description.captions.text"), + df2.select("descriptions.description.tags", "descriptions.description.captions.text"))(eq) + } + override def testObjects(): Seq[TestObject[DescribeImage]] = Seq(new TestObject(t, df)) diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FaceSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FaceSuite.scala index c4e28aeea6..35dd109531 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FaceSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FaceSuite.scala @@ -98,6 +98,18 @@ class FindSimilarFaceSuite extends TransformerFuzzing[FindSimilarFace] with Cogn assert(numMatches === List(1, 2, 2)) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new FindSimilarFace() + .setSubscriptionKey(cognitiveKey) + .setLocation("eastus") + .setOutputCol("similar") + .transform(faceIdDF).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("faceId")) + } + override def testObjects(): Seq[TestObject[FindSimilarFace]] = Seq(new TestObject(findSimilar, faceIdDF)) @@ -147,6 +159,18 @@ class GroupFacesSuite extends TransformerFuzzing[GroupFaces] with CognitiveKey { assert(numMatches === List(2, 2, 2)) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new GroupFaces() + .setSubscriptionKey(cognitiveKey) + .setLocation("eastus") + .setOutputCol("grouping") + .transform(faceIdDF).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("faceIds")) + } + override def testObjects(): Seq[TestObject[GroupFaces]] = Seq(new TestObject(group, faceIdDF)) @@ -229,6 +253,19 @@ class IdentifyFacesSuite extends TransformerFuzzing[IdentifyFaces] with Cognitiv assert(matches === List(satyaId, bradId, bradId)) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new IdentifyFaces() + .setSubscriptionKey(cognitiveKey) + .setLocation("eastus") + .setPersonGroupId(pgId) + .setOutputCol("identified_faces") + .transform(df).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("faceIds")) + } + override def testObjects(): Seq[TestObject[IdentifyFaces]] = Seq(new TestObject(id, df)) override def reader: MLReadable[_] = IdentifyFaces diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FormRecognizerSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FormRecognizerSuite.scala index 4b97b43fee..6814ef2c37 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FormRecognizerSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/FormRecognizerSuite.scala @@ -535,6 +535,18 @@ class GetCustomModelSuite extends TransformerFuzzing[GetCustomModel] """"SALESPERSON","SERVICE ADDRESS:","SHIP TO:","SHIPPED VIA","TERMS","TOTAL","UNIT PRICE"]}}""").stripMargin) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new GetCustomModel() + .setSubscriptionKey(cognitiveKey).setLocation("eastus") + .setIncludeKeys(true) + .setOutputCol("model") + .transform(pathDf).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("modelId")) + } + override def testObjects(): Seq[TestObject[GetCustomModel]] = Seq(new TestObject(getCustomModel, pathDf)) @@ -586,6 +598,17 @@ class AnalyzeCustomModelSuite extends TransformerFuzzing[AnalyzeCustomModel] assert(results.head.getString(2) === "") } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new AnalyzeCustomModel() + .setSubscriptionKey(cognitiveKey).setLocation("eastus") + .setImageUrlCol("source").setOutputCol("form") + .transform(imageDf4).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("modelId")) + } + override def testObjects(): Seq[TestObject[AnalyzeCustomModel]] = Seq(new TestObject(analyzeCustomModel, imageDf4)) diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ImageSearchSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ImageSearchSuite.scala index d599b67a69..77829f15c5 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ImageSearchSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/ImageSearchSuite.scala @@ -103,6 +103,20 @@ class ImageSearchSuite extends TransformerFuzzing[BingImageSearch] assert(ddf.collect().head.getAs[Row]("images") != null) } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new BingImageSearch() + .setSubscriptionKey(searchKey) + .setOffsetCol("offsets") + .setCount(10) + .setImageType("photo") + .setOutputCol("images") + .transform(requestParameters).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("q")) + } + override lazy val dfEq: Equality[DataFrame] = new Equality[DataFrame] { def areEqual(a: DataFrame, b: Any): Boolean = (a.schema === b.asInstanceOf[DataFrame].schema) && diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSDKSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSDKSuite.scala index cb41bd5c68..39d625bbf3 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSDKSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSDKSuite.scala @@ -164,10 +164,9 @@ class LanguageDetectionSuiteV4 extends TestBase with DataFrameEquality with Text test("Language Detection - Overriding request options and including statistics") { val replies = getDetector - .setOptions(TextAnalyticsRequestOptionsV4( - modelVersion = "latest", - includeStatistics = true, - disableServiceLogs = false)) + .setModelVersion("latest") + .setIncludeStatistics(true) + .setDisableServiceLogs(false) .transform(df) .select("output.statistics") .collect() @@ -180,10 +179,9 @@ class LanguageDetectionSuiteV4 extends TestBase with DataFrameEquality with Text val caught = intercept[SparkException] { getDetector - .setOptions(TextAnalyticsRequestOptionsV4( - modelVersion = "oopsie doopsie", - includeStatistics = false, - disableServiceLogs = false)) + .setModelVersion("invalid model") + .setIncludeStatistics(true) + .setDisableServiceLogs(false) .transform(df) .collect() } @@ -193,10 +191,9 @@ class LanguageDetectionSuiteV4 extends TestBase with DataFrameEquality with Text test("Language Detection - Disable logs") { val replies = getDetector - .setOptions(TextAnalyticsRequestOptionsV4( - modelVersion = "latest", - includeStatistics = false, - disableServiceLogs = true)) + .setModelVersion("latest") + .setIncludeStatistics(false) + .setDisableServiceLogs(true) .transform(df) .select("output.result") .collect() @@ -244,6 +241,27 @@ class SentimentAnalysisSuiteV4 extends TestBase with DataFrameEquality with Text .setTextCol("text") .setOutputCol("output") + test("Sentiment Analysis - Include Opinion Mining") { + val replies = getDetector + .setIncludeOpinionMining(true) + .transform(batchedDF) + .select("output") + .collect() + assert(replies(0).schema(0).name == "output") + df.printSchema() + df.show() + val fromRow = SentimentResponseV4.makeFromRowConverter + + val outResponse = fromRow(replies(0).getAs[GenericRowWithSchema]("output")) + + val opinions = outResponse.result.head.get.sentences.head.opinions + + assert(opinions != null) + + assert(opinions.get.head.target.text == "rain") + assert(opinions.get.head.target.sentiment == "negative") + } + test("Sentiment Analysis - Output Assertion") { val replies = getDetector.transform(batchedDF) .select("output") @@ -313,6 +331,17 @@ class SentimentAnalysisSuiteV4 extends TestBase with DataFrameEquality with Text assert(codes(0).get(0).toString == "InvalidDocument") } + test("Sentiment Analysis - Opinion Mining") { + val replies = getDetector.transform(invalidDocDf) + .select("output.error.errorMessage", "output.error.errorCode") + .collect() + val errors = replies.map(row => row.getList(0)) + val codes = replies.map(row => row.getList(1)) + + assert(errors(0).get(0).toString == "Document text is empty.") + assert(codes(0).get(0).toString == "InvalidDocument") + } + test("Sentiment Analysis - Assert Confidence Score") { val replies = getDetector.transform(batchedDF) .select("output") @@ -531,8 +560,6 @@ class HealthcareSuiteV4 extends TestBase with DataFrameEquality with TextKey { ("en", "6-drops of Vitamin B-12 every evening") ).toDF("lang", "text") - val options: TextAnalyticsRequestOptionsV4 = new TextAnalyticsRequestOptionsV4("", true, false) - lazy val extractor: HealthcareV4 = new HealthcareV4() .setSubscriptionKey(textKey) .setLocation("eastus") diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSuite.scala index 23c84df185..c700910ee9 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TextAnalyticsSuite.scala @@ -310,7 +310,7 @@ class KeyPhraseExtractorV3Suite extends TransformerFuzzing[KeyPhraseExtractor] w println(results) assert(results(0).getSeq[String](0).toSet === Set("Hello world", "input text")) - assert(results(2).getSeq[String](0).toSet === Set("mucho tráfico", "carretera", "ayer")) + assert(results(2).getSeq[String](0).toSet === Set("mucho tráfico", "día", "carretera", "ayer")) } override def testObjects(): Seq[TestObject[KeyPhraseExtractor]] = @@ -396,3 +396,44 @@ class NERSuiteV3 extends TransformerFuzzing[NER] with TextKey { override def reader: MLReadable[_] = NER } + +class PIISuiteV3 extends TransformerFuzzing[PII] with TextKey { + import spark.implicits._ + + lazy val df: DataFrame = Seq( + ("1", "en", "My SSN is 859-98-0987"), + ("2", "en", + "Your ABA number - 111000025 - is the first 9 digits in the lower left hand corner of your personal check."), + ("3", "en", "Is 998.214.865-68 your Brazilian CPF number?") + ).toDF("id", "language", "text") + + lazy val n: PII = new PII() + .setSubscriptionKey(textKey) + .setLocation("eastus") + .setLanguage("en") + .setOutputCol("response") + + test("Basic Usage") { + val results = n.transform(df) + val matches = results.withColumn("match", + col("response") + .getItem(0) + .getItem("entities") + .getItem(0)) + .select("match") + + val testRow = matches.collect().head(0).asInstanceOf[GenericRowWithSchema] + + assert(testRow.getAs[String]("text") === "859-98-0987") + assert(testRow.getAs[Int]("offset") === 10) + assert(testRow.getAs[Int]("length") === 11) + assert(testRow.getAs[Double]("confidenceScore") > 0.6) + assert(testRow.getAs[String]("category") === "USSocialSecurityNumber") + + } + + override def testObjects(): Seq[TestObject[PII]] = + Seq(new TestObject[PII](n, df)) + + override def reader: MLReadable[_] = PII +} diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TranslatorSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TranslatorSuite.scala index 63f6c8fdae..29e780ac7f 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TranslatorSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split1/TranslatorSuite.scala @@ -10,7 +10,6 @@ import com.microsoft.ml.spark.core.test.fuzzing.{TestObject, TransformerFuzzing} import org.apache.spark.ml.util.MLReadable import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{col, flatten} -import org.scalactic.Equality trait TranslatorKey { lazy val translatorKey: String = sys.env.getOrElse("TRANSLATOR_KEY", Secrets.TranslatorKey) @@ -24,7 +23,7 @@ trait TranslatorUtils extends TestBase { lazy val textDf1: DataFrame = Seq(List("Hello, what is your name?")).toDF("text") - lazy val textDf2: DataFrame = Seq(List("Hello, what is your name?", "Bye")).toDF("text") + lazy val textDf2: DataFrame = Seq(List("Hello, what is your name?", "Bye")).toDF("text") lazy val textDf3: DataFrame = Seq(List("This is bullshit.")).toDF("text") @@ -34,6 +33,12 @@ trait TranslatorUtils extends TestBase { lazy val textDf5: DataFrame = Seq(List("The word word " + "or phrase is a dictionary entry.")).toDF("text") + lazy val textDf6: DataFrame = Seq(("Hi, this is Synapse!", "zh-Hans"), + (null, "zh-Hans"), ("test", null)) + .toDF("text", "language") + + lazy val emptyDf: DataFrame = Seq("").toDF() + } class TranslateSuite extends TransformerFuzzing[Translate] @@ -46,24 +51,47 @@ class TranslateSuite extends TransformerFuzzing[Translate] .setOutputCol("translation") .setConcurrency(5) - def translationTextTest(translator: Translate, - df: DataFrame, - expectString: String): Boolean = { - val results = translator + def getTranslationTextResult(translator: Translate, + df: DataFrame): DataFrame = { + translator .transform(df) .withColumn("translation", flatten(col("translation.translations"))) .withColumn("translation", col("translation.text")) - .select("translation").collect() - val headStr = results.head.getSeq(0).mkString("\n") - headStr === expectString + .select("translation") } test("Translate multiple pieces of text with language autodetection") { - assert( - translationTextTest( - translate.setToLanguage(Seq("zh-Hans")), textDf2, "你好,你叫什么名字?\n再见" - ) - ) + val result1 = getTranslationTextResult(translate.setToLanguage(Seq("zh-Hans")), textDf2).collect() + assert(result1(0).getSeq(0).mkString("\n") == "你好,你叫什么名字?\n再见") + + val translate1: Translate = new Translate() + .setSubscriptionKey(translatorKey) + .setLocation("eastus") + .setText("Hi, this is Synapse!") + .setOutputCol("translation") + .setConcurrency(5) + val result3 = getTranslationTextResult(translate1.setToLanguage("zh-Hans"), emptyDf).collect() + assert(result3(0).getSeq(0).mkString("\n") == "嗨, 这是突触!") + + val translate2: Translate = new Translate() + .setSubscriptionKey(translatorKey) + .setLocation("eastus") + .setTextCol("text") + .setToLanguageCol("language") + .setOutputCol("translation") + .setConcurrency(5) + val result4 = getTranslationTextResult(translate2, textDf6).collect() + assert(result4(0).getSeq(0).mkString("") == "嗨, 这是突触!") + assert(result4(1).get(0) == null) + assert(result4(2).get(0) == null) + } + + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + translate.transform(textDf2).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("toLanguage")) } test("Translate with transliteration") { @@ -80,31 +108,22 @@ class TranslateSuite extends TransformerFuzzing[Translate] } test("Translate to multiple languages") { - assert( - translationTextTest( - translate.setToLanguage(Seq("zh-Hans", "de")), textDf1, "你好,你叫什么名字?\nHallo, wie heißt du?" - ) - ) + val result1 = getTranslationTextResult(translate.setToLanguage(Seq("zh-Hans", "de")), textDf1).collect() + assert(result1(0).getSeq(0).mkString("\n") == "你好,你叫什么名字?\nHallo, wie heißt du?") } test("Handle profanity") { - assert( - translationTextTest( - translate.setFromLanguage("en").setToLanguage(Seq("de")).setProfanityAction("Marked"), - textDf3, - "Das ist ***." // problem with Rest API "freaking" -> the marker disappears *** no difference - ) - ) + val result1 = getTranslationTextResult( + translate.setFromLanguage("en").setToLanguage(Seq("de")).setProfanityAction("Marked"), textDf3).collect() + assert(result1(0).getSeq(0).mkString("\n") == "Das ist ***.") + // problem with Rest API "freaking" -> the marker disappears *** no difference } test("Translate content with markup and decide what's translated") { - assert( - translationTextTest( - translate.setFromLanguage("en").setToLanguage(Seq("zh-Hans")).setTextType("html"), - textDf4, - "
This will not be translated.
这将被翻译。
" - ) - ) + val result1 = getTranslationTextResult( + translate.setFromLanguage("en").setToLanguage(Seq("zh-Hans")).setTextType("html"), textDf4).collect() + assert(result1(0).getSeq(0).mkString("\n") == + "
This will not be translated.
这将被翻译。
") } test("Obtain alignment information") { @@ -138,15 +157,12 @@ class TranslateSuite extends TransformerFuzzing[Translate] } test("Translate with dynamic dictionary") { - assert( - translationTextTest( - translate.setToLanguage(Seq("de")), textDf5, "Das Wort wordomatic ist ein Wörterbucheintrag." - ) - ) + val result1 = getTranslationTextResult(translate.setToLanguage(Seq("de")), textDf5).collect() + assert(result1(0).getSeq(0).mkString("\n") == "Das Wort wordomatic ist ein Wörterbucheintrag.") } override def testObjects(): Seq[TestObject[Translate]] = - Seq(new TestObject(translate, textDf1)) + Seq(new TestObject(translate.setToLanguage(Seq("zh-Hans")), textDf1)) override def reader: MLReadable[_] = Translate } @@ -176,6 +192,20 @@ class TransliterateSuite extends TransformerFuzzing[Transliterate] assert(results.head.getSeq(1).mkString("\n") === "Latn\nLatn") } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new Transliterate() + .setSubscriptionKey(translatorKey) + .setLocation("eastus") + .setTextCol("text") + .transform(textDf2).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("language")) + assert(caught.getMessage.contains("fromScript")) + assert(caught.getMessage.contains("toScript")) + } + override def testObjects(): Seq[TestObject[Transliterate]] = Seq(new TestObject(transliterate, transDf)) @@ -252,6 +282,19 @@ class DictionaryLookupSuite extends TransformerFuzzing[DictionaryLookup] assert(headStr === "volar\nmosca\noperan\npilotar\nmoscas\nmarcha") } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new DictionaryLookup() + .setSubscriptionKey(translatorKey) + .setLocation("eastus") + .setTextCol("text") + .transform(textDf2).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("fromLanguage")) + assert(caught.getMessage.contains("toLanguage")) + } + override def testObjects(): Seq[TestObject[DictionaryLookup]] = Seq(new TestObject(dictionaryLookup, dictDf)) @@ -270,21 +313,47 @@ class DictionaryExamplesSuite extends TransformerFuzzing[DictionaryExamples] .setLocation("eastus") .setFromLanguage("en") .setToLanguage("es") - .setTextAndTranslationCol("textAndTranslation") .setOutputCol("result") - test("Dictionary Examples") { - val results = dictionaryExamples.transform(dictDf) + def dictionaryExamplesTest(dictExamples: DictionaryExamples, + df: DataFrame): DataFrame = { + dictExamples + .transform(df) .withColumn("examples", flatten(col("result.examples"))) .withColumn("sourceTerm", col("examples.sourceTerm")) .withColumn("targetTerm", col("examples.targetTerm")) - .select("sourceTerm", "targetTerm").collect() - assert(results.head.getSeq(0).head.toString === "fly") - assert(results.head.getSeq(1).head.toString === "volar") + .select("sourceTerm", "targetTerm") + } + + test("Dictionary Examples") { + val result1 = dictionaryExamplesTest(dictionaryExamples + .setTextAndTranslationCol("textAndTranslation"), dictDf) + .collect() + assert(result1.head.getSeq(0).head.toString === "fly") + assert(result1.head.getSeq(1).head.toString === "volar") + + val result2 = dictionaryExamplesTest(dictionaryExamples + .setTextAndTranslation(("fly", "volar")), emptyDf) + .collect() + + assert(result2.head.getSeq(0).head.toString === "fly") + assert(result2.head.getSeq(1).head.toString === "volar") + } + + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new DictionaryExamples() + .setSubscriptionKey(translatorKey) + .setLocation("eastus") + .transform(dictDf).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("fromLanguage")) + assert(caught.getMessage.contains("toLanguage")) } override def testObjects(): Seq[TestObject[DictionaryExamples]] = - Seq(new TestObject(dictionaryExamples, dictDf)) + Seq(new TestObject(dictionaryExamples.setTextAndTranslationCol("textAndTranslation"), dictDf)) override def reader: MLReadable[_] = DictionaryExamples } diff --git a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split2/SpeechToTextSuite.scala b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split2/SpeechToTextSuite.scala index 46a3893021..65949bf096 100644 --- a/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split2/SpeechToTextSuite.scala +++ b/cognitive/src/test/scala/com/microsoft/ml/spark/cognitive/split2/SpeechToTextSuite.scala @@ -72,6 +72,19 @@ class SpeechToTextSuite extends TransformerFuzzing[SpeechToText] result.NBest.get.head.Display.contains("this is a test") } + test("Throw errors if required fields not set") { + val caught = intercept[AssertionError] { + new SpeechToText() + .setSubscriptionKey(cognitiveKey) + .setLocation(region) + .setOutputCol("text") + .transform(df).collect() + } + assert(caught.getMessage.contains("Missing required params")) + assert(caught.getMessage.contains("audioData")) + assert(caught.getMessage.contains("language")) + } + override def testObjects(): Seq[TestObject[SpeechToText]] = Seq(new TestObject(stt, df)) diff --git a/notebooks/Cognitive Services - Overview.ipynb b/notebooks/CognitiveServices - Overview.ipynb similarity index 100% rename from notebooks/Cognitive Services - Overview.ipynb rename to notebooks/CognitiveServices - Overview.ipynb diff --git a/notebooks/CognitiveServices - Predictive Maintenance.ipynb b/notebooks/CognitiveServices - Predictive Maintenance.ipynb new file mode 100644 index 0000000000..1c7651c0fb --- /dev/null +++ b/notebooks/CognitiveServices - Predictive Maintenance.ipynb @@ -0,0 +1,412 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "source": [ + "# Recipe: Predictive maintenance with the Cognitive Services for Big Data\n", + "\n", + "This recipe shows how you can use Azure Synapse Analytics and Cognitive Services on Apache Spark for predictive maintenance of IoT devices. We'll follow along with the [CosmosDB and Synapse Link](https://github.com/Azure-Samples/cosmosdb-synapse-link-samples) sample. To keep things simple, in this recipe we'll read the data straight from a CSV file rather than getting streamed data through CosmosDB and Synapse Link. We strongly encourage you to look over the Synapse Link sample.\n", + "\n", + "## Hypothetical scenario\n", + "\n", + "The hypothetical scenario is a Power Plant, where IoT devices are monitoring [steam turbines](https://en.wikipedia.org/wiki/Steam_turbine). The IoTSignals collection has Revolutions per minute (RPM) and Megawatts (MW) data for each turbine. Signals from steam turbines are being analyzed and anomalous signals are detected.\n", + "\n", + "There could be outliers in the data in random frequency. In those situations, RPM values will go up and MW output will go down, for circuit protection. The idea is to see the data varying at the same time, but with different signals.\n", + "\n", + "## Prerequisites\n", + "\n", + "* An Azure subscription - [Create one for free](https://azure.microsoft.com/free/cognitive-services)\n", + "* [Azure Synapse workspace](../../../synapse-analytics/quickstart-create-workspace.md) configured with a [serverless Apache Spark pool](../../../synapse-analytics/quickstart-create-apache-spark-pool-portal.md)\n", + "\n", + "## Setup\n", + "\n", + "### Create an Anomaly Detector resource\n", + "\n", + "Azure Cognitive Services are represented by Azure resources that you subscribe to. Create a resource for Translator using the [Azure portal](../../cognitive-services-apis-create-account.md) or [Azure CLI](../../cognitive-services-apis-create-account-cli.md). You can also:\n", + "\n", + "- View an existing resource in the [Azure portal](https://portal.azure.com/).\n", + "\n", + "Make note of the endpoint and the key for this resource, you'll need it in this guide." + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "912ab522-ac44-4301-802d-9fc01dc33112" + } + } + }, + { + "cell_type": "markdown", + "source": [ + "## Enter your service keys\n\nLet's start by adding your key and location." + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "df7b3b14-9abb-459d-91f2-7265f1c5969a" + } + } + }, + { + "cell_type": "code", + "source": [ + "import os\n\nservice_key = os.environ[\"ANOMALY_API_KEY\"] # Paste your anomaly detector key here\nlocation = \"westus2\" # Paste your anomaly detector location here\n\nassert (service_key is not None)" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "84a76c12-a8da-4c9f-8eaf-f675ef1fab7e" + } + }, + "outputs": [ + { + "output_type": "display_data", + "metadata": { + "application/vnd.databricks.v1+output": { + "data": "", + "errorSummary": "", + "metadata": {}, + "errorTraceType": null, + "type": "ipynbError", + "arguments": {} + } + }, + "data": { + "text/html": [ + "" + ] + } + } + ], + "execution_count": 0 + }, + { + "cell_type": "markdown", + "source": [ + "## Read data into a DataFrame\n\nNext, let's read the IoTSignals file into a DataFrame. Open a new notebook in your Synapse workspace and create a DataFrame from the file." + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "f341d984-88bf-4c17-a4c5-b9473db630bb" + } + } + }, + { + "cell_type": "code", + "source": [ + "df_signals = spark.read.csv(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/iot/IoTSignals.csv\", header=True, inferSchema=True)" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "2a0632b8-69b0-4446-b0b2-5e507d4d2fb4" + } + }, + "outputs": [ + { + "output_type": "display_data", + "metadata": { + "application/vnd.databricks.v1+output": { + "data": "", + "errorSummary": "", + "metadata": {}, + "errorTraceType": null, + "type": "ipynbError", + "arguments": {} + } + }, + "data": { + "text/html": [ + "" + ] + } + } + ], + "execution_count": 0 + }, + { + "cell_type": "markdown", + "source": [ + "### Run anomaly detection using Cognitive Services on Spark\n\nThe goal is to find instances where the signals from the IoT devices were outputting anomalous values so that we can see when something is going wrong and do predictive maintenance. To do that, let's use Anomaly Detector on Spark:" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "65b25fe2-2cb7-454b-85d8-355a42e2023b" + } + } + }, + { + "cell_type": "code", + "source": [ + "from pyspark.sql.functions import col, struct\nfrom mmlspark.cognitive import SimpleDetectAnomalies\nfrom mmlspark.core.spark import FluentAPI\n\ndetector = (SimpleDetectAnomalies()\n .setSubscriptionKey(service_key)\n .setLocation(location)\n .setOutputCol(\"anomalies\")\n .setGroupbyCol(\"grouping\")\n .setSensitivity(95)\n .setGranularity(\"secondly\"))\n\ndf_anomaly = (df_signals\n .where(col(\"unitSymbol\") == 'RPM')\n .withColumn(\"timestamp\", col(\"dateTime\").cast(\"string\"))\n .withColumn(\"value\", col(\"measureValue\").cast(\"double\"))\n .withColumn(\"grouping\", struct(\"deviceId\"))\n .mlTransform(detector)).cache()\n\ndf_anomaly.createOrReplaceTempView('df_anomaly')" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "c3155f67-5c30-4de8-b503-3a7451712945" + } + }, + "outputs": [ + { + "output_type": "display_data", + "metadata": { + "application/vnd.databricks.v1+output": { + "data": "", + "errorSummary": "", + "metadata": {}, + "errorTraceType": null, + "type": "ipynbError", + "arguments": {} + } + }, + "data": { + "text/html": [ + "" + ] + } + } + ], + "execution_count": 0 + }, + { + "cell_type": "markdown", + "source": [ + "Let's take a look at the data:" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "694c601e-fd19-4de1-a358-9759123fe1c0" + } + } + }, + { + "cell_type": "code", + "source": [ + "df_anomaly.select(\"timestamp\",\"value\",\"deviceId\",\"anomalies.isAnomaly\").show(3)\n" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "2d4feed9-e879-4a7b-93d6-6007077c3544" + } + }, + "outputs": [ + { + "output_type": "display_data", + "metadata": { + "application/vnd.databricks.v1+output": { + "data": "", + "errorSummary": "", + "metadata": {}, + "errorTraceType": null, + "type": "ipynbError", + "arguments": {} + } + }, + "data": { + "text/html": [ + "" + ] + } + } + ], + "execution_count": 0 + }, + { + "cell_type": "markdown", + "source": [ + "This cell should yield a result that looks like:\n\n| timestamp | value | deviceId | isAnomaly |\n|:--------------------|--------:|:-----------|:------------|\n| 2020-05-01 18:33:51 | 3174 | dev-7 | False |\n| 2020-05-01 18:33:52 | 2976 | dev-7 | False |\n| 2020-05-01 18:33:53 | 2714 | dev-7 | False |" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "eef1bc9c-c57a-4e8f-81aa-d791e4eb3d80" + } + } + }, + { + "cell_type": "markdown", + "source": [ + "## Visualize anomalies for one of the devices\n\nIoTSignals.csv has signals from multiple IoT devices. We'll focus on a specific device and visualize anomalous outputs from the device." + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "b5285c35-06ab-4601-af88-ce60168fd6c7" + } + } + }, + { + "cell_type": "code", + "source": [ + "df_anomaly_single_device = spark.sql(\"\"\"\nselect\n timestamp,\n measureValue,\n anomalies.expectedValue,\n anomalies.expectedValue + anomalies.upperMargin as expectedUpperValue,\n anomalies.expectedValue - anomalies.lowerMargin as expectedLowerValue,\n case when anomalies.isAnomaly=true then 1 else 0 end as isAnomaly\nfrom\n df_anomaly\nwhere deviceid = 'dev-1' and timestamp < '2020-04-29'\norder by timestamp\nlimit 200\"\"\")" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "b83b1a05-40ef-4262-9291-38b099a3e14f" + } + }, + "outputs": [ + { + "output_type": "display_data", + "metadata": { + "application/vnd.databricks.v1+output": { + "data": "", + "errorSummary": "", + "metadata": {}, + "errorTraceType": null, + "type": "ipynbError", + "arguments": {} + } + }, + "data": { + "text/html": [ + "" + ] + } + } + ], + "execution_count": 0 + }, + { + "cell_type": "markdown", + "source": [ + "Now that we have created a dataframe that represents the anomalies for a particular device, we can visualize these anomalies:" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "4f8c004b-fa72-4690-b1df-f90a27b9e33e" + } + } + }, + { + "cell_type": "code", + "source": [ + "import matplotlib.pyplot as plt\nfrom pyspark.sql.functions import col\n\nadf = df_anomaly_single_device.toPandas()\nadf_subset = df_anomaly_single_device.where(col(\"isAnomaly\") == 1).toPandas()\n\nplt.figure(figsize=(23,8))\nplt.plot(adf['timestamp'],adf['expectedUpperValue'], color='darkred', linestyle='solid', linewidth=0.25, label='UpperMargin')\nplt.plot(adf['timestamp'],adf['expectedValue'], color='darkgreen', linestyle='solid', linewidth=2, label='Expected Value')\nplt.plot(adf['timestamp'],adf['measureValue'], 'b', color='royalblue', linestyle='dotted', linewidth=2, label='Actual')\nplt.plot(adf['timestamp'],adf['expectedLowerValue'], color='black', linestyle='solid', linewidth=0.25, label='Lower Margin')\nplt.plot(adf_subset['timestamp'],adf_subset['measureValue'], 'ro', label = 'Anomaly')\nplt.legend()\nplt.title('RPM Anomalies with Confidence Intervals')\nplt.show()" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "adc8243e-47c3-4957-ae24-9ce34ef64407" + } + }, + "outputs": [ + { + "output_type": "display_data", + "metadata": { + "application/vnd.databricks.v1+output": { + "data": "", + "errorSummary": "", + "metadata": {}, + "errorTraceType": null, + "type": "ipynbError", + "arguments": {} + } + }, + "data": { + "text/html": [ + "" + ] + } + } + ], + "execution_count": 0 + }, + { + "cell_type": "markdown", + "source": [ + "If successful, your output will look like this:\n\n![Anomaly Detector Plot](https://github.com/MicrosoftDocs/azure-docs/raw/master/articles/cognitive-services/big-data/media/anomaly-output.png)\n\n## Next steps\n\nLearn how to do predictive maintenance at scale with Azure Cognitive Services, Azure Synapse Analytics, and Azure CosmosDB. For more information, see the full sample on [GitHub](https://github.com/Azure-Samples/cosmosdb-synapse-link-samples)." + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "1d53317c-f5e9-415e-b43f-6625b6760ea6" + } + } + }, + { + "cell_type": "code", + "source": [ + "" + ], + "metadata": { + "application/vnd.databricks.v1+cell": { + "title": "", + "showTitle": false, + "inputWidgets": {}, + "nuid": "42e83063-6c0a-4d24-8bcd-9fa305869e14" + } + }, + "outputs": [ + { + "output_type": "display_data", + "metadata": { + "application/vnd.databricks.v1+output": { + "data": "", + "errorSummary": "", + "metadata": {}, + "errorTraceType": null, + "type": "ipynbError", + "arguments": {} + } + }, + "data": { + "text/html": [ + "" + ] + } + } + ], + "execution_count": 0 + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "notebookName": "Cognitive Services: Predictive Maintenance", + "dashboards": [], + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "language": "python", + "widgets": {}, + "notebookOrigID": 862073629653459 + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/tools/docker/clean_acr.py b/tools/docker/clean_acr.py new file mode 100644 index 0000000000..b2f7f6b4cd --- /dev/null +++ b/tools/docker/clean_acr.py @@ -0,0 +1,40 @@ +import os +import json +from azure.storage.blob import BlobClient +import sys +import subprocess +from tqdm import tqdm + +acr = "mmlsparkmcr" +container = "acrbackup" +rg = "marhamil-mmlspark" +pipeline = "mmlsparkacrexport3" + +conn_string = sys.argv[1] + +repos = json.loads(os.popen( + 'az acr repository list -n {}'.format(acr)).read()) +for repo in repos: + tags = json.loads(os.popen( + 'az acr repository show-tags -n {} --repository {} --orderby time_desc'.format(acr, repo)).read()) + + for tag in tqdm(tags): + target_blob = repo + "/" + tag + ".tar" + image = repo + ":" + tag + + backup_exists = BlobClient.from_connection_string( + conn_string, container_name=container, blob_name=target_blob).exists() + if not backup_exists: + subprocess.run(["sudo", "az", "acr", "pipeline-run", "create", "--resource-group", rg, + "--registry", acr, "--pipeline", pipeline, "--name", str(abs(hash(target_blob))), + "--pipeline-type", "export", "--storage-blob", target_blob, "-a", image]) + print("Transferred {}".format(target_blob)) + else: + print("Skipped existing {}".format(image)) + + backup_exists = BlobClient.from_connection_string( + conn_string, container_name=container, blob_name=target_blob).exists() + if backup_exists: + print("Deleting {}".format(image)) + result = os.system("az acr repository delete --name {} --image {} --yes".format(acr, image)) + assert result == 0