Skip to content

Commit

Permalink
rip out replication lister from ebt state matrix store
Browse files Browse the repository at this point in the history
updates #45
  • Loading branch information
cryptix committed May 7, 2021
1 parent 5fa6829 commit 895fc19
Show file tree
Hide file tree
Showing 21 changed files with 226 additions and 140 deletions.
6 changes: 5 additions & 1 deletion ebt.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func (s Note) MarshalJSON() ([]byte, error) {
if !s.Replicate {
return []byte("-1"), nil
}
i = s.Seq << 1
i = s.Seq
if i == -1 { // -1 is margarets way of saying "no msgs in this feed"
i = 0
}
i = i << 1 // times 2 (to make room for the rx bit)
if s.Receive {
i |= 0
} else {
Expand Down
3 changes: 2 additions & 1 deletion graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/dgraph-io/badger"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.cryptoscope.co/librarian"
libbadger "go.cryptoscope.co/librarian/badger"
"go.cryptoscope.co/margaret"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (b *builder) indexUpdateFunc(ctx context.Context, seq margaret.Seq, val int
abs, ok := val.(refs.Message)
if !ok {
err := fmt.Errorf("graph/idx: invalid msg value %T", val)
b.log.Log("msg", "contact eval failed", "reason", err)
level.Warn(b.log).Log("msg", "contact eval failed", "reason", err)
return err
}

Expand Down
92 changes: 21 additions & 71 deletions internal/statematrix/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,14 @@ type StateMatrix struct {

self string // whoami

currSeq CurrentSequencer
wantList ssb.ReplicationLister

mu sync.Mutex
open currentFrontiers
}

// map[peer reference]frontier
type currentFrontiers map[string]ssb.NetworkFrontier

type CurrentSequencer interface {
CurrentSequence(*refs.FeedRef) (ssb.Note, error)
}

func New(base string, self *refs.FeedRef, wl ssb.ReplicationLister, cs CurrentSequencer) (*StateMatrix, error) {
func New(base string, self *refs.FeedRef) (*StateMatrix, error) {

os.MkdirAll(base, onlyOwnerPerms)

Expand All @@ -55,9 +48,6 @@ func New(base string, self *refs.FeedRef, wl ssb.ReplicationLister, cs CurrentSe

self: self.Ref(),

wantList: wl,
currSeq: cs,

open: make(currentFrontiers),
}

Expand All @@ -69,13 +59,12 @@ func New(base string, self *refs.FeedRef, wl ssb.ReplicationLister, cs CurrentSe
return &sm, nil
}

/*
func (sm *StateMatrix) Open(peer *refs.FeedRef) (ssb.NetworkFrontier, error) {
// Inspect returns the current frontier for the passed peer
func (sm *StateMatrix) Inspect(peer *refs.FeedRef) (ssb.NetworkFrontier, error) {
sm.mu.Lock()
defer sm.mu.Unlock()
return sm.loadFrontier(peer)
}
*/

func (sm *StateMatrix) StateFileName(peer *refs.FeedRef) (string, error) {
peerTfk, err := tfk.Encode(peer)
Expand Down Expand Up @@ -282,76 +271,40 @@ func (sm *StateMatrix) Changed(self, peer *refs.FeedRef) (ssb.NetworkFrontier, e
sm.mu.Lock()
defer sm.mu.Unlock()

var err error

selfNf, err := sm.loadFrontier(self)
if err != nil {
return nil, err
}

// no state yet
if len(selfNf) == 0 {
// use the replication lister and determine the stored feeds lengths
lister := sm.wantList.ReplicationList()
feeds, err := lister.List()
if err != nil {
return nil, fmt.Errorf("failed to get userlist: %w", err)
}

for i, feed := range feeds {
if feed.Algo != refs.RefAlgoFeedSSB1 {
// skip other formats (TODO: gg support)
continue
}

seq, err := sm.currSeq.CurrentSequence(feed)
if err != nil {
return nil, fmt.Errorf("failed to get sequence for entry %d: %w", i, err)
}
selfNf[feed.Ref()] = seq
}

selfNf[self.Ref()], err = sm.currSeq.CurrentSequence(self)
if err != nil {
return nil, fmt.Errorf("failed to get our sequence: %w", err)
}

sm.open[sm.self] = selfNf
err = sm.save(self)
if err != nil {
return nil, err
}
}

peerNf, err := sm.loadFrontier(peer)
if err != nil {
fmt.Println("ebt/warning: remote peer state loading error:", err)
return selfNf, nil
return nil, err
}

// just the ones that differ
changedFrontier := make(ssb.NetworkFrontier)
// calculate the subset of what self wants and peer wants to hear about
relevant := make(ssb.NetworkFrontier)

for myFeed, myNote := range selfNf {
theirNote, has := peerNf[myFeed]
for wantedFeed, myNote := range selfNf {
theirNote, has := peerNf[wantedFeed]
if !has && myNote.Receive {
// they don't have it, but tell them we want it
changedFrontier[myFeed] = myNote
relevant[wantedFeed] = myNote
continue
}

if !theirNote.Receive {
// they dont care about this feed
if !theirNote.Replicate {
continue
}

if myNote.Seq > theirNote.Seq {
// we have more then them, tell them how much
changedFrontier[myFeed] = myNote
if !theirNote.Receive && wantedFeed != peer.Ref() {
// they dont care about this feed
continue
}

relevant[wantedFeed] = myNote
}

return changedFrontier, nil
return relevant, nil
}

type ObservedFeed struct {
Expand Down Expand Up @@ -380,9 +333,7 @@ func (sm *StateMatrix) Update(who *refs.FeedRef, update ssb.NetworkFrontier) (ss
return current, nil
}

// Fill updates the current frontier state.
//
// It might be deprecated.
// Fill updates who's frontier state with a list of observed feeds.
func (sm *StateMatrix) Fill(who *refs.FeedRef, feeds []ObservedFeed) error {
sm.mu.Lock()
defer sm.mu.Unlock()
Expand All @@ -392,13 +343,12 @@ func (sm *StateMatrix) Fill(who *refs.FeedRef, feeds []ObservedFeed) error {
return err
}

for _, of := range feeds {

if of.Replicate {
nf[of.Feed.Ref()] = of.Note
for _, updatedFeed := range feeds {
if updatedFeed.Replicate {
nf[updatedFeed.Feed.Ref()] = updatedFeed.Note
} else {
// seq == -1 means drop it
delete(nf, of.Feed.Ref())
delete(nf, updatedFeed.Feed.Ref())
}
}

Expand Down
30 changes: 30 additions & 0 deletions internal/statematrix/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,33 @@ func testFeed(i int) *refs.FeedRef {
ID: k,
}
}

func TestChanged(t *testing.T) {
r := require.New(t)
os.RemoveAll("testrun")
os.Mkdir("testrun", 0700)
m, err := New("testrun/new", testFeed(0))
r.NoError(err)

// 0 has seen feed(1) up until 2
feeds := []ObservedFeed{
{Feed: testFeed(1), Note: ssb.Note{Replicate: true, Receive: true, Seq: 2}},
}
r.NoError(m.Fill(testFeed(0), feeds))

// feed(1) already has 25 tho
feeds = []ObservedFeed{
{Feed: testFeed(1), Note: ssb.Note{Replicate: true, Receive: true, Seq: 25}},
}
r.NoError(m.Fill(testFeed(1), feeds))

changed, err := m.Changed(testFeed(0), testFeed(1))
r.NoError(err)

// changed should have 1 as two still (to get just 3)
note, has := changed[testFeed(1).Ref()]
r.True(has, "changed doesnt have feed(1) (has %d entries)", len(changed))
r.Equal(int64(2), note.Seq)
r.True(note.Replicate)
r.True(note.Receive)
}
17 changes: 9 additions & 8 deletions multilogs/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error {
if !ok {
return fmt.Errorf("error casting seq wrapper. got type %T", swv)
}
seq := sw.Seq()
seq := sw.Seq() //received as

// todo: defer state save!?
err := persist.Save(slog.file, seq)
Expand Down Expand Up @@ -201,7 +201,8 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error {
return slog.update(seq.Seq(), msg)
}

func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
// update all the indexes with this new message which was stored as rxSeq (received sequence number)
func (slog *CombinedIndex) update(rxSeq int64, msg refs.Message) error {
// TODO[major/pgroups] fix storage and resumption
// err := slog.seqresolver.Append(seq, msg.Seq(), msg.Claimed(), msg.Received())
// if err != nil {
Expand All @@ -215,7 +216,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return fmt.Errorf("error opening sublog: %w", err)
}
_, err = authorLog.Append(margaret.BaseSeq(seq))
_, err = authorLog.Append(margaret.BaseSeq(rxSeq))
if err != nil {
return fmt.Errorf("error updating author sublog: %w", err)
}
Expand All @@ -237,7 +238,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
content := msg.ContentBytes()
// TODO: gabby grove
if content[0] != '{' { // assuming all other content is json objects
cleartext, err := slog.tryDecrypt(msg, seq)
cleartext, err := slog.tryDecrypt(msg, rxSeq)
if err != nil {
if err == errSkip {
// yes it's a boxed message but we can't read it (yet)
Expand Down Expand Up @@ -281,7 +282,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return err
}
_, err = sl.Append(seq)
_, err = sl.Append(rxSeq)
if err != nil {
return err
}
Expand All @@ -292,7 +293,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
return fmt.Errorf("error opening sublog: %w", err)
}

_, err = typedLog.Append(seq)
_, err = typedLog.Append(rxSeq)
if err != nil {
return fmt.Errorf("error updating byType sublog: %w", err)
}
Expand All @@ -304,7 +305,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return fmt.Errorf("error opening sublog: %w", err)
}
_, err = tangleLog.Append(seq)
_, err = tangleLog.Append(rxSeq)
if err != nil {
return fmt.Errorf("error updating v1 tangle sublog: %w", err)
}
Expand All @@ -319,7 +320,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return fmt.Errorf("error opening sublog: %w", err)
}
_, err = tangleLog.Append(seq)
_, err = tangleLog.Append(rxSeq)
if err != nil {
return fmt.Errorf("error updating v2 tangle sublog: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/ebt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type MUXRPCHandler struct {
}

func (h *MUXRPCHandler) check(err error) {
if err != nil {
if err != nil && !muxrpc.IsSinkClosed(err) {
level.Error(h.info).Log("error", err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/ebt/plug.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package ebt

import (
"sync"

"github.com/cryptix/go/logging"
"go.cryptoscope.co/margaret"
"go.cryptoscope.co/margaret/multilog"
Expand Down Expand Up @@ -39,6 +41,7 @@ func NewPlug(
verify: v,

Sessions: Sessions{
mu: new(sync.Mutex),
open: make(map[string]*session),

waitingFor: make(map[string]chan<- struct{}),
Expand Down
2 changes: 1 addition & 1 deletion plugins/ebt/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *session) Unubscribe(feed *refs.FeedRef) {
}

type Sessions struct {
mu sync.Mutex
mu *sync.Mutex
open map[string]*session
// to be able to correctly trigger fallback on the server we need to be able to wait for incoming sessions
waitingFor map[string]chan<- struct{}
Expand Down
4 changes: 2 additions & 2 deletions plugins/gossip/feed_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (m *FeedManager) CreateStreamHistory(
}

default:
return fmt.Errorf("unsupported feed format.")
return fmt.Errorf("unsupported feed format")
}

sent := 0
Expand All @@ -293,7 +293,7 @@ func (m *FeedManager) CreateStreamHistory(
m.sysCtr.With("event", "gossiptx").Add(float64(sent))
} else {
if sent > 0 {
level.Debug(feedLogger).Log("event", "gossiptx", "n", sent)
level.Debug(feedLogger).Log("event", "gossiptx", "n", sent, "starting", arg.Seq)
}
}

Expand Down
2 changes: 1 addition & 1 deletion private/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestGroupsManualDecrypt(t *testing.T) {
t.Log("reply:", reply.ShortRef())

// reconnect to get the reply
edp, has := srh.Network.GetEndpointFor(tal.KeyPair.Id)
edp, has := srh.Network.GetEndpointFor(*tal.KeyPair.Id)
r.True(has)
edp.Terminate()
time.Sleep(1 * time.Second)
Expand Down
3 changes: 2 additions & 1 deletion sbot/feeds_gabby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.cryptoscope.co/ssb/message/multimsg"
)

// TODO: need to rework EBT for gabby grove support
func XTestFeedsGabbySync(t *testing.T) {
defer leakcheck.Check(t)
r := require.New(t)
Expand All @@ -52,6 +51,7 @@ func XTestFeedsGabbySync(t *testing.T) {
WithInfo(log.With(mainLog, "unit", "ali")),
WithRepoPath(filepath.Join("testrun", t.Name(), "ali")),
WithListenAddr(":0"),
DisableEBT(true), // no multi-format support yet
)
r.NoError(err)

Expand All @@ -70,6 +70,7 @@ func XTestFeedsGabbySync(t *testing.T) {
WithInfo(log.With(mainLog, "unit", "bob")),
WithRepoPath(filepath.Join("testrun", t.Name(), "bob")),
WithListenAddr(":0"),
DisableEBT(true), // no multi-format support yet
)
r.NoError(err)

Expand Down
Loading

0 comments on commit 895fc19

Please sign in to comment.