Skip to content

Commit

Permalink
[SPARK-47574][INFRA] Introduce Structured Logging Framework
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Introduce Structured Logging Framework as per [SPIP: Structured Logging Framework for Apache Spark](https://docs.google.com/document/d/1rATVGmFLNVLmtxSpWrEceYm7d-ocgu8ofhryVs4g3XU/edit?usp=sharing) .
* The default logging output format will be json lines. For example
```
{
   "ts":"2023-03-12T12:02:46.661-0700",
   "level":"ERROR",
   "msg":"Cannot determine whether executor 289 is alive or not",
   "context":{
       "executor_id":"289"
   },
   "exception":{
      "class":"org.apache.spark.SparkException",
      "msg":"Exception thrown in awaitResult",
      "stackTrace":"..."
   },
   "source":"BlockManagerMasterEndpoint"
}
```
* Introduce a new configuration `spark.log.structuredLogging.enabled` to set the default log4j configuration. It is true by default. Users can disable it to get plain text log outputs.
* The change will start with the `logError` method. Example changes on the API:
from
```
logError(s"Cannot determine whether executor $executorId is alive or not.", e)
```
to
```
logError(log"Cannot determine whether executor ${MDC(EXECUTOR_ID, executorId)} is alive or not.", e)
```

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging. This transition will change the format of the default log output from plain text to JSON lines, making it more analyzable.

### Does this PR introduce _any_ user-facing change?

Yes, the default log output format will be json lines instead of plain text. User can restore the default plain text output when disabling configuration `spark.log.structuredLogging.enabled`.
If a user is a customized log4j configuration, there is no changes in the log output.

### How was this patch tested?

New Unit tests

### Was this patch authored or co-authored using generative AI tooling?

Yes, some of the code comments are from github copilot

Closes apache#45729 from gengliangwang/LogInterpolator.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
gengliangwang committed Mar 29, 2024
1 parent a8b247e commit 874d033
Show file tree
Hide file tree
Showing 14 changed files with 441 additions and 4 deletions.
4 changes: 4 additions & 0 deletions common/utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-layout-template-json</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
38 changes: 38 additions & 0 deletions common/utils/src/main/resources/org/apache/spark/SparkLayout.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"ts": {
"$resolver": "timestamp"
},
"level": {
"$resolver": "level",
"field": "name"
},
"msg": {
"$resolver": "message",
"stringified": true
},
"context": {
"$resolver": "mdc"
},
"exception": {
"class": {
"$resolver": "exception",
"field": "className"
},
"msg": {
"$resolver": "exception",
"field": "message",
"stringified": true
},
"stacktrace": {
"$resolver": "exception",
"field": "stackTrace",
"stackTrace": {
"stringified": true
}
}
},
"logger": {
"$resolver": "logger",
"field": "name"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ rootLogger.appenderRef.stdout.ref = console
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
appender.console.layout.type = JsonTemplateLayout
appender.console.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json

# Settings to quiet third party logs that are too verbose
logger.jetty.name = org.sparkproject.jetty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Settings to quiet third party logs that are too verbose
logger.jetty.name = org.sparkproject.jetty
logger.jetty.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.repl1.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.repl1.level = info
logger.repl2.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.repl2.level = info

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs
# in SparkSQL with Hive support
logger.metastore.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.metastore.level = fatal
logger.hive_functionregistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.hive_functionregistry.level = error

# Parquet related logging
logger.parquet.name = org.apache.parquet.CorruptStatistics
logger.parquet.level = error
logger.parquet2.name = parquet.CorruptStatistics
logger.parquet2.level = error
25 changes: 25 additions & 0 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal

/**
* Various keys used for mapped diagnostic contexts(MDC) in logging.
* All structured logging keys should be defined here for standardization.
*/
object LogKey extends Enumeration {
val EXECUTOR_ID = Value
}
105 changes: 103 additions & 2 deletions common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.internal

import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.{CloseableThreadContext, Level, LogManager}
import org.apache.logging.log4j.CloseableThreadContext.Instance
import org.apache.logging.log4j.core.{Filter, LifeCycle, LogEvent, Logger => Log4jLogger, LoggerContext}
import org.apache.logging.log4j.core.appender.ConsoleAppender
import org.apache.logging.log4j.core.config.DefaultConfiguration
Expand All @@ -29,6 +32,38 @@ import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.internal.Logging.SparkShellLoggingFilter
import org.apache.spark.util.SparkClassUtils

/**
* Mapped Diagnostic Context (MDC) that will be used in log messages.
* The values of the MDC will be inline in the log message, while the key-value pairs will be
* part of the ThreadContext.
*/
case class MDC(key: LogKey.Value, value: String)

/**
* Wrapper class for log messages that include a logging context.
* This is used as the return type of the string interpolator `LogStringContext`.
*/
case class MessageWithContext(message: String, context: Option[Instance])

/**
* Companion class for lazy evaluation of the MessageWithContext instance.
*/
class LogEntry(messageWithContext: => MessageWithContext) {
def message: String = messageWithContext.message

def context: Option[Instance] = messageWithContext.context
}

/**
* Companion object for the wrapper to enable implicit conversions
*/
object LogEntry {
import scala.language.implicitConversions

implicit def from(msgWithCtx: => MessageWithContext): LogEntry =
new LogEntry(msgWithCtx)
}

/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
Expand All @@ -55,6 +90,33 @@ trait Logging {
log_
}

implicit class LogStringContext(val sc: StringContext) {
def log(args: MDC*): MessageWithContext = {
val processedParts = sc.parts.iterator
val sb = new StringBuilder(processedParts.next())
lazy val map = new java.util.HashMap[String, String]()

args.foreach { mdc =>
sb.append(mdc.value)
if (Logging.isStructuredLoggingEnabled) {
map.put(mdc.key.toString.toLowerCase(Locale.ROOT), mdc.value)
}

if (processedParts.hasNext) {
sb.append(processedParts.next())
}
}

// Create a CloseableThreadContext and apply the context map
val closeableContext = if (Logging.isStructuredLoggingEnabled) {
Some(CloseableThreadContext.putAll(map))
} else {
None
}
MessageWithContext(sb.toString(), closeableContext)
}
}

// Log methods that take only a String
protected def logInfo(msg: => String): Unit = {
if (log.isInfoEnabled) log.info(msg)
Expand All @@ -76,6 +138,20 @@ trait Logging {
if (log.isErrorEnabled) log.error(msg)
}

protected def logError(entry: LogEntry): Unit = {
if (log.isErrorEnabled) {
log.error(entry.message)
entry.context.map(_.close())
}
}

protected def logError(entry: LogEntry, throwable: Throwable): Unit = {
if (log.isErrorEnabled) {
log.error(entry.message, throwable)
entry.context.map(_.close())
}
}

// Log methods that take Throwables (Exceptions/Errors) too
protected def logInfo(msg: => String, throwable: Throwable): Unit = {
if (log.isInfoEnabled) log.info(msg, throwable)
Expand Down Expand Up @@ -132,7 +208,11 @@ trait Logging {
// scalastyle:off println
if (Logging.islog4j2DefaultConfigured()) {
Logging.defaultSparkLog4jConfig = true
val defaultLogProps = "org/apache/spark/log4j2-defaults.properties"
val defaultLogProps = if (Logging.isStructuredLoggingEnabled) {
"org/apache/spark/log4j2-defaults.properties"
} else {
"org/apache/spark/log4j2-pattern-layout-defaults.properties"
}
Option(SparkClassUtils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
val context = LogManager.getContext(false).asInstanceOf[LoggerContext]
Expand Down Expand Up @@ -190,6 +270,7 @@ private[spark] object Logging {
@volatile private var initialized = false
@volatile private var defaultRootLevel: Level = null
@volatile private var defaultSparkLog4jConfig = false
@volatile private var structuredLoggingEnabled = true
@volatile private[spark] var sparkShellThresholdLevel: Level = null
@volatile private[spark] var setLogLevelPrinted: Boolean = false

Expand Down Expand Up @@ -259,6 +340,26 @@ private[spark] object Logging {
.getConfiguration.isInstanceOf[DefaultConfiguration])
}

/**
* Enable Structured logging framework.
*/
private[spark] def enableStructuredLogging(): Unit = {
structuredLoggingEnabled = true
}

/**
* Disable Structured logging framework.
*/
private[spark] def disableStructuredLogging(): Unit = {
structuredLoggingEnabled = false
}

/**
* Return true if Structured logging framework is enabled.
*/
private[spark] def isStructuredLoggingEnabled: Boolean = {
structuredLoggingEnabled
}

private[spark] class SparkShellLoggingFilter extends AbstractFilter {
private var status = LifeCycle.State.INITIALIZING
Expand Down
50 changes: 50 additions & 0 deletions common/utils/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

rootLogger.level = info
rootLogger.appenderRef.file.ref = ${sys:test.appender:-File}

appender.file.type = File
appender.file.name = File
appender.file.fileName = target/unit-tests.log
appender.file.layout.type = JsonTemplateLayout
appender.file.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json

# Structured Logging Appender
appender.structured.type = File
appender.structured.name = structured
appender.structured.fileName = target/structured.log
appender.structured.layout.type = JsonTemplateLayout
appender.structured.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json

# Pattern Logging Appender
appender.pattern.type = File
appender.pattern.name = pattern
appender.pattern.fileName = target/pattern.log
appender.pattern.layout.type = PatternLayout
appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Custom loggers
logger.structured.name = org.apache.spark.util.StructuredLoggingSuite
logger.structured.level = info
logger.structured.appenderRefs = structured
logger.structured.appenderRef.structured.ref = structured

logger.pattern.name = org.apache.spark.util.PatternLoggingSuite
logger.pattern.level = info
logger.pattern.appenderRefs = pattern
logger.pattern.appenderRef.pattern.ref = pattern
Loading

0 comments on commit 874d033

Please sign in to comment.