Skip to content

Commit

Permalink
Merge pull request #97 from cryptoscope/untangle-statemat-from-replic…
Browse files Browse the repository at this point in the history
…ation

Untangle statemat from replication
  • Loading branch information
cryptix committed May 7, 2021
2 parents 5fa6829 + 2e3b795 commit 24f7fb5
Show file tree
Hide file tree
Showing 27 changed files with 255 additions and 199 deletions.
6 changes: 5 additions & 1 deletion ebt.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func (s Note) MarshalJSON() ([]byte, error) {
if !s.Replicate {
return []byte("-1"), nil
}
i = s.Seq << 1
i = s.Seq
if i == -1 { // -1 is margarets way of saying "no msgs in this feed"
i = 0
}
i = i << 1 // times 2 (to make room for the rx bit)
if s.Receive {
i |= 0
} else {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
go.cryptoscope.co/librarian v0.2.1-0.20200604160012-d85e03a70e79
go.cryptoscope.co/luigi v0.3.6-0.20200131144242-3256b54e72c8
go.cryptoscope.co/margaret v0.1.7-0.20201027171711-332f00d22dd0
go.cryptoscope.co/muxrpc/v2 v2.0.3
go.cryptoscope.co/muxrpc/v2 v2.0.4
go.cryptoscope.co/netwrap v0.1.1
go.cryptoscope.co/secretstream v1.2.2
go.mindeco.de/ssb-gabbygrove v0.1.7-0.20200618115102-169cb68d2398
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ go.cryptoscope.co/muxrpc/v2 v2.0.2 h1:UdlGHY+EEYZpJz5HWnWz0r34pULYxJHfFTeqLvv+7s
go.cryptoscope.co/muxrpc/v2 v2.0.2/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
go.cryptoscope.co/muxrpc/v2 v2.0.3 h1:WpkWYFBIXKwxtxFStHpcYqaHfSyJxTcgdWIa0FgXdDM=
go.cryptoscope.co/muxrpc/v2 v2.0.3/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
go.cryptoscope.co/muxrpc/v2 v2.0.4 h1:NLN//zPt9UKFelnPNBh3fefrQ/TFylCflPZhKiDtK3U=
go.cryptoscope.co/muxrpc/v2 v2.0.4/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
go.cryptoscope.co/netwrap v0.1.0/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
go.cryptoscope.co/netwrap v0.1.1 h1:JLzzGKEvrUrkKzu3iM0DhpHmt+L/gYqmpcf1lJMUyFs=
go.cryptoscope.co/netwrap v0.1.1/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
Expand Down
3 changes: 2 additions & 1 deletion graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/dgraph-io/badger"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.cryptoscope.co/librarian"
libbadger "go.cryptoscope.co/librarian/badger"
"go.cryptoscope.co/margaret"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (b *builder) indexUpdateFunc(ctx context.Context, seq margaret.Seq, val int
abs, ok := val.(refs.Message)
if !ok {
err := fmt.Errorf("graph/idx: invalid msg value %T", val)
b.log.Log("msg", "contact eval failed", "reason", err)
level.Warn(b.log).Log("msg", "contact eval failed", "reason", err)
return err
}

Expand Down
92 changes: 21 additions & 71 deletions internal/statematrix/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,14 @@ type StateMatrix struct {

self string // whoami

currSeq CurrentSequencer
wantList ssb.ReplicationLister

mu sync.Mutex
open currentFrontiers
}

// map[peer reference]frontier
type currentFrontiers map[string]ssb.NetworkFrontier

type CurrentSequencer interface {
CurrentSequence(*refs.FeedRef) (ssb.Note, error)
}

func New(base string, self *refs.FeedRef, wl ssb.ReplicationLister, cs CurrentSequencer) (*StateMatrix, error) {
func New(base string, self *refs.FeedRef) (*StateMatrix, error) {

os.MkdirAll(base, onlyOwnerPerms)

Expand All @@ -55,9 +48,6 @@ func New(base string, self *refs.FeedRef, wl ssb.ReplicationLister, cs CurrentSe

self: self.Ref(),

wantList: wl,
currSeq: cs,

open: make(currentFrontiers),
}

Expand All @@ -69,13 +59,12 @@ func New(base string, self *refs.FeedRef, wl ssb.ReplicationLister, cs CurrentSe
return &sm, nil
}

/*
func (sm *StateMatrix) Open(peer *refs.FeedRef) (ssb.NetworkFrontier, error) {
// Inspect returns the current frontier for the passed peer
func (sm *StateMatrix) Inspect(peer *refs.FeedRef) (ssb.NetworkFrontier, error) {
sm.mu.Lock()
defer sm.mu.Unlock()
return sm.loadFrontier(peer)
}
*/

func (sm *StateMatrix) StateFileName(peer *refs.FeedRef) (string, error) {
peerTfk, err := tfk.Encode(peer)
Expand Down Expand Up @@ -282,76 +271,40 @@ func (sm *StateMatrix) Changed(self, peer *refs.FeedRef) (ssb.NetworkFrontier, e
sm.mu.Lock()
defer sm.mu.Unlock()

var err error

selfNf, err := sm.loadFrontier(self)
if err != nil {
return nil, err
}

// no state yet
if len(selfNf) == 0 {
// use the replication lister and determine the stored feeds lengths
lister := sm.wantList.ReplicationList()
feeds, err := lister.List()
if err != nil {
return nil, fmt.Errorf("failed to get userlist: %w", err)
}

for i, feed := range feeds {
if feed.Algo != refs.RefAlgoFeedSSB1 {
// skip other formats (TODO: gg support)
continue
}

seq, err := sm.currSeq.CurrentSequence(feed)
if err != nil {
return nil, fmt.Errorf("failed to get sequence for entry %d: %w", i, err)
}
selfNf[feed.Ref()] = seq
}

selfNf[self.Ref()], err = sm.currSeq.CurrentSequence(self)
if err != nil {
return nil, fmt.Errorf("failed to get our sequence: %w", err)
}

sm.open[sm.self] = selfNf
err = sm.save(self)
if err != nil {
return nil, err
}
}

peerNf, err := sm.loadFrontier(peer)
if err != nil {
fmt.Println("ebt/warning: remote peer state loading error:", err)
return selfNf, nil
return nil, err
}

// just the ones that differ
changedFrontier := make(ssb.NetworkFrontier)
// calculate the subset of what self wants and peer wants to hear about
relevant := make(ssb.NetworkFrontier)

for myFeed, myNote := range selfNf {
theirNote, has := peerNf[myFeed]
for wantedFeed, myNote := range selfNf {
theirNote, has := peerNf[wantedFeed]
if !has && myNote.Receive {
// they don't have it, but tell them we want it
changedFrontier[myFeed] = myNote
relevant[wantedFeed] = myNote
continue
}

if !theirNote.Receive {
// they dont care about this feed
if !theirNote.Replicate {
continue
}

if myNote.Seq > theirNote.Seq {
// we have more then them, tell them how much
changedFrontier[myFeed] = myNote
if !theirNote.Receive && wantedFeed != peer.Ref() {
// they dont care about this feed
continue
}

relevant[wantedFeed] = myNote
}

return changedFrontier, nil
return relevant, nil
}

type ObservedFeed struct {
Expand Down Expand Up @@ -380,9 +333,7 @@ func (sm *StateMatrix) Update(who *refs.FeedRef, update ssb.NetworkFrontier) (ss
return current, nil
}

// Fill updates the current frontier state.
//
// It might be deprecated.
// Fill updates who's frontier state with a list of observed feeds.
func (sm *StateMatrix) Fill(who *refs.FeedRef, feeds []ObservedFeed) error {
sm.mu.Lock()
defer sm.mu.Unlock()
Expand All @@ -392,13 +343,12 @@ func (sm *StateMatrix) Fill(who *refs.FeedRef, feeds []ObservedFeed) error {
return err
}

for _, of := range feeds {

if of.Replicate {
nf[of.Feed.Ref()] = of.Note
for _, updatedFeed := range feeds {
if updatedFeed.Replicate {
nf[updatedFeed.Feed.Ref()] = updatedFeed.Note
} else {
// seq == -1 means drop it
delete(nf, of.Feed.Ref())
delete(nf, updatedFeed.Feed.Ref())
}
}

Expand Down
30 changes: 30 additions & 0 deletions internal/statematrix/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,33 @@ func testFeed(i int) *refs.FeedRef {
ID: k,
}
}

func TestChanged(t *testing.T) {
r := require.New(t)
os.RemoveAll("testrun")
os.Mkdir("testrun", 0700)
m, err := New("testrun/new", testFeed(0))
r.NoError(err)

// 0 has seen feed(1) up until 2
feeds := []ObservedFeed{
{Feed: testFeed(1), Note: ssb.Note{Replicate: true, Receive: true, Seq: 2}},
}
r.NoError(m.Fill(testFeed(0), feeds))

// feed(1) already has 25 tho
feeds = []ObservedFeed{
{Feed: testFeed(1), Note: ssb.Note{Replicate: true, Receive: true, Seq: 25}},
}
r.NoError(m.Fill(testFeed(1), feeds))

changed, err := m.Changed(testFeed(0), testFeed(1))
r.NoError(err)

// changed should have 1 as two still (to get just 3)
note, has := changed[testFeed(1).Ref()]
r.True(has, "changed doesnt have feed(1) (has %d entries)", len(changed))
r.Equal(int64(2), note.Seq)
r.True(note.Replicate)
r.True(note.Receive)
}
17 changes: 9 additions & 8 deletions multilogs/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error {
if !ok {
return fmt.Errorf("error casting seq wrapper. got type %T", swv)
}
seq := sw.Seq()
seq := sw.Seq() //received as

// todo: defer state save!?
err := persist.Save(slog.file, seq)
Expand Down Expand Up @@ -201,7 +201,8 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error {
return slog.update(seq.Seq(), msg)
}

func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
// update all the indexes with this new message which was stored as rxSeq (received sequence number)
func (slog *CombinedIndex) update(rxSeq int64, msg refs.Message) error {
// TODO[major/pgroups] fix storage and resumption
// err := slog.seqresolver.Append(seq, msg.Seq(), msg.Claimed(), msg.Received())
// if err != nil {
Expand All @@ -215,7 +216,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return fmt.Errorf("error opening sublog: %w", err)
}
_, err = authorLog.Append(margaret.BaseSeq(seq))
_, err = authorLog.Append(margaret.BaseSeq(rxSeq))
if err != nil {
return fmt.Errorf("error updating author sublog: %w", err)
}
Expand All @@ -237,7 +238,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
content := msg.ContentBytes()
// TODO: gabby grove
if content[0] != '{' { // assuming all other content is json objects
cleartext, err := slog.tryDecrypt(msg, seq)
cleartext, err := slog.tryDecrypt(msg, rxSeq)
if err != nil {
if err == errSkip {
// yes it's a boxed message but we can't read it (yet)
Expand Down Expand Up @@ -281,7 +282,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return err
}
_, err = sl.Append(seq)
_, err = sl.Append(rxSeq)
if err != nil {
return err
}
Expand All @@ -292,7 +293,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
return fmt.Errorf("error opening sublog: %w", err)
}

_, err = typedLog.Append(seq)
_, err = typedLog.Append(rxSeq)
if err != nil {
return fmt.Errorf("error updating byType sublog: %w", err)
}
Expand All @@ -304,7 +305,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return fmt.Errorf("error opening sublog: %w", err)
}
_, err = tangleLog.Append(seq)
_, err = tangleLog.Append(rxSeq)
if err != nil {
return fmt.Errorf("error updating v1 tangle sublog: %w", err)
}
Expand All @@ -319,7 +320,7 @@ func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
if err != nil {
return fmt.Errorf("error opening sublog: %w", err)
}
_, err = tangleLog.Append(seq)
_, err = tangleLog.Append(rxSeq)
if err != nil {
return fmt.Errorf("error updating v2 tangle sublog: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions network/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,12 @@ func (n *node) handleConnection(ctx context.Context, origConn net.Conn, isServer
h = hw(h)
}

pkr := muxrpc.NewPacker(conn)
filtered := level.NewFilter(n.log, level.AllowInfo())
edp := muxrpc.Handle(pkr, h,
connLogger := n.log
connLogger = level.NewFilter(connLogger, level.AllowInfo())

edp := muxrpc.Handle(muxrpc.NewPacker(conn), h,
muxrpc.WithContext(ctx),
muxrpc.WithLogger(filtered),
muxrpc.WithLogger(connLogger),
// _isServer_ defines _are we a server_.
// the muxrpc option asks are we _talking_ to a server > inverted
muxrpc.WithIsServer(!isServer))
Expand Down
1 change: 0 additions & 1 deletion network/tunnel_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func (newConn handleNewConnection) HandleConnect(ctx context.Context, edp muxrpc
var yes bool
err = edp.Async(ctx, &yes, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "isRoom"})
if err != nil || !yes {
level.Warn(peerLogger).Log("event", "not a room", "err", err)
return
}

Expand Down

0 comments on commit 24f7fb5

Please sign in to comment.