From c050a74f39e5fb761b584299259e67c4a3b4ecf3 Mon Sep 17 00:00:00 2001 From: Brendan Walsh Date: Fri, 22 Mar 2024 12:58:46 -0700 Subject: [PATCH] chore: Adding Spark35 support --- README.md | 50 ++++++++++++++----- build.sbt | 4 +- .../ml/services/bing/BingImageSearch.scala | 4 +- .../speech/SpeakerEmotionInference.scala | 4 +- .../ml/services/speech/SpeechToTextSDK.scala | 4 +- .../ml/services/speech/TextToSpeech.scala | 4 +- .../synapse/ml/core/env/PackageUtils.scala | 2 +- .../ml/core/schema/SparkBindings.scala | 4 +- .../synapse/ml/explainers/LIMEBase.scala | 4 +- .../ml/featurize/text/MultiNGram.scala | 4 +- .../ml/io/binary/BinaryFileReader.scala | 4 +- .../synapse/ml/io/http/HTTPTransformer.scala | 4 +- .../synapse/ml/io/image/ImageUtils.scala | 8 +-- .../ml/stages/MiniBatchTransformer.scala | 6 +-- .../ml/stages/PartitionConsolidator.scala | 4 +- .../ml/train/ComputeModelStatistics.scala | 4 +- .../source/image/PatchedImageFileFormat.scala | 4 +- .../streaming/DistributedHTTPSource.scala | 6 +-- .../injections/OptimizedCKNNFitting.scala | 15 +++--- .../azure/synapse/ml/codegen/RTestGen.scala | 2 +- .../ml/flaky/PartitionConsolidatorSuite.scala | 4 +- .../synapse/ml/io/split1/PowerBiSuite.scala | 4 +- .../ml/io/split2/DistributedHTTPSuite.scala | 6 +-- .../ml/nbtest/DatabricksUtilities.scala | 13 ++--- .../SynapseExtensionUtilities.scala | 2 +- .../synapse/ml/nbtest/SynapseUtilities.scala | 2 +- .../ml/stages/MiniBatchTransformerSuite.scala | 6 +-- .../stages/StratifiedRepartitionSuite.scala | 4 +- .../azure/synapse/ml/onnx/ONNXModel.scala | 4 +- .../Multivariate Anomaly Detection.ipynb | 2 +- docs/Get Started/Install SynapseML.md | 17 ++++++- environment.yml | 2 +- .../VerifyLightGBMClassifierStream.scala | 8 +-- .../azure/synapse/ml/opencv/OpenCVUtils.scala | 4 +- pipeline.yaml | 2 +- start | 2 +- tools/docker/demo/Dockerfile | 2 +- tools/docker/minimal/Dockerfile | 2 +- tools/dotnet/dotnetSetup.sh | 8 +-- tools/tests/run_r_tests.R | 2 +- .../ml/vw/VowpalWabbitBaseLearner.scala | 4 +- .../ml/vw/VowpalWabbitBaseModelSpark.scala | 4 +- .../ml/vw/VowpalWabbitBaseProgressive.scala | 4 +- .../synapse/ml/vw/VowpalWabbitGeneric.scala | 4 +- .../ml/vw/VerifyVowpalWabbitClassifier.scala | 4 +- website/src/pages/index.js | 30 +++++++++-- 46 files changed, 174 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index 2183794514..1a0e0ec2a3 100644 --- a/README.md +++ b/README.md @@ -26,9 +26,9 @@ SynapseML requires Scala 2.12, Spark 3.4+, and Python 3.8+. - [Features](#features) - [Documentation and Examples](#documentation-and-examples) - [Setup and installation](#setup-and-installation) + - [Microsoft Fabric](#microsoft-fabric) - [Synapse Analytics](#synapse-analytics) - [Databricks](#databricks) - - [Microsoft Fabric](#microsoft-fabric) - [Python Standalone](#python-standalone) - [Spark Submit](#spark-submit) - [SBT](#sbt) @@ -72,17 +72,25 @@ For quickstarts, documentation, demos, and examples please see our [website](htt First select the correct platform that you are installing SynapseML into: -- [Microsoft Fabric](#microsoft-fabric) -- [Synapse Analytics](#synapse-analytics) -- [Databricks](#databricks) -- [Python Standalone](#python-standalone) -- [Spark Submit](#spark-submit) -- [SBT](#sbt) -- [Apache Livy and HDInsight](#apache-livy-and-hdinsight) -- [Docker](#docker) -- [R](#r) -- [C# (.NET)](#c-net) -- [Building from source](#building-from-source) +- [Synapse Machine Learning](#synapse-machine-learning) + - [Features](#features) + - [Documentation and Examples](#documentation-and-examples) + - [Setup and installation](#setup-and-installation) + - [Microsoft Fabric](#microsoft-fabric) + - [Synapse Analytics](#synapse-analytics) + - [Databricks](#databricks) + - [Python Standalone](#python-standalone) + - [Spark Submit](#spark-submit) + - [SBT](#sbt) + - [Apache Livy and HDInsight](#apache-livy-and-hdinsight) + - [Docker](#docker) + - [R](#r) + - [C# (.NET)](#c-net) + - [Building from source](#building-from-source) + - [Papers](#papers) + - [Learn More](#learn-more) + - [Contributing \& feedback](#contributing--feedback) + - [Other relevant projects](#other-relevant-projects) @@ -113,6 +121,22 @@ In Microsoft Fabric notebooks SynapseML is already installed. To change the vers In Azure Synapse notebooks please place the following in the first cell of your notebook. +- For Spark 3.5 Pools: + +```bash +%%configure -f +{ + "name": "synapseml", + "conf": { + "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3", + "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", + "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", + "spark.yarn.user.classpath.first": "true", + "spark.sql.parquet.enableVectorizedReader": "false" + } +} +``` + - For Spark 3.4 Pools: ```bash @@ -120,7 +144,7 @@ In Azure Synapse notebooks please place the following in the first cell of your { "name": "synapseml", "conf": { - "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2", + "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2-spark3.4", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", "spark.yarn.user.classpath.first": "true", diff --git a/build.sbt b/build.sbt index 15bdea4bb1..776cb16cf8 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ import scala.xml.transform.{RewriteRule, RuleTransformer} import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _} val condaEnvName = "synapseml" -val sparkVersion = "3.4.1" +val sparkVersion = "3.5.0" name := "synapseml" ThisBuild / organization := "com.microsoft.azure" ThisBuild / scalaVersion := "2.12.17" @@ -34,7 +34,7 @@ val extraDependencies = Seq( "com.jcraft" % "jsch" % "0.1.54", "org.apache.httpcomponents.client5" % "httpclient5" % "5.1.3", "org.apache.httpcomponents" % "httpmime" % "4.5.13", - "com.linkedin.isolation-forest" %% "isolation-forest_3.4.2" % "3.0.4" + "com.linkedin.isolation-forest" %% "isolation-forest_3.5.0" % "3.0.5" exclude("com.google.protobuf", "protobuf-java") exclude("org.apache.spark", "spark-mllib_2.12") exclude("org.apache.spark", "spark-core_2.12") exclude("org.apache.spark", "spark-avro_2.12") exclude("org.apache.spark", "spark-sql_2.12"), diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala index b90cef5881..7d77704f5b 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala @@ -15,7 +15,7 @@ import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.ComplexParamsReadable import org.apache.spark.ml.util._ import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.{col, explode} import org.apache.spark.sql.types._ import spray.json.DefaultJsonProtocol._ @@ -44,7 +44,7 @@ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Seria ): Lambda = { Lambda({ df => val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) df.toDF().mapPartitions { rows => val futures = rows.map { row: Row => (Future { diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala index 442cc21118..000a07b695 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala @@ -12,7 +12,7 @@ import org.apache.http.entity.{AbstractHttpEntity, StringEntity} import org.apache.spark.ml.util.Identifiable import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel, Transformer} import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{DataType, StringType, StructType} import spray.json.DefaultJsonProtocol.StringJsonFormat @@ -93,7 +93,7 @@ class SpeakerEmotionInference(override val uid: String) converter(row.getAs[Row](row.fieldIndex(getOutputCol))) ) new GenericRowWithSchema((row.toSeq.dropRight(1) ++ Seq(ssml)).toArray, newSchema): Row - })(RowEncoder({ + })(ExpressionEncoder({ newSchema })) }) diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala index 4a034b1f80..7596a07fa1 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala @@ -24,7 +24,7 @@ import org.apache.spark.injections.SConf import org.apache.spark.ml.param._ import org.apache.spark.ml.util._ import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -400,7 +400,7 @@ abstract class SpeechSDKBase extends Transformer ArrayType(responseTypeBinding.schema) } - val enc = RowEncoder(enrichedDf.schema.add(getOutputCol, addedSchema)) + val enc = ExpressionEncoder(enrichedDf.schema.add(getOutputCol, addedSchema)) val sc = df.sparkSession.sparkContext val bConf = sc.broadcast(new SConf(sc.hadoopConfiguration)) val isUriAudio = df.schema(getAudioDataCol).dataType match { diff --git a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala index bedd949cc4..67761ee4e3 100644 --- a/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala +++ b/cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala @@ -15,7 +15,7 @@ import org.apache.hadoop.io.{IOUtils => HUtils} import org.apache.spark.ml.param.{Param, ParamMap} import org.apache.spark.ml.util._ import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.util.SerializableConfiguration @@ -152,7 +152,7 @@ class TextToSpeech(override val uid: String) } Row.fromSeq(row.toSeq ++ Seq(errorRow)) }.get - }(RowEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema))) + }(ExpressionEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema))) } override def copy(extra: ParamMap): Transformer = defaultCopy(extra) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala index 6041b9b307..95a97c010f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala @@ -18,7 +18,7 @@ object PackageUtils { val PackageName = s"synapseml_$ScalaVersionSuffix" val PackageMavenCoordinate = s"$PackageGroup:$PackageName:${BuildInfo.version}" - private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1" + private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.5.0" val PackageRepository: String = SparkMLRepository // If testing onnx package with snapshots repo, make sure to switch to using diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala index 2fb39424ab..9d03df8285 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala @@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.core.schema import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.StructType import scala.reflect.runtime.universe.TypeTag @@ -14,7 +14,7 @@ abstract class SparkBindings[T: TypeTag] extends Serializable { lazy val schema: StructType = enc.schema private lazy val enc: ExpressionEncoder[T] = ExpressionEncoder[T]().resolveAndBind() - private lazy val rowEnc: ExpressionEncoder[Row] = RowEncoder(enc.schema).resolveAndBind() + private lazy val rowEnc: ExpressionEncoder[Row] = ExpressionEncoder(enc.schema).resolveAndBind() // WARNING: each time you use this function on a dataframe, you should make a new converter. // Spark does some magic that makes this leak memory if re-used on a diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala index 483b058f17..f4abbc7cec 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala @@ -14,7 +14,7 @@ import org.apache.spark.ml.Transformer import org.apache.spark.ml.linalg.SQLDataTypes.VectorType import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -44,7 +44,7 @@ object LIMEUtils extends SLogging { case field if colsToSquish.contains(field.name) => StructField(field.name, ArrayType(field.dataType)) case f => f }) - val encoder = RowEncoder(schema) + val encoder = ExpressionEncoder(schema) val indiciesToSquish = colsToSquish.map(df.schema.fieldIndex) df.mapPartitions { it => val isEmpty = it.isEmpty diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala index fd958d1216..1388d84045 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala @@ -12,7 +12,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.feature._ import org.apache.spark.ml.param._ import org.apache.spark.ml.util._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -56,7 +56,7 @@ class MultiNGram(override val uid: String) .map(col => row.getAs[Seq[String]](col)) .reduce(_ ++ _) Row.fromSeq(row.toSeq :+ mergedNGrams) - }(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType)))) + }(ExpressionEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType)))) .drop(intermediateOutputCols: _*) }, dataset.columns.length) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala index 1268750cc7..0bed65a49e 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.schema.BinaryFileSchema import com.microsoft.azure.synapse.ml.core.utils.AsyncUtils import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.BinaryType import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -85,7 +85,7 @@ object BinaryFileReader { timeout: Int ): DataFrame = { val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) val hconf = ConfUtils.getHConf(df) df.mapPartitions { rows => diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala index 8d942e34b1..ad74bd11c4 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala @@ -13,7 +13,7 @@ import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.param._ import org.apache.spark.ml.util.Identifiable import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -118,7 +118,7 @@ class HTTPTransformer(val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ val df = dataset.toDF() - val enc = RowEncoder(transformSchema(df.schema)) + val enc = ExpressionEncoder(transformSchema(df.schema)) val colIndex = df.schema.fieldNames.indexOf(getInputCol) val fromRow = HTTPRequestData.makeFromRowConverter val toRow = HTTPResponseData.makeToRowConverter diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala index 6d0d4f8e1a..c4ef7130d1 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala @@ -11,7 +11,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.ml.ImageInjections import org.apache.spark.ml.image.ImageSchema import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{DataFrame, Row} import java.awt.color.ColorSpace @@ -117,7 +117,7 @@ object ImageUtils { def readFromPaths(df: DataFrame, pathCol: String, imageCol: String = "image"): DataFrame = { val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) val hconf = ConfUtils.getHConf(df) df.mapPartitions { rows => rows.map { row => @@ -133,7 +133,7 @@ object ImageUtils { def readFromBytes(df: DataFrame, pathCol: String, bytesCol: String, imageCol: String = "image"): DataFrame = { val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) df.mapPartitions { rows => rows.map { row => val path = row.getAs[String](pathCol) @@ -150,7 +150,7 @@ object ImageUtils { imageCol: String = "image", dropPrefix: Boolean = false): DataFrame = { val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema) - val encoder = RowEncoder(outputSchema) + val encoder = ExpressionEncoder(outputSchema) df.mapPartitions { rows => rows.map { row => val encoded = row.getAs[String](bytesCol) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala index 0fef316649..9c15aa5809 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.param.TransformerParam import org.apache.spark.ml.Transformer import org.apache.spark.ml.param._ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} -import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -35,7 +35,7 @@ trait MiniBatchBase extends Transformer with DefaultParamsWritable with Wrappabl def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ val outputSchema = transformSchema(dataset.schema) - implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema) + implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema) dataset.toDF().mapPartitions { it => if (it.isEmpty) { it @@ -215,7 +215,7 @@ class FlattenBatch(val uid: String) override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ val outputSchema = transformSchema(dataset.schema) - implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema) + implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema) dataset.toDF().mapPartitions(it => it.flatMap { rowOfLists => diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala index bd6e1340bf..494f2a945f 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import org.apache.spark.ml.param._ import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.ml.{ComplexParamsWritable, Transformer} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -39,7 +39,7 @@ class PartitionConsolidator(val uid: String) } else { Iterator() } - }(RowEncoder(dataset.schema)) + }(ExpressionEncoder(dataset.schema)) }, dataset.columns.length) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala index 277dbf0f97..fa609a6e51 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala @@ -17,7 +17,7 @@ import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, Multiclas import org.apache.spark.mllib.linalg.{Matrices, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -252,7 +252,7 @@ class ComputeModelStatistics(override val uid: String) extends Transformer confusionMatrix: Matrix, resultDF: DataFrame): DataFrame = { val schema = resultDF.schema.add(MetricConstants.ConfusionMatrix, SQLDataTypes.MatrixType) - resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(RowEncoder(schema)) + resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(ExpressionEncoder(schema)) } private def selectAndCastToDF(dataset: Dataset[_], diff --git a/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala b/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala index 299881b1bb..4346d88b22 100644 --- a/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala +++ b/core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala @@ -13,7 +13,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.image.ImageSchema import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ @@ -118,7 +118,7 @@ class PatchedImageFileFormat extends ImageFileFormat with Serializable with Logg if (requiredSchema.isEmpty) { filteredResult.map(_ => emptyUnsafeRow) } else { - val converter = RowEncoder(requiredSchema) + val converter = ExpressionEncoder(requiredSchema) filteredResult.map(row => converter.createSerializer()(row)) } } diff --git a/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala index 15f6886619..f41218b0e0 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala @@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.io.http.{HTTPRequestData, HTTPResponseData import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} import org.apache.spark.sql.execution.streaming.continuous.HTTPSourceV2 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider} @@ -218,7 +218,7 @@ class DistributedHTTPSource(name: String, private[spark] val infoSchema = new StructType() .add("machine", StringType).add("ip", StringType).add("id", StringType) - private[spark] val infoEnc = RowEncoder(infoSchema) + private[spark] val infoEnc = ExpressionEncoder(infoSchema) // Access point to run code on nodes through mapPartitions // TODO do this by hooking deeper into spark, @@ -284,7 +284,7 @@ class DistributedHTTPSource(name: String, .map{ case (id, request) => Row.fromSeq(Seq(Row(null, id, null), toRow(request))) //scalastyle:ignore null }.toIterator - }(RowEncoder(HTTPSourceV2.Schema)) + }(ExpressionEncoder(HTTPSourceV2.Schema)) } override def commit(end: OffsetV2): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala b/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala index 3ab64e7548..306991c0f5 100644 --- a/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala +++ b/core/src/main/scala/org/apache/spark/sql/types/injections/OptimizedCKNNFitting.scala @@ -8,6 +8,7 @@ import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging import com.microsoft.azure.synapse.ml.nn._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.types.PhysicalDataType import org.apache.spark.sql.types._ trait OptimizedCKNNFitting extends ConditionalKNNParams with SynapseMLLogging { @@ -35,12 +36,12 @@ trait OptimizedCKNNFitting extends ConditionalKNNParams with SynapseMLLogging { protected def fitOptimized(dataset: Dataset[_]): ConditionalKNNModel = { - val vt = dataset.schema(getValuesCol).dataType - val lt = dataset.schema(getLabelCol).dataType + val vt = PhysicalDataType.apply(dataset.schema(getValuesCol).dataType) + val lt = PhysicalDataType.apply(dataset.schema(getLabelCol).dataType) (vt, lt) match { - case (avt: AtomicType, alt: AtomicType) => fitGeneric[avt.InternalType, alt.InternalType](dataset) - case (avt: AtomicType, _) => fitGeneric[avt.InternalType, Any](dataset) - case (_, alt: AtomicType) => fitGeneric[Any, alt.InternalType](dataset) + case (avt: PhysicalDataType, alt: PhysicalDataType) => fitGeneric[avt.InternalType, alt.InternalType](dataset) + case (avt: PhysicalDataType, _) => fitGeneric[avt.InternalType, Any](dataset) + case (_, alt: PhysicalDataType) => fitGeneric[Any, alt.InternalType](dataset) case _ => fitGeneric[Any, Any](dataset) } } @@ -69,8 +70,8 @@ trait OptimizedKNNFitting extends KNNParams with SynapseMLLogging { protected def fitOptimized(dataset: Dataset[_]): KNNModel = { - dataset.schema(getValuesCol).dataType match { - case avt: AtomicType => fitGeneric[avt.InternalType](dataset) + PhysicalDataType.apply(dataset.schema(getValuesCol).dataType) match { + case avt: PhysicalDataType => fitGeneric[avt.InternalType](dataset) case _ => fitGeneric[Any](dataset) } } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala index 2a86894bc2..a409ad0d0b 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/codegen/RTestGen.scala @@ -101,7 +101,7 @@ object RTestGen { | "spark.sql.shuffle.partitions=10", | "spark.sql.crossJoin.enabled=true") | - |sc <- spark_connect(master = "local", version = "3.4.1", config = conf) + |sc <- spark_connect(master = "local", version = "3.5.0", config = conf) | |""".stripMargin, StandardOpenOption.CREATE) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala index acbc5e3e12..3469ee6975 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/flaky/PartitionConsolidatorSuite.scala @@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.core.test.base.{TestBase, TimeLimitedFlaky import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, TransformerFuzzing} import com.microsoft.azure.synapse.ml.stages.PartitionConsolidator import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.{DoubleType, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.scalatest.Assertion @@ -66,7 +66,7 @@ class PartitionConsolidatorSuite extends TransformerFuzzing[PartitionConsolidato println(baseDF.count()) def getDF: Dataset[Row] = baseDF.map { x => Thread.sleep(10); x }( - RowEncoder(new StructType().add("values", DoubleType))) + ExpressionEncoder(new StructType().add("values", DoubleType))) val t1 = getTime(3)( getDF.foreach(_ => ()))._2 diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala index 50d2d09bc4..2d932a9991 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/PowerBiSuite.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.test.base.TestBase import com.microsoft.azure.synapse.ml.io.powerbi.PowerBIWriter import org.apache.spark.SparkException import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.{current_timestamp, lit} import java.io.File @@ -33,7 +33,7 @@ class PowerBiSuite extends TestBase with FileReaderUtils { .createDataFrame(rows, df.schema) .coalesce(1).cache() df2.count() - df2.map({x => Thread.sleep(10); x})(RowEncoder(df2.schema)) + df2.map({x => Thread.sleep(10); x})(ExpressionEncoder(df2.schema)) } test("write to powerBi") { diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala index 4303384f03..bac6d152ea 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split2/DistributedHTTPSuite.scala @@ -14,7 +14,7 @@ import org.apache.commons.io.IOUtils import org.apache.http.client.methods.HttpPost import org.apache.http.entity.{FileEntity, StringEntity} import org.apache.http.impl.client.{BasicResponseHandler, CloseableHttpClient} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.streaming.DistributedHTTPSourceProvider import org.apache.spark.sql.functions.{col, length} import org.apache.spark.sql.streaming.{DataStreamReader, DataStreamWriter, StreamingQuery} @@ -396,12 +396,12 @@ class DistributedHTTPSuite extends TestBase with Flaky with HTTPTestUtils { .mapPartitions { _ => Foo.get.increment() Iterator(Row(Foo.get.state)) - }(RowEncoder(new StructType().add("state", IntegerType))).cache() + }(ExpressionEncoder(new StructType().add("state", IntegerType))).cache() val States1: Array[Row] = DF.collect() val DF2: DataFrame = DF.mapPartitions { _ => Iterator(Row(Foo.get.state)) - }(RowEncoder(new StructType().add("state", IntegerType))) + }(ExpressionEncoder(new StructType().add("state", IntegerType))) val States2: Array[Row] = DF2.collect() assert(States2.forall(_.getInt(0) === States2.length)) } diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala index fe3c488fd0..32195677db 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/DatabricksUtilities.scala @@ -30,11 +30,11 @@ object DatabricksUtilities { // ADB Info val Region = "eastus" - val PoolName = "synapseml-build-13.3" - val GpuPoolName = "synapseml-build-13.3-gpu" - val AdbRuntime = "13.3.x-scala2.12" - // https://docs.databricks.com/en/release-notes/runtime/13.3lts-ml.html - val AdbGpuRuntime = "13.3.x-gpu-ml-scala2.12" + val PoolName = "synapseml-build-14.3" + val GpuPoolName = "synapseml-build-14.3-gpu" + val AdbRuntime = "14.3.x-scala2.12" + // https://docs.databricks.com/en/release-notes/runtime/14.3lts-ml.html + val AdbGpuRuntime = "14.3.x-gpu-ml-scala2.12" val NumWorkers = 5 val AutoTerminationMinutes = 15 @@ -64,7 +64,7 @@ object DatabricksUtilities { "pdf2image", "pdfminer.six", "sqlparse", - "raiwidgets", + // "raiwidgets", // Broken on ADB "interpret-community", "numpy==1.22.4", "unstructured==0.10.24", @@ -105,6 +105,7 @@ object DatabricksUtilities { .filterNot(_.getAbsolutePath.contains("Fine-tune")) .filterNot(_.getAbsolutePath.contains("GPU")) .filterNot(_.getAbsolutePath.contains("Explanation Dashboard")) // TODO Remove this exclusion + .filterNot(_.getAbsolutePath.contains("Isolation Forests")) // TODO Remove this exclusion when raiwidgets is fixed val GPUNotebooks: Seq[File] = ParallelizableNotebooks.filter(_.getAbsolutePath.contains("Fine-tune")) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala index 48683cc413..9ee918c31e 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseExtension/SynapseExtensionUtilities.scala @@ -83,7 +83,7 @@ object SynapseExtensionUtilities { |"{ | 'Default${store}ArtifactId': '$storeId', | 'ExecutableFile': '$path', - | 'SparkVersion':'3.4', + | 'SparkVersion':'3.5', | 'SparkSettings': { | 'spark.jars.packages' : '$SparkMavenPackageList', | 'spark.jars.repositories' : '$SparkMavenRepositoryList', diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala index 6128f1cced..2e10f45b52 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/nbtest/SynapseUtilities.scala @@ -254,7 +254,7 @@ object SynapseUtilities { | "nodeSizeFamily": "MemoryOptimized", | "provisioningState": "Succeeded", | "sessionLevelPackagesEnabled": "true", - | "sparkVersion": "3.4" + | "sparkVersion": "3.5" | } |} |""".stripMargin diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala index 8d43eddbcf..d446305e5a 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformerSuite.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, Transformer import com.microsoft.azure.synapse.ml.param.DataFrameEquality import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType} @@ -30,10 +30,10 @@ trait MiniBatchTestUtils extends TestBase with DataFrameEquality { def basicTest(t: MiniBatchBase): Assertion = { val delay = 5 - val slowDf = df.map { x => Thread.sleep(3 * delay.toLong); x }(RowEncoder(df.schema)) + val slowDf = df.map { x => Thread.sleep(3 * delay.toLong); x }(ExpressionEncoder(df.schema)) val df2 = t.transform(slowDf) - val df3 = df2.map { x => Thread.sleep(10 * delay.toLong); x }(RowEncoder(df2.schema)) + val df3 = df2.map { x => Thread.sleep(10 * delay.toLong); x }(ExpressionEncoder(df2.schema)) assert(df3.schema == new StructType() .add("in1", ArrayType(IntegerType)) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala index 16ca75c13e..2e77eebf1e 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartitionSuite.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.test.fuzzing.{TestObject, Transformer import org.apache.spark.TaskContext import org.apache.spark.ml.util.MLReadable import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.{IntegerType, StringType, StructType} class StratifiedRepartitionSuite extends TestBase with TransformerFuzzing[StratifiedRepartition] { @@ -37,7 +37,7 @@ class StratifiedRepartitionSuite extends TestBase with TransformerFuzzing[Strati test("Assert doing a stratified repartition will ensure all keys exist across all partitions") { val inputSchema = new StructType() .add(values, IntegerType).add(colors, StringType).add(const, IntegerType) - val inputEnc = RowEncoder(inputSchema) + val inputEnc = ExpressionEncoder(inputSchema) val valuesFieldIndex = inputSchema.fieldIndex(values) val numPartitions = 3 val trainData = input.repartition(numPartitions).select(values, colors, const) diff --git a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala index 3378cd9721..491ee64d67 100644 --- a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala +++ b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala @@ -24,7 +24,7 @@ import org.apache.spark.ml.linalg.{SQLDataTypes, Vector} import org.apache.spark.ml.param._ import org.apache.spark.ml.util.Identifiable import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ import org.apache.spark.{SparkContext, TaskContext} @@ -230,7 +230,7 @@ class ONNXModel(override val uid: String) def transformInner(dataset: Dataset[_], inputSchema: StructType): DataFrame = logTransform ({ val modelOutputSchema = getModelOutputSchema(inputSchema) - implicit val enc: Encoder[Row] = RowEncoder( + implicit val enc: Encoder[Row] = ExpressionEncoder( StructType(modelOutputSchema.map(f => StructField(f.name, ArrayType(f.dataType)))) ) diff --git a/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb b/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb index eccbd4e83f..009daede03 100644 --- a/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb +++ b/docs/Explore Algorithms/AI Services/Multivariate Anomaly Detection.ipynb @@ -426,7 +426,7 @@ "\n", "anoms = list(rdf[\"severity\"] >= minSeverity)\n", "_, _, ymin, ymax = plt.axis()\n", - "plt.vlines(np.where(anoms), ymin=ymin, ymax=ymax, color=\"r\", alpha=0.8)\n", + "plt.vlines(list(np.where(anoms)[0]), ymin=ymin, ymax=ymax, color=\"r\", alpha=0.8)\n", "\n", "plt.legend()\n", "plt.title(\n", diff --git a/docs/Get Started/Install SynapseML.md b/docs/Get Started/Install SynapseML.md index 1af8bf1f10..5c48ace4bb 100644 --- a/docs/Get Started/Install SynapseML.md +++ b/docs/Get Started/Install SynapseML.md @@ -26,13 +26,28 @@ SynapseML is already installed in Microsoft Fabric notebooks. To change the vers SynapseML is already installed in Synapse Analytics notebooks. To change the version please place the following in the first cell of your notebook: +For Spark3.5 pools +```python +%%configure -f +{ + "name": "synapseml", + "conf": { + "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3", + "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", + "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", + "spark.yarn.user.classpath.first": "true", + "spark.sql.parquet.enableVectorizedReader": "false" + } +} +``` + For Spark3.4 pools ```python %%configure -f { "name": "synapseml", "conf": { - "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2", + "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.2-spark3.4", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", "spark.yarn.user.classpath.first": "true", diff --git a/environment.yml b/environment.yml index 257c657630..14705ff67d 100644 --- a/environment.yml +++ b/environment.yml @@ -11,7 +11,7 @@ dependencies: - r-devtools=2.4.2 - pip: - pyarrow>=0.15.0 - - pyspark==3.4.1 + - pyspark==3.5.0 - pandas==1.2.5 - wheel - sphinx==4.2.0 diff --git a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala index a99355cc69..ea96e91b57 100644 --- a/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala +++ b/lightgbm/src/test/scala/com/microsoft/azure/synapse/ml/lightgbm/split5/VerifyLightGBMClassifierStream.scala @@ -11,7 +11,7 @@ import com.microsoft.azure.synapse.ml.lightgbm.split1._ import org.apache.spark.TaskContext import org.apache.spark.ml.feature.{LabeledPoint, VectorAssembler} import org.apache.spark.ml.linalg.{DenseVector, Vector, Vectors} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Row} @@ -455,7 +455,7 @@ class VerifyLightGBMClassifierStream extends LightGBMClassifierTestData { } else { rows } - }(RowEncoder(baseDF.schema)) + }(ExpressionEncoder(baseDF.schema)) assertFitWithoutErrors(baseModel, df) } @@ -470,7 +470,7 @@ class VerifyLightGBMClassifierStream extends LightGBMClassifierTestData { } else { rows } - })(RowEncoder(baseDF.schema)) + })(ExpressionEncoder(baseDF.schema)) val model = new LightGBMClassifier() .setLabelCol(labelCol) @@ -493,7 +493,7 @@ class VerifyLightGBMClassifierStream extends LightGBMClassifierTestData { } else { rows } - })(RowEncoder(baseDF.schema)) + })(ExpressionEncoder(baseDF.schema)) // Validate fit works and doesn't get stuck assertFitWithoutErrors(baseModel, df) diff --git a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala index 57fa63a0d2..a943bc3660 100644 --- a/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala +++ b/opencv/src/main/scala/com/microsoft/azure/synapse/ml/opencv/OpenCVUtils.scala @@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.opencv import com.microsoft.azure.synapse.ml.core.env.NativeLoader import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder object OpenCVUtils { /** This object will load the openCV binaries when the object is referenced @@ -27,7 +27,7 @@ object OpenCVUtils { } private[ml] def loadOpenCV(df: DataFrame): DataFrame = { - val encoder = RowEncoder(df.schema) + val encoder = ExpressionEncoder(df.schema) df.mapPartitions(loadOpenCVFunc)(encoder) } diff --git a/pipeline.yaml b/pipeline.yaml index 1d7ec29d0c..ab0d50ffd0 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -529,7 +529,7 @@ jobs: fi sbt publishM2 - SPARK_VERSION=3.4.1 + SPARK_VERSION=3.5.0 HADOOP_VERSION=3 wget https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz (timeout 20m sbt "project $(PACKAGE)" coverage testR) || (echo "retrying" && timeout 20m sbt "project $(PACKAGE)" coverage testR) || (echo "retrying" && timeout 20m sbt "project $(PACKAGE)" coverage testR) diff --git a/start b/start index 75219e9459..4031ea5b4e 100644 --- a/start +++ b/start @@ -2,7 +2,7 @@ export OPENMPI_VERSION="3.1.2" -export SPARK_VERSION="3.4.1" +export SPARK_VERSION="3.5.0" export HADOOP_VERSION="3.3" export SYNAPSEML_VERSION="1.0.2" # Binder compatibility version diff --git a/tools/docker/demo/Dockerfile b/tools/docker/demo/Dockerfile index 7def1662d8..f42d8e855d 100644 --- a/tools/docker/demo/Dockerfile +++ b/tools/docker/demo/Dockerfile @@ -3,7 +3,7 @@ FROM mcr.microsoft.com/oss/mirror/docker.io/library/ubuntu:20.04 ARG SYNAPSEML_VERSION=1.0.2 ARG DEBIAN_FRONTEND=noninteractive -ENV SPARK_VERSION=3.4.1 +ENV SPARK_VERSION=3.5.0 ENV HADOOP_VERSION=3 ENV SYNAPSEML_VERSION=${SYNAPSEML_VERSION} ENV JAVA_HOME /usr/lib/jvm/java-1.11.0-openjdk-amd64 diff --git a/tools/docker/minimal/Dockerfile b/tools/docker/minimal/Dockerfile index 9b6b3522e8..9bad04f134 100644 --- a/tools/docker/minimal/Dockerfile +++ b/tools/docker/minimal/Dockerfile @@ -3,7 +3,7 @@ FROM mcr.microsoft.com/oss/mirror/docker.io/library/ubuntu:20.04 ARG SYNAPSEML_VERSION=1.0.2 ARG DEBIAN_FRONTEND=noninteractive -ENV SPARK_VERSION=3.4.1 +ENV SPARK_VERSION=3.5.0 ENV HADOOP_VERSION=3 ENV SYNAPSEML_VERSION=${SYNAPSEML_VERSION} ENV JAVA_HOME /usr/lib/jvm/java-1.11.0-openjdk-amd64 diff --git a/tools/dotnet/dotnetSetup.sh b/tools/dotnet/dotnetSetup.sh index c378cfa524..dc085759dd 100644 --- a/tools/dotnet/dotnetSetup.sh +++ b/tools/dotnet/dotnetSetup.sh @@ -20,11 +20,11 @@ echo "##vso[task.setvariable variable=DOTNET_WORKER_DIR]$DOTNET_WORKER_DIR" # Install Sleet dotnet tool install -g sleet -# Install Apache Spark-3.4.1 -curl https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz -o spark-3.4.1-bin-hadoop3.tgz +# Install Apache Spark-3.5.0 +curl https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz -o spark-3.5.0-bin-hadoop3.tgz mkdir ~/bin -tar -xzvf spark-3.4.1-bin-hadoop3.tgz -C ~/bin -export SPARK_HOME=~/bin/spark-3.4.1-bin-hadoop3/ +tar -xzvf spark-3.5.0-bin-hadoop3.tgz -C ~/bin +export SPARK_HOME=~/bin/spark-3.5.0-bin-hadoop3/ export PATH=$SPARK_HOME/bin:$PATH echo "##vso[task.setvariable variable=SPARK_HOME]$SPARK_HOME" echo "##vso[task.setvariable variable=PATH]$SPARK_HOME/bin:$PATH" diff --git a/tools/tests/run_r_tests.R b/tools/tests/run_r_tests.R index a5a61260f2..47d255b9d6 100644 --- a/tools/tests/run_r_tests.R +++ b/tools/tests/run_r_tests.R @@ -3,7 +3,7 @@ if (!require("sparklyr")) { library("sparklyr") } -spark_install_tar(paste(getwd(), "/../../../../../../spark-3.4.1-bin-hadoop3.tgz", sep = "")) +spark_install_tar(paste(getwd(), "/../../../../../../spark-3.5.0-bin-hadoop3.tgz", sep = "")) options("testthat.output_file" = "../../../../r-test-results.xml") devtools::test(reporter = JunitReporter$new()) diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala index 8c0178aebe..699a737da8 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala @@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.utils.{FaultToleranceUtils, ParamsStr import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.param.{Param, StringArrayParam} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.{col, lit, spark_partition_id} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SparkSession} @@ -312,7 +312,7 @@ trait VowpalWabbitBaseLearner extends VowpalWabbitBase { // construct buffer & schema for buffered predictions val predictionBuffer = createPredictionBuffer(schema) - val encoder = RowEncoder(predictionBuffer.schema) + val encoder = ExpressionEncoder(predictionBuffer.schema) // always include preserve perf counters to make sure all information is retained in serialized model for // model merging diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala index 92163a942f..d030167151 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseModelSpark.scala @@ -3,7 +3,7 @@ package com.microsoft.azure.synapse.ml.vw -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.types.{StructField} import org.vowpalwabbit.spark.VowpalWabbitExample @@ -41,7 +41,7 @@ trait VowpalWabbitBaseModelSpark val outputSchema = dataset.schema.add(StructField(vowpalWabbitPredictionCol, schemaForPredictionType, false)) // create a fitting row encoder - val rowEncoder = RowEncoder(outputSchema) + val rowEncoder = ExpressionEncoder(outputSchema) dataset.toDF.mapPartitions(inputRows => { inputRows.map { row => { diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala index 8a265b0717..7c781175ee 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala @@ -6,7 +6,7 @@ package com.microsoft.azure.synapse.ml.vw import org.apache.spark.TaskContext import org.apache.spark.ml.Transformer import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.StructType import org.vowpalwabbit.spark.VowpalWabbitNative @@ -114,7 +114,7 @@ trait VowpalWabbitBaseProgressive // TODO: barrier mode? // TODO: check w/ Stage ID (different stages) - val encoder = RowEncoder(schema) + val encoder = ExpressionEncoder(schema) df .mapPartitions(inputRows => diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala index c1b3f8e0bc..7537a3511f 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitGeneric.scala @@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Estimator, Model} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions => F} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -105,7 +105,7 @@ class VowpalWabbitGenericModel(override val uid: String) val inputColIdx = df.schema.fieldIndex(getInputCol) val predictToSeq = VowpalWabbitPrediction.getPredictionFunc(vw) - val rowEncoder = RowEncoder(schemaForPredictionType) + val rowEncoder = ExpressionEncoder(schemaForPredictionType) df.mapPartitions(inputRows => { inputRows.map { row => { diff --git a/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala b/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala index 3a9ca46cd4..7d89a001d1 100644 --- a/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala +++ b/vw/src/test/scala/com/microsoft/azure/synapse/ml/vw/VerifyVowpalWabbitClassifier.scala @@ -9,7 +9,7 @@ import org.apache.spark.TaskContext import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator} import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, IntegerType} import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -199,7 +199,7 @@ class VerifyVowpalWabbitClassifier extends Benchmarks with EstimatorFuzzing[Vowp .setNumPasses(3) .setLabelConversion(false) - val infoEnc = RowEncoder(dataset.schema) + val infoEnc = ExpressionEncoder(dataset.schema) val trainData = dataset .mapPartitions(iter => { val ctx = TaskContext.get diff --git a/website/src/pages/index.js b/website/src/pages/index.js index 83e884fec6..f54f733673 100644 --- a/website/src/pages/index.js +++ b/website/src/pages/index.js @@ -263,7 +263,7 @@ function Home() { >

SynapseML can be installed on Synapse adding the following to the first cell of a notebook:

- For Spark3.4 pools: + For Spark3.5 pools: + For Spark3.4 pools: + @@ -315,7 +330,7 @@ function Home() { SynapseML can be conveniently installed on existing Spark clusters via the --packages option, examples:

For the coordinates:

- Spark 3.4 Cluster: + Spark 3.5 Cluster: + Spark 3.4 Cluster: + Spark 3.3 Cluster: