Skip to content

Commit

Permalink
hadoop: add an option to disable session in java SDK (#3750)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Jun 6, 2023
1 parent 25a4624 commit 821b990
Show file tree
Hide file tree
Showing 14 changed files with 36 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func initForSvc(c *cli.Context, mp string, metaUrl string) (*vfs.Config, *fs.Fil
store := chunk.NewCachedStore(blob, *chunkConf, registerer)
registerMetaMsg(metaCli, store, chunkConf)

err = metaCli.NewSession()
err = metaCli.NewSession(true)
if err != nil {
logger.Fatalf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/mdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func initForMdtest(c *cli.Context, mp string, metaUrl string) *fs.FileSystem {
store := chunk.NewCachedStore(blob, *chunkConf, registerer)
registerMetaMsg(m, store, chunkConf)

err = m.NewSession()
err = m.NewSession(true)
if err != nil {
logger.Fatalf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func mount(c *cli.Context) error {
}

removePassword(addr)
err = metaCli.NewSession()
err = metaCli.NewSession(true)
if err != nil {
logger.Fatalf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func newJFS(endpoint, accessKey, secretKey, token string) (object.ObjectStorage,
chunkConf := getDefaultChunkConf(format)
store := chunk.NewCachedStore(blob, *chunkConf, nil)
registerMetaMsg(metaCli, store, chunkConf)
err = metaCli.NewSession()
err = metaCli.NewSession(false)
if err != nil {
return nil, fmt.Errorf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fuse/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func mount(url, mp string) {
Chunk: &chunkConf,
}

err = m.NewSession()
err = m.NewSession(true)
if err != nil {
log.Fatalf("new session: %s", err)
}
Expand Down
25 changes: 15 additions & 10 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,22 +470,24 @@ func (m *baseMeta) newSessionInfo() []byte {
return buf
}

func (m *baseMeta) NewSession() error {
func (m *baseMeta) NewSession(record bool) error {
go m.refresh()
if m.conf.ReadOnly {
logger.Infof("Create read-only session OK with version: %s", version.Version())
return nil
}

v, err := m.en.incrCounter("nextSession", 1)
if err != nil {
return fmt.Errorf("get session ID: %s", err)
}
m.sid = uint64(v)
if err = m.en.doNewSession(m.newSessionInfo()); err != nil {
return fmt.Errorf("create session: %s", err)
if record {
v, err := m.en.incrCounter("nextSession", 1)
if err != nil {
return fmt.Errorf("get session ID: %s", err)
}
m.sid = uint64(v)
if err = m.en.doNewSession(m.newSessionInfo()); err != nil {
return fmt.Errorf("create session: %s", err)
}
logger.Infof("Create session %d OK with version: %s", m.sid, version.Version())
}
logger.Infof("Create session %d OK with version: %s", m.sid, version.Version())

m.loadQuotas()
go m.en.flushStats()
Expand Down Expand Up @@ -536,7 +538,7 @@ func (m *baseMeta) refresh() {
m.sesMu.Unlock()
return
}
if !m.conf.ReadOnly && m.conf.Heartbeat > 0 {
if !m.conf.ReadOnly && m.conf.Heartbeat > 0 && m.sid > 0 {
if err := m.en.doRefreshSession(); err != nil {
logger.Errorf("Refresh session %d: %s", m.sid, err)
}
Expand Down Expand Up @@ -632,6 +634,9 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parents ...Ino)
}

func (m *baseMeta) loadQuotas() {
if !m.GetFormat().DirStats {
return
}
quotas, err := m.en.doLoadQuotas(Background)
if err == nil {
m.quotaMu.Lock()
Expand Down
8 changes: 4 additions & 4 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func testMetaClient(t *testing.T, m Meta) {
if format.Name != "test" {
t.Fatalf("load got volume name %s, expected %s", format.Name, "test")
}
if err = m.NewSession(); err != nil {
if err = m.NewSession(true); err != nil {
t.Fatalf("new session: %s", err)
}
defer m.CloseSession()
Expand Down Expand Up @@ -1343,7 +1343,7 @@ func testCopyFileRange(t *testing.T, m Meta) {
}

func testCloseSession(t *testing.T, m Meta) {
if err := m.NewSession(); err != nil {
if err := m.NewSession(true); err != nil {
t.Fatalf("new session: %s", err)
}

Expand Down Expand Up @@ -1628,7 +1628,7 @@ func testOpenCache(t *testing.T, m Meta) {

func testReadOnly(t *testing.T, m Meta) {
ctx := Background
if err := m.NewSession(); err != nil {
if err := m.NewSession(true); err != nil {
t.Fatalf("new session: %s", err)
}
defer m.CloseSession()
Expand Down Expand Up @@ -2472,7 +2472,7 @@ func checkEntry(t *testing.T, m Meta, srcEntry, dstEntry *Entry, dstParentIno In
}

func testQuota(t *testing.T, m Meta) {
if err := m.NewSession(); err != nil {
if err := m.NewSession(true); err != nil {
t.Fatalf("New session: %s", err)
}
defer m.CloseSession()
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func benchmarkData(b *testing.B, m Meta) {

func benchmarkAll(b *testing.B, m Meta) {
_ = m.Init(&Format{Name: "benchmarkAll", DirStats: true}, true)
_ = m.NewSession()
_ = m.NewSession(false)
benchmarkDir(b, m)
benchmarkFile(b, m)
benchmarkXattr(b, m)
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ type Meta interface {
// Load loads the existing setting of a formatted volume from meta service.
Load(checkVersion bool) (*Format, error)
// NewSession creates a new client session.
NewSession() error
NewSession(record bool) error
// CloseSession does cleanup and close the session.
CloseSession() error
// GetSession retrieves information of session with sid
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ func (m *redisMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, s
attr.Ctimensec = uint32(now.Nanosecond())
if trash == 0 {
attr.Nlink--
if _type == TypeFile && attr.Nlink == 0 {
if _type == TypeFile && attr.Nlink == 0 && m.sid > 0 {
opened = m.of.IsOpen(inode)
}
} else if attr.Parent > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
n.Ctimensec = int16(now % 1e3)
if trash == 0 {
n.Nlink--
if n.Type == TypeFile && n.Nlink == 0 {
if n.Type == TypeFile && n.Nlink == 0 && m.sid > 0 {
opened = m.of.IsOpen(e.Inode)
}
} else if n.Parent > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,7 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
} else {
if trash == 0 {
tattr.Nlink--
if dtyp == TypeFile && tattr.Nlink == 0 {
if dtyp == TypeFile && tattr.Nlink == 0 && m.sid > 0 {
opened = m.of.IsOpen(dino)
}
defer func() { m.of.InvalidateChunk(dino, invalidateAttrOnly) }()
Expand Down
11 changes: 6 additions & 5 deletions sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ type javaConf struct {
Bucket string `json:"bucket"`
StorageClass string `json:"storageClass"`
ReadOnly bool `json:"readOnly"`
NoSession bool `json:"noSession"`
NoBGJob bool `json:"noBGJob"`
OpenCache float64 `json:"openCache"`
BackupMeta int64 `json:"backupMeta"`
Expand Down Expand Up @@ -436,7 +437,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
metaConf.MaxDeletes = jConf.MaxDeletes
metaConf.SkipDirNlink = jConf.SkipDirNlink
metaConf.ReadOnly = jConf.ReadOnly
metaConf.NoBGJob = jConf.NoBGJob
metaConf.NoBGJob = jConf.NoBGJob || jConf.NoSession
metaConf.OpenCache = time.Duration(jConf.OpenCache * 1e9)
metaConf.Heartbeat = time.Second * time.Duration(jConf.Heartbeat)
m := meta.NewClient(jConf.MetaURL, metaConf)
Expand Down Expand Up @@ -533,12 +534,11 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
id := args[1].(uint64)
return vfs.Compact(chunkConf, store, slices, id)
})
err = m.NewSession()
err = m.NewSession(!jConf.NoSession)
if err != nil {
logger.Errorf("new session: %s", err)
return nil
}

m.OnReload(func(fmt *meta.Format) {
if jConf.UploadLimit > 0 {
fmt.UploadLimit = int64(jConf.UploadLimit)
Expand All @@ -548,6 +548,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
}
store.UpdateLimit(fmt.UploadLimit, fmt.DownloadLimit)
})

conf := &vfs.Config{
Meta: metaConf,
Format: *format,
Expand All @@ -559,10 +560,10 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
FastResolve: jConf.FastResolve,
BackupMeta: time.Second * time.Duration(jConf.BackupMeta),
}
if !jConf.ReadOnly && !jConf.NoBGJob && conf.BackupMeta > 0 {
if !jConf.ReadOnly && !jConf.NoSession && !jConf.NoBGJob && conf.BackupMeta > 0 {
go vfs.Backup(m, blob, conf.BackupMeta)
}
if !jConf.NoUsageReport {
if !jConf.NoUsageReport && !jConf.NoSession {
go usage.ReportUsage(m, "java-sdk "+version.Version())
}
jfs, err := fs.NewFileSystem(conf, m, store)
Expand Down
1 change: 1 addition & 0 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
obj.put("bucket", getConf(conf, "bucket", ""));
obj.put("storageClass", getConf(conf, "storage-class", ""));
obj.put("readOnly", Boolean.valueOf(getConf(conf, "read-only", "false")));
obj.put("noSession", Boolean.valueOf(getConf(conf, "no-session", "false")));
obj.put("noBGJob", Boolean.valueOf(getConf(conf, "no-bgjob", "false")));
obj.put("cacheDir", getConf(conf, "cache-dir", "memory"));
obj.put("cacheSize", Integer.valueOf(getConf(conf, "cache-size", "100")));
Expand Down

0 comments on commit 821b990

Please sign in to comment.