Skip to content
Switch branches/tags
Go to file
Cannot retrieve contributors at this time
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at
package netlog
import (
// IntegrityErrorType is the category of possible errors in the data.
type IntegrityErrorType string
const (
errLimit = 1000
// IntegrityChecksumErr is returned when the checksum in the message
// header doesn't match the checksum recalculated from the payload.
IntegrityChecksumErr IntegrityErrorType = "checksum"
// IntegrityLengthErr is returned when the length in the message
// header doesn't match the length of the payload.
IntegrityLengthErr IntegrityErrorType = "length"
// IntegrityUnknownErr is returned when data can not be read because
// of an underlying error reading the data.
IntegrityUnknownErr IntegrityErrorType = "unknown"
// IntegrityError is the struct with metadata about an any integrity error found.
type IntegrityError struct {
Offset int64 `json:"offset"`
ODelta int `json:"odelta"`
Type IntegrityErrorType `json:"type"`
Expected string `json:"expected"`
Actual string `json:"actual"`
// IntegrityChecker is used to check the integrity of an entire topic.
type IntegrityChecker struct {
sc *biglog.Scanner
// NewIntegrityChecker creates a new integrity checker for a given topic.
func NewIntegrityChecker(t *Topic, from int64) (*IntegrityChecker, error) {
sc, err := biglog.NewScanner(, from)
if err != nil {
return nil, err
return &IntegrityChecker{sc: sc}, nil
// Check reads all data collecting errors which then returns.
// Is recommended to pass a cancellable context since this operation can be slow.
func (ic *IntegrityChecker) Check(ctx context.Context) (errors []*IntegrityError) {
for {
if len(errors) >= errLimit {
return errors
select {
case <-ctx.Done():
return errors
m, o, d, err := ic.scan()
if err == ErrEndOfTopic {
return errors
if err != nil {
errors = append(errors, &IntegrityError{
Offset: o,
ODelta: d,
Type: IntegrityUnknownErr,
Actual: err.Error(),
iErr := CheckMessageIntegrity(m, d)
if iErr != nil {
iErr.Offset = o
errors = append(errors, iErr)
// CheckMessageIntegrity checks the integrity of a single message
func CheckMessageIntegrity(m Message, delta int) *IntegrityError {
if !m.ChecksumOK() {
return &IntegrityError{
ODelta: delta,
Type: IntegrityChecksumErr,
Expected: strconv.Itoa(int(crc32.ChecksumIEEE(m.Bytes()))),
Actual: strconv.Itoa(int(m.CRC32())),
if int(m.PLength()) != len(m.Payload()) {
return &IntegrityError{
ODelta: delta,
Type: IntegrityLengthErr,
Expected: strconv.Itoa(int(m.PLength())),
Actual: strconv.Itoa(len(m.Payload())),
// TODO check compressed integrity
return nil
func (ic *IntegrityChecker) scan() (Message, int64, int, error) {
ok :=
if ok {
return Message(,,,
if != nil {
return nil, -1, -1,
return nil, -1, -1, ErrEndOfTopic
// Close releases the underlying resources.
func (ic *IntegrityChecker) Close() error {