Skip to content

Commit

Permalink
MINOR: Update Scala to 2.13.3 (apache#8931)
Browse files Browse the repository at this point in the history
I had to fix several compiler errors due to deprecation of auto application of `()`. A related
Xlint config (`-Xlint:nullary-override`) is no longer valid in 2.13, so we now only enable it
for 2.12. The compiler flagged two new inliner warnings that required suppression and
the semantics of `&` in `@nowarn` annotations changed, requiring a small change in
one of the warning suppressions.

I also removed the deprecation of a number of methods in `KafkaZkClient` as
they should not have been deprecated in the first place since `KafkaZkClient` is an
internal class and we still use these methods in the Controller and so on. This
became visible because the Scala compiler now respects Java's `@Deprecated`
annotation.

Finally, I included a few minor clean-ups (eg using `toBuffer` instead `toList`) when fixing
the compilation warnings.

Noteworthy bug fixes in Scala 2.13.3:

* Fix 2.13-only bug in Java collection converters that caused some operations to perform an extra pass
* Fix 2.13.2 performance regression in Vector: restore special cases for small operands in appendedAll and prependedAll
* Increase laziness of #:: for LazyList
* Fixes related to annotation parsing of @deprecated from Java sources in mixed compilation

Full release notes:
https://github.com/scala/scala/releases/tag/v2.13.3

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
  • Loading branch information
ijuma committed Jun 26, 2020
1 parent 9c9a79b commit 7f90a58
Show file tree
Hide file tree
Showing 79 changed files with 382 additions and 405 deletions.
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ should_include_file() {
base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.2
SCALA_VERSION=2.13.3
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
Expand Down
2 changes: 1 addition & 1 deletion bin/windows/kafka-run-class.bat
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.2
set SCALA_VERSION=2.13.3
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ subprojects {
"-Xlint:delayedinit-select",
"-Xlint:doc-detached",
"-Xlint:missing-interpolator",
"-Xlint:nullary-override",
"-Xlint:nullary-unit",
"-Xlint:option-implicit",
"-Xlint:package-object-classes",
Expand Down Expand Up @@ -503,6 +502,7 @@ subprojects {
if (versions.baseScala == '2.12') {
scalaCompileOptions.additionalParameters += [
"-Xlint:by-name-right-associative",
"-Xlint:nullary-override",
"-Xlint:unsound-match"
]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object Kafka extends Logging {
}

// attach shutdown handler to catch terminating signals as well as normal termination
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown())

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ object ConfigCommand extends Config {
}
}

val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next)) else None))
val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next())) else None))
ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None)
}

Expand Down Expand Up @@ -711,12 +711,12 @@ object ConfigCommand extends Config {
(userDefaults, ConfigType.User),
(brokerDefaults, ConfigType.Broker))

