Skip to content

Commit

Permalink
Spark 3 (#970)
Browse files Browse the repository at this point in the history
* feat: Update to Spark 3.0

Co-authored-by: Mark Hamilton <marhamil@microsoft.com>
Co-authored-by: Daniel Ciborowski <dciborow@microsoft.com>
Co-authored-by: Ilya Matiach <ilmat@microsoft.com>
Co-authored-by: Markus Cozowicz <marcozo@microsoft.com>
Co-authored-by: Jack Gerrits <jagerrit@microsoft.com>
  • Loading branch information
6 people committed Jan 28, 2021
1 parent 5a5147a commit 6218a5b
Show file tree
Hide file tree
Showing 106 changed files with 912 additions and 1,027 deletions.
59 changes: 33 additions & 26 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import scala.sys.process.Process
val condaEnvName = "mmlspark"
name := "mmlspark"
organization := "com.microsoft.ml.spark"
scalaVersion := "2.11.12"
scalaVersion := "2.12.10"
val sparkVersion = "3.0.1"

val sparkVersion = "2.4.5"
//val scalaMajorVersion = settingKey[String]("scalaMajorVersion")
//scalaMajorVersion := {scalaVersion.value.split(".".toCharArray).dropRight(0).mkString(".")}
val scalaMajorVersion = 2.12

val excludes = Seq(
ExclusionRule("org.apache.spark", "spark-tags_2.11"),
ExclusionRule("org.apache.spark", s"spark-tags_$scalaMajorVersion"),
ExclusionRule("org.scalatic"),
ExclusionRule("org.scalatest")
)
Expand All @@ -35,9 +38,9 @@ libraryDependencies ++= Seq(
"org.apache.httpcomponents" % "httpclient" % "4.5.6" excludeAll (excludes: _*),
"org.apache.httpcomponents" % "httpmime" % "4.5.6" excludeAll (excludes: _*),
"com.microsoft.ml.lightgbm" % "lightgbmlib" % "2.3.180" excludeAll (excludes: _*),
"com.github.vowpalwabbit" % "vw-jni" % "8.8.1" excludeAll (excludes: _*),
"com.linkedin.isolation-forest" %% "isolation-forest_2.4.3" % "0.3.2" excludeAll (excludes: _*),
"org.apache.spark" %% "spark-avro" % sparkVersion % "provided",
"com.github.vowpalwabbit" % "vw-jni" % "8.9.1" excludeAll (excludes: _*),
"com.linkedin.isolation-forest" %% "isolation-forest_3.0.0" % "1.0.1" excludeAll (excludes: _*),
"org.apache.spark" %% "spark-avro" % sparkVersion % "provided"
)

def txt(e: Elem, label: String): String = "\"" + e.child.filter(_.label == label).flatMap(_.text).mkString + "\""
Expand Down Expand Up @@ -111,9 +114,11 @@ def activateCondaEnv: Seq[String] = {
}
}



