Skip to content

Commit

Permalink
Merge branch 'yurii/5615-synchronization-engine-queue' of https://git…
Browse files Browse the repository at this point in the history
…hub.com/onflow/flow-go into yurii/5615-synchronization-engine-queue
  • Loading branch information
durkmurder committed Jul 5, 2021
2 parents e75c7fd + 879c998 commit a00f07e
Show file tree
Hide file tree
Showing 45 changed files with 611 additions and 5,080 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ lint:
# GO111MODULE=on revive -config revive.toml -exclude storage/ledger/trie ./...
golangci-lint run -v --build-tags relic ./...

.PHONY: fix-lint
fix-lint:
# GO111MODULE=on revive -config revive.toml -exclude storage/ledger/trie ./...
golangci-lint run -v --build-tags relic --fix ./...

# Runs unit tests, SKIP FOR NOW linter, coverage
.PHONY: ci
ci: install-tools tidy test # lint coverage
Expand Down
6 changes: 5 additions & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,20 @@ func main() {

resultApprovalSigVerifier := signature.NewAggregationVerifier(encoding.ResultApprovalTag)

sealValidator := validation.NewSealValidator(
sealValidator, err := validation.NewSealValidator(
node.State,
node.Storage.Headers,
node.Storage.Index,
node.Storage.Results,
node.Storage.Seals,
chunkAssigner,
resultApprovalSigVerifier,
requiredApprovalsForSealConstruction,
requiredApprovalsForSealVerification,
conMetrics)
if err != nil {
return fmt.Errorf("could not instantiate seal validator: %w", err)
}

mutableState, err = badgerState.NewFullConsensusState(
state,
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/notifications/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (t *TelemetryConsumer) OnQcConstructedFromVotes(qc *flow.QuorumCertificate)
t.pathHandler.NextStep().
Uint64("qc_block_view", qc.View).
Hex("qc_block_id", qc.BlockID[:]).
Msg("OnQcIncorporated")
Msg("OnQcConstructedFromVotes")
}

func (t *TelemetryConsumer) OnQcIncorporated(qc *flow.QuorumCertificate) {
Expand Down
60 changes: 46 additions & 14 deletions crypto/relic_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,48 @@ chmod -R +w "$(pwd)"
mkdir -p "$DIR/relic/build"
pushd "$DIR/relic/build"

# Set RELIC config for Flow

#
# make cmake print its CC interpretation
CMAKE_FILE="${DIR}/relic/CMakeLists.txt"
# parameter expansion is not suitable here
# shellcheck disable=SC2089
CMAKE_PRINT_CC="message ( STATUS \"CC=\$ENV{CC}\" )"
# Make the cmake run print its interpretation of CC
echo "$CMAKE_PRINT_CC" >> "${CMAKE_FILE}"

# Probe cmake's MakeFile generation and extract the CC version
CMAKE_TEMP=$(mktemp)
cmake .. > "$CMAKE_TEMP"
CC_VAL="$(tail -n 5 "$CMAKE_TEMP" | grep -oE -m 1 'CC=.*$')"
CC_VAL="${CC_VAL:3}"

# de-mangle the CMakeLists file, using a temporary file for BSD compatibility
sed '$d' ../CMakeLists.txt > "$CMAKE_TEMP"
mv "$CMAKE_TEMP" ../CMakeLists.txt

# default to which
CC_VAL=${CC_VAL:-"$(which cc)"}
CC_VERSION_STR="$($CC_VAL --version)"

# we use uname to record which arch we are running on
ARCH=$(uname -m 2>/dev/null ||true)

if [[ "$ARCH" =~ ^(arm64|armv7|armv7s)$ && "${CC_VERSION_STR[0]}" =~ (clang) ]]; then
# the "-march=native" option is not supported with clang on ARM
MARCH=""
else
MARCH="-march=native"
fi

# Set RELIC config for Flow
COMP=(-DCOMP="-O3 -funroll-loops -fomit-frame-pointer ${MARCH} -mtune=native")
GENERAL=(-DTIMER=CYCLE -DCHECK=OFF -DVERBS=OFF)
LIBS=(-DSHLIB=OFF -DSTLIB=ON)
COMP=(-DCOMP="-O3 -funroll-loops -fomit-frame-pointer -march=native -mtune=native")
RAND=(-DRAND=HASHD -DSEED=)

#
BN_REP=(-DALLOC=AUTO -DALIGN=1 -DWSIZE=64 -DBN_PRECI=1024 -DBN_MAGNI=DOUBLE)
ARITH=(-DARITH=EASY)
ARITH=(-DARITH=EASY)
PRIME=(-DFP_PRIME=381)

#
Expand All @@ -34,21 +65,22 @@ EP_METH=(-DEP_MIXED=ON -DEP_PLAIN=OFF -DEP_SUPER=OFF -DEP_DEPTH=4 -DEP_WIDTH=2 \
-DEP_CTMAP=ON -DEP_METHD="JACOB;LWNAF;COMBS;INTER")
PP_METH=(-DPP_METHD="LAZYR;OATEP")

# Generate make files using cmake
# run cmake
cmake "${COMP[@]}" "${GENERAL[@]}" \
"${LIBS[@]}" "${RAND[@]}" \
"${BN_REP[@]}" "${ARITH[@]}" \
"${PRIME[@]}" "${PRIMES[@]}" \
"${EP_METH[@]}" \
"${BN_METH[@]}" \
"${FP_METH[@]}" \
"${FPX_METH[@]}" \
"${PP_METH[@]}" ..
"${LIBS[@]}" "${RAND[@]}" \
"${BN_REP[@]}" "${ARITH[@]}" \
"${PRIME[@]}" "${PRIMES[@]}" \
"${EP_METH[@]}" \
"${BN_METH[@]}" \
"${FP_METH[@]}" \
"${FPX_METH[@]}" \
"${PP_METH[@]}" ..


# Compile the static library
make clean
make relic_s -j8
rm -f CMakeCache.txt

popd
popd
popd
21 changes: 16 additions & 5 deletions engine/consensus/approvals/aggregated_signatures.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package approvals

import (
"fmt"
"sync"

"github.com/onflow/flow-go/model/flow"
Expand All @@ -14,23 +15,33 @@ type AggregatedSignatures struct {
numberOfChunks uint64
}

func NewAggregatedSignatures(chunks uint64) *AggregatedSignatures {
// NewAggregatedSignatures instantiates a AggregatedSignatures. Requires that
// number of chunks is positive integer. Errors otherwise.
func NewAggregatedSignatures(chunks uint64) (*AggregatedSignatures, error) {
if chunks < 1 {
return nil, fmt.Errorf("number of chunks must be positive but got %d", chunks)
}
return &AggregatedSignatures{
signatures: make(map[uint64]flow.AggregatedSignature, chunks),
lock: sync.RWMutex{},
numberOfChunks: chunks,
}
}, nil
}

// PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`.
// The returned int is the resulting number of approved chunks.
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) uint64 {
// Errors if chunk index exceeds valid range.
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) (uint64, error) {
if chunkIndex >= as.numberOfChunks {
return uint64(len(as.signatures)), fmt.Errorf("chunk index must be in range [0, %d] but is %d", as.numberOfChunks-1, chunkIndex)
}

as.lock.Lock()
defer as.lock.Unlock()
if _, found := as.signatures[chunkIndex]; !found {
as.signatures[chunkIndex] = aggregatedSignature
}
return uint64(len(as.signatures))
return uint64(len(as.signatures)), nil
}

// HasSignature returns boolean depending if we have signature for particular chunk
Expand All @@ -43,7 +54,7 @@ func (as *AggregatedSignatures) HasSignature(chunkIndex uint64) bool {

// Collect returns array with aggregated signature for each chunk
func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature {
aggregatedSigs := make([]flow.AggregatedSignature, len(as.signatures))
aggregatedSigs := make([]flow.AggregatedSignature, as.numberOfChunks)

as.lock.RLock()
defer as.lock.RUnlock()
Expand Down
168 changes: 168 additions & 0 deletions engine/consensus/approvals/aggregated_signatures_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package approvals

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)

// TestAggregatedSignatures_NoChunks verifies that NewAggregatedSignatures
// errors when initialized with 0 chunks. This is compatible with Flow's
// architecture, because each block contains at least one chunk (the system chunk).
func TestAggregatedSignatures_NoChunks(t *testing.T) {
_, err := NewAggregatedSignatures(0)
require.Error(t, err)
}

// TestAggregatedSignatures_PutSignature tests putting signature with different indexes
func TestAggregatedSignatures_PutSignature(t *testing.T) {
// create NewAggregatedSignatures for block with chunk indices 0, 1, ... , 9
sigs, err := NewAggregatedSignatures(10)
require.NoError(t, err)
require.Empty(t, sigs.signatures)

// add signature for chunk with index 3
n, err := sigs.PutSignature(3, flow.AggregatedSignature{})
require.NoError(t, err)
require.Equal(t, uint64(1), n)
require.Len(t, sigs.signatures, 1)

// attempt to add signature for chunk with index 10 should error
n, err = sigs.PutSignature(sigs.numberOfChunks, flow.AggregatedSignature{})
require.Error(t, err)
require.Equal(t, uint64(1), n)
}

// TestAggregatedSignatures_Repeated_PutSignature tests that repeated calls to
// PutSignature for the same chunk index are no-ops except for the first one.
func TestAggregatedSignatures_Repeated_PutSignature(t *testing.T) {
// create NewAggregatedSignatures for block with chunk indices 0, 1, ... , 9
sigs, err := NewAggregatedSignatures(10)
require.NoError(t, err)
require.Empty(t, sigs.signatures)

// add signature for chunk with index 3
as1 := flow.AggregatedSignature{
VerifierSignatures: unittest.SignaturesFixture(22),
SignerIDs: unittest.IdentifierListFixture(22),
}
n, err := sigs.PutSignature(3, as1)
require.NoError(t, err)
require.Equal(t, uint64(1), n)

// add _different_ sig for chunk index 3 (should be no-op)
as2 := flow.AggregatedSignature{
VerifierSignatures: unittest.SignaturesFixture(2),
SignerIDs: unittest.IdentifierListFixture(2),
}
n, err = sigs.PutSignature(3, as2)
require.NoError(t, err)
require.Equal(t, uint64(1), n)

aggSigs := sigs.Collect()
for idx, s := range aggSigs {
if idx == 3 {
require.Equal(t, 22, s.CardinalitySignerSet())
} else {
require.Equal(t, 0, s.CardinalitySignerSet())
}
}
}

// TestAggregatedSignatures_Repeated_Signer tests that repeated calls to
// PutSignature for the same chunk index are no-ops except for the first one.
func TestAggregatedSignatures_Repeated_Signer(t *testing.T) {
// create NewAggregatedSignatures for block with chunk indices 0, 1, ... , 9
sigs, err := NewAggregatedSignatures(10)
require.NoError(t, err)
require.Empty(t, sigs.signatures)

// add signature for chunk with index 3
as1 := flow.AggregatedSignature{
VerifierSignatures: unittest.SignaturesFixture(22),
SignerIDs: unittest.IdentifierListFixture(22),
}
n, err := sigs.PutSignature(3, as1)
require.NoError(t, err)
require.Equal(t, uint64(1), n)

// add _different_ sig for chunk index 3 (should be no-op)
as2 := flow.AggregatedSignature{
VerifierSignatures: unittest.SignaturesFixture(2),
SignerIDs: unittest.IdentifierListFixture(2),
}
n, err = sigs.PutSignature(3, as2)
require.NoError(t, err)
require.Equal(t, uint64(1), n)

aggSigs := sigs.Collect()
for idx, s := range aggSigs {
if idx == 3 {
require.Equal(t, 22, s.CardinalitySignerSet())
} else {
require.Equal(t, 0, s.CardinalitySignerSet())
}
}
}

// TestAggregatedSignatures_PutSignature_Sequence tests PutSignature for a full sequence
func TestAggregatedSignatures_PutSignature_Sequence(t *testing.T) {
chunks := uint64(10)
sigs, err := NewAggregatedSignatures(chunks)
require.NoError(t, err)

for index := uint64(0); index < chunks; index++ {
n, err := sigs.PutSignature(index, flow.AggregatedSignature{})
require.NoError(t, err)
require.Equal(t, n, index+1)
}
}

// TestAggregatedSignatures_Collect tests that collecting over full signatures and partial signatures behaves as expected
func TestAggregatedSignatures_Collect(t *testing.T) {
chunks := uint64(10)
sigs, err := NewAggregatedSignatures(chunks)
require.NoError(t, err)
sig := flow.AggregatedSignature{}
_, err = sigs.PutSignature(5, sig)
require.NoError(t, err)

// collecting over signatures with missing chunks results in empty array
require.Len(t, sigs.Collect(), int(chunks))
for index := uint64(0); index < chunks; index++ {
_, err := sigs.PutSignature(index, sig)
require.NoError(t, err)
require.Len(t, sigs.Collect(), int(chunks))
}
}

// TestAggregatedSignatures_HasSignature tests that after putting a signature we can get it
func TestAggregatedSignatures_HasSignature(t *testing.T) {
sigs, err := NewAggregatedSignatures(10)
require.NoError(t, err)
index := uint64(5)
_, err = sigs.PutSignature(index, flow.AggregatedSignature{})
require.NoError(t, err)
require.True(t, sigs.HasSignature(index))
require.False(t, sigs.HasSignature(0))
}

// TestAggregatedSignatures_ChunksWithoutAggregatedSignature tests that we can retrieve all chunks with missing signatures
func TestAggregatedSignatures_ChunksWithoutAggregatedSignature(t *testing.T) {
numberOfChunks := uint64(10)
sigs, err := NewAggregatedSignatures(numberOfChunks)
require.NoError(t, err)
_, err = sigs.PutSignature(0, flow.AggregatedSignature{})
require.NoError(t, err)
chunks := sigs.ChunksWithoutAggregatedSignature()
require.Len(t, chunks, int(numberOfChunks)-1)

expectedChunks := make([]uint64, 0, numberOfChunks)
for i := uint64(1); i < numberOfChunks; i++ {
expectedChunks = append(expectedChunks, i)
}
require.ElementsMatch(t, expectedChunks, chunks)
}

0 comments on commit a00f07e

Please sign in to comment.