Skip to content

Commit

Permalink
disable timestamp sorter
Browse files Browse the repository at this point in the history
see #61 (comment)
for context.

The mentioned bug holds back development quite a bit... every crash
leads to the seqmaps beeing out of sync since they are just hold in ram
and not persisted until a CLEAN shutdown of the server.
  • Loading branch information
cryptix committed Feb 4, 2021
1 parent 15d616d commit ace3e87
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 32 deletions.
38 changes: 21 additions & 17 deletions multilogs/combined.go
Expand Up @@ -11,15 +11,14 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/keks/persist"
"go.cryptoscope.co/librarian"
"go.cryptoscope.co/margaret"
"go.cryptoscope.co/margaret/multilog"
"go.cryptoscope.co/margaret/multilog/roaring"
"go.cryptoscope.co/ssb"

"go.cryptoscope.co/ssb"
"go.cryptoscope.co/ssb/internal/statematrix"
"go.cryptoscope.co/ssb/internal/storedrefs"
"go.cryptoscope.co/ssb/message/multimsg"
Expand All @@ -38,7 +37,7 @@ func NewCombinedIndex(
box *private.Manager,
self *refs.FeedRef,
rxlog margaret.Log,
res *repo.SequenceResolver,
// res *repo.SequenceResolver, // TODO[major/pgroups] fix storage and resumption
u, p, bt, tan *roaring.MultiLog,
oh multilog.MultiLog,
sm *statematrix.StateMatrix,
Expand Down Expand Up @@ -67,8 +66,9 @@ func NewCombinedIndex(

ebtState: sm,

// TODO[major/pgroups] fix storage and resumption
// timestamp sorting
seqresolver: res,
// seqresolver: res,

// groups reindexing
rxlog: rxlog,
Expand All @@ -95,7 +95,8 @@ type CombinedIndex struct {

orderdHelper multilog.MultiLog

seqresolver *repo.SequenceResolver
// TODO[major/pgroups] fix storage and resumption
// seqresolver *repo.SequenceResolver

ebtState *statematrix.StateMatrix

Expand Down Expand Up @@ -182,10 +183,11 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error {

if isNulled, ok := v.(error); ok {
if margaret.IsErrNulled(isNulled) {
err = slog.seqresolver.Append(seq.Seq(), 0, time.Now(), time.Now())
if err != nil {
return fmt.Errorf("error updating sequence resolver (nulled message): %w", err)
}
// TODO[major/pgroups] fix storage and resumption
// err = slog.seqresolver.Append(seq.Seq(), 0, time.Now(), time.Now())
// if err != nil {
// return fmt.Errorf("error updating sequence resolver (nulled message): %w", err)
// }
return nil
}
return isNulled
Expand All @@ -200,10 +202,11 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error {
}

func (slog *CombinedIndex) update(seq int64, msg refs.Message) error {
err := slog.seqresolver.Append(seq, msg.Seq(), msg.Claimed(), msg.Received())
if err != nil {
return fmt.Errorf("error updating sequence resolver: %w", err)
}
// TODO[major/pgroups] fix storage and resumption
// err := slog.seqresolver.Append(seq, msg.Seq(), msg.Claimed(), msg.Received())
// if err != nil {
// return fmt.Errorf("error updating sequence resolver: %w", err)
// }

author := msg.Author()

Expand Down Expand Up @@ -345,10 +348,11 @@ func (slog *CombinedIndex) QuerySpec() margaret.QuerySpec {
seq = margaret.SeqEmpty
}

if resN := slog.seqresolver.Seq() - 1; resN != seq.Seq() {
err := fmt.Errorf("combined idx (has:%d, will: %d)", resN, seq.Seq())
return margaret.ErrorQuerySpec(err)
}
// TODO[major/pgroups] fix storage and resumption
// if resN := slog.seqresolver.Seq() - 1; resN != seq.Seq() {
// err := fmt.Errorf("combined idx (has:%d, will: %d)", resN, seq.Seq())
// return margaret.ErrorQuerySpec(err)
// }

return margaret.MergeQuerySpec(
margaret.Gt(seq),
Expand Down
15 changes: 11 additions & 4 deletions plugins/rawread/logt.go
Expand Up @@ -46,7 +46,8 @@ func NewByTypePlugin(
ml *roaring.MultiLog,
pl *roaring.MultiLog,
pm *private.Manager,
res *repo.SequenceResolver,
// TODO[major/pgroups] fix storage and resumption
// res *repo.SequenceResolver,
isSelf ssb.Authorizer,
) ssb.Plugin {
plug := &Plugin{
Expand All @@ -57,7 +58,8 @@ func NewByTypePlugin(

unboxer: pm,

res: res,
// TODO[major/pgroups] fix storage and resumption
// res: res,

isSelf: isSelf,
}
Expand Down Expand Up @@ -121,7 +123,7 @@ func (g Plugin) HandleSource(ctx context.Context, req *muxrpc.Request, w *muxrpc
snk = newSinkCounter(&cnt, snk)

idxAddr := librarian.Addr("string:" + qry.Type)
if qry.Live {
if true { // TODO[major/pgroups] fix storage and resumption
if qry.Private {
return fmt.Errorf("TODO: fix live && private")
}
Expand All @@ -130,7 +132,9 @@ func (g Plugin) HandleSource(ctx context.Context, req *muxrpc.Request, w *muxrpc
return fmt.Errorf("failed to load typed log: %w", err)
}

src, err := mutil.Indirect(g.rxlog, typed).Query(margaret.Limit(int(qry.Limit)), margaret.Live(qry.Live))
src, err := mutil.Indirect(g.rxlog, typed).Query(
margaret.Limit(int(qry.Limit)),
margaret.Live(qry.Live))
if err != nil {
return fmt.Errorf("logT: failed to qry tipe: %w", err)
}
Expand All @@ -146,6 +150,9 @@ func (g Plugin) HandleSource(ctx context.Context, req *muxrpc.Request, w *muxrpc

return snk.Close()
}
// TODO[major/pgroups] fix storage and resumption
// TODO: fix seq resolver filling hiccups
return snk.Close()

/* TODO: i'm skipping a fairly big refactor here to find out what works first.
ideallly the live and not-live code would just be the same, somehow shoving it into Query(...).
Expand Down
22 changes: 12 additions & 10 deletions sbot/new.go
Expand Up @@ -196,11 +196,12 @@ func initSbot(s *Sbot) (*Sbot, error) {
s.closers.addCloser(sm)
s.ebtState = sm

s.SeqResolver, err = repo.NewSequenceResolver(r)
if err != nil {
return nil, fmt.Errorf("error opening sequence resolver: %w", err)
}
s.closers.addCloser(s.SeqResolver)
// TODO[major/pgroups] fix storage and resumption
// s.SeqResolver, err = repo.NewSequenceResolver(r)
// if err != nil {
// return nil, fmt.Errorf("error opening sequence resolver: %w", err)
// }
// s.closers.addCloser(s.SeqResolver)

// default multilogs
var mlogs = []struct {
Expand Down Expand Up @@ -279,7 +280,7 @@ func initSbot(s *Sbot) (*Sbot, error) {
s.Groups,
s.KeyPair.Id,
s.ReceiveLog,
s.SeqResolver,
// s.SeqResolver, // TODO[major/pgroups] fix storage and resumption
s.Users,
s.Private,
s.ByType,
Expand Down Expand Up @@ -585,12 +586,13 @@ func initSbot(s *Sbot) (*Sbot, error) {
s.ByType,
s.Private,
s.Groups,
s.SeqResolver,
// s.SeqResolver, // TODO[major/pgroups] fix storage and resumption
sc))
s.master.Register(rawread.NewSequenceStream(s.ReceiveLog))
s.master.Register(rawread.NewRXLog(s.ReceiveLog)) // createLogStream
s.master.Register(rawread.NewSortedStream(s.info, s.ReceiveLog, s.SeqResolver)) // createLogStream
s.master.Register(hist) // createHistoryStream
s.master.Register(rawread.NewRXLog(s.ReceiveLog)) // createLogStream
// TODO[major/pgroups] fix storage and resumption
// s.master.Register(rawread.NewSortedStream(s.info, s.ReceiveLog, s.SeqResolver))
s.master.Register(hist) // createHistoryStream

s.master.Register(replicate.NewPlug(s.Users))

Expand Down
3 changes: 2 additions & 1 deletion sbot/options.go
Expand Up @@ -92,7 +92,8 @@ type Sbot struct {

ReceiveLog multimsg.AlterableLog // the stream of messages as they arrived

SeqResolver *repo.SequenceResolver
// TODO[major/pgroups] fix storage and resumption
// SeqResolver *repo.SequenceResolver

PublishLog ssb.Publisher
signHMACsecret *[32]byte
Expand Down

0 comments on commit ace3e87

Please sign in to comment.