Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync automation sanity #867

Merged
merged 35 commits into from May 8, 2019

Conversation

Projects
None yet
2 participants
@almogdepaz
Copy link
Member

commented Apr 28, 2019

No description provided.

@almogdepaz almogdepaz requested a review from y0sher Apr 29, 2019

almogdepaz added some commits May 2, 2019

fix sync concurrent bug
add new automation sync sanity

almogdepaz added some commits May 7, 2019

fix handlers bug
add debug logs
Merge branch 'develop' into sync_automation_sanity
# Conflicts:
#	tests/config.yaml
func (app *SyncApp) Start(cmd *cobra.Command, args []string) {
// start p2p services
lg := log.New("sync_test", "", "")
lg.Info("Initializing P2P services")
lg.Info("Start!!!!!!!!!!!!!!!!!!!!!!!!!!!")

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

maybe a better log message :)

if sendErr != nil {
p.Error("Error sending response message, err:", sendErr)
p.Debug("handleRequestMessage start")
if foo, okFoo := p.msgRequestHandlers[MessageType(data.MsgType)]; okFoo {

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

in my opinion -

foo, okfoo := ...
if !okFoo {
return
}

is cleaner, though you can't use the if := thing :)

This comment has been minimized.

Copy link
@almogdepaz

almogdepaz May 8, 2019

Author Member

true

if payload := foo(msg); payload != nil {
p.Debug("handle request type %v", data.MsgType)
rmsg := &service.DataMsgWrapper{MsgType: data.MsgType, ReqID: data.ReqID, Payload: payload}
sendErr := p.network.SendWrappedMessage(msg.Sender(), p.name, rmsg)

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

here the if := notion will be useful since sendErr won't be used out of that if

foo, okFoo := p.resHandlers[headers.ReqID]
p.pendMutex.Unlock()
p.pendMutex.RUnlock()

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

just a note for the future, maybe we can make response handlers static (like requests) and then we won't have to append them, protect the map and run a different callback every time.

This comment has been minimized.

Copy link
@almogdepaz

almogdepaz May 8, 2019

Author Member

good idea!


ch := make(chan types.LayerID, 1)
app.sync = sync.NewSync(swarm, msh, sync.BlockValidatorMock{}, conf, ch, lg.WithName("syncer"))
ch <- 101

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

this method has a lot of hard-coded values. it will be easier to understand if they'll be assigned to variables with meaningful names before using them, it will also be easier to change them in the future like that. (or just add some comments about them and what happens here)

This comment has been minimized.

Copy link
@almogdepaz

almogdepaz May 8, 2019

Author Member

the sync conf will be moved to configuration, ill open a task for the other ones

@@ -93,7 +93,7 @@ func (s *Syncer) run() {
s.currentLayerMutex.Lock()
s.currentLayer = layer
s.currentLayerMutex.Unlock()
s.Debug("sync got tick for layer %v", layer)
s.Info("sync got tick for layer %v", layer)

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

debug ?

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

since this will stay info in go-spacemesh.

@@ -131,24 +131,19 @@ func (s *Syncer) maxSyncLayer() types.LayerID {
func (s *Syncer) Synchronise() {
for currenSyncLayer := s.VerifiedLayer() + 1; currenSyncLayer < s.maxSyncLayer(); currenSyncLayer++ {

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

just noting the typo, here current

close(ch)
}()

for _, bid := range <-ch {

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

you can use for { select {} } and join the above goroutine here

This comment has been minimized.

Copy link
@almogdepaz

almogdepaz May 8, 2019

Author Member

i know the select has its own disadvantages here
im keeping it this way in the meantime until i find a better pattern

<-time.After(s.RequestTimeout)
s.Info("timeout, not all peers responded to ids request")
close(kill)
close(ch)

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

are you sure this won't cause send on ch when its closed ?

This comment has been minimized.

Copy link
@almogdepaz

almogdepaz May 8, 2019

Author Member

yes, we close it so the range will exit when finished

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

it still seems to me like this can be closed while value is sent on ch..

ch <- v
}
case <-kill:
s.Info("ids request to %v timed out", p)

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

maybe cloes ch here (not a must)

This comment has been minimized.

Copy link
@almogdepaz

almogdepaz May 8, 2019

Author Member

its closed by the routine that closed the kill chan

resCounter := len(peers)
for _, p := range peers {
c, err := s.sendLayerHashRequest(p, index)
for _, peer := range peers {

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

this looks quite similar to the getIdsForHashes func maybe they can share some code.

return m, nil
}

func (s *Syncer) sendLayerHashRequest(peer p2p.Peer, layer types.LayerID) (chan *peerHashPair, error) {
s.Debug("send layer hash request Peer: %v layer: %v", peer, layer)
ch := make(chan *peerHashPair)
s.Info("send layer hash request Peer: %v layer: %v", peer, layer)

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

debug?


ch := make(chan []types.BlockID)
s.Info("send blockIds request Peer: ", peer, " layer: ", idx)
ch := make(chan []types.BlockID, 1)
foo := func(msg []byte) {
defer close(ch)
ids, err := types.BytesToBlockIds(msg)

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

isn't ids from type []BlockId ? why do you reassign it to lyrIds ?

assert res3
assert res4

delete_deployment(inf.deployment_name, testconfig['namespace'])

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

you don't delete all the infs ?

time.sleep(3*60)

fields = {"M": "sync done"}
res1 = query_message(current_index, testconfig['namespace'], inf1.pods[0]['name'], fields, False)

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

maybe you could use query_messagr once and count the hits

@@ -142,7 +142,7 @@ def query_message(indx, namespace, client_po_name, fields, findFails=False):
print("====================================================================")
if findFails:
print("Looking for pods that didn't hit:")
podnames = [hit.kubernetes.pod_name for hit in hits]
podnames = set([hit.kubernetes.pod_name for hit in hits])

This comment has been minimized.

Copy link
@y0sher

y0sher May 8, 2019

Collaborator

not sure that this is what we want every time.. maybe it should be done per test.

@y0sher

y0sher approved these changes May 8, 2019

almogdepaz added some commits May 8, 2019

@almogdepaz almogdepaz merged commit 592c688 into develop May 8, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.