Skip to content

Commit

Permalink
Merge pull request #14 from mbilal92/main
Browse files Browse the repository at this point in the history
FIX: concurrent map read and map write for `globalChunkTbl`, added mutex for globalChunkTbl read/write
  • Loading branch information
thakurajayL committed Nov 8, 2023
2 parents 2a78831 + 2c2ef2a commit 1354cc3
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 18 deletions.
4 changes: 2 additions & 2 deletions drsm/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (d *Drsm) ReleaseInt32ID(id int32) error {
}

func (d *Drsm) FindOwnerInt32ID(id int32) (*PodId, error) {
mutex.Lock()
defer mutex.Unlock()
d.globalChunkTblMutex.Lock()
defer d.globalChunkTblMutex.Unlock()
chunkId := id >> 10
chunk, found := d.globalChunkTbl[chunkId]
if found == true {
Expand Down
2 changes: 2 additions & 0 deletions drsm/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func (d *Drsm) GetNewChunk() (*chunk, error) {
for {
for {
cn = rand.Int31n(d.chunkIdRange)
d.globalChunkTblMutex.Lock()
_, found := d.globalChunkTbl[cn]
d.globalChunkTblMutex.Unlock()
if found == true {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions drsm/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ func (d *Drsm) podDownDetected() {
// Given Pod find out current Chunks owned by this POD
pd := d.podMap[p]
for k, _ := range pd.podChunks {
d.globalChunkTblMutex.Lock()
c, found := d.globalChunkTbl[k]
d.globalChunkTblMutex.Unlock()
logger.AppLog.Debugf("Found : %v chunk : %v ", found, c)
go c.claimChunk(d)
}
Expand Down
34 changes: 18 additions & 16 deletions drsm/drsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,23 @@ type podData struct {
}

type Drsm struct {
mu sync.Mutex
sharedPoolName string
clientId PodId
db DbInfo
mode DrsmMode
resIdSize int32
localChunkTbl map[int32]*chunk // chunkid to chunk
globalChunkTbl map[int32]*chunk // chunkid to chunk
podMap map[string]*podData // podId to podData
podDown chan string
scanChunks map[int32]*chunk
chunkIdRange int32
resourceValidCb func(int32) bool
ipModule ipam.Ipamer
prefix map[string]*ipam.Prefix
mongo *MongoDBLibrary.MongoClient
mu sync.Mutex
sharedPoolName string
clientId PodId
db DbInfo
mode DrsmMode
resIdSize int32
localChunkTbl map[int32]*chunk // chunkid to chunk
globalChunkTbl map[int32]*chunk // chunkid to chunk
podMap map[string]*podData // podId to podData
podDown chan string
scanChunks map[int32]*chunk
chunkIdRange int32
resourceValidCb func(int32) bool
ipModule ipam.Ipamer
prefix map[string]*ipam.Prefix
mongo *MongoDBLibrary.MongoClient
globalChunkTblMutex sync.Mutex
}

func (d *Drsm) DeletePod(podInstance string) {
Expand All @@ -87,6 +88,7 @@ func (d *Drsm) ConstuctDrsm(opt *Options) {
d.podMap = make(map[string]*podData)
d.podDown = make(chan string, 10)
d.scanChunks = make(map[int32]*chunk)
d.globalChunkTblMutex = sync.Mutex{}
t := time.Now().UnixNano()
rand.Seed(t)
d.initIpam(opt)
Expand Down
5 changes: 5 additions & 0 deletions drsm/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func iterateChangeStream(d *Drsm, routineCtx context.Context, stream *mongo.Chan
// looks like chunk owner getting change
owner := s.Update.UpdFields.PodId
c := getChunIdFromDocId(s.DId.Id)
d.globalChunkTblMutex.Lock()
cp := d.globalChunkTbl[c]
d.globalChunkTblMutex.Unlock()
// TODO update IP address as well.
cp.Owner.PodName = owner
cp.Owner.PodIp = s.Update.UpdFields.PodIp
Expand Down Expand Up @@ -255,7 +257,10 @@ func (d *Drsm) addChunk(full *FullStream) {
c.resourceValidCb = d.resourceValidCb

pod.podChunks[cid] = c

d.globalChunkTblMutex.Lock()
d.globalChunkTbl[cid] = c
d.globalChunkTblMutex.Unlock()

logger.AppLog.Infof("Chunk id %v, podChunks %v ", cid, pod.podChunks)
}
Expand Down

0 comments on commit 1354cc3

Please sign in to comment.