private[admin] def entityTypes(): List[String] = {
private[admin] def entityTypes: List[String] = {
options.valuesOf(entityType).asScala.toList ++
(entityFlags ++ entityDefaultsFlags).filter(entity => options.has(entity._1)).map(_._2)
}

private[admin] def entityNames(): List[String] = {
private[admin] def entityNames: List[String] = {
val namesIterator = options.valuesOf(entityName).iterator
options.specs.asScala
.filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default"))
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import org.apache.kafka.common.requests.ListOffsetResponse
import org.apache.kafka.common.ConsumerGroupState
import joptsimple.OptionException

import scala.annotation.nowarn

object ConsumerGroupCommand extends Logging {

def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -566,6 +568,7 @@ object ConsumerGroupCommand extends Logging {
/**
* Returns the state of the specified consumer group and partition assignment states
*/
@nowarn("cat=optimizer")
def collectGroupOffsets(groupId: String): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
collectGroupsOffsets(List(groupId)).getOrElse(groupId, (None, None))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case Some(partitions) =>
partitions.map(_.topic).toSet
case None =>
zkClient.getAllPartitions().map(_.topic)
zkClient.getAllPartitions.map(_.topic)
}

val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) =>
Expand All @@ -190,7 +190,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case Some(partitions) =>
partitions.partition(partitionsFromZk.contains)
case None =>
(zkClient.getAllPartitions(), Set.empty)
(zkClient.getAllPartitions, Set.empty)
}
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ object ReassignPartitionsCommand extends Logging {
// Check for the presence of the legacy partition reassignment ZNode. This actually
// won't detect all rebalances... only ones initiated by the legacy method.
// This is a limitation of the legacy ZK API.
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress()
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
if (reassignPartitionsInProgress) {
// Note: older versions of this tool would modify the broker quotas here (but not
// topic quotas, for some reason). This behavior wasn't documented in the --execute
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,16 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
}

private def setAclIndividually(path: String): Unit = {
val setPromise = Promise[String]
val setPromise = Promise[String]()
futures.synchronized {
futures += setPromise.future
}
setAcl(path, setPromise)
}

private def setAclsRecursively(path: String): Unit = {
val setPromise = Promise[String]
val childrenPromise = Promise[String]
val setPromise = Promise[String]()
val childrenPromise = Promise[String]()
futures.synchronized {
futures += setPromise.future
futures += childrenPromise.future
Expand Down Expand Up @@ -279,15 +279,15 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
future match {
case Some(a) =>
Await.result(a, 6000 millis)
futures.synchronized { futures.dequeue }
recurse
futures.synchronized { futures.dequeue() }
recurse()
case None =>
}
}
recurse()

} finally {
zkClient.close
zkClient.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
}

class ChangeNotification {
def process(): Unit = processNotifications
def process(): Unit = processNotifications()
}

/**
Expand All @@ -143,17 +143,17 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong

class ChangeEventProcessThread(name: String) extends ShutdownableThread(name = name) {
override def doWork(): Unit = queue.take().process
override def doWork(): Unit = queue.take().process()
}

object ChangeNotificationHandler extends ZNodeChildChangeHandler {
override val path: String = seqNodeRoot
override def handleChildChange(): Unit = addChangeNotification
override def handleChildChange(): Unit = addChangeNotification()
}

object ZkStateChangeHandler extends StateChangeHandler {
override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
override def afterInitializingSession(): Unit = addChangeNotification
override def afterInitializingSession(): Unit = addChangeNotification()
}
}

18 changes: 6 additions & 12 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,13 @@ class ControllerContext {
partitionLeadershipInfo.get(partition)
}

def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = {
def partitionsLeadershipInfo: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
partitionLeadershipInfo
}

def partitionsWithLeaders(): Set[TopicPartition] = {
partitionLeadershipInfo.keys.filter(tp => !isTopicQueuedUpForDeletion(tp.topic)).toSet
}
def partitionsWithLeaders: Set[TopicPartition] =
partitionLeadershipInfo.keySet.filter(tp => !isTopicQueuedUpForDeletion(tp.topic))

def partitionsWithOfflineLeader(): Set[TopicPartition] = {
def partitionsWithOfflineLeader: Set[TopicPartition] = {
partitionLeadershipInfo.filter { case (topicPartition, leaderIsrAndControllerEpoch) =>
!isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, topicPartition) &&
!isTopicQueuedUpForDeletion(topicPartition.topic)
Expand All @@ -439,13 +437,9 @@ class ControllerContext {
}.keySet
}

def clearPartitionLeadershipInfo(): Unit = {
partitionLeadershipInfo.clear()
}
def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear()

def partitionWithLeadersCount(): Int = {
partitionLeadershipInfo.size
}
def partitionWithLeadersCount: Int = partitionLeadershipInfo.size

private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
oldReplicaAssignment: Option[ReplicaAssignment],
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class KafkaController(val config: KafkaConfig,
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = () => tokenManager.expireTokens,
fun = () => tokenManager.expireTokens(),
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
Expand Down Expand Up @@ -439,7 +439,7 @@ class KafkaController(val config: KafkaConfig,
val (newOfflineReplicasForDeletion, newOfflineReplicasNotForDeletion) =
newOfflineReplicas.partition(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))

val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader()
val partitionsWithOfflineLeader = controllerContext.partitionsWithOfflineLeader

// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, OfflinePartition)
Expand Down Expand Up @@ -931,10 +931,10 @@ class KafkaController(val config: KafkaConfig,
* @param shouldRemoveReassignment Predicate indicating which partition reassignments should be removed
*/
private def maybeRemoveFromZkReassignment(shouldRemoveReassignment: (TopicPartition, Seq[Int]) => Boolean): Unit = {
if (!zkClient.reassignPartitionsInProgress())
if (!zkClient.reassignPartitionsInProgress)
return

val reassigningPartitions = zkClient.getPartitionReassignment()
val reassigningPartitions = zkClient.getPartitionReassignment
val (removingPartitions, updatedPartitionsBeingReassigned) = reassigningPartitions.partition { case (tp, replicas) =>
shouldRemoveReassignment(tp, replicas)
}
Expand Down Expand Up @@ -1516,7 +1516,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

zkClient.getPartitionReassignment().foreach { case (tp, targetReplicas) =>
zkClient.getPartitionReassignment.foreach { case (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class GroupMetadataManager(brokerId: Int,
scheduler.startup()
if (enableMetadataExpiration) {
scheduler.schedule(name = "delete-expired-group-metadata",
fun = () => cleanupGroupMetadata,
fun = () => cleanupGroupMetadata(),
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
Expand Down Expand Up @@ -752,7 +752,7 @@ class GroupMetadataManager(brokerId: Int,
onGroupUnloaded: GroupMetadata => Unit): Unit = {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets)
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets())

def removeGroupsAndOffsets(): Unit = {
var numOffsetsRemoved = 0
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ class Log(@volatile private var _dir: File,
var truncated = false

while (unflushed.hasNext && !truncated) {
val segment = unflushed.next
val segment = unflushed.next()
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
Expand Down Expand Up @@ -2270,7 +2270,7 @@ class Log(@volatile private var _dir: File,

if (asyncDelete) {
info(s"Scheduling segments for deletion ${segments.mkString(",")}")
scheduler.schedule("delete-file", () => deleteSegments, delay = config.fileDeleteDelayMs)
scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
} else {
deleteSegments()
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class LogManager(logDirs: Seq[File],

var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory $dir", e)
Expand All @@ -327,7 +327,7 @@ class LogManager(logDirs: Seq[File],

var logStartOffsets = Map[TopicPartition, Long]()
try {
logStartOffsets = this.logStartOffsetCheckpoints(dir).read
logStartOffsets = this.logStartOffsetCheckpoints(dir).read()
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory $dir", e)
Expand Down Expand Up @@ -1039,7 +1039,7 @@ class LogManager(logDirs: Seq[File],
debug(s"Checking if flush is needed on ${topicPartition.topic} flush interval ${log.config.flushMs}" +
s" last flushed ${log.lastFlushTime} time since last flush: $timeSinceLastFlush")
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
log.flush()
} catch {
case e: Throwable =>
error(s"Error flushing topic ${topicPartition.topic}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object AclAuthorizer {
def find(p: AclEntry => Boolean): Option[AclEntry] = {
// Lazily iterate through the inner `Seq` elements and stop as soon as we find a match
val it = seqs.iterator.flatMap(_.find(p))
if (it.hasNext) Some(it.next)
if (it.hasNext) Some(it.next())
else None
}

Expand Down Expand Up @@ -367,7 +367,8 @@ class AclAuthorizer extends Authorizer with Logging {
} else false
}

@nowarn("cat=deprecation&cat=optimizer")
@nowarn("cat=deprecation")
@nowarn("cat=optimizer")
private def matchingAcls(resourceType: ResourceType, resourceName: String): AclSeqs = {
// this code is performance sensitive, make sure to run AclAuthorizerBenchmark after any changes

Expand Down Expand Up @@ -523,7 +524,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
}

if(!writeComplete)
if (!writeComplete)
throw new IllegalStateException(s"Failed to update ACLs for $resource after trying a maximum of $maxUpdateRetries times")

if (newVersionedAcls.acls != currentVersionedAcls.acls) {
Expand All @@ -538,6 +539,7 @@ class AclAuthorizer extends Authorizer with Logging {
}
}

@nowarn("cat=optimizer")
private def getAclsFromCache(resource: ResourcePattern): VersionedAcls = {
aclCache.getOrElse(resource, throw new IllegalArgumentException(s"ACLs do not exist in the cache for resource $resource"))
}
Expand All @@ -548,9 +550,9 @@ class AclAuthorizer extends Authorizer with Logging {

private def updateCache(resource: ResourcePattern, versionedAcls: VersionedAcls): Unit = {
if (versionedAcls.acls.nonEmpty) {
aclCache = aclCache + (resource -> versionedAcls)
aclCache = aclCache.updated(resource, versionedAcls)
} else {
aclCache = aclCache - resource
aclCache -= resource
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ abstract class AbstractFetcherThread(name: String,
} finally partitionMapLock.unlock()
}

def partitionCount(): Int = {
def partitionCount: Int = {
partitionMapLock.lockInterruptibly()
try partitionStates.size
finally partitionMapLock.unlock()
Expand Down

0 comments on commit 7f90a58

Please sign in to comment.