Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hadoop: add an option to disable session in java SDK #3750

Merged
merged 5 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func initForSvc(c *cli.Context, mp string, metaUrl string) (*vfs.Config, *fs.Fil
store := chunk.NewCachedStore(blob, *chunkConf, registerer)
registerMetaMsg(metaCli, store, chunkConf)

err = metaCli.NewSession()
err = metaCli.NewSession(true)
if err != nil {
logger.Fatalf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/mdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func initForMdtest(c *cli.Context, mp string, metaUrl string) *fs.FileSystem {
store := chunk.NewCachedStore(blob, *chunkConf, registerer)
registerMetaMsg(m, store, chunkConf)

err = m.NewSession()
err = m.NewSession(true)
if err != nil {
logger.Fatalf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func mount(c *cli.Context) error {
}

removePassword(addr)
err = metaCli.NewSession()
err = metaCli.NewSession(true)
if err != nil {
logger.Fatalf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func newJFS(endpoint, accessKey, secretKey, token string) (object.ObjectStorage,
chunkConf := getDefaultChunkConf(format)
store := chunk.NewCachedStore(blob, *chunkConf, nil)
registerMetaMsg(metaCli, store, chunkConf)
err = metaCli.NewSession()
err = metaCli.NewSession(false)
if err != nil {
return nil, fmt.Errorf("new session: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fuse/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func mount(url, mp string) {
Chunk: &chunkConf,
}

err = m.NewSession()
err = m.NewSession(true)
if err != nil {
log.Fatalf("new session: %s", err)
}
Expand Down
25 changes: 15 additions & 10 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,22 +470,24 @@ func (m *baseMeta) newSessionInfo() []byte {
return buf
}

func (m *baseMeta) NewSession() error {
func (m *baseMeta) NewSession(record bool) error {
go m.refresh()
if m.conf.ReadOnly {
logger.Infof("Create read-only session OK with version: %s", version.Version())
return nil
}

v, err := m.en.incrCounter("nextSession", 1)
if err != nil {
return fmt.Errorf("get session ID: %s", err)
}
m.sid = uint64(v)
if err = m.en.doNewSession(m.newSessionInfo()); err != nil {
return fmt.Errorf("create session: %s", err)
if record {
v, err := m.en.incrCounter("nextSession", 1)
if err != nil {
return fmt.Errorf("get session ID: %s", err)
}
m.sid = uint64(v)
if err = m.en.doNewSession(m.newSessionInfo()); err != nil {
return fmt.Errorf("create session: %s", err)
}
logger.Infof("Create session %d OK with version: %s", m.sid, version.Version())
}
logger.Infof("Create session %d OK with version: %s", m.sid, version.Version())

m.loadQuotas()
go m.en.flushStats()
Expand Down Expand Up @@ -536,7 +538,7 @@ func (m *baseMeta) refresh() {
m.sesMu.Unlock()
return
}
if !m.conf.ReadOnly && m.conf.Heartbeat > 0 {
if !m.conf.ReadOnly && m.conf.Heartbeat > 0 && m.sid > 0 {
if err := m.en.doRefreshSession(); err != nil {
logger.Errorf("Refresh session %d: %s", m.sid, err)
}
Expand Down Expand Up @@ -632,6 +634,9 @@ func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parents ...Ino)
}

func (m *baseMeta) loadQuotas() {
if !m.GetFormat().DirStats {
return
}
quotas, err := m.en.doLoadQuotas(Background)
if err == nil {
m.quotaMu.Lock()
Expand Down
8 changes: 4 additions & 4 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func testMetaClient(t *testing.T, m Meta) {
if format.Name != "test" {
t.Fatalf("load got volume name %s, expected %s", format.Name, "test")
}
if err = m.NewSession(); err != nil {
if err = m.NewSession(true); err != nil {
t.Fatalf("new session: %s", err)
}
defer m.CloseSession()
Expand Down Expand Up @@ -1343,7 +1343,7 @@ func testCopyFileRange(t *testing.T, m Meta) {
}

func testCloseSession(t *testing.T, m Meta) {
if err := m.NewSession(); err != nil {
if err := m.NewSession(true); err != nil {
t.Fatalf("new session: %s", err)
}

Expand Down Expand Up @@ -1628,7 +1628,7 @@ func testOpenCache(t *testing.T, m Meta) {

func testReadOnly(t *testing.T, m Meta) {
ctx := Background
if err := m.NewSession(); err != nil {
if err := m.NewSession(true); err != nil {
t.Fatalf("new session: %s", err)
}
defer m.CloseSession()
Expand Down Expand Up @@ -2472,7 +2472,7 @@ func checkEntry(t *testing.T, m Meta, srcEntry, dstEntry *Entry, dstParentIno In
}

func testQuota(t *testing.T, m Meta) {
if err := m.NewSession(); err != nil {
if err := m.NewSession(true); err != nil {
t.Fatalf("New session: %s", err)
}
defer m.CloseSession()
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func benchmarkData(b *testing.B, m Meta) {

func benchmarkAll(b *testing.B, m Meta) {
_ = m.Init(&Format{Name: "benchmarkAll", DirStats: true}, true)
_ = m.NewSession()
_ = m.NewSession(false)
benchmarkDir(b, m)
benchmarkFile(b, m)
benchmarkXattr(b, m)
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ type Meta interface {
// Load loads the existing setting of a formatted volume from meta service.
Load(checkVersion bool) (*Format, error)
// NewSession creates a new client session.
NewSession() error
NewSession(record bool) error
// CloseSession does cleanup and close the session.
CloseSession() error
// GetSession retrieves information of session with sid
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ func (m *redisMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, s
attr.Ctimensec = uint32(now.Nanosecond())
if trash == 0 {
attr.Nlink--
if _type == TypeFile && attr.Nlink == 0 {
if _type == TypeFile && attr.Nlink == 0 && m.sid > 0 {
opened = m.of.IsOpen(inode)
}
} else if attr.Parent > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
n.Ctimensec = int16(now % 1e3)
if trash == 0 {
n.Nlink--
if n.Type == TypeFile && n.Nlink == 0 {
if n.Type == TypeFile && n.Nlink == 0 && m.sid > 0 {
opened = m.of.IsOpen(e.Inode)
}
} else if n.Parent > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,7 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
} else {
if trash == 0 {
tattr.Nlink--
if dtyp == TypeFile && tattr.Nlink == 0 {
if dtyp == TypeFile && tattr.Nlink == 0 && m.sid > 0 {
opened = m.of.IsOpen(dino)
}
defer func() { m.of.InvalidateChunk(dino, invalidateAttrOnly) }()
Expand Down
11 changes: 6 additions & 5 deletions sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ type javaConf struct {
Bucket string `json:"bucket"`
StorageClass string `json:"storageClass"`
ReadOnly bool `json:"readOnly"`
NoSession bool `json:"noSession"`
NoBGJob bool `json:"noBGJob"`
OpenCache float64 `json:"openCache"`
BackupMeta int64 `json:"backupMeta"`
Expand Down Expand Up @@ -436,7 +437,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
metaConf.MaxDeletes = jConf.MaxDeletes
metaConf.SkipDirNlink = jConf.SkipDirNlink
metaConf.ReadOnly = jConf.ReadOnly
metaConf.NoBGJob = jConf.NoBGJob
metaConf.NoBGJob = jConf.NoBGJob || jConf.NoSession
metaConf.OpenCache = time.Duration(jConf.OpenCache * 1e9)
metaConf.Heartbeat = time.Second * time.Duration(jConf.Heartbeat)
m := meta.NewClient(jConf.MetaURL, metaConf)
Expand Down Expand Up @@ -533,12 +534,11 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
id := args[1].(uint64)
return vfs.Compact(chunkConf, store, slices, id)
})
err = m.NewSession()
err = m.NewSession(!jConf.NoSession)
if err != nil {
logger.Errorf("new session: %s", err)
return nil
}

m.OnReload(func(fmt *meta.Format) {
if jConf.UploadLimit > 0 {
fmt.UploadLimit = int64(jConf.UploadLimit)
Expand All @@ -548,6 +548,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
}
store.UpdateLimit(fmt.UploadLimit, fmt.DownloadLimit)
})

conf := &vfs.Config{
Meta: metaConf,
Format: *format,
Expand All @@ -559,10 +560,10 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
FastResolve: jConf.FastResolve,
BackupMeta: time.Second * time.Duration(jConf.BackupMeta),
}
if !jConf.ReadOnly && !jConf.NoBGJob && conf.BackupMeta > 0 {
if !jConf.ReadOnly && !jConf.NoSession && !jConf.NoBGJob && conf.BackupMeta > 0 {
go vfs.Backup(m, blob, conf.BackupMeta)
}
if !jConf.NoUsageReport {
if !jConf.NoUsageReport && !jConf.NoSession {
go usage.ReportUsage(m, "java-sdk "+version.Version())
}
jfs, err := fs.NewFileSystem(conf, m, store)
Expand Down
1 change: 1 addition & 0 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
obj.put("bucket", getConf(conf, "bucket", ""));
obj.put("storageClass", getConf(conf, "storage-class", ""));
obj.put("readOnly", Boolean.valueOf(getConf(conf, "read-only", "false")));
obj.put("noSession", Boolean.valueOf(getConf(conf, "no-session", "false")));
obj.put("noBGJob", Boolean.valueOf(getConf(conf, "no-bgjob", "false")));
obj.put("cacheDir", getConf(conf, "cache-dir", "memory"));
obj.put("cacheSize", Integer.valueOf(getConf(conf, "cache-size", "100")));
Expand Down