val packagePythonTask = TaskKey[Unit]("packagePython", "Package python sdk")
val genDir = join("target", "scala-2.11", "generated")
val unidocDir = join("target", "scala-2.11", "unidoc")
val genDir = join("target", s"scala-${scalaMajorVersion}", "generated")
val unidocDir = join("target", s"scala-${scalaMajorVersion}", "unidoc")
val pythonSrcDir = join(genDir.toString, "src", "python")
val unifiedDocDir = join(genDir.toString, "doc")
val pythonDocDir = join(unifiedDocDir.toString, "pyspark")
Expand Down Expand Up @@ -198,7 +203,7 @@ val publishR = TaskKey[Unit]("publishR", "publish R package to blob")
publishR := {
val s = streams.value
(runMain in Test).toTask(" com.microsoft.ml.spark.codegen.CodeGen").value
val rPackage = join("target", "scala-2.11", "generated", "package", "R")
val rPackage = join("target", s"scala-${scalaMajorVersion}", "generated", "package", "R")
.listFiles().head
singleUploadToBlob(rPackage.toString, rPackage.getName, "rrr", s.log)
}
Expand All @@ -207,7 +212,7 @@ packagePythonTask := {
val s = streams.value
(runMain in Test).toTask(" com.microsoft.ml.spark.codegen.CodeGen").value
createCondaEnvTask.value
val destPyDir = join("target", "scala-2.11", "classes", "mmlspark")
val destPyDir = join("target", s"scala-${scalaMajorVersion}", "classes", "mmlspark")
if (destPyDir.exists()) FileUtils.forceDelete(destPyDir)
FileUtils.copyDirectory(join(pythonSrcDir.getAbsolutePath, "mmlspark"), destPyDir)

Expand Down Expand Up @@ -243,7 +248,7 @@ testPythonTask := {
"--cov-report=xml",
"mmlsparktest"
),
new File("target/scala-2.11/generated/test/python/"),
new File(s"target/scala-${scalaMajorVersion}/generated/test/python/")
) ! s.log
}

Expand All @@ -252,7 +257,7 @@ val datasetName = "datasets-2020-08-27.tgz"
val datasetUrl = new URL(s"https://mmlspark.blob.core.windows.net/installers/$datasetName")
val datasetDir = settingKey[File]("The directory that holds the dataset")
datasetDir := {
join(target.value.toString, "scala-2.11", "datasets", datasetName.split(".".toCharArray.head).head)
join(target.value.toString, s"scala-${scalaMajorVersion}", "datasets", datasetName.split(".".toCharArray.head).head)
}

getDatasetsTask := {
Expand All @@ -270,19 +275,19 @@ genBuildInfo := {

val buildInfo =
s"""
|MMLSpark Build and Release Information
|---------------
|
|### Maven Coordinates
| `${organization.value}:${name.value}_2.11:${version.value}`
|
|### Maven Resolver
| `https://mmlspark.azureedge.net/maven`
|
|### Documentation Pages:
|[Scala Documentation](https://mmlspark.blob.core.windows.net/docs/${version.value}/scala/index.html)
|[Python Documentation](https://mmlspark.blob.core.windows.net/docs/${version.value}/pyspark/index.html)
|
|MMLSpark Build and Release Information
|---------------
|
|### Maven Coordinates
| `${organization.value}:${name.value}_${scalaMajorVersion}:${version.value}`
|
|### Maven Resolver
| `https://mmlspark.azureedge.net/maven`
|
|### Documentation Pages:
|[Scala Documentation](https://mmlspark.blob.core.windows.net/docs/${version.value}/scala/index.html)
|[Python Documentation](https://mmlspark.blob.core.windows.net/docs/${version.value}/pyspark/index.html)
|
""".stripMargin

val infoFile = join("target", "Build.md")
Expand Down Expand Up @@ -364,7 +369,7 @@ val settings = Seq(
case x => MergeStrategy.first
},
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false),
buildInfoPackage := "com.microsoft.ml.spark.build") //++
buildInfoPackage := "com.microsoft.ml.spark.build")

