Skip to content

Commit

Permalink
updating keepalive and chunk table with new entry 'nfInstanceId'
Browse files Browse the repository at this point in the history
  • Loading branch information
vthiruveedula committed Jan 9, 2023
1 parent cd1a13d commit c1415d9
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 17 deletions.
6 changes: 4 additions & 2 deletions drsm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ type DbInfo struct {
}

type PodId struct {
PodName string `bson:"podName,omitempty" json:"podName,omitempty"`
PodIp string `bson:"podIp,omitempty" json:"podIp,omitempty"`
PodName string `bson:"podName,omitempty" json:"podName,omitempty"`
PodInstance string `bson:"podInstance,omitempty" json:"podName,omitempty"`
PodIp string `bson:"podIp,omitempty" json:"podIp,omitempty"`
}

type DrsmMode int
Expand All @@ -45,6 +46,7 @@ type DrsmInterface interface {
ReleaseIp(pool, ip string) error
CreateIpPool(poolName string, ipPool string) error
DeleteIpPool(poolName string) error
DeletePod(string)
}

func InitDRSM(sharedPoolName string, myid PodId, db DbInfo, opt *Options) (DrsmInterface, error) {
Expand Down
2 changes: 1 addition & 1 deletion drsm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
// Let's confirm if this gets updated in DB
docId := fmt.Sprintf("chunkid-%d", cn)
filter := bson.M{"_id": docId}
update := bson.M{"_id": docId, "chunkId": docId, "type": "chunk", "podId": d.clientId.PodName, "podIp": d.clientId.PodIp}
update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podInstance": d.clientId.PodInstance, "podIp": d.clientId.PodIp}
inserted := d.mongo.RestfulAPIPostOnly(d.sharedPoolName, filter, update)
if inserted != true {
log.Printf("Adding chunk %v failed. Retry again ", cn)
Expand Down
2 changes: 1 addition & 1 deletion drsm/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *chunk) claimChunk(d *Drsm) {
// try to claim. If success then notification will update owner.
logger.AppLog.Debugf("claimChunk started")
docId := fmt.Sprintf("chunkid-%d", c.Id)
update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podIp": d.clientId.PodIp}
update := bson.M{"_id": docId, "type": "chunk", "podId": d.clientId.PodName, "podInstance": d.clientId.PodInstance, "podIp": d.clientId.PodIp}
filter := bson.M{"_id": docId, "podId": c.Owner.PodName}
updated := d.mongo.RestfulAPIPutOnly(d.sharedPoolName, filter, update)
if updated == nil {
Expand Down
7 changes: 7 additions & 0 deletions drsm/drsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/omec-project/util/logger"
MongoDBLibrary "github.com/omec-project/util/mongoapi"
ipam "github.com/thakurajayL/go-ipam"
"go.mongodb.org/mongo-driver/bson"
)

type chunkState int
Expand Down Expand Up @@ -62,6 +63,12 @@ type Drsm struct {
mongo *MongoDBLibrary.MongoClient
}

func (d *Drsm) DeletePod(podInstance string) {
filter := bson.M{"type": "keepalive", "podInstance": podInstance}
d.mongo.RestfulAPIDeleteMany(d.sharedPoolName, filter)
logger.AppLog.Infoln("Deleted PodId from DB: ", podInstance)
}

func (d *Drsm) ConstuctDrsm(opt *Options) {
if opt != nil {
d.mode = opt.Mode
Expand Down
29 changes: 16 additions & 13 deletions drsm/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,24 @@ import (
)

type UpdatedFields struct {
ExpireAt time.Time `bson:"expireAt,omitempty"`
PodId string `bson:"podId,omitempty"`
PodIp string `bson:"podIp,omitempty"`
ExpireAt time.Time `bson:"expireAt,omitempty"`
PodId string `bson:"podId,omitempty"`
PodIp string `bson:"podIp,omitempty"`
PodInstance string `bson:"podInstance,omitempty"`
}

type UpdatedDesc struct {
UpdFields UpdatedFields `bson:"updatedFields,omitempty"`
}

type FullStream struct {
Id string `bson:"_id"`
ChunkId string `bson:"chunkId"`
PodId string `bson:"podId,omitempty"`
PodIp string `bson:"podIp,omitempty"`
ExpireAt time.Time `bson:"expireAt,omitempty"`
Type string `bson:"type,omitempty"`
Id string `bson:"_id"`
ChunkId string `bson:"chunkId"`
PodId string `bson:"podId,omitempty"`
PodIp string `bson:"podIp,omitempty"`
PodInstance string `bson:"podInstance,omitempty"`
ExpireAt time.Time `bson:"expireAt,omitempty"`
Type string `bson:"type,omitempty"`
}

type DocKey struct {
Expand Down Expand Up @@ -159,6 +161,7 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
// TODO update IP address as well.
cp.Owner.PodName = owner
cp.Owner.PodIp = s.Update.UpdFields.PodIp
cp.Owner.PodInstance = s.Update.UpdFields.PodInstance
podD := d.podMap[owner]
podD.podChunks[c] = cp // add chunk to pod
logger.AppLog.Infof("Stream(Update): pod to chunk map %v ", podD.podChunks)
Expand Down Expand Up @@ -198,11 +201,11 @@ func (d *Drsm) punchLiveness() {

timein := time.Now().Local().Add(time.Second * 20)

update := bson.D{{"_id", d.clientId.PodName}, {"type", "keepalive"}, {"podIp", d.clientId.PodIp}, {"podId", d.clientId.PodName}, {"expireAt", timein}}
update := bson.D{{"_id", d.clientId.PodName}, {"type", "keepalive"}, {"podIp", d.clientId.PodIp}, {"podId", d.clientId.PodName}, {"podInstance", d.clientId.PodInstance}, {"expireAt", timein}}

_, err := d.mongo.PutOneCustomDataStructure(d.sharedPoolName, filter, update)
if err != nil {
logger.AppLog.Errorln("put data failed : ", err)
logger.AppLog.Errorln("put data failed : ", err)
// TODO : should we panic ?
continue
}
Expand Down Expand Up @@ -247,7 +250,7 @@ func (d *Drsm) addChunk(full *FullStream) {
}
logger.AppLog.Infof("received Chunk Doc: %v", full)
cid := getChunIdFromDocId(did)
o := PodId{PodName: full.PodId, PodIp: full.PodIp}
o := PodId{PodName: full.PodId, PodInstance: full.PodInstance, PodIp: full.PodIp}
c := &chunk{Id: cid, Owner: o}
c.resourceValidCb = d.resourceValidCb

Expand All @@ -258,7 +261,7 @@ func (d *Drsm) addChunk(full *FullStream) {
}

func (d *Drsm) addPod(full *FullStream) *podData {
podI := PodId{PodName: full.PodId, PodIp: full.PodIp}
podI := PodId{PodName: full.PodId, PodInstance: full.PodInstance, PodIp: full.PodIp}
pod := &podData{PodId: podI}
pod.podChunks = make(map[int32]*chunk)
d.podMap[full.PodId] = pod
Expand Down

0 comments on commit c1415d9

Please sign in to comment.