diff --git a/multilogs/combined.go b/multilogs/combined.go index 1f062b8f..c845e061 100644 --- a/multilogs/combined.go +++ b/multilogs/combined.go @@ -11,15 +11,14 @@ import ( "os" "path/filepath" "sync" - "time" "github.com/keks/persist" "go.cryptoscope.co/librarian" "go.cryptoscope.co/margaret" "go.cryptoscope.co/margaret/multilog" "go.cryptoscope.co/margaret/multilog/roaring" - "go.cryptoscope.co/ssb" + "go.cryptoscope.co/ssb" "go.cryptoscope.co/ssb/internal/statematrix" "go.cryptoscope.co/ssb/internal/storedrefs" "go.cryptoscope.co/ssb/message/multimsg" @@ -38,7 +37,7 @@ func NewCombinedIndex( box *private.Manager, self *refs.FeedRef, rxlog margaret.Log, - res *repo.SequenceResolver, + // res *repo.SequenceResolver, // TODO[major/pgroups] fix storage and resumption u, p, bt, tan *roaring.MultiLog, oh multilog.MultiLog, sm *statematrix.StateMatrix, @@ -67,8 +66,9 @@ func NewCombinedIndex( ebtState: sm, + // TODO[major/pgroups] fix storage and resumption // timestamp sorting - seqresolver: res, + // seqresolver: res, // groups reindexing rxlog: rxlog, @@ -95,7 +95,8 @@ type CombinedIndex struct { orderdHelper multilog.MultiLog - seqresolver *repo.SequenceResolver + // TODO[major/pgroups] fix storage and resumption + // seqresolver *repo.SequenceResolver ebtState *statematrix.StateMatrix @@ -182,10 +183,11 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error { if isNulled, ok := v.(error); ok { if margaret.IsErrNulled(isNulled) { - err = slog.seqresolver.Append(seq.Seq(), 0, time.Now(), time.Now()) - if err != nil { - return fmt.Errorf("error updating sequence resolver (nulled message): %w", err) - } + // TODO[major/pgroups] fix storage and resumption + // err = slog.seqresolver.Append(seq.Seq(), 0, time.Now(), time.Now()) + // if err != nil { + // return fmt.Errorf("error updating sequence resolver (nulled message): %w", err) + // } return nil } return isNulled @@ -200,10 +202,11 @@ func (slog *CombinedIndex) Pour(ctx context.Context, swv interface{}) error { } func (slog *CombinedIndex) update(seq int64, msg refs.Message) error { - err := slog.seqresolver.Append(seq, msg.Seq(), msg.Claimed(), msg.Received()) - if err != nil { - return fmt.Errorf("error updating sequence resolver: %w", err) - } + // TODO[major/pgroups] fix storage and resumption + // err := slog.seqresolver.Append(seq, msg.Seq(), msg.Claimed(), msg.Received()) + // if err != nil { + // return fmt.Errorf("error updating sequence resolver: %w", err) + // } author := msg.Author() @@ -345,10 +348,11 @@ func (slog *CombinedIndex) QuerySpec() margaret.QuerySpec { seq = margaret.SeqEmpty } - if resN := slog.seqresolver.Seq() - 1; resN != seq.Seq() { - err := fmt.Errorf("combined idx (has:%d, will: %d)", resN, seq.Seq()) - return margaret.ErrorQuerySpec(err) - } + // TODO[major/pgroups] fix storage and resumption + // if resN := slog.seqresolver.Seq() - 1; resN != seq.Seq() { + // err := fmt.Errorf("combined idx (has:%d, will: %d)", resN, seq.Seq()) + // return margaret.ErrorQuerySpec(err) + // } return margaret.MergeQuerySpec( margaret.Gt(seq), diff --git a/plugins/rawread/logt.go b/plugins/rawread/logt.go index 1500fc4d..818c2c42 100644 --- a/plugins/rawread/logt.go +++ b/plugins/rawread/logt.go @@ -46,7 +46,8 @@ func NewByTypePlugin( ml *roaring.MultiLog, pl *roaring.MultiLog, pm *private.Manager, - res *repo.SequenceResolver, + // TODO[major/pgroups] fix storage and resumption + // res *repo.SequenceResolver, isSelf ssb.Authorizer, ) ssb.Plugin { plug := &Plugin{ @@ -57,7 +58,8 @@ func NewByTypePlugin( unboxer: pm, - res: res, + // TODO[major/pgroups] fix storage and resumption + // res: res, isSelf: isSelf, } @@ -121,7 +123,7 @@ func (g Plugin) HandleSource(ctx context.Context, req *muxrpc.Request, w *muxrpc snk = newSinkCounter(&cnt, snk) idxAddr := librarian.Addr("string:" + qry.Type) - if qry.Live { + if true { // TODO[major/pgroups] fix storage and resumption if qry.Private { return fmt.Errorf("TODO: fix live && private") } @@ -130,7 +132,9 @@ func (g Plugin) HandleSource(ctx context.Context, req *muxrpc.Request, w *muxrpc return fmt.Errorf("failed to load typed log: %w", err) } - src, err := mutil.Indirect(g.rxlog, typed).Query(margaret.Limit(int(qry.Limit)), margaret.Live(qry.Live)) + src, err := mutil.Indirect(g.rxlog, typed).Query( + margaret.Limit(int(qry.Limit)), + margaret.Live(qry.Live)) if err != nil { return fmt.Errorf("logT: failed to qry tipe: %w", err) } @@ -146,6 +150,9 @@ func (g Plugin) HandleSource(ctx context.Context, req *muxrpc.Request, w *muxrpc return snk.Close() } + // TODO[major/pgroups] fix storage and resumption + // TODO: fix seq resolver filling hiccups + return snk.Close() /* TODO: i'm skipping a fairly big refactor here to find out what works first. ideallly the live and not-live code would just be the same, somehow shoving it into Query(...). diff --git a/sbot/new.go b/sbot/new.go index 5ae61741..60472a45 100644 --- a/sbot/new.go +++ b/sbot/new.go @@ -196,11 +196,12 @@ func initSbot(s *Sbot) (*Sbot, error) { s.closers.addCloser(sm) s.ebtState = sm - s.SeqResolver, err = repo.NewSequenceResolver(r) - if err != nil { - return nil, fmt.Errorf("error opening sequence resolver: %w", err) - } - s.closers.addCloser(s.SeqResolver) + // TODO[major/pgroups] fix storage and resumption + // s.SeqResolver, err = repo.NewSequenceResolver(r) + // if err != nil { + // return nil, fmt.Errorf("error opening sequence resolver: %w", err) + // } + // s.closers.addCloser(s.SeqResolver) // default multilogs var mlogs = []struct { @@ -279,7 +280,7 @@ func initSbot(s *Sbot) (*Sbot, error) { s.Groups, s.KeyPair.Id, s.ReceiveLog, - s.SeqResolver, + // s.SeqResolver, // TODO[major/pgroups] fix storage and resumption s.Users, s.Private, s.ByType, @@ -585,12 +586,13 @@ func initSbot(s *Sbot) (*Sbot, error) { s.ByType, s.Private, s.Groups, - s.SeqResolver, + // s.SeqResolver, // TODO[major/pgroups] fix storage and resumption sc)) s.master.Register(rawread.NewSequenceStream(s.ReceiveLog)) - s.master.Register(rawread.NewRXLog(s.ReceiveLog)) // createLogStream - s.master.Register(rawread.NewSortedStream(s.info, s.ReceiveLog, s.SeqResolver)) // createLogStream - s.master.Register(hist) // createHistoryStream + s.master.Register(rawread.NewRXLog(s.ReceiveLog)) // createLogStream + // TODO[major/pgroups] fix storage and resumption + // s.master.Register(rawread.NewSortedStream(s.info, s.ReceiveLog, s.SeqResolver)) + s.master.Register(hist) // createHistoryStream s.master.Register(replicate.NewPlug(s.Users)) diff --git a/sbot/options.go b/sbot/options.go index b60f7265..8b7aca87 100644 --- a/sbot/options.go +++ b/sbot/options.go @@ -92,7 +92,8 @@ type Sbot struct { ReceiveLog multimsg.AlterableLog // the stream of messages as they arrived - SeqResolver *repo.SequenceResolver + // TODO[major/pgroups] fix storage and resumption + // SeqResolver *repo.SequenceResolver PublishLog ssb.Publisher signHMACsecret *[32]byte