lazy val mmlspark = (project in file("."))
.enablePlugins(BuildInfoPlugin)
Expand Down Expand Up @@ -414,3 +419,5 @@ pgpPublicRing := {
dynverSonatypeSnapshots in ThisBuild := true
dynverSeparator in ThisBuild := "-"
publishTo := sonatypePublishToBundle.value

// Cache Break 1
4 changes: 3 additions & 1 deletion environment.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
name: mmlspark
channels:
- conda-forge
dependencies:
- python=3.6
- pyspark=2.4.3
- pyspark=3.0.1
- requests
- pip:
- wheel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,16 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tokenizeUDF = udf(word_tokenize, ArrayType(StringType()))\n",
"def safe_tokenize(sent):\n",
" try:\n",
" return word_tokenize(sent)\n",
" except LookupError:\n",
" prepNLTK(None)\n",
" return word_tokenize(sent)\n",
"\n",
"tokenizeUDF = udf(safe_tokenize, ArrayType(StringType()))\n",
"df = df.withColumn(\"tokens\",tokenizeUDF(\"sentence\"))\n",
"\n",
"countUDF = udf(len, IntegerType())\n",
Expand All @@ -203,9 +209,15 @@
"featurizeUDF = udf(featurize, ArrayType(FloatType()))\n",
"\n",
"df = df.withColumn(\"features\", featurizeUDF(\"tokens\")).cache()\n",
"safe_show(df, 3) # Can be flaky on build server\n",
"safe_show(df, 5) # Can be flaky on build server\n",
" \n"
]
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "markdown",
Expand Down Expand Up @@ -338,15 +350,6 @@
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"source": [],
"metadata": {
"collapsed": false
}
}
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@
"outputs": [],
"source": [
"from mmlspark.lightgbm import LightGBMRegressionModel\n",
"model.saveNativeModel(\"mymodel\")\n",
"model = LightGBMRegressionModel.loadNativeModelFromFile(\"mymodel\")"
"model.saveNativeModel(\"/mymodel\")\n",
"model = LightGBMRegressionModel.loadNativeModelFromFile(\"/mymodel\")"
]
},
{
Expand Down Expand Up @@ -175,4 +175,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
"testCat = DataConversion(cols=[\"Carrier\",\"DepTimeBlk\",\"ArrTimeBlk\"],\n",
" convertTo=\"toCategorical\") \\\n",
" .transform(test)\n",
"lr = LinearRegression().setSolver(\"l-bfgs\").setRegParam(0.1) \\\n",
"lr = LinearRegression().setRegParam(0.1) \\\n",
" .setElasticNetParam(0.3)\n",
"model = TrainRegressor(model=lr, labelCol=\"ArrDelay\").fit(trainCat)"
]
Expand Down Expand Up @@ -232,15 +232,6 @@
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"source": [],
"metadata": {
"collapsed": false
}
}
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ jobs:
chmod +x .codecov
echo "Starting Codecov Upload"
./.codecov -t $(codecov-token) -f coverage.xml
./.codecov -t $(codecov-token) -f target/scala-2.11/coverage-report/cobertura.xml
./.codecov -t $(codecov-token) -f target/scala-2.12/coverage-report/cobertura.xml
displayName: Upload Coverage Report To Codecov.io
condition: succeededOrFailed()
Expand Down
6 changes: 3 additions & 3 deletions src/main/python/mmlspark/cyber/feature/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

from typing import List

from mmlspark.cyber.utils.spark_utils import DataFrameUtils, ExplainBuilder
from mmlspark.cyber.utils.spark_utils import DataFrameUtils, ExplainBuilder, HasSetInputCol, HasSetOutputCol

from pyspark.ml import Estimator, Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from pyspark.sql import DataFrame, functions as f


class IdIndexerModel(Transformer, HasInputCol, HasOutputCol):
class IdIndexerModel(Transformer, HasSetInputCol, HasSetOutputCol):
partitionKey = Param(
Params._dummy(),
"partitionKey",
Expand Down Expand Up @@ -43,7 +43,7 @@ def _transform(self, df):
)


