Skip to content

Commit

Permalink
feat: Support hdfs configuration replication (#2680)
Browse files Browse the repository at this point in the history
  • Loading branch information
shjwudp committed Sep 7, 2022
1 parent 92eea36 commit e9088ac
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sort"
"strings"
"time"
"strconv"

"github.com/colinmarc/hdfs/v2"
"github.com/colinmarc/hdfs/v2/hadoopconf"
Expand All @@ -43,6 +44,7 @@ type hdfsclient struct {
DefaultObjectStorage
addr string
c *hdfs.Client
dfsReplication int
}

func (h *hdfsclient) String() string {
Expand Down Expand Up @@ -128,20 +130,20 @@ func (h *hdfsclient) Put(key string, in io.Reader) error {
return h.c.MkdirAll(path, os.FileMode(0755))
}
tmp := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s.tmp.%d", filepath.Base(path), rand.Int()))
f, err := h.c.CreateFile(tmp, 3, 128<<20, 0755)
f, err := h.c.CreateFile(tmp, h.dfsReplication, 128<<20, 0755)
defer func() { _ = h.c.Remove(tmp) }()
if err != nil {
if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrNotExist {
_ = h.c.MkdirAll(filepath.Dir(path), 0755)
f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755)
f, err = h.c.CreateFile(tmp, h.dfsReplication, 128<<20, 0755)
}
if pe, ok := err.(*os.PathError); ok {
if remoteErr, ok := pe.Err.(hdfs.Error); ok && remoteErr.Exception() == abcException {
pe.Err = os.ErrExist
}
if pe.Err == os.ErrExist {
_ = h.c.Remove(tmp)
f, err = h.c.CreateFile(tmp, 3, 128<<20, 0755)
f, err = h.c.CreateFile(tmp, h.dfsReplication, 128<<20, 0755)
}
}
if err != nil {
Expand Down Expand Up @@ -353,7 +355,14 @@ func newHDFS(addr, username, sk, token string) (ObjectStorage, error) {
supergroup = os.Getenv("HADOOP_SUPER_GROUP")
}

return &hdfsclient{addr: addr, c: c}, nil
var replication int = 3
if replication_conf, found := conf["dfs.replication"]; found {
if x, err := strconv.Atoi(replication_conf); err == nil {
replication = x
}
}

return &hdfsclient{addr: addr, c: c, dfsReplication: replication}, nil
}

func init() {
Expand Down

0 comments on commit e9088ac

Please sign in to comment.