forked from colinmarc/hdfs
/
conf.go
98 lines (85 loc) · 2.34 KB
/
conf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package hdfs
import (
"encoding/xml"
"errors"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
)
// Property is the struct representation of hadoop configuration
// key value pair.
type Property struct {
Name string `xml:"name"`
Value string `xml:"value"`
}
type propertyList struct {
Property []Property `xml:"property"`
}
// HadoopConf represents a map of all the key value configutation
// pairs found in a user's hadoop configuration files.
type HadoopConf map[string]string
var errUnresolvedNamenode = errors.New("no namenode address in configuration")
// LoadHadoopConf returns a HadoopConf object that is key value map
// of all the hadoop conf properties, swallows errors reading xml
// and reading a non-existant file.
func LoadHadoopConf(inputPath string) HadoopConf {
var tryPaths []string
if inputPath != "" {
tryPaths = append(tryPaths, inputPath)
} else {
hadoopConfDir := os.Getenv("HADOOP_CONF_DIR")
hadoopHome := os.Getenv("HADOOP_HOME")
if hadoopConfDir != "" {
confHdfsPath := filepath.Join(hadoopConfDir, "hdfs-site.xml")
confCorePath := filepath.Join(hadoopConfDir, "core-site.xml")
tryPaths = append(tryPaths, confHdfsPath, confCorePath)
}
if hadoopHome != "" {
hdfsPath := filepath.Join(hadoopHome, "conf", "hdfs-site.xml")
corePath := filepath.Join(hadoopHome, "conf", "core-site.xml")
tryPaths = append(tryPaths, hdfsPath, corePath)
}
}
hadoopConf := make(HadoopConf)
for _, tryPath := range tryPaths {
pList := propertyList{}
f, err := ioutil.ReadFile(tryPath)
if err != nil {
continue
}
xmlErr := xml.Unmarshal(f, &pList)
if xmlErr != nil {
continue
}
for _, prop := range pList.Property {
hadoopConf[prop.Name] = prop.Value
}
}
return hadoopConf
}
// Namenodes returns a slice of deduplicated namenodes named in
// a user's hadoop configuration files or an error is there are no namenodes.
func (conf HadoopConf) Namenodes() ([]string, error) {
nns := make(map[string]bool)
for key, value := range conf {
if strings.Contains(key, "fs.defaultFS") {
nnUrl, _ := url.Parse(value)
nns[nnUrl.Host] = true
}
if strings.HasPrefix(key, "dfs.namenode.rpc-address") {
nns[value] = true
}
}
if len(nns) == 0 {
return nil, errUnresolvedNamenode
}
keys := make([]string, len(nns))
i := 0
for k, _ := range nns {
keys[i] = k
i++
}
return keys, nil
}