class IdIndexer(Estimator, HasInputCol, HasOutputCol):
class IdIndexer(Estimator, HasSetInputCol, HasSetOutputCol):
partitionKey = Param(
Params._dummy(),
"partitionKey",
Expand Down
6 changes: 3 additions & 3 deletions src/main/python/mmlspark/cyber/feature/scalers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Union

from mmlspark.cyber.utils.spark_utils import ExplainBuilder
from mmlspark.cyber.utils.spark_utils import ExplainBuilder, HasSetInputCol, HasSetOutputCol

from pyspark.ml import Estimator, Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
Expand All @@ -15,7 +15,7 @@ def _pyudf(func, use_pandas):
return pandas_udf(func, t.DoubleType(), PandasUDFType.SCALAR) if use_pandas else udf(func, t.DoubleType())


class PerPartitionScalarScalerModel(ABC, Transformer, HasInputCol, HasOutputCol):
class PerPartitionScalarScalerModel(ABC, Transformer, HasSetInputCol, HasSetOutputCol):
partitionKey = Param(
Params._dummy(),
"partitionKey",
Expand Down Expand Up @@ -83,7 +83,7 @@ def _transform(self, df: DataFrame) -> DataFrame:
return with_stats_df.withColumn(output_col, stats_method(f.col(input_col)))


class PerPartitionScalarScalerEstimator(ABC, Estimator, HasInputCol, HasOutputCol):
class PerPartitionScalarScalerEstimator(ABC, Estimator, HasSetInputCol, HasSetOutputCol):
partitionKey = Param(
Params._dummy(),
"partitionKey",
Expand Down
14 changes: 14 additions & 0 deletions src/main/python/mmlspark/cyber/utils/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,17 @@ def build(explainable: Any, **kwargs):

# noinspection PyProtectedMember
explainable._set(**kwargs)

class HasSetInputCol(HasInputCol):
def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
"""
return self.set(self.inputCol, value)

class HasSetOutputCol(HasOutputCol):
def setOutputCol(self, value):
"""
Sets the value of :py:attr:`outputCol`.
"""
return self.set(self.outputCol, value)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from mmlspark.recommendation._RankingTrainValidationSplit import _RankingTrainValidationSplit
from pyspark import keyword_only
from pyspark.ml.param import Params
from pyspark.ml.tuning import ValidatorParams
from pyspark.ml.tuning import _ValidatorParams
from pyspark.ml.util import *
from pyspark import keyword_only
from pyspark.ml.param import Params, Param, TypeConverters
Expand All @@ -20,7 +20,7 @@


@inherit_doc
class RankingTrainValidationSplit(Estimator, ValidatorParams):
class RankingTrainValidationSplit(Estimator, _ValidatorParams):
trainRatio = Param(Params._dummy(), "trainRatio", "Param for ratio between train and\
validation data. Must be between 0 and 1.", typeConverter=TypeConverters.toFloat)
userCol = Param(Params._dummy(), "userCol",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
basestring = str

from pyspark.ml.common import inherit_doc
from pyspark.ml.tuning import ValidatorParams
from pyspark.ml.tuning import _ValidatorParams
from pyspark.ml.util import *
from mmlspark.recommendation._RankingTrainValidationSplitModel import _RankingTrainValidationSplitModel
from pyspark.ml.wrapper import JavaParams
Expand All @@ -16,7 +16,7 @@

# Load information from java_stage to the instance.
@inherit_doc
class RankingTrainValidationSplitModel(_RankingTrainValidationSplitModel, ValidatorParams):
class RankingTrainValidationSplitModel(_RankingTrainValidationSplitModel, _ValidatorParams):

def __init__(self, bestModel=None, validationMetrics=[]):
super(RankingTrainValidationSplitModel, self).__init__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.microsoft.ml.spark.automl

import org.apache.spark.ml.param._

import scala.collection.{JavaConversions, mutable}
import scala.collection.{JavaConverters, mutable}
import scala.util.Random

abstract class RangeHyperParam[T](val min: T, val max: T, val seed: Long) extends Dist[T] {
Expand Down Expand Up @@ -77,7 +77,7 @@ object HyperParamUtils {
* @return A RangeHyperParam matched to the given type for min and max values.
*/
def getDiscreteHyperParam(values: java.util.ArrayList[_], seed: Long = 0): DiscreteHyperParam[_] = {
val valuesList = JavaConversions.asScalaBuffer(values).toList
val valuesList = JavaConverters.asScalaBuffer(values).toList
new DiscreteHyperParam(valuesList, seed)
}
}
Expand Down
Loading

0 comments on commit 6218a5b

Please sign in to comment.