From 02bab62a4344b1f2a97264a8960743202f9f6ccd Mon Sep 17 00:00:00 2001 From: kbabushkin Date: Mon, 15 Apr 2024 13:04:18 -0700 Subject: [PATCH] simple filtering --- docker-compose.yaml | 4 + golibs/datetime/date.go | 3 +- golibs/ulidutils/ulidutils.go | 6 + pkg/api/rest/rest.go | 34 ++-- pkg/intervals/intervals.go | 4 +- pkg/ql/datetime.go | 1 + pkg/ql/intervalbuilder.go | 16 +- pkg/storage/buntdb/buntdb.go | 4 +- pkg/storage/logfs/locallog.go | 313 +++++++++++++++++++++-------- pkg/storage/logfs/locallog_test.go | 64 ++++++ pkg/storage/storage.go | 4 +- 11 files changed, 340 insertions(+), 113 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index b6c9510..104e3f7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,6 +11,8 @@ services: ports: - 50051:50051 - 8080:8080 + volumes: + - fs:/app/slogs healthcheck: test: [ "CMD", "/bin/grpc_health_probe", "-addr=:50051" ] interval: 10s @@ -34,5 +36,7 @@ services: retries: 3 volumes: + fs: + driver: local db: driver: local diff --git a/golibs/datetime/date.go b/golibs/datetime/date.go index c0383d0..05fd84f 100644 --- a/golibs/datetime/date.go +++ b/golibs/datetime/date.go @@ -107,6 +107,7 @@ var ( "YYYY/M/DD", // yyyy-mm-ddThh + "YYYY-MM-DDTHH:mm:ss.SSSZZZZZ", "YYYY-MM-DDTHH:mm:ss.SSSZZZZ", "YYYY-MM-DDTHH:mm:ss.SSSZ", "YYYY-MM-DDTHH:mm:ssZZZZZ", @@ -161,7 +162,7 @@ var ( {"m", "4", "\\d{1,2}"}, {"ss", "05", "\\d{2}"}, {"s", "5", "\\d{1,2}"}, - {".SSS", ".999999999", ".\\d{3,}"}, + {".SSS", ".999999999", ".\\d+"}, {"P", "PM", "(?:am|AM|pm|PM)"}, {"ZZZZZ", "-07:00", "[+-][0-9]{2}:[0-9]{2}"}, {"ZZZZ", "-0700", "[+-][0-9]{4}"}, diff --git a/golibs/ulidutils/ulidutils.go b/golibs/ulidutils/ulidutils.go index 503f6ca..0b7664f 100644 --- a/golibs/ulidutils/ulidutils.go +++ b/golibs/ulidutils/ulidutils.go @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package ulidutils import ( @@ -20,6 +21,11 @@ import ( "github.com/oklog/ulid/v2" ) +var ( + ZeroULID = ulid.ULID{} + MaxULID, _ = ulid.Parse("7ZZZZZZZZZZZZZZZZZZZZZZZZZ") +) + // New returns new ulid.ULID. func New() ulid.ULID { return ulid.Make() diff --git a/pkg/api/rest/rest.go b/pkg/api/rest/rest.go index f30f44c..88ad023 100644 --- a/pkg/api/rest/rest.go +++ b/pkg/api/rest/rest.go @@ -50,11 +50,11 @@ func (r *Rest) RegisterEPs(g *gin.Engine) error { func (r *Rest) CreateLog(c *gin.Context) { var rReq restapi.CreateLogRequest - if r.errorRespnse(c, BindAppJson(c, &rReq), "") { + if r.errorResponse(c, BindAppJson(c, &rReq), "") { return } sLog, err := r.svc.CreateLog(c, &solaris.Log{Tags: rReq.Tags}) - if r.errorRespnse(c, err, "") { + if r.errorResponse(c, err, "") { return } c.JSON(http.StatusCreated, logToRest(sLog)) @@ -62,11 +62,11 @@ func (r *Rest) CreateLog(c *gin.Context) { func (r *Rest) UpdateLog(c *gin.Context, logId restapi.LogId) { var rReq restapi.UpdateLogRequest - if r.errorRespnse(c, BindAppJson(c, &rReq), "") { + if r.errorResponse(c, BindAppJson(c, &rReq), "") { return } sLog, err := r.svc.UpdateLog(c, &solaris.Log{ID: logId, Tags: rReq.Tags}) - if r.errorRespnse(c, err, "") { + if r.errorResponse(c, err, "") { return } c.JSON(http.StatusOK, logToRest(sLog)) @@ -74,11 +74,11 @@ func (r *Rest) UpdateLog(c *gin.Context, logId restapi.LogId) { func (r *Rest) DeleteLogs(c *gin.Context) { var rReq restapi.DeleteLogsRequest - if r.errorRespnse(c, BindAppJson(c, &rReq), "") { + if r.errorResponse(c, BindAppJson(c, &rReq), "") { return } sRes, err := r.svc.DeleteLogs(c, &solaris.DeleteLogsRequest{Condition: rReq.FilterCondition}) - if r.errorRespnse(c, err, "") { + if r.errorResponse(c, err, "") { return } c.JSON(http.StatusOK, restapi.DeleteLogsResponse{Deleted: len(sRes.DeletedIDs)}) @@ -91,7 +91,7 @@ func (r *Rest) QueryLogs(c *gin.Context, params restapi.QueryLogsParams) { sReq.PageID = cast.String(params.FromPageId, "") sRes, err := r.svc.QueryLogs(c, sReq) - if r.errorRespnse(c, err, "") { + if r.errorResponse(c, err, "") { return } rRes := restapi.QueryLogsResult{Items: logsToRest(sRes.Logs), Total: int(sRes.Total)} @@ -103,14 +103,14 @@ func (r *Rest) QueryLogs(c *gin.Context, params restapi.QueryLogsParams) { func (r *Rest) CreateRecords(c *gin.Context, logId restapi.LogId) { var rReq restapi.CreateRecordsRequest - if r.errorRespnse(c, BindAppJson(c, &rReq), "") { + if r.errorResponse(c, BindAppJson(c, &rReq), "") { return } sReq := new(solaris.AppendRecordsRequest) sReq.LogID = logId sReq.Records = createRecsToSvc(rReq.Records) sRes, err := r.svc.AppendRecords(c, sReq) - if r.errorRespnse(c, err, "") { + if r.errorResponse(c, err, "") { return } c.JSON(http.StatusCreated, restapi.CreateRecordsResponse{Added: int(sRes.Added)}) @@ -126,16 +126,14 @@ func (r *Rest) QueryRecords(c *gin.Context, params restapi.QueryRecordsParams) { sReq.Limit = int64(cast.Int(params.Limit, 0)) sResQ, err := r.svc.QueryRecords(c, sReq) - if r.errorRespnse(c, err, "") { + if r.errorResponse(c, err, "") { return } - // TODO: uncomment after svc.CountRecords is implemented - //sResC, err := r.svc.CountRecords(c, sReq) - //if r.errorRespnse(c, err, "") { - // return - //} - //rRes := restapi.QueryRecordsResult{Items: recsToRest(sResQ.Records), Total: int(sResC.Total)} - rRes := restapi.QueryRecordsResult{Items: recsToRest(sResQ.Records), Total: len(sResQ.Records)} + sResC, err := r.svc.CountRecords(c, sReq) + if r.errorResponse(c, err, "") { + return + } + rRes := restapi.QueryRecordsResult{Items: recsToRest(sResQ.Records), Total: int(sResC.Count)} if len(sResQ.NextPageID) > 0 { rRes.NextPageId = cast.Ptr(sResQ.NextPageID) } @@ -146,7 +144,7 @@ func (r *Rest) Ping(c *gin.Context) { c.String(http.StatusOK, "pong") } -func (r *Rest) errorRespnse(c *gin.Context, err error, msg string) bool { +func (r *Rest) errorResponse(c *gin.Context, err error, msg string) bool { if err == nil { return false } diff --git a/pkg/intervals/intervals.go b/pkg/intervals/intervals.go index bd52994..a9ead33 100644 --- a/pkg/intervals/intervals.go +++ b/pkg/intervals/intervals.go @@ -206,8 +206,8 @@ func (b Basis[T]) Union(i1, i2 Interval[T]) (Interval[T], bool) { } // After returns true if the L border of the i1 interval -// is lesser than the R border of the i2 interval. -// NOTE: The `(L=1, R=3)` and `(L=2, R=4)` intervals are +// is greater than the R border of the i2 interval. +// NOTE: The `(L=2, R=4)` and `(L=1, R=3)` intervals are // considered to have an intersection (see Intersection description) // and the `(L=2, R=4).After.((L=1, R=3))` call returns false. func (b Basis[T]) After(i1, i2 Interval[T]) bool { diff --git a/pkg/ql/datetime.go b/pkg/ql/datetime.go index 0c639c7..dc4c636 100644 --- a/pkg/ql/datetime.go +++ b/pkg/ql/datetime.go @@ -67,6 +67,7 @@ var dateTimeParser = datetime.NewParser([]string{ "YYYY/M/DD", // yyyy-mm-ddThh + "YYYY-MM-DDTHH:mm:ss.SSSZZZZZ", "YYYY-MM-DDTHH:mm:ss.SSSZZZZ", "YYYY-MM-DDTHH:mm:ss.SSSZ", "YYYY-MM-DDTHH:mm:ssZZZZZ", diff --git a/pkg/ql/intervalbuilder.go b/pkg/ql/intervalbuilder.go index 3b9362a..92c5863 100644 --- a/pkg/ql/intervalbuilder.go +++ b/pkg/ql/intervalbuilder.go @@ -46,6 +46,7 @@ func NewParamIntervalBuilder[T, K any](basis intervals.Basis[T], dialect Dialect } // Build returns a list of intervals built from the AST expression. +// Returned intervals are sorted by the L border. func (ib *ParamIntervalBuilder[T, K]) Build(expr *Expression) ([]intervals.Interval[T], error) { var res []intervals.Interval[T] for _, or := range expr.Or { @@ -57,8 +58,7 @@ func (ib *ParamIntervalBuilder[T, K]) Build(expr *Expression) ([]intervals.Inter res = append(res, tt...) } } - res = ib.union(res) - return res, nil + return ib.union(res), nil } func (ib *ParamIntervalBuilder[T, K]) buildOR(or *OrCondition) ([]intervals.Interval[T], error) { @@ -105,9 +105,6 @@ func (ib *ParamIntervalBuilder[T, K]) buildXCond(and *XCondition) ([]intervals.I func (ib *ParamIntervalBuilder[T, K]) buildCond(cond *Condition) ([]intervals.Interval[T], error) { // param1 p1 := cond.FirstParam - if p1.Name(false) != ib.param { // skip not the param we look for - return nil, nil - } dp1, ok := ib.dialect[p1.ID()] if !ok { return nil, fmt.Errorf("the parameter %s must be known: %w", p1.Name(false), errors.ErrInvalid) @@ -118,15 +115,15 @@ func (ib *ParamIntervalBuilder[T, K]) buildCond(cond *Condition) ([]intervals.In if dp1.Flags&PfNop != 0 { return nil, fmt.Errorf("the parameter %s must allow operation (%s): %w", p1.Name(false), cond.Op, errors.ErrInvalid) } + if p1.Name(false) != ib.param { // skip not the param we look for + return nil, nil + } // param2 p2 := cond.SecondParam if p2 == nil { return nil, fmt.Errorf("the second parameter must be specified for the parameter %s and the operation %q: %w", p1.Name(false), cond.Op, errors.ErrInvalid) } - if p2.Const == nil { // skip not a constant param - return nil, nil - } dp2, ok := ib.dialect[p2.ID()] if !ok { return nil, fmt.Errorf("the second parameter %s must be known: %w", p2.Name(false), errors.ErrInvalid) @@ -137,6 +134,9 @@ func (ib *ParamIntervalBuilder[T, K]) buildCond(cond *Condition) ([]intervals.In if dp2.Flags&PfNop != 0 { return nil, fmt.Errorf("the second parameter %s must allow operation (%s): %w", p2.Name(false), cond.Op, errors.ErrInvalid) } + if p2.Const == nil { // skip not a constant param + return nil, nil + } // operation if !ib.ops[cond.Op] { // skip not the ops we look for diff --git a/pkg/storage/buntdb/buntdb.go b/pkg/storage/buntdb/buntdb.go index bf83622..1038772 100644 --- a/pkg/storage/buntdb/buntdb.go +++ b/pkg/storage/buntdb/buntdb.go @@ -58,8 +58,6 @@ type ( } ) -const maxULID = "7ZZZZZZZZZZZZZZZZZZZZZZZZZ" - // NewStorage creates new logs meta storage based on BuntDB func NewStorage(cfg Config) *Storage { return &Storage{cfg: &cfg} @@ -381,7 +379,7 @@ func (s *Storage) queryLogsByCondition(ctx context.Context, qr storage.QueryLogs tx := mustBeginTx(s.db, false) defer mustRollback(tx) - if err = tx.AscendRange("", logKey(qr.Page), logKey(maxULID), iter); err != nil { + if err = tx.AscendRange("", logKey(qr.Page), logKey(ulidutils.MaxULID.String()), iter); err != nil { return nil, fmt.Errorf("iteration failed: %w", err) } if iterErr != nil { diff --git a/pkg/storage/logfs/locallog.go b/pkg/storage/logfs/locallog.go index 7b8a636..8e8d85f 100644 --- a/pkg/storage/logfs/locallog.go +++ b/pkg/storage/logfs/locallog.go @@ -27,9 +27,13 @@ import ( "github.com/solarisdb/solaris/golibs/errors" "github.com/solarisdb/solaris/golibs/logging" "github.com/solarisdb/solaris/golibs/ulidutils" + "github.com/solarisdb/solaris/pkg/intervals" + "github.com/solarisdb/solaris/pkg/ql" "github.com/solarisdb/solaris/pkg/storage" "github.com/solarisdb/solaris/pkg/storage/chunkfs" "google.golang.org/protobuf/types/known/timestamppb" + "strings" + "time" ) type ( @@ -68,6 +72,11 @@ type ( // RecordsCount is the number of records stored in the chunk RecordsCount int `json:"recordsCount"` } + + idRange struct { + start ulid.ULID + end ulid.ULID + } ) const ( @@ -79,6 +88,11 @@ const ( var _ storage.Log = (*localLog)(nil) +var ( + tiBasis = intervals.BasisTime + tiBuilder = ql.NewParamIntervalBuilder(tiBasis, ql.RecordsCondValueDialect, "ctime", ql.OpsAll) +) + // NewLocalLog creates the new localLog object for the cfg provided func NewLocalLog(cfg Config) *localLog { l := new(localLog) @@ -217,59 +231,69 @@ func (l *localLog) QueryRecords(ctx context.Context, request storage.QueryRecord if err != nil { return nil, false, err } - if len(cis) == 0 { return nil, false, nil } - var idx int + var fromIdx int inc := 1 if request.Descending { inc = -1 - idx = len(cis) - 1 + fromIdx = len(cis) - 1 } + var sid ulid.ULID - var empty ulid.ULID if request.StartID != "" { - if err := sid.UnmarshalText(cast.StringToByteArray(request.StartID)); err != nil { + if err = sid.UnmarshalText(cast.StringToByteArray(request.StartID)); err != nil { l.logger.Warnf("could not unmarshal startID=%s: %v", request.StartID, err) return nil, false, fmt.Errorf("wrong startID=%q: %w", request.StartID, errors.ErrInvalid) } - if request.Descending { - idx = sort.Search(len(cis), func(i int) bool { + fromIdx = sort.Search(len(cis), func(i int) bool { return cis[i].Min.Compare(sid) > 0 }) - idx-- + fromIdx-- inc = -1 } else { - idx = sort.Search(len(cis), func(i int) bool { + fromIdx = sort.Search(len(cis), func(i int) bool { return cis[i].Max.Compare(sid) >= 0 }) } } + tis, err := getIntervals(request.Condition) + if err != nil { + return nil, false, err + } + if len(request.Condition) > 0 && len(tis) == 0 { + return nil, false, nil + } + limit := int(request.Limit) if limit > l.cfg.MaxRecordsLimit { limit = l.cfg.MaxRecordsLimit } totalSize := 0 - res := []*solaris.Record{} - for idx >= 0 && idx < len(cis) && limit > len(res) { + + var res []*solaris.Record + for idx := fromIdx; idx >= 0 && idx < len(cis) && limit > len(res); idx += inc { ci := cis[idx] - srecs, err := l.readRecords(ctx, lid, ci, request.Descending, sid, limit-len(res), &totalSize) + idRanges := getRanges(tis, ci) + if len(request.Condition) > 0 && len(idRanges) == 0 { + continue + } + srecs, err := l.readRecords(ctx, lid, ci, request.Descending, considerSIDAndDesc(idRanges, sid, request.Descending), limit-len(res), &totalSize) if err != nil { return nil, false, err } res = append(res, srecs...) - idx += inc - sid = empty + sid = ulidutils.ZeroULID } return res, len(res) >= limit || totalSize >= l.cfg.MaxBunchSize, nil } -// CountRecords count total number for records in the log and number of records after (before) specified record ID -// Returned values are (total, count, error) +// CountRecords count total number for records in the log and number of records after (before) +// specified record ID which match the request condition. Returned values are (total, count, error). func (l *localLog) CountRecords(ctx context.Context, request storage.QueryRecordsRequest) (uint64, uint64, error) { lid := request.LogID @@ -288,65 +312,69 @@ func (l *localLog) CountRecords(ctx context.Context, request storage.QueryRecord if err != nil { return 0, 0, err } - if len(cis) == 0 { return 0, 0, nil } - var total uint64 - var count uint64 + var initIdx int + var fromIdx int - var initialIdx int - var idx int inc := 1 if request.Descending { inc = -1 - idx = len(cis) - 1 - initialIdx = len(cis) - 1 + initIdx = len(cis) - 1 + fromIdx = len(cis) - 1 } + var sid ulid.ULID - // Search for first record if start id specified if request.StartID != "" { - if err := sid.UnmarshalText(cast.StringToByteArray(request.StartID)); err != nil { + if err = sid.UnmarshalText(cast.StringToByteArray(request.StartID)); err != nil { l.logger.Warnf("could not unmarshal startID=%s: %v", request.StartID, err) return 0, 0, fmt.Errorf("wrong startID=%q: %w", request.StartID, errors.ErrInvalid) } - if request.Descending { - idx = sort.Search(len(cis), func(i int) bool { + fromIdx = sort.Search(len(cis), func(i int) bool { return cis[i].Min.Compare(sid) > 0 }) - idx-- + fromIdx-- } else { - idx = sort.Search(len(cis), func(i int) bool { + fromIdx = sort.Search(len(cis), func(i int) bool { return cis[i].Max.Compare(sid) >= 0 }) } + } - // If idx found - we found an element and need to select how many record we have - if idx >= 0 && idx < len(cis) { - total = uint64(cis[idx].RecordsCount) - numRecs, err := l.countRecords(ctx, cis[idx], request.Descending, sid) - if err != nil { - return 0, 0, nil - } - count += numRecs + tis, err := getIntervals(request.Condition) + if err != nil { + return 0, 0, err + } + if len(request.Condition) > 0 && len(tis) == 0 { + return 0, 0, nil + } - // Calculate total of non-matching - for i := initialIdx; i != idx; i += inc { - total += uint64(cis[i].RecordsCount) - } + var total uint64 + var count uint64 - idx += inc + for idx := initIdx; idx >= 0 && idx < len(cis); idx += inc { + ci := cis[idx] + total += uint64(ci.RecordsCount) + if (request.Descending && idx <= fromIdx) || (!request.Descending && idx >= fromIdx) { + idRanges := getRanges(tis, ci) + if len(request.Condition) > 0 && len(idRanges) == 0 { + continue + } + recCnt := uint64(ci.RecordsCount) + if sid.Compare(ulidutils.ZeroULID) != 0 || len(idRanges) > 0 { + recCnt, err = l.countRecords(ctx, ci, request.Descending, considerSIDAndDesc(idRanges, sid, request.Descending)) + if err != nil { + return 0, 0, nil + } + } + count += recCnt + sid = ulidutils.ZeroULID } } - // Calculate total of matching records - for ; idx >= 0 && idx < len(cis); idx += inc { - l := uint64(cis[idx].RecordsCount) - count += l - total += l - } return total, count, nil } @@ -354,47 +382,50 @@ func (l *localLog) readRecords( ctx context.Context, lid string, ci ChunkInfo, - descending bool, - sid ulid.ULID, + desc bool, + idRanges []idRange, limit int, - totalSize *int, -) ([]*solaris.Record, error) { + totalSize *int) ([]*solaris.Record, error) { rc, err := l.ChnkProvider.GetOpenedChunk(ctx, ci.ID, false) if err != nil { return nil, err } defer l.ChnkProvider.ReleaseChunk(&rc) - cr, err := rc.Value().OpenChunkReader(descending) + cr, err := rc.Value().OpenChunkReader(desc) if err != nil { return nil, err } defer cr.Close() - var empty ulid.ULID - if sid.Compare(empty) != 0 { - cr.SetStartID(sid) - } - res := []*solaris.Record{} - for cr.HasNext() && len(res) < limit && *totalSize < l.cfg.MaxBunchSize { - ur, _ := cr.Next() - r := new(solaris.Record) - r.ID = ur.ID.String() - r.LogID = lid - r.Payload = make([]byte, len(ur.UnsafePayload)) - copy(r.Payload, ur.UnsafePayload) - *totalSize += len(ur.UnsafePayload) - r.CreatedAt = timestamppb.New(ulid.Time(ur.ID.Time())) - res = append(res, r) + var res []*solaris.Record + for _, ir := range idRanges { + if ir.start.Compare(ulidutils.ZeroULID) != 0 { + cr.SetStartID(ir.start) + } + for cr.HasNext() && len(res) < limit && *totalSize < l.cfg.MaxBunchSize { + ur, _ := cr.Next() + if ir.end.Compare(ulidutils.ZeroULID) != 0 && + ((desc && ur.ID.Compare(ir.end) < 0) || (!desc && ur.ID.Compare(ir.end) > 0)) { + break + } + r := new(solaris.Record) + r.ID = ur.ID.String() + r.LogID = lid + r.Payload = make([]byte, len(ur.UnsafePayload)) + copy(r.Payload, ur.UnsafePayload) + r.CreatedAt = timestamppb.New(ulid.Time(ur.ID.Time())) + *totalSize += len(ur.UnsafePayload) + res = append(res, r) + } } - return res, nil } func (l *localLog) countRecords(ctx context.Context, ci ChunkInfo, - descending bool, - sid ulid.ULID) (uint64, error) { + desc bool, + idRanges []idRange) (uint64, error) { rc, err := l.ChnkProvider.GetOpenedChunk(ctx, ci.ID, false) if err != nil { @@ -402,22 +433,146 @@ func (l *localLog) countRecords(ctx context.Context, } defer l.ChnkProvider.ReleaseChunk(&rc) - cr, err := rc.Value().OpenChunkReader(descending) + cr, err := rc.Value().OpenChunkReader(desc) if err != nil { return 0, err } defer cr.Close() var count uint64 - var empty ulid.ULID - if sid.Compare(empty) != 0 { - cr.SetStartID(sid) + for _, ir := range idRanges { + if ir.start.Compare(ulidutils.ZeroULID) != 0 { + cr.SetStartID(ir.start) + } + for cr.HasNext() { + ur, _ := cr.Next() + if ir.end.Compare(ulidutils.ZeroULID) != 0 && + ((desc && ur.ID.Compare(ir.end) < 0) || (!desc && ur.ID.Compare(ir.end) > 0)) { + break + } + count++ + } } + return count, nil +} - for cr.HasNext() { - cr.Next() - count++ +func getIntervals(cond string) ([]intervals.Interval[time.Time], error) { + if len(strings.TrimSpace(cond)) == 0 { + return nil, nil + } + expr, err := ql.Parse(cond) + if err != nil { + return nil, err } + tis, err := tiBuilder.Build(expr) + if err != nil { + return nil, err + } + return tis, nil +} - return count, nil +func getRanges(tis []intervals.Interval[time.Time], ci ChunkInfo) []idRange { + cti := tiBasis.Closed(ulid.Time(ci.Min.Time()), ulid.Time(ci.Max.Time())) + var ranges []idRange + for _, ti := range tis { + if ri, ok := tiBasis.Intersect(cti, ti); ok { + ranges = append(ranges, toRange(ri)) + } + } + return ranges +} + +func toRange(ti intervals.Interval[time.Time]) idRange { + if ti.IsClosed() { + return idRange{start: minULIDForTime(ti.L), end: maxULIDForTime(ti.R)} + } else if ti.IsOpenL() { + return idRange{start: minULIDForTime(ti.L.Add(time.Millisecond)), end: maxULIDForTime(ti.R)} + } else if ti.IsOpenR() { + return idRange{start: minULIDForTime(ti.L), end: maxULIDForTime(ti.R.Add(-time.Millisecond))} + } else { // open + return idRange{start: minULIDForTime(ti.L.Add(time.Millisecond)), end: maxULIDForTime(ti.R.Add(-time.Millisecond))} + } +} + +func minULIDForTime(t time.Time) ulid.ULID { + var id ulid.ULID + _ = id.SetTime(uint64(t.UnixMilli())) + return id +} + +func maxULIDForTime(t time.Time) ulid.ULID { + maxBytes := make([]byte, 10) + for i := 0; i < len(maxBytes); i++ { + maxBytes[i] = 0xff + } + var id ulid.ULID + _ = id.SetTime(uint64(t.UnixMilli())) + _ = id.SetEntropy(maxBytes) + return id +} + +func considerSIDAndDesc(irs []idRange, sid ulid.ULID, desc bool) []idRange { + if len(irs) == 0 { + return []idRange{{start: sid}} + } + if sid.Compare(ulidutils.ZeroULID) == 0 { + if desc { + return reverseRanges(irs) + } + return irs + } + if desc { + irs = reverseRanges(irs) + if sid.Compare(irs[0].start) >= 0 { + return irs + } + if sid.Compare(irs[len(irs)-1].end) < 0 { + return nil + } + for i, r := range irs { + if r.end.Compare(sid) <= 0 { + if r.start.Compare(sid) > 0 { + irs[i].start = sid + } + irs = irs[i:] + break + } + } + return irs + } + if sid.Compare(irs[0].start) <= 0 { + return irs + } + if sid.Compare(irs[len(irs)-1].end) > 0 { + return nil + } + for i, r := range irs { + if r.end.Compare(sid) >= 0 { + if r.start.Compare(sid) < 0 { + irs[i].start = sid + } + irs = irs[i:] + break + } + } + return irs +} + +func reverseRanges(irs []idRange) []idRange { + if len(irs) == 0 { + return irs + } + l := 0 + r := len(irs) - 1 + for l < r { + irs[l].start, irs[l].end = irs[l].end, irs[l].start + irs[r].start, irs[r].end = irs[r].end, irs[r].start + irs[l], irs[r] = irs[r], irs[l] + l++ + r-- + } + if len(irs)&1 != 0 { + irs[l].start, irs[l].end = irs[l].end, irs[l].start + } + return irs } diff --git a/pkg/storage/logfs/locallog_test.go b/pkg/storage/logfs/locallog_test.go index bca96dc..54c90df 100644 --- a/pkg/storage/logfs/locallog_test.go +++ b/pkg/storage/logfs/locallog_test.go @@ -18,10 +18,12 @@ import ( "context" rand2 "crypto/rand" "fmt" + "github.com/oklog/ulid/v2" "math/rand" "os" "sync" "testing" + "time" "github.com/solarisdb/solaris/api/gen/solaris/v1" "github.com/solarisdb/solaris/golibs/container" @@ -263,6 +265,68 @@ func TestCountRecords_SingleChunk(t *testing.T) { assert.Equal(t, uint64(3), count) } +func TestQueryCountRecordsWithCondition(t *testing.T) { + p, ll := setupTestDB(t) + defer p.Close() + defer ll.Shutdown() + + var recs []*solaris.Record + for i := 0; i < 10; i++ { + recs = append(recs, generateRecords(1, 100)...) + res, err := ll.AppendRecords(context.Background(), &solaris.AppendRecordsRequest{Records: recs[len(recs)-1:], LogID: "l1"}) + assert.NoError(t, err) + assert.Equal(t, int64(1), res.Added) + time.Sleep(time.Millisecond) // ULIDs have time in millis + } + + startIDAsc := recs[3].ID + startIDDesc := recs[6].ID + + geID, _ := ulid.Parse(recs[2].ID) + leID, _ := ulid.Parse(recs[7].ID) + exID, _ := ulid.Parse(recs[5].ID) + + geTime := ulid.Time(geID.Time()) + leTime := ulid.Time(leID.Time()) + exTime := ulid.Time(exID.Time()) + + cond := fmt.Sprintf("ctime >= '%s' and ctime <= '%s' and ctime != '%s'", + geTime.Format(time.RFC3339Nano), leTime.Format(time.RFC3339Nano), exTime.Format(time.RFC3339Nano)) + + // query + records, more, err := ll.QueryRecords(context.Background(), storage.QueryRecordsRequest{LogID: "l1", StartID: startIDAsc, Condition: cond, Limit: 10}) + require.NoError(t, err) + require.Len(t, records, 4) + require.False(t, more) + + records, more, err = ll.QueryRecords(context.Background(), storage.QueryRecordsRequest{LogID: "l1", StartID: startIDDesc, Condition: cond, Limit: 10, Descending: true}) + require.NoError(t, err) + require.Len(t, records, 4) + require.False(t, more) + + // query limit + records, more, err = ll.QueryRecords(context.Background(), storage.QueryRecordsRequest{LogID: "l1", StartID: startIDAsc, Condition: cond, Limit: 4}) + require.NoError(t, err) + require.Len(t, records, 4) + require.True(t, more) + + records, more, err = ll.QueryRecords(context.Background(), storage.QueryRecordsRequest{LogID: "l1", StartID: startIDDesc, Condition: cond, Limit: 4, Descending: true}) + require.NoError(t, err) + require.Len(t, records, 4) + require.True(t, more) + + // count + total, count, err := ll.CountRecords(context.Background(), storage.QueryRecordsRequest{LogID: "l1", StartID: startIDAsc, Condition: cond}) + assert.NoError(t, err) + assert.Equal(t, uint64(4), count) + assert.Equal(t, uint64(10), total) + + total, count, err = ll.CountRecords(context.Background(), storage.QueryRecordsRequest{LogID: "l1", StartID: startIDDesc, Condition: cond, Descending: true}) + assert.NoError(t, err) + assert.Equal(t, uint64(4), count) + assert.Equal(t, uint64(10), total) +} + func TestCountRecords_ManyChunks(t *testing.T) { p, ll := setupTestDB(t) ll.cfg.MaxRecordsLimit = 100 diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 0a054ce..05f5564 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -63,8 +63,8 @@ type ( // QueryRecords allows to retrieve records by the request. The function returns the selected records and the flag, // that more records potentially available for the read QueryRecords(ctx context.Context, request QueryRecordsRequest) ([]*solaris.Record, bool, error) - // CountRecords count total number for records in the log and number of records after (before) specified record ID - // Returned values are (total, count, error) + // CountRecords count total number for records in the log and number of records after (before) + // specified record ID which match the request condition. Returned values are (total, count, error). CountRecords(ctx context.Context, request QueryRecordsRequest) (uint64, uint64, error) }