forked from apache/kafka
/
UnifiedLog.scala
2220 lines (1957 loc) · 105 KB
/
UnifiedLog.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DescribeProducersResponseData
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET
import org.apache.kafka.common.requests.ProduceResponse.RecordError
import org.apache.kafka.common.utils.{PrimitiveRef, Time, Utils}
import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}
import java.io.{File, IOException}
import java.nio.file.Files
import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
/**
* A log which presents a unified view of local and tiered log segments.
*
* The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an
* overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments
* are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local
* segments including the active segment.
*
* NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered
* and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance.
*
* @param logStartOffset The earliest offset allowed to be exposed to kafka client.
* The logStartOffset can be updated by :
* - user's DeleteRecordsRequest
* - broker's log retention
* - broker's log truncation
* - broker's log recovery
* The logStartOffset is used to decide the following:
* - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
* It may trigger log rolling if the active segment is deleted.
* - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
* we make sure that logStartOffset <= log's highWatermark
* Other activities such as log cleaning are not affected by logStartOffset.
* @param localLog The LocalLog instance containing non-empty log segments recovered from disk
* @param brokerTopicStats Container for Broker Topic Yammer Metrics
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
* @param leaderEpochCache The LeaderEpochFileCache instance (if any) containing state associated
* with the provided logStartOffset and nextOffsetMetadata
* @param producerStateManager The ProducerStateManager instance containing state associated with the provided segments
* @param _topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when
* first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log,
* this field will be populated by reading the topic ID value from partition.metadata if it exists.
* @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the
* log directory. A partition.metadata file is only created when the raft controller is used
* or the ZK controller and this broker's inter-broker protocol version is at least 2.8.
* This file will persist the topic ID on the broker. If inter-broker protocol for a ZK controller
* is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade.
* If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata
* will be deleted to avoid ID conflicts upon re-upgrade.
* @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not.
*/
@threadsafe
class UnifiedLog(@volatile var logStartOffset: Long,
private val localLog: LocalLog,
val brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
val producerStateManager: ProducerStateManager,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean,
val remoteStorageSystemEnable: Boolean = false,
@volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
import kafka.log.UnifiedLog._
private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
// For compatibility, metrics are defined to be under `Log` class
override def metricName(name: String, tags: util.Map[String, String]): MetricName = {
KafkaMetricsGroup.explicitMetricName(getClass.getPackage.getName, "Log", name, tags)
}
}
this.logIdent = s"[UnifiedLog partition=$topicPartition, dir=$parentDir] "
/* A lock that guards all modifications to the log */
private val lock = new Object
private val validatorMetricsRecorder = newValidatorMetricsRecorder(brokerTopicStats.allTopicsStats)
/* The earliest offset which is part of an incomplete transaction. This is used to compute the
* last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset
* gets removed from the log (through record or segment deletion). In this case, the first unstable offset
* will point to the log start offset, which may actually be either part of a completed transaction or not
* part of a transaction at all. However, since we only use the LSO for the purpose of restricting the
* read_committed consumer to fetching decided data (i.e. committed, aborted, or non-transactional), this
* temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets
* of each ongoing transaction in order to compute a new first unstable offset. It is possible, however,
* that this could result in disagreement between replicas depending on when they began replicating the log.
* In the worst case, the LSO could be seen by a consumer to go backwards.
*/
@volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None
/* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are
* not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark
* equals the log end offset (which may never happen for a partition under consistent load). This is needed to
* prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark.
*/
@volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(logStartOffset)
@volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
@volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
def localLogStartOffset(): Long = _localLogStartOffset
@volatile private var highestOffsetInRemoteStorage: Long = -1L
locally {
initializePartitionMetadata()
updateLogStartOffset(logStartOffset)
maybeIncrementFirstUnstableOffset()
initializeTopicId()
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
}
def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
logOffsetsListener = listener
}
def remoteLogEnabled(): Boolean = {
// Remote log is enabled only for non-compact and non-internal topics
remoteStorageSystemEnable &&
!(config.compact || Topic.isInternal(topicPartition.topic())
|| TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topicPartition.topic())
|| Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topicPartition.topic())) &&
config.remoteLogConfig.remoteStorageEnable
}
/**
* Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId.
* Delete partition metadata file if the version does not support topic IDs.
* Set _topicId based on a few scenarios:
* - Recover topic ID if present and topic IDs are supported. Ensure we do not try to assign a provided topicId that is inconsistent
* with the ID on file.
* - If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
* set _topicId and write to the partition metadata file.
* - Otherwise set _topicId to None
*/
def initializeTopicId(): Unit = {
val partMetadataFile = partitionMetadataFile.getOrElse(
throw new KafkaException("The partitionMetadataFile should have been initialized"))
if (partMetadataFile.exists()) {
if (keepPartitionMetadataFile) {
val fileTopicId = partMetadataFile.read().topicId
if (_topicId.isDefined && !_topicId.contains(fileTopicId))
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID $fileTopicId")
_topicId = Some(fileTopicId)
} else {
try partMetadataFile.delete()
catch {
case e: IOException =>
error(s"Error while trying to delete partition metadata file $partMetadataFile", e)
}
}
} else if (keepPartitionMetadataFile) {
_topicId.foreach(partMetadataFile.record)
scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile())
} else {
// We want to keep the file and the in-memory topic ID in sync.
_topicId = None
}
}
def topicId: Option[Uuid] = _topicId
def dir: File = localLog.dir
def parentDir: String = localLog.parentDir
def parentDirFile: File = localLog.parentDirFile
def name: String = localLog.name
def recoveryPoint: Long = localLog.recoveryPoint
def topicPartition: TopicPartition = localLog.topicPartition
def time: Time = localLog.time
def scheduler: Scheduler = localLog.scheduler
def config: LogConfig = localLog.config
def logDirFailureChannel: LogDirFailureChannel = localLog.logDirFailureChannel
def updateConfig(newConfig: LogConfig): LogConfig = {
val oldConfig = localLog.config
localLog.updateConfig(newConfig)
val oldRecordVersion = oldConfig.recordVersion
val newRecordVersion = newConfig.recordVersion
if (newRecordVersion != oldRecordVersion)
initializeLeaderEpochCache()
oldConfig
}
def highWatermark: Long = highWatermarkMetadata.messageOffset
/**
* Update the high watermark to a new offset. The new high watermark will be lower
* bounded by the log start offset and upper bounded by the log end offset.
*
* This is intended to be called by the leader when initializing the high watermark.
*
* @param hw the suggested new value for the high watermark
* @return the updated high watermark offset
*/
def updateHighWatermark(hw: Long): Long = {
updateHighWatermark(new LogOffsetMetadata(hw))
}
/**
* Update high watermark with offset metadata. The new high watermark will be lower
* bounded by the log start offset and upper bounded by the log end offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset metadata
* @return the updated high watermark offset
*/
def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
val endOffsetMetadata = localLog.logEndOffsetMetadata
val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) {
new LogOffsetMetadata(logStartOffset)
} else if (highWatermarkMetadata.messageOffset >= endOffsetMetadata.messageOffset) {
endOffsetMetadata
} else {
highWatermarkMetadata
}
updateHighWatermarkMetadata(newHighWatermarkMetadata)
newHighWatermarkMetadata.messageOffset
}
/**
* Update the high watermark to a new value if and only if it is larger than the old value. It is
* an error to update to a value which is larger than the log end offset.
*
* This method is intended to be used by the leader to update the high watermark after follower
* fetch offsets have been updated.
*
* @return the old high watermark, if updated by the new value
*/
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
if (newHighWatermark.messageOffset > logEndOffset)
throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
s"log end offset ${localLog.logEndOffsetMetadata}")
lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata
// Ensure that the high watermark increases monotonically. We also update the high watermark when the new
// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark)
} else {
None
}
}
}
/**
* Update high watermark with a new value. The new high watermark will be lower
* bounded by the log start offset and upper bounded by the log end offset.
*
* This method is intended to be used by the follower to update its high watermark after
* replication from the leader.
*
* @return the new high watermark if the high watermark changed, None otherwise.
*/
def maybeUpdateHighWatermark(hw: Long): Option[Long] = {
lock.synchronized {
val oldHighWatermark = highWatermarkMetadata
updateHighWatermark(new LogOffsetMetadata(hw)) match {
case oldHighWatermark.messageOffset =>
None
case newHighWatermark =>
Some(newHighWatermark)
}
}
}
/**
* Get the offset and metadata for the current high watermark. If offset metadata is not
* known, this will do a lookup in the index and cache the result.
*/
private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
localLog.checkIfMemoryMappedBufferClosed()
val offsetMetadata = highWatermarkMetadata
if (offsetMetadata.messageOffsetOnly) {
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
updateHighWatermarkMetadata(fullOffset)
fullOffset
}
} else {
offsetMetadata
}
}
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset < 0)
throw new IllegalArgumentException("High watermark offset should be non-negative")
lock synchronized {
if (newHighWatermark.messageOffset < highWatermarkMetadata.messageOffset) {
warn(s"Non-monotonic update of high watermark from $highWatermarkMetadata to $newHighWatermark")
}
highWatermarkMetadata = newHighWatermark
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
logOffsetsListener.onHighWatermarkUpdated(newHighWatermark.messageOffset)
maybeIncrementFirstUnstableOffset()
}
trace(s"Setting high watermark $newHighWatermark")
}
/**
* Get the first unstable offset. Unlike the last stable offset, which is always defined,
* the first unstable offset only exists if there are transactions in progress.
*
* @return the first unstable offset, if it exists
*/
private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset)
private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
localLog.checkIfMemoryMappedBufferClosed()
// cache the current high watermark to avoid a concurrent update invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata
firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
lock synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
}
} else {
offsetMetadata
}
case _ => highWatermarkMetadata
}
}
/**
* The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
* Non-transactional messages are considered decided immediately, but transactional messages are only decided when
* the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
* to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
* beyond the high watermark.
*/
def lastStableOffset: Long = {
firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark => offsetMetadata.messageOffset
case _ => highWatermark
}
}
def lastStableOffsetLag: Long = highWatermark - lastStableOffset
/**
* Fully materialize and return an offset snapshot including segment position info. This method will update
* the LogOffsetMetadata for the high watermark and last stable offset if they are message-only. Throws an
* offset out of range error if the segment info cannot be loaded.
*/
def fetchOffsetSnapshot: LogOffsetSnapshot = {
val lastStable = fetchLastStableOffsetMetadata
val highWatermark = fetchHighWatermarkMetadata
new LogOffsetSnapshot(
logStartOffset,
localLog.logEndOffsetMetadata,
highWatermark,
lastStable
)
}
private var metricNames: Map[String, java.util.Map[String, String]] = Map.empty
newMetrics()
private[log] def newMetrics(): Unit = {
val tags = (Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++
(if (isFuture) Map("is-future" -> "true") else Map.empty)).asJava
metricsGroup.newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
metricsGroup.newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
metricsGroup.newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
metricsGroup.newGauge(LogMetricNames.Size, () => size, tags)
metricNames = Map(LogMetricNames.NumLogSegments -> tags,
LogMetricNames.LogStartOffset -> tags,
LogMetricNames.LogEndOffset -> tags,
LogMetricNames.Size -> tags)
}
val producerExpireCheck = scheduler.schedule("PeriodicProducerExpirationCheck", () => removeExpiredProducers(time.milliseconds),
producerIdExpirationCheckIntervalMs, producerIdExpirationCheckIntervalMs)
// Visible for testing
def removeExpiredProducers(currentTimeMs: Long): Unit = {
lock synchronized {
producerStateManager.removeExpiredProducers(currentTimeMs)
}
}
def loadProducerState(lastOffset: Long): Unit = lock synchronized {
rebuildProducerState(lastOffset, producerStateManager)
maybeIncrementFirstUnstableOffset()
updateHighWatermark(localLog.logEndOffsetMetadata)
}
private def recordVersion: RecordVersion = config.recordVersion
private def initializePartitionMetadata(): Unit = lock synchronized {
val partitionMetadata = PartitionMetadataFile.newFile(dir)
partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel))
}
private def maybeFlushMetadataFile(): Unit = {
partitionMetadataFile.foreach(_.maybeFlush())
}
/** Only used for ZK clusters when we update and start using topic IDs on existing topics */
def assignTopicId(topicId: Uuid): Unit = {
_topicId match {
case Some(currentId) =>
if (!currentId.equals(topicId)) {
throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
s"but log already contained topic ID $currentId")
}
case None =>
if (keepPartitionMetadataFile) {
_topicId = Some(topicId)
partitionMetadataFile match {
case Some(partMetadataFile) =>
if (!partMetadataFile.exists()) {
partMetadataFile.record(topicId)
scheduler.scheduleOnce("flush-metadata-file", () => maybeFlushMetadataFile())
}
case _ => warn(s"The topic id $topicId will not be persisted to the partition metadata file " +
"since the partition is deleted")
}
}
}
}
private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
}
private def updateHighWatermarkWithLogEndOffset(): Unit = {
// Update the high watermark in case it has gotten ahead of the log end offset following a truncation
// or if a new segment has been rolled and the offset metadata needs to be updated.
if (highWatermark >= localLog.logEndOffset) {
updateHighWatermarkMetadata(localLog.logEndOffsetMetadata)
}
}
private def updateLogStartOffset(offset: Long): Unit = {
logStartOffset = offset
if (highWatermark < offset) {
updateHighWatermark(offset)
}
if (localLog.recoveryPoint < offset) {
localLog.updateRecoveryPoint(offset)
}
}
def updateHighestOffsetInRemoteStorage(offset: Long): Unit = {
if (!remoteLogEnabled())
warn(s"Unable to update the highest offset in remote storage with offset $offset since remote storage is not enabled. The existing highest offset is $highestOffsetInRemoteStorage.")
else if (offset > highestOffsetInRemoteStorage) highestOffsetInRemoteStorage = offset
}
// Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be
// free of all side-effects, i.e. it must not update any log-specific state.
private def rebuildProducerState(lastOffset: Long,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
localLog.checkIfMemoryMappedBufferClosed()
UnifiedLog.rebuildProducerState(producerStateManager, localLog.segments, logStartOffset, lastOffset, recordVersion, time,
reloadFromCleanShutdown = false, logIdent)
}
@threadsafe
def hasLateTransaction(currentTimeMs: Long): Boolean = {
producerStateManager.hasLateTransaction(currentTimeMs)
}
@threadsafe
def producerIdCount: Int = producerStateManager.producerIdCount
def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = {
lock synchronized {
producerStateManager.activeProducers.asScala.map { case (producerId, state) =>
new DescribeProducersResponseData.ProducerState()
.setProducerId(producerId)
.setProducerEpoch(state.producerEpoch)
.setLastSequence(state.lastSeq)
.setLastTimestamp(state.lastTimestamp)
.setCoordinatorEpoch(state.coordinatorEpoch)
.setCurrentTxnStartOffset(state.currentTxnFirstOffset.orElse(-1L))
}
}.toSeq
}
private[log] def activeProducersWithLastSequence: mutable.Map[Long, Int] = lock synchronized {
val result = mutable.Map[Long, Int]()
producerStateManager.activeProducers.forEach { case (producerId, producerIdEntry) =>
result.put(producerId.toLong, producerIdEntry.lastSeq)
}
result
}
private[log] def lastRecordsOfActiveProducers: mutable.Map[Long, LastRecord] = lock synchronized {
val result = mutable.Map[Long, LastRecord]()
producerStateManager.activeProducers.forEach { case (producerId, producerIdEntry) =>
val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0) Some(producerIdEntry.lastDataOffset) else None
val lastRecord = new LastRecord(
if (lastDataOffset.isEmpty) OptionalLong.empty() else OptionalLong.of(lastDataOffset.get),
producerIdEntry.producerEpoch)
result.put(producerId.toLong, lastRecord)
}
result
}
/**
* Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return null.
*/
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = lock synchronized {
if (hasOngoingTransaction(producerId))
null
else
maybeCreateVerificationGuard(producerId, sequence, epoch)
}
/**
* Maybe create the VerificationStateEntry for the given producer ID -- always return the verification guard
*/
def maybeCreateVerificationGuard(producerId: Long,
sequence: Int,
epoch: Short): Object = lock synchronized {
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
}
/**
* If an VerificationStateEntry is present for the given producer ID, return its verification guard, otherwise, return null.
*/
def verificationGuard(producerId: Long): Object = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId)
if (entry != null) entry.verificationGuard else null
}
/**
* Return true if the given producer ID has a transaction ongoing.
*/
def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
val entry = producerStateManager.activeProducers.get(producerId)
entry != null && entry.currentTxnFirstOffset.isPresent
}
/**
* The number of segments in the log.
* Take care! this is an O(n) operation.
*/
def numberOfSegments: Int = localLog.segments.numberOfSegments
/**
* Close this log.
* The memory mapped buffer for index files of this log will be left open until the log is deleted.
*/
def close(): Unit = {
debug("Closing log")
lock synchronized {
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER
maybeFlushMetadataFile()
localLog.checkIfMemoryMappedBufferClosed()
producerExpireCheck.cancel(true)
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
// We take a snapshot at the last written offset to hopefully avoid the need to scan the log
// after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
// (the clean shutdown file is written after the logs are all closed).
producerStateManager.takeSnapshot()
}
localLog.close()
}
}
/**
* Rename the directory of the local log. If the log's directory is being renamed for async deletion due to a
* StopReplica request, then the shouldReinitialize parameter should be set to false, otherwise it should be set to true.
*
* @param name The new name that this log's directory is being renamed to
* @param shouldReinitialize Whether the log's metadata should be reinitialized after renaming
* @throws KafkaStorageException if rename fails
*/
def renameDir(name: String, shouldReinitialize: Boolean): Unit = {
lock synchronized {
maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") {
// Flush partitionMetadata file before initializing again
maybeFlushMetadataFile()
if (localLog.renameDir(name)) {
producerStateManager.updateParentDir(dir)
if (shouldReinitialize) {
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
// the checkpoint file in renamed log directory
initializeLeaderEpochCache()
initializePartitionMetadata()
} else {
leaderEpochCache = None
partitionMetadataFile = None
}
}
}
}
}
/**
* Close file handlers used by this log but don't write to disk. This is called if the log directory is offline
*/
def closeHandlers(): Unit = {
debug("Closing handlers")
lock synchronized {
localLog.closeHandlers()
}
}
/**
* Append this message set to the active segment of the local log, assigning offsets and Partition Leader Epochs
*
* @param records The records to append
* @param origin Declares the origin of the append which affects required validations
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param requestLocal request local instance
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
def appendAsLeader(records: MemoryRecords,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.CLIENT,
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
requestLocal: RequestLocal = RequestLocal.NoCaching,
verificationGuard: Object = null): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
}
/**
* Append this message set to the active segment of the local log without assigning offsets or Partition Leader Epochs
*
* @param records The records to append
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
append(records,
origin = AppendOrigin.REPLICATION,
interBrokerProtocolVersion = MetadataVersion.latest,
validateAndAssignOffsets = false,
leaderEpoch = -1,
requestLocal = None,
verificationGuard = null,
// disable to check the validation of record size since the record is already accepted by leader.
ignoreRecordSize = true)
}
/**
* Append this message set to the active segment of the local log, rolling over to a fresh segment if necessary.
*
* This method will generally be responsible for assigning offsets to the messages,
* however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
*
* @param records The log records to append
* @param origin Declares the origin of the append which affects required validations
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param validateAndAssignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
* @param requestLocal The request local instance if validateAndAssignOffsets is true
* @param ignoreRecordSize true to skip validation of record size.
* @throws KafkaStorageException If the append fails due to an I/O error.
* @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
* @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
* @return Information about the appended messages including the first and last offset.
*/
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: MetadataVersion,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
verificationGuard: Object,
ignoreRecordSize: Boolean): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
maybeFlushMetadataFile()
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
// return if we have no valid messages or if this is a duplicate of the last appended entry
if (appendInfo.shallowCount == 0) appendInfo
else {
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validRecords = trimInvalidBytes(records, appendInfo)
// they are valid, insert them in the log
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
localLog.checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = PrimitiveRef.ofLong(localLog.logEndOffset)
appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(offset.value)))
val validateAndOffsetAssignResult = try {
val validator = new LogValidator(validRecords,
topicPartition,
time,
appendInfo.sourceCompression,
appendInfo.targetCompression,
config.compact,
config.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion
)
validator.validateMessagesAndAssignOffsets(offset,
validatorMetricsRecorder,
requestLocal.getOrElse(throw new IllegalArgumentException(
"requestLocal should be defined if assignOffsets is true")
).bufferSupplier
)
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs)
appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordConversionStats(validateAndOffsetAssignResult.recordConversionStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs)
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
validRecords.batches.forEach { batch =>
if (batch.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic)
throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
records.records.asScala.map(_.offset))
if (appendInfo.firstOrLastOffsetOfFirstBatch < localLog.logEndOffset) {
// we may still be able to recover if the log is empty
// one example: fetching from log start offset on the leader which is not batch aligned,
// which may happen as a result of AdminClient#deleteRecords()
val firstOffset = appendInfo.firstOffset.map[Long](x => x.messageOffset)
.orElse(records.batches.iterator().next().baseOffset())
val firstOrLast = if (appendInfo.firstOffset.isPresent) "First offset" else "Last offset of the first batch"
throw new UnexpectedAppendOffsetException(
s"Unexpected offset in append to $topicPartition. $firstOrLast " +
s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${localLog.logEndOffset}. " +
s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
firstOffset, appendInfo.lastOffset)
}
}
// update the epoch cache with the epoch stamped onto the message by the leader
validRecords.batches.forEach { batch =>
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
} else {
// In partial upgrade scenarios, we may get a temporary regression to the message format. In
// order to ensure the safety of leader election, we clear the epoch cache so that we revert
// to truncation by high watermark after the next leader election.
leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
cache.clearAndFlush()
}
}
}
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// maybe roll the log if this segment is full
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = new LogOffsetMetadata(
appendInfo.firstOrLastOffsetOfFirstBatch,
segment.baseOffset,
segment.size)
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, origin, verificationGuard)
maybeDuplicate match {
case Some(duplicate) =>
appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset)))
appendInfo.setLastOffset(duplicate.lastOffset)
appendInfo.setLogAppendTime(duplicate.timestamp)
appendInfo.setLogStartOffset(logStartOffset)
case None =>
// Before appending update the first offset metadata to include segment information
appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata =>
new LogOffsetMetadata(offsetMetadata.messageOffset, segment.baseOffset, segment.size)
})
// Append the records, and increment the local log end offset immediately after the append because a
// write to the transaction index below may fail and we want to ensure that the offsets
// of future appends still grow monotonically. The resulting transaction index inconsistency
// will be cleaned up after the log directory is recovered. Note that the end offset of the
// ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails.
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
updateHighWatermarkWithLogEndOffset()
// update the producer state
updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
// update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark.
completedTxns.foreach { completedTxn =>
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
segment.updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
}
// always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// update the first unstable offset (which is used to compute LSO)
maybeIncrementFirstUnstableOffset()
trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
s"first offset: ${appendInfo.firstOffset}, " +
s"next offset: ${localLog.logEndOffset}, " +
s"and messages: $validRecords")
if (localLog.unflushedMessages >= config.flushInterval) flush(false)
}
appendInfo
}
}
}
}
def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = {
leaderEpochCache.foreach { cache =>
cache.assign(leaderEpoch, startOffset)
}
}
def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch.asScala)
def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = {
leaderEpochCache.flatMap { cache =>
val entry = cache.endOffsetFor(leaderEpoch, logEndOffset)
val (foundEpoch, foundOffset) = (entry.getKey(), entry.getValue())
if (foundOffset == UNDEFINED_EPOCH_OFFSET)
None
else
Some(new OffsetAndEpoch(foundOffset, foundEpoch))
}
}
private def maybeIncrementFirstUnstableOffset(): Unit = lock synchronized {
localLog.checkIfMemoryMappedBufferClosed()
val updatedFirstUnstableOffset = producerStateManager.firstUnstableOffset.asScala match {
case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
Some(convertToOffsetMetadataOrThrow(offset))
case other => other
}
if (updatedFirstUnstableOffset != this.firstUnstableOffsetMetadata) {
debug(s"First unstable offset updated to $updatedFirstUnstableOffset")
this.firstUnstableOffsetMetadata = updatedFirstUnstableOffset
}
}
/**
* Increment the log start offset if the provided offset is larger.
*
* If the log start offset changed, then this method also update a few key offset such that
* `logStartOffset <= logStableOffset <= highWatermark`. The leader epoch cache is also updated
* such that all of offsets referenced in that component point to valid offset in this log.
*
* @throws OffsetOutOfRangeException if the log start offset is greater than the high watermark
* @return true if the log start offset was updated; otherwise false
*/
def maybeIncrementLogStartOffset(newLogStartOffset: Long, reason: LogStartOffsetIncrementReason): Boolean = {
// We don't have to write the log start offset to log-start-offset-checkpoint immediately.
// The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
// in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
var updatedLogStartOffset = false
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
lock synchronized {
if (newLogStartOffset > highWatermark)
throw new OffsetOutOfRangeException(s"Cannot increment the log start offset to $newLogStartOffset of partition $topicPartition " +
s"since it is larger than the high watermark $highWatermark")
localLog.checkIfMemoryMappedBufferClosed()
if (newLogStartOffset > logStartOffset) {
updatedLogStartOffset = true
updateLogStartOffset(newLogStartOffset)
_localLogStartOffset = newLogStartOffset
info(s"Incremented log start offset to $newLogStartOffset due to $reason")
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
maybeIncrementFirstUnstableOffset()
}
}
}
updatedLogStartOffset
}
private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,
origin: AppendOrigin,
requestVerificationGuard: Object):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {