Permalink
Cannot retrieve contributors at this time
Join GitHub today
GitHub is home to over 40 million developers working together to host and review code, manage projects, and build software together.
Sign up
Fetching contributors…
| // Copyright 2016 The go-ethereum Authors | |
| // This file is part of the go-ethereum library. | |
| // | |
| // The go-ethereum library is free software: you can redistribute it and/or modify | |
| // it under the terms of the GNU Lesser General Public License as published by | |
| // the Free Software Foundation, either version 3 of the License, or | |
| // (at your option) any later version. | |
| // | |
| // The go-ethereum library is distributed in the hope that it will be useful, | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| // GNU Lesser General Public License for more details. | |
| // | |
| // You should have received a copy of the GNU Lesser General Public License | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | |
| package discv5 | |
| import ( | |
| "bytes" | |
| "encoding/binary" | |
| "fmt" | |
| "math" | |
| "math/rand" | |
| "time" | |
| "github.com/ethereum/go-ethereum/common" | |
| "github.com/ethereum/go-ethereum/common/mclock" | |
| "github.com/ethereum/go-ethereum/crypto" | |
| "github.com/ethereum/go-ethereum/log" | |
| ) | |
| const ( | |
| ticketTimeBucketLen = time.Minute | |
| collectFrequency = time.Second * 30 | |
| registerFrequency = time.Second * 60 | |
| maxCollectDebt = 10 | |
| maxRegisterDebt = 5 | |
| keepTicketConst = time.Minute * 10 | |
| keepTicketExp = time.Minute * 5 | |
| targetWaitTime = time.Minute * 10 | |
| topicQueryTimeout = time.Second * 5 | |
| topicQueryResend = time.Minute | |
| // topic radius detection | |
| maxRadius = 0xffffffffffffffff | |
| radiusTC = time.Minute * 20 | |
| radiusBucketsPerBit = 8 | |
| minSlope = 1 | |
| minPeakSize = 40 | |
| maxNoAdjust = 20 | |
| lookupWidth = 8 | |
| minRightSum = 20 | |
| searchForceQuery = 4 | |
| ) | |
| // timeBucket represents absolute monotonic time in minutes. | |
| // It is used as the index into the per-topic ticket buckets. | |
| type timeBucket int | |
| type ticket struct { | |
| topics []Topic | |
| regTime []mclock.AbsTime // Per-topic local absolute time when the ticket can be used. | |
| // The serial number that was issued by the server. | |
| serial uint32 | |
| // Used by registrar, tracks absolute time when the ticket was created. | |
| issueTime mclock.AbsTime | |
| // Fields used only by registrants | |
| node *Node // the registrar node that signed this ticket | |
| refCnt int // tracks number of topics that will be registered using this ticket | |
| pong []byte // encoded pong packet signed by the registrar | |
| } | |
| // ticketRef refers to a single topic in a ticket. | |
| type ticketRef struct { | |
| t *ticket | |
| idx int // index of the topic in t.topics and t.regTime | |
| } | |
| func (ref ticketRef) topic() Topic { | |
| return ref.t.topics[ref.idx] | |
| } | |
| func (ref ticketRef) topicRegTime() mclock.AbsTime { | |
| return ref.t.regTime[ref.idx] | |
| } | |
| func pongToTicket(localTime mclock.AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) { | |
| wps := p.data.(*pong).WaitPeriods | |
| if len(topics) != len(wps) { | |
| return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps)) | |
| } | |
| if rlpHash(topics) != p.data.(*pong).TopicHash { | |
| return nil, fmt.Errorf("bad topic hash") | |
| } | |
| t := &ticket{ | |
| issueTime: localTime, | |
| node: node, | |
| topics: topics, | |
| pong: p.rawData, | |
| regTime: make([]mclock.AbsTime, len(wps)), | |
| } | |
| // Convert wait periods to local absolute time. | |
| for i, wp := range wps { | |
| t.regTime[i] = localTime + mclock.AbsTime(time.Second*time.Duration(wp)) | |
| } | |
| return t, nil | |
| } | |
| func ticketToPong(t *ticket, pong *pong) { | |
| pong.Expiration = uint64(t.issueTime / mclock.AbsTime(time.Second)) | |
| pong.TopicHash = rlpHash(t.topics) | |
| pong.TicketSerial = t.serial | |
| pong.WaitPeriods = make([]uint32, len(t.regTime)) | |
| for i, regTime := range t.regTime { | |
| pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second) | |
| } | |
| } | |
| type ticketStore struct { | |
| // radius detector and target address generator | |
| // exists for both searched and registered topics | |
| radius map[Topic]*topicRadius | |
| // Contains buckets (for each absolute minute) of tickets | |
| // that can be used in that minute. | |
| // This is only set if the topic is being registered. | |
| tickets map[Topic]*topicTickets | |
| regQueue []Topic // Topic registration queue for round robin attempts | |
| regSet map[Topic]struct{} // Topic registration queue contents for fast filling | |
| nodes map[*Node]*ticket | |
| nodeLastReq map[*Node]reqInfo | |
| lastBucketFetched timeBucket | |
| nextTicketCached *ticketRef | |
| searchTopicMap map[Topic]searchTopic | |
| nextTopicQueryCleanup mclock.AbsTime | |
| queriesSent map[*Node]map[common.Hash]sentQuery | |
| } | |
| type searchTopic struct { | |
| foundChn chan<- *Node | |
| } | |
| type sentQuery struct { | |
| sent mclock.AbsTime | |
| lookup lookupInfo | |
| } | |
| type topicTickets struct { | |
| buckets map[timeBucket][]ticketRef | |
| nextLookup mclock.AbsTime | |
| nextReg mclock.AbsTime | |
| } | |
| func newTicketStore() *ticketStore { | |
| return &ticketStore{ | |
| radius: make(map[Topic]*topicRadius), | |
| tickets: make(map[Topic]*topicTickets), | |
| regSet: make(map[Topic]struct{}), | |
| nodes: make(map[*Node]*ticket), | |
| nodeLastReq: make(map[*Node]reqInfo), | |
| searchTopicMap: make(map[Topic]searchTopic), | |
| queriesSent: make(map[*Node]map[common.Hash]sentQuery), | |
| } | |
| } | |
| // addTopic starts tracking a topic. If register is true, | |
| // the local node will register the topic and tickets will be collected. | |
| func (s *ticketStore) addTopic(topic Topic, register bool) { | |
| log.Trace("Adding discovery topic", "topic", topic, "register", register) | |
| if s.radius[topic] == nil { | |
| s.radius[topic] = newTopicRadius(topic) | |
| } | |
| if register && s.tickets[topic] == nil { | |
| s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)} | |
| } | |
| } | |
| func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) { | |
| s.addTopic(t, false) | |
| if s.searchTopicMap[t].foundChn == nil { | |
| s.searchTopicMap[t] = searchTopic{foundChn: foundChn} | |
| } | |
| } | |
| func (s *ticketStore) removeSearchTopic(t Topic) { | |
| if st := s.searchTopicMap[t]; st.foundChn != nil { | |
| delete(s.searchTopicMap, t) | |
| } | |
| } | |
| // removeRegisterTopic deletes all tickets for the given topic. | |
| func (s *ticketStore) removeRegisterTopic(topic Topic) { | |
| log.Trace("Removing discovery topic", "topic", topic) | |
| if s.tickets[topic] == nil { | |
| log.Warn("Removing non-existent discovery topic", "topic", topic) | |
| return | |
| } | |
| for _, list := range s.tickets[topic].buckets { | |
| for _, ref := range list { | |
| ref.t.refCnt-- | |
| if ref.t.refCnt == 0 { | |
| delete(s.nodes, ref.t.node) | |
| delete(s.nodeLastReq, ref.t.node) | |
| } | |
| } | |
| } | |
| delete(s.tickets, topic) | |
| } | |
| func (s *ticketStore) regTopicSet() []Topic { | |
| topics := make([]Topic, 0, len(s.tickets)) | |
| for topic := range s.tickets { | |
| topics = append(topics, topic) | |
| } | |
| return topics | |
| } | |
| // nextRegisterLookup returns the target of the next lookup for ticket collection. | |
| func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) { | |
| // Queue up any new topics (or discarded ones), preserving iteration order | |
| for topic := range s.tickets { | |
| if _, ok := s.regSet[topic]; !ok { | |
| s.regQueue = append(s.regQueue, topic) | |
| s.regSet[topic] = struct{}{} | |
| } | |
| } | |
| // Iterate over the set of all topics and look up the next suitable one | |
| for len(s.regQueue) > 0 { | |
| // Fetch the next topic from the queue, and ensure it still exists | |
| topic := s.regQueue[0] | |
| s.regQueue = s.regQueue[1:] | |
| delete(s.regSet, topic) | |
| if s.tickets[topic] == nil { | |
| continue | |
| } | |
| // If the topic needs more tickets, return it | |
| if s.tickets[topic].nextLookup < mclock.Now() { | |
| next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond | |
| log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay) | |
| return next, delay | |
| } | |
| } | |
| // No registration topics found or all exhausted, sleep | |
| delay := 40 * time.Second | |
| log.Trace("No topic found to register", "delay", delay) | |
| return lookupInfo{}, delay | |
| } | |
| func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo { | |
| tr := s.radius[topic] | |
| target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery) | |
| if target.radiusLookup { | |
| tr.radiusLookupCnt++ | |
| } else { | |
| tr.radiusLookupCnt = 0 | |
| } | |
| return target | |
| } | |
| func (s *ticketStore) addTicketRef(r ticketRef) { | |
| topic := r.t.topics[r.idx] | |
| tickets := s.tickets[topic] | |
| if tickets == nil { | |
| log.Warn("Adding ticket to non-existent topic", "topic", topic) | |
| return | |
| } | |
| bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen)) | |
| tickets.buckets[bucket] = append(tickets.buckets[bucket], r) | |
| r.t.refCnt++ | |
| min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt | |
| if tickets.nextLookup < min { | |
| tickets.nextLookup = min | |
| } | |
| tickets.nextLookup += mclock.AbsTime(collectFrequency) | |
| //s.removeExcessTickets(topic) | |
| } | |
| func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) { | |
| now := mclock.Now() | |
| for { | |
| ticket, wait := s.nextRegisterableTicket() | |
| if ticket == nil { | |
| return ticket, wait | |
| } | |
| log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait) | |
| regTime := now + mclock.AbsTime(wait) | |
| topic := ticket.t.topics[ticket.idx] | |
| if s.tickets[topic] != nil && regTime >= s.tickets[topic].nextReg { | |
| return ticket, wait | |
| } | |
| s.removeTicketRef(*ticket) | |
| } | |
| } | |
| func (s *ticketStore) ticketRegistered(ref ticketRef) { | |
| now := mclock.Now() | |
| topic := ref.t.topics[ref.idx] | |
| tickets := s.tickets[topic] | |
| min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt | |
| if min > tickets.nextReg { | |
| tickets.nextReg = min | |
| } | |
| tickets.nextReg += mclock.AbsTime(registerFrequency) | |
| s.tickets[topic] = tickets | |
| s.removeTicketRef(ref) | |
| } | |
| // nextRegisterableTicket returns the next ticket that can be used | |
| // to register. | |
| // | |
| // If the returned wait time <= zero the ticket can be used. For a positive | |
| // wait time, the caller should requery the next ticket later. | |
| // | |
| // A ticket can be returned more than once with <= zero wait time in case | |
| // the ticket contains multiple topics. | |
| func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) { | |
| now := mclock.Now() | |
| if s.nextTicketCached != nil { | |
| return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now) | |
| } | |
| for bucket := s.lastBucketFetched; ; bucket++ { | |
| var ( | |
| empty = true // true if there are no tickets | |
| nextTicket ticketRef // uninitialized if this bucket is empty | |
| ) | |
| for _, tickets := range s.tickets { | |
| //s.removeExcessTickets(topic) | |
| if len(tickets.buckets) != 0 { | |
| empty = false | |
| list := tickets.buckets[bucket] | |
| for _, ref := range list { | |
| //debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now))) | |
| if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() { | |
| nextTicket = ref | |
| } | |
| } | |
| } | |
| } | |
| if empty { | |
| return nil, 0 | |
| } | |
| if nextTicket.t != nil { | |
| s.nextTicketCached = &nextTicket | |
| return &nextTicket, time.Duration(nextTicket.topicRegTime() - now) | |
| } | |
| s.lastBucketFetched = bucket | |
| } | |
| } | |
| // removeTicket removes a ticket from the ticket store | |
| func (s *ticketStore) removeTicketRef(ref ticketRef) { | |
| log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial) | |
| // Make nextRegisterableTicket return the next available ticket. | |
| s.nextTicketCached = nil | |
| topic := ref.topic() | |
| tickets := s.tickets[topic] | |
| if tickets == nil { | |
| log.Trace("Removing tickets from unknown topic", "topic", topic) | |
| return | |
| } | |
| bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen)) | |
| list := tickets.buckets[bucket] | |
| idx := -1 | |
| for i, bt := range list { | |
| if bt.t == ref.t { | |
| idx = i | |
| break | |
| } | |
| } | |
| if idx == -1 { | |
| panic(nil) | |
| } | |
| list = append(list[:idx], list[idx+1:]...) | |
| if len(list) != 0 { | |
| tickets.buckets[bucket] = list | |
| } else { | |
| delete(tickets.buckets, bucket) | |
| } | |
| ref.t.refCnt-- | |
| if ref.t.refCnt == 0 { | |
| delete(s.nodes, ref.t.node) | |
| delete(s.nodeLastReq, ref.t.node) | |
| } | |
| } | |
| type lookupInfo struct { | |
| target common.Hash | |
| topic Topic | |
| radiusLookup bool | |
| } | |
| type reqInfo struct { | |
| pingHash []byte | |
| lookup lookupInfo | |
| time mclock.AbsTime | |
| } | |
| // returns -1 if not found | |
| func (t *ticket) findIdx(topic Topic) int { | |
| for i, tt := range t.topics { | |
| if tt == topic { | |
| return i | |
| } | |
| } | |
| return -1 | |
| } | |
| func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) { | |
| now := mclock.Now() | |
| for i, n := range nodes { | |
| if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius { | |
| if lookup.radiusLookup { | |
| if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC { | |
| s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now} | |
| } | |
| } else { | |
| if s.nodes[n] == nil { | |
| s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now} | |
| } | |
| } | |
| } | |
| } | |
| } | |
| func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) { | |
| now := mclock.Now() | |
| for i, n := range nodes { | |
| if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius { | |
| if lookup.radiusLookup { | |
| if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC { | |
| s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now} | |
| } | |
| } // else { | |
| if s.canQueryTopic(n, lookup.topic) { | |
| hash := query(n, lookup.topic) | |
| if hash != nil { | |
| s.addTopicQuery(common.BytesToHash(hash), n, lookup) | |
| } | |
| } | |
| //} | |
| } | |
| } | |
| } | |
| func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t *ticket) { | |
| for i, topic := range t.topics { | |
| if tt, ok := s.radius[topic]; ok { | |
| tt.adjustWithTicket(now, targetHash, ticketRef{t, i}) | |
| } | |
| } | |
| } | |
| func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) { | |
| log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial) | |
| lastReq, ok := s.nodeLastReq[ticket.node] | |
| if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) { | |
| return | |
| } | |
| s.adjustWithTicket(localTime, lastReq.lookup.target, ticket) | |
| if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil { | |
| return | |
| } | |
| topic := lastReq.lookup.topic | |
| topicIdx := ticket.findIdx(topic) | |
| if topicIdx == -1 { | |
| return | |
| } | |
| bucket := timeBucket(localTime / mclock.AbsTime(ticketTimeBucketLen)) | |
| if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched { | |
| s.lastBucketFetched = bucket | |
| } | |
| if _, ok := s.tickets[topic]; ok { | |
| wait := ticket.regTime[topicIdx] - localTime | |
| rnd := rand.ExpFloat64() | |
| if rnd > 10 { | |
| rnd = 10 | |
| } | |
| if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd { | |
| // use the ticket to register this topic | |
| //fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong) | |
| s.addTicketRef(ticketRef{ticket, topicIdx}) | |
| } | |
| } | |
| if ticket.refCnt > 0 { | |
| s.nextTicketCached = nil | |
| s.nodes[ticket.node] = ticket | |
| } | |
| } | |
| func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool { | |
| qq := s.queriesSent[node] | |
| if qq != nil { | |
| now := mclock.Now() | |
| for _, sq := range qq { | |
| if sq.lookup.topic == topic && sq.sent > now-mclock.AbsTime(topicQueryResend) { | |
| return false | |
| } | |
| } | |
| } | |
| return true | |
| } | |
| func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) { | |
| now := mclock.Now() | |
| qq := s.queriesSent[node] | |
| if qq == nil { | |
| qq = make(map[common.Hash]sentQuery) | |
| s.queriesSent[node] = qq | |
| } | |
| qq[hash] = sentQuery{sent: now, lookup: lookup} | |
| s.cleanupTopicQueries(now) | |
| } | |
| func (s *ticketStore) cleanupTopicQueries(now mclock.AbsTime) { | |
| if s.nextTopicQueryCleanup > now { | |
| return | |
| } | |
| exp := now - mclock.AbsTime(topicQueryResend) | |
| for n, qq := range s.queriesSent { | |
| for h, q := range qq { | |
| if q.sent < exp { | |
| delete(qq, h) | |
| } | |
| } | |
| if len(qq) == 0 { | |
| delete(s.queriesSent, n) | |
| } | |
| } | |
| s.nextTopicQueryCleanup = now + mclock.AbsTime(topicQueryTimeout) | |
| } | |
| func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) { | |
| now := mclock.Now() | |
| //fmt.Println("got", from.addr().String(), hash, len(nodes)) | |
| qq := s.queriesSent[from] | |
| if qq == nil { | |
| return true | |
| } | |
| q, ok := qq[hash] | |
| if !ok || now > q.sent+mclock.AbsTime(topicQueryTimeout) { | |
| return true | |
| } | |
| inside := float64(0) | |
| if len(nodes) > 0 { | |
| inside = 1 | |
| } | |
| s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside) | |
| chn := s.searchTopicMap[q.lookup.topic].foundChn | |
| if chn == nil { | |
| //fmt.Println("no channel") | |
| return false | |
| } | |
| for _, node := range nodes { | |
| ip := node.IP | |
| if ip.IsUnspecified() || ip.IsLoopback() { | |
| ip = from.IP | |
| } | |
| n := NewNode(node.ID, ip, node.UDP, node.TCP) | |
| select { | |
| case chn <- n: | |
| default: | |
| return false | |
| } | |
| } | |
| return false | |
| } | |
| type topicRadius struct { | |
| topic Topic | |
| topicHashPrefix uint64 | |
| radius, minRadius uint64 | |
| buckets []topicRadiusBucket | |
| converged bool | |
| radiusLookupCnt int | |
| } | |
| type topicRadiusEvent int | |
| const ( | |
| trOutside topicRadiusEvent = iota | |
| trInside | |
| trNoAdjust | |
| trCount | |
| ) | |
| type topicRadiusBucket struct { | |
| weights [trCount]float64 | |
| lastTime mclock.AbsTime | |
| value float64 | |
| lookupSent map[common.Hash]mclock.AbsTime | |
| } | |
| func (b *topicRadiusBucket) update(now mclock.AbsTime) { | |
| if now == b.lastTime { | |
| return | |
| } | |
| exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC)) | |
| for i, w := range b.weights { | |
| b.weights[i] = w * exp | |
| } | |
| b.lastTime = now | |
| for target, tm := range b.lookupSent { | |
| if now-tm > mclock.AbsTime(respTimeout) { | |
| b.weights[trNoAdjust] += 1 | |
| delete(b.lookupSent, target) | |
| } | |
| } | |
| } | |
| func (b *topicRadiusBucket) adjust(now mclock.AbsTime, inside float64) { | |
| b.update(now) | |
| if inside <= 0 { | |
| b.weights[trOutside] += 1 | |
| } else { | |
| if inside >= 1 { | |
| b.weights[trInside] += 1 | |
| } else { | |
| b.weights[trInside] += inside | |
| b.weights[trOutside] += 1 - inside | |
| } | |
| } | |
| } | |
| func newTopicRadius(t Topic) *topicRadius { | |
| topicHash := crypto.Keccak256Hash([]byte(t)) | |
| topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8]) | |
| return &topicRadius{ | |
| topic: t, | |
| topicHashPrefix: topicHashPrefix, | |
| radius: maxRadius, | |
| minRadius: maxRadius, | |
| } | |
| } | |
| func (r *topicRadius) getBucketIdx(addrHash common.Hash) int { | |
| prefix := binary.BigEndian.Uint64(addrHash[0:8]) | |
| var log2 float64 | |
| if prefix != r.topicHashPrefix { | |
| log2 = math.Log2(float64(prefix ^ r.topicHashPrefix)) | |
| } | |
| bucket := int((64 - log2) * radiusBucketsPerBit) | |
| max := 64*radiusBucketsPerBit - 1 | |
| if bucket > max { | |
| return max | |
| } | |
| if bucket < 0 { | |
| return 0 | |
| } | |
| return bucket | |
| } | |
| func (r *topicRadius) targetForBucket(bucket int) common.Hash { | |
| min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit) | |
| max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit) | |
| a := uint64(min) | |
| b := randUint64n(uint64(max - min)) | |
| xor := a + b | |
| if xor < a { | |
| xor = ^uint64(0) | |
| } | |
| prefix := r.topicHashPrefix ^ xor | |
| var target common.Hash | |
| binary.BigEndian.PutUint64(target[0:8], prefix) | |
| globalRandRead(target[8:]) | |
| return target | |
| } | |
| // package rand provides a Read function in Go 1.6 and later, but | |
| // we can't use it yet because we still support Go 1.5. | |
| func globalRandRead(b []byte) { | |
| pos := 0 | |
| val := 0 | |
| for n := 0; n < len(b); n++ { | |
| if pos == 0 { | |
| val = rand.Int() | |
| pos = 7 | |
| } | |
| b[n] = byte(val) | |
| val >>= 8 | |
| pos-- | |
| } | |
| } | |
| func (r *topicRadius) chooseLookupBucket(a, b int) int { | |
| if a < 0 { | |
| a = 0 | |
| } | |
| if a > b { | |
| return -1 | |
| } | |
| c := 0 | |
| for i := a; i <= b; i++ { | |
| if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust { | |
| c++ | |
| } | |
| } | |
| if c == 0 { | |
| return -1 | |
| } | |
| rnd := randUint(uint32(c)) | |
| for i := a; i <= b; i++ { | |
| if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust { | |
| if rnd == 0 { | |
| return i | |
| } | |
| rnd-- | |
| } | |
| } | |
| panic(nil) // should never happen | |
| } | |
| func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool { | |
| var max float64 | |
| if a < 0 { | |
| a = 0 | |
| } | |
| if b >= len(r.buckets) { | |
| b = len(r.buckets) - 1 | |
| if r.buckets[b].value > max { | |
| max = r.buckets[b].value | |
| } | |
| } | |
| if b >= a { | |
| for i := a; i <= b; i++ { | |
| if r.buckets[i].value > max { | |
| max = r.buckets[i].value | |
| } | |
| } | |
| } | |
| return maxValue-max < minPeakSize | |
| } | |
| func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) { | |
| maxBucket := 0 | |
| maxValue := float64(0) | |
| now := mclock.Now() | |
| v := float64(0) | |
| for i := range r.buckets { | |
| r.buckets[i].update(now) | |
| v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside] | |
| r.buckets[i].value = v | |
| //fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust]) | |
| } | |
| //fmt.Println() | |
| slopeCross := -1 | |
| for i, b := range r.buckets { | |
| v := b.value | |
| if v < float64(i)*minSlope { | |
| slopeCross = i | |
| break | |
| } | |
| if v > maxValue { | |
| maxValue = v | |
| maxBucket = i + 1 | |
| } | |
| } | |
| minRadBucket := len(r.buckets) | |
| sum := float64(0) | |
| for minRadBucket > 0 && sum < minRightSum { | |
| minRadBucket-- | |
| b := r.buckets[minRadBucket] | |
| sum += b.weights[trInside] + b.weights[trOutside] | |
| } | |
| r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit)) | |
| lookupLeft := -1 | |
| if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) { | |
| lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1) | |
| } | |
| lookupRight := -1 | |
| if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) { | |
| for len(r.buckets) <= maxBucket+lookupWidth { | |
| r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]mclock.AbsTime)}) | |
| } | |
| lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1) | |
| } | |
| if lookupLeft == -1 { | |
| radiusLookup = lookupRight | |
| } else { | |
| if lookupRight == -1 { | |
| radiusLookup = lookupLeft | |
| } else { | |
| if randUint(2) == 0 { | |
| radiusLookup = lookupLeft | |
| } else { | |
| radiusLookup = lookupRight | |
| } | |
| } | |
| } | |
| //fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue) | |
| if radiusLookup == -1 { | |
| // no more radius lookups needed at the moment, return a radius | |
| r.converged = true | |
| rad := maxBucket | |
| if minRadBucket < rad { | |
| rad = minRadBucket | |
| } | |
| radius = ^uint64(0) | |
| if rad > 0 { | |
| radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit)) | |
| } | |
| r.radius = radius | |
| } | |
| return | |
| } | |
| func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo { | |
| if !forceRegular { | |
| _, radiusLookup := r.recalcRadius() | |
| if radiusLookup != -1 { | |
| target := r.targetForBucket(radiusLookup) | |
| r.buckets[radiusLookup].lookupSent[target] = mclock.Now() | |
| return lookupInfo{target: target, topic: r.topic, radiusLookup: true} | |
| } | |
| } | |
| radExt := r.radius / 2 | |
| if radExt > maxRadius-r.radius { | |
| radExt = maxRadius - r.radius | |
| } | |
| rnd := randUint64n(r.radius) + randUint64n(2*radExt) | |
| if rnd > radExt { | |
| rnd -= radExt | |
| } else { | |
| rnd = radExt - rnd | |
| } | |
| prefix := r.topicHashPrefix ^ rnd | |
| var target common.Hash | |
| binary.BigEndian.PutUint64(target[0:8], prefix) | |
| globalRandRead(target[8:]) | |
| return lookupInfo{target: target, topic: r.topic, radiusLookup: false} | |
| } | |
| func (r *topicRadius) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t ticketRef) { | |
| wait := t.t.regTime[t.idx] - t.t.issueTime | |
| inside := float64(wait)/float64(targetWaitTime) - 0.5 | |
| if inside > 1 { | |
| inside = 1 | |
| } | |
| if inside < 0 { | |
| inside = 0 | |
| } | |
| r.adjust(now, targetHash, t.t.node.sha, inside) | |
| } | |
| func (r *topicRadius) adjust(now mclock.AbsTime, targetHash, addrHash common.Hash, inside float64) { | |
| bucket := r.getBucketIdx(addrHash) | |
| //fmt.Println("adjust", bucket, len(r.buckets), inside) | |
| if bucket >= len(r.buckets) { | |
| return | |
| } | |
| r.buckets[bucket].adjust(now, inside) | |
| delete(r.buckets[bucket].lookupSent, targetHash) | |
| } |