Skip to content

Commit

Permalink
[SPARK-47598][CORE] MLLib: Migrate logError with variables to structu…
Browse files Browse the repository at this point in the history
…red logging framework

### What changes were proposed in this pull request?
The pr aims to migrate `logError` in module `MLLib` with variables to `structured logging framework`.

### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#45837 from panbingkun/SPARK-47598.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
panbingkun authored and gengliangwang committed Apr 4, 2024
1 parent e3405c1 commit 3fd0cd6
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object LogKey extends Enumeration {
val BLOCK_MANAGER_ID = Value
val BROADCAST_ID = Value
val BUCKET = Value
val CATEGORICAL_FEATURES = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
val COMMAND = Value
Expand All @@ -44,17 +45,22 @@ object LogKey extends Enumeration {
val EXIT_CODE = Value
val HOST = Value
val JOB_ID = Value
val LEARNING_RATE = Value
val LINE = Value
val LINE_NUM = Value
val MASTER_URL = Value
val MAX_ATTEMPTS = Value
val MAX_CATEGORIES = Value
val MAX_EXECUTOR_FAILURES = Value
val MAX_SIZE = Value
val MIN_SIZE = Value
val NUM_ITERATIONS = Value
val OLD_BLOCK_MANAGER_ID = Value
val OPTIMIZER_CLASS_NAME = Value
val PARTITION_ID = Value
val PATH = Value
val POD_ID = Value
val RANGE = Value
val REASON = Value
val REMOTE_ADDRESS = Value
val RETRY_COUNT = Value
Expand All @@ -63,6 +69,7 @@ object LogKey extends Enumeration {
val SIZE = Value
val STAGE_ID = Value
val SUBMISSION_ID = Value
val SUBSAMPLING_RATE = Value
val TASK_ATTEMPT_ID = Value
val TASK_ID = Value
val TASK_NAME = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ trait Logging {
}
}

private def withLogContext(context: java.util.HashMap[String, String])(body: => Unit): Unit = {
protected def withLogContext(context: java.util.HashMap[String, String])(body: => Unit): Unit = {
val threadContext = CloseableThreadContext.putAll(context)
try {
body
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{COUNT, RANGE}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.optim.aggregator._
Expand All @@ -36,6 +37,7 @@ import org.apache.spark.ml.stat._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DatasetUtils._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -220,10 +222,11 @@ class LinearSVC @Since("2.2.0") (
instr.logNumFeatures(numFeatures)

if (numInvalid != 0) {
val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
s"Found $numInvalid invalid labels."
val msg = log"Classification labels should be in " +
log"${MDC(RANGE, s"[0 to ${numClasses - 1}]")}. " +
log"Found ${MDC(COUNT, numInvalid)} invalid labels."
instr.logError(msg)
throw new SparkException(msg)
throw new SparkException(msg.message)
}

val featuresStd = summarizer.std.toArray
Expand All @@ -249,9 +252,7 @@ class LinearSVC @Since("2.2.0") (
regularization, optimizer)

if (rawCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
instr.logError(msg)
throw new SparkException(msg)
MLUtils.optimizerFailed(instr, optimizer.getClass)
}

val coefficientArray = Array.tabulate(numFeatures) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{COUNT, RANGE}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.impl.Utils
import org.apache.spark.ml.linalg._
Expand Down Expand Up @@ -530,10 +531,11 @@ class LogisticRegression @Since("1.2.0") (
}

if (numInvalid != 0) {
val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
s"Found $numInvalid invalid labels."
val msg = log"Classification labels should be in " +
log"${MDC(RANGE, s"[0 to ${numClasses - 1}]")}. " +
log"Found ${MDC(COUNT, numInvalid)} invalid labels."
instr.logError(msg)
throw new SparkException(msg)
throw new SparkException(msg.message)
}

instr.logNumClasses(numClasses)
Expand Down Expand Up @@ -634,9 +636,7 @@ class LogisticRegression @Since("1.2.0") (
initialSolution.toArray, regularization, optimizer)

if (allCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
instr.logError(msg)
throw new SparkException(msg)
MLUtils.optimizerFailed(instr, optimizer.getClass)
}

val allCoefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import breeze.linalg.{DenseVector => BDV}
import breeze.optimize.{CachedDiffFunction, LBFGS => BreezeLBFGS}
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.PredictorParams
Expand Down Expand Up @@ -271,9 +270,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
optimizer, initialSolution)

if (rawCoefficients == null) {
val msg = s"${optimizer.getClass.getName} failed."
instr.logError(msg)
throw new SparkException(msg)
MLUtils.optimizerFailed(instr, optimizer.getClass)
}

val coefficientArray = Array.tabulate(numFeatures) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import breeze.stats.distributions.Rand.FixedSeed.randBasis
import breeze.stats.distributions.StudentsT
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{PipelineStage, PredictorParams}
Expand Down Expand Up @@ -428,9 +427,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
featuresMean, featuresStd, initialSolution, regularization, optimizer)

if (parameters == null) {
val msg = s"${optimizer.getClass.getName} failed."
instr.logError(msg)
throw new SparkException(msg)
MLUtils.optimizerFailed(instr, optimizer.getClass)
}

val model = createModel(parameters, yMean, yStd, featuresMean, featuresStd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{LogEntry, Logging}
import org.apache.spark.ml.{MLEvents, PipelineStage}
import org.apache.spark.ml.param.{Param, Params}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -84,20 +84,53 @@ private[spark] class Instrumentation private () extends Logging with MLEvents {
super.logWarning(prefix + msg)
}

/**
* Logs a LogEntry which message with a prefix that uniquely identifies the training session.
*/
override def logWarning(entry: LogEntry): Unit = {
if (log.isWarnEnabled) {
withLogContext(entry.context) {
log.warn(prefix + entry.message)
}
}
}

/**
* Logs a error message with a prefix that uniquely identifies the training session.
*/
override def logError(msg: => String): Unit = {
super.logError(prefix + msg)
}

/**
* Logs a LogEntry which message with a prefix that uniquely identifies the training session.
*/
override def logError(entry: LogEntry): Unit = {
if (log.isErrorEnabled) {
withLogContext(entry.context) {
log.error(prefix + entry.message)
}
}
}

/**
* Logs an info message with a prefix that uniquely identifies the training session.
*/
override def logInfo(msg: => String): Unit = {
super.logInfo(prefix + msg)
}

/**
* Logs a LogEntry which message with a prefix that uniquely identifies the training session.
*/
override def logInfo(entry: LogEntry): Unit = {
if (log.isInfoEnabled) {
withLogContext(entry.context) {
log.info(prefix + entry.message)
}
}
}

/**
* Logs the value of the given parameters for the estimator being used in this session.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.mllib.util

import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{COUNT, RANGE}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

Expand All @@ -37,7 +38,8 @@ object DataValidators extends Logging {
val binaryLabelValidator: RDD[LabeledPoint] => Boolean = { data =>
val numInvalid = data.filter(x => x.label != 1.0 && x.label != 0.0).count()
if (numInvalid != 0) {
logError("Classification labels should be 0 or 1. Found " + numInvalid + " invalid labels")
logError(log"Classification labels should be 0 or 1. " +
log"Found ${MDC(COUNT, numInvalid)} invalid labels")
}
numInvalid == 0
}
Expand All @@ -53,8 +55,9 @@ object DataValidators extends Logging {
val numInvalid = data.filter(x =>
x.label - x.label.toInt != 0.0 || x.label < 0 || x.label > k - 1).count()
if (numInvalid != 0) {
logError("Classification labels should be in {0 to " + (k - 1) + "}. " +
"Found " + numInvalid + " invalid labels")
logError(log"Classification labels should be in " +
log"${MDC(RANGE, s"[0 to ${k - 1}]")}. " +
log"Found ${MDC(COUNT, numInvalid)} invalid labels")
}
numInvalid == 0
}
Expand Down
10 changes: 9 additions & 1 deletion mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import scala.reflect.ClassTag

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.OPTIMIZER_CLASS_NAME
import org.apache.spark.ml.linalg.{MatrixUDT => MLMatrixUDT, VectorUDT => MLVectorUDT}
import org.apache.spark.ml.util.Instrumentation
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.BLAS.dot
import org.apache.spark.mllib.regression.LabeledPoint
Expand Down Expand Up @@ -593,4 +595,10 @@ object MLUtils extends Logging {
math.log1p(math.exp(x))
}
}

def optimizerFailed(instr: Instrumentation, optimizerClass: Class[_]): Unit = {
val msg = log"${MDC(OPTIMIZER_CLASS_NAME, optimizerClass.getName)} failed."
instr.logError(msg)
throw new SparkException(msg.message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.ml.feature

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CATEGORICAL_FEATURES, MAX_CATEGORIES}
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
Expand Down Expand Up @@ -175,8 +176,10 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging {
maxCategories: Int,
categoricalFeatures: Set[Int]): Unit = {
val collectedData = data.collect().map(_.getAs[Vector](0))
val errMsg = s"checkCategoryMaps failed for input with maxCategories=$maxCategories," +
s" categoricalFeatures=${categoricalFeatures.mkString(", ")}"

val errMsg = log"checkCategoryMaps failed for input with " +
log"maxCategories=${MDC(MAX_CATEGORIES, maxCategories)} " +
log"categoricalFeatures=${MDC(CATEGORICAL_FEATURES, categoricalFeatures.mkString(", "))}"
try {
val vectorIndexer = getIndexer.setMaxCategories(maxCategories)
val model = vectorIndexer.fit(data)
Expand Down Expand Up @@ -210,8 +213,8 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging {
assert(attr.values.get === origValueSet.toArray.sorted.map(_.toString))
assert(attr.isOrdinal.get === false)
case _ =>
throw new RuntimeException(errMsg + s". Categorical feature $feature failed" +
s" metadata check. Found feature attribute: $featureAttr.")
throw new RuntimeException(errMsg.message + s". Categorical feature $feature " +
s"failed metadata check. Found feature attribute: $featureAttr.")
}
}
// Check numerical feature metadata.
Expand All @@ -222,8 +225,8 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging {
case attr: NumericAttribute =>
assert(featureAttr.index.get === feature)
case _ =>
throw new RuntimeException(errMsg + s". Numerical feature $feature failed" +
s" metadata check. Found feature attribute: $featureAttr.")
throw new RuntimeException(errMsg.message + s". Numerical feature $feature " +
s"failed metadata check. Found feature attribute: $featureAttr.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.mllib.tree

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.internal.LogKey.{LEARNING_RATE, NUM_ITERATIONS, SUBSAMPLING_RATE}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Strategy}
import org.apache.spark.mllib.tree.configuration.Algo._
Expand All @@ -33,6 +35,15 @@ import org.apache.spark.util.Utils
*/
class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext {

private def buildErrorLog(
numIterations: Int,
learningRate: Double,
subsamplingRate: Double): MessageWithContext = {
log"FAILED for numIterations=${MDC(NUM_ITERATIONS, numIterations)}, " +
log"learningRate=${MDC(LEARNING_RATE, learningRate)}, " +
log"subsamplingRate=${MDC(SUBSAMPLING_RATE, subsamplingRate)}"
}

test("Regression with continuous features: SquaredError") {
GradientBoostedTreesSuite.testCombinations.foreach {
case (numIterations, learningRate, subsamplingRate) =>
Expand All @@ -51,8 +62,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext
gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.06)
} catch {
case e: java.lang.AssertionError =>
logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
logError(buildErrorLog(numIterations, learningRate, subsamplingRate))
throw e
}

Expand Down Expand Up @@ -82,8 +92,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext
gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.85, "mae")
} catch {
case e: java.lang.AssertionError =>
logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
logError(buildErrorLog(numIterations, learningRate, subsamplingRate))
throw e
}

Expand Down Expand Up @@ -114,8 +123,7 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext
gbt, GradientBoostedTreesSuite.data.toImmutableArraySeq, 0.9)
} catch {
case e: java.lang.AssertionError =>
logError(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," +
s" subsamplingRate=$subsamplingRate")
logError(buildErrorLog(numIterations, learningRate, subsamplingRate))
throw e
}

Expand Down

0 comments on commit 3fd0cd6

Please sign in to comment.