Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

Commit

Permalink
UPDATE: v1.5
Browse files Browse the repository at this point in the history
  • Loading branch information
kainonly committed Oct 25, 2022
1 parent ce3afe8 commit d83e773
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/bytedance/sonic"
"github.com/nats-io/nats.go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
Expand Down Expand Up @@ -71,15 +72,28 @@ func (x *Transfer) Set(ctx context.Context, option Option) (err error) {
if _, err = x.KeyValue.Put(option.Key, b); err != nil {
return
}

coll := fmt.Sprintf(`%s_logs`, option.Key)
if err = x.Db.CreateCollection(ctx, coll, options.CreateCollection().
SetTimeSeriesOptions(
options.TimeSeries().
SetMetaField("metadata").
SetTimeField("timestamp"),
).SetExpireAfterSeconds(option.TTL)); err != nil {
var exists []*mongo.CollectionSpecification
if exists, err = x.Db.ListCollectionSpecifications(ctx, bson.M{"name": coll}); err != nil {
return
}

if len(exists) != 0 {
if exists[0].Type != "timeseries" {
return fmt.Errorf(`the [%s] collection already exists, but it must be a timeseries`, coll)
}
} else {
if err = x.Db.CreateCollection(ctx, coll, options.CreateCollection().
SetTimeSeriesOptions(
options.TimeSeries().
SetMetaField("metadata").
SetTimeField("timestamp"),
).SetExpireAfterSeconds(option.TTL)); err != nil {
return
}
}

name := fmt.Sprintf(`%s:logs:%s`, x.Namespace, option.Key)
subject := fmt.Sprintf(`%s.logs.%s`, x.Namespace, option.Key)
if _, err = x.JetStream.AddStream(&nats.StreamConfig{
Expand All @@ -93,6 +107,7 @@ func (x *Transfer) Set(ctx context.Context, option Option) (err error) {
}
return
}

return
}

Expand Down

0 comments on commit d83e773

Please sign in to comment.