Skip to content

Commit

Permalink
refactored p2p blockchain syncronization
Browse files Browse the repository at this point in the history
  • Loading branch information
xiphon committed Aug 16, 2018
1 parent bb15ecf commit 9c39fa8
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 66 deletions.
23 changes: 12 additions & 11 deletions network/pasl/connection.go
Expand Up @@ -95,14 +95,18 @@ func (this *PascalConnection) GetState() (uint32, []byte) {
return this.state.height, this.state.prevSafeboxHash
}

func (this *PascalConnection) StartBlocksDownloading(from, to uint32, downloadingDone chan<- interface{}) error {
func (this *PascalConnection) BlocksGet(from, to uint32) []safebox.SerializedBlock {
packet := utils.Serialize(packetGetBlocksRequest{
FromIndex: from,
ToIndex: to,
})

onSuccess := func(response *requestResponse, payload []byte) error {
defer func() { downloadingDone <- nil }()
blocks := make([]safebox.SerializedBlock, 0)

finished := sync.WaitGroup{}
finished.Add(1)
err := this.underlying.sendRequest(getBlocks, packet, func(response *requestResponse, payload []byte) error {
defer finished.Done()

if response == nil {
return errors.New("GetBlocks request failed")
Expand All @@ -112,18 +116,15 @@ func (this *PascalConnection) StartBlocksDownloading(from, to uint32, downloadin
if err := utils.Deserialize(&packet, bytes.NewBuffer(payload)); err != nil {
return err
}
for _, it := range packet.Blocks {
this.onNewBlock <- &eventNewBlock{
event: event{this},
SerializedBlock: it,
shouldBroadcast: false,
}
}

blocks = append(blocks, packet.Blocks...)
return nil
})
if err == nil {
finished.Wait()
}

return this.underlying.sendRequest(getBlocks, packet, onSuccess)
return blocks
}

func (this *PascalConnection) BroadcastTx(operation *tx.Tx) {
Expand Down
137 changes: 82 additions & 55 deletions network/pasl/manager.go
Expand Up @@ -20,6 +20,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
package pasl

import (
"context"
"io"
"math/rand"
"sync"
Expand Down Expand Up @@ -53,45 +54,53 @@ type manager struct {

waitGroup sync.WaitGroup

blockchain *blockchain.Blockchain
nonce []byte
blockchain *blockchain.Blockchain
nonce []byte

timeoutRequest time.Duration
peerUpdates chan<- PeerInfo
onStateUpdate chan *PascalConnection
onNewBlock chan *eventNewBlock
onNewOperation chan *eventNewOperation
closed chan *PascalConnection
initializedConnections map[*PascalConnection]uint32
downloading bool
downloadingDone chan interface{}
initializedConnections sync.Map
doSyncValue bool
doSync *sync.Cond
}

func WithManager(nonce []byte, blockchain *blockchain.Blockchain, peerUpdates chan<- PeerInfo, timeoutRequest time.Duration, callback func(network.Manager) error) error {
manager := &manager{
timeoutRequest: timeoutRequest,
blockchain: blockchain,
nonce: nonce,
peerUpdates: peerUpdates,
onStateUpdate: make(chan *PascalConnection),
onNewOperation: make(chan *eventNewOperation),
closed: make(chan *PascalConnection),
onNewBlock: make(chan *eventNewBlock),
initializedConnections: make(map[*PascalConnection]uint32),
downloading: false,
downloadingDone: make(chan interface{}),
timeoutRequest: timeoutRequest,
blockchain: blockchain,
nonce: nonce,
peerUpdates: peerUpdates,
onStateUpdate: make(chan *PascalConnection),
onNewOperation: make(chan *eventNewOperation),
closed: make(chan *PascalConnection),
onNewBlock: make(chan *eventNewBlock),
doSync: sync.NewCond(&sync.Mutex{}),
}
defer manager.waitGroup.Wait()

stop := make(chan bool)
signalSync := func() {
manager.doSync.L.Lock()
manager.doSyncValue = true
manager.doSync.Broadcast()
manager.doSync.L.Unlock()
}
defer signalSync()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

manager.waitGroup.Add(1)
go func() {
defer manager.waitGroup.Done()

for {
select {
case event := <-manager.onNewBlock:
if err := manager.blockchain.AddBlockSerialized(&event.SerializedBlock); err != nil {
utils.Tracef("[P2P] AddBlockSerialized %d failed %v", event.SerializedBlock.Header.Index, err)
utils.Tracef("[P2P %p] AddBlockSerialized %d failed %v", event.source, event.SerializedBlock.Header.Index, err)
if event.shouldBroadcast {
manager.forEachConnection(func(conn *PascalConnection) {
conn.BroadcastBlock(&event.SerializedBlock)
Expand All @@ -107,65 +116,83 @@ func WithManager(nonce []byte, blockchain *blockchain.Blockchain, peerUpdates ch
conn.BroadcastTx(&event.Tx)
}, event.source)
}
case <-manager.downloadingDone:
manager.downloading = false
manager.startDownloading()
case conn := <-manager.closed:
delete(manager.initializedConnections, conn)
manager.initializedConnections.Delete(conn)
case conn := <-manager.onStateUpdate:
connHeight, _ := conn.GetState()
manager.initializedConnections[conn] = connHeight
manager.startDownloading()
case <-stop:
manager.initializedConnections.Store(conn, connHeight)
signalSync()
case <-ctx.Done():
return
}
}
utils.Tracef("Exit1")
}()

err := callback(manager)
stop <- true
manager.waitGroup.Add(1)
go func() {
defer manager.waitGroup.Done()

return err
}
for {
select {
case <-ctx.Done():
return
default:
if !manager.sync() {
manager.doSync.L.Lock()
if !manager.doSyncValue {
manager.doSync.Wait()
manager.doSyncValue = false
}
manager.doSync.L.Unlock()
}
}
}
utils.Tracef("Exit2")
}()

func (this *manager) startDownloading() {
if this.downloading {
return
}
return callback(manager)
}

func (this *manager) sync() bool {
nodeHeight, _ := this.blockchain.GetState()

candidates := make(map[uint32]*PascalConnection)
for conn, height := range this.initializedConnections {
if height > nodeHeight {
candidates[rand.Uint32()] = conn
candidates := make([]*PascalConnection, 0)
this.initializedConnections.Range(func(conn, height interface{}) bool {
if height.(uint32) > nodeHeight {
candidates = append(candidates, conn.(*PascalConnection))
}
return true
})

candidatesTotal := len(candidates)
if candidatesTotal == 0 {
return false
}

for _, conn := range candidates {
height, _ := conn.GetState()
utils.Tracef("[P2P %p] Remote node height %d (%d blocks ahead)", conn, height, height-nodeHeight)

this.downloading = true
to := utils.MinUint32(nodeHeight+defaults.NetworkBlocksPerRequest-1, height-1)
if err := conn.StartBlocksDownloading(nodeHeight, to, this.downloadingDone); err == nil {
utils.Tracef("[P2P %p] Downloading blocks #%d .. #%d", conn, nodeHeight, to)
break
} else {
this.downloading = false
selected := rand.Int() % candidatesTotal
conn := candidates[selected]
height, _ := conn.GetState()
utils.Tracef("[P2P %p] Fetching blocks %d -> %d (%d blocks ahead)", conn, nodeHeight, height, height-nodeHeight)

to := utils.MinUint32(nodeHeight+defaults.NetworkBlocksPerRequest-1, height-1)
blocks := conn.BlocksGet(nodeHeight, to)
for _, block := range blocks {
if err := this.blockchain.AddBlockSerialized(&block); err != nil {
utils.Tracef("[P2P %p] Block #%d verification failed %v", block.Header.Index, err)
}
}

if !this.downloading && len(candidates) == 0 {
utils.Tracef("On main chain, height %d", nodeHeight)
}
return true
}

func (this *manager) forEachConnection(fn func(*PascalConnection), except *PascalConnection) {
for conn := range this.initializedConnections {
this.initializedConnections.Range(func(conn, height interface{}) bool {
if conn != except {
fn(conn)
fn(conn.(*PascalConnection))
}
}
return true
})
}

func (this *manager) OnOpen(address string, transport io.WriteCloser, isOutgoing bool) (interface{}, error) {
Expand Down

0 comments on commit 9c39fa8

Please sign in to comment.