Skip to content

Commit

Permalink
rpk/config: Support named listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
David Castillo committed Jan 29, 2021
1 parent 4e1fb27 commit 2ff2279
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 220 deletions.
182 changes: 110 additions & 72 deletions src/go/rpk/pkg/cli/cmd/redpanda/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"regexp"
Expand All @@ -38,64 +39,64 @@ import (
)

type prestartConfig struct {
tuneEnabled bool
checkEnabled bool
tuneEnabled bool
checkEnabled bool
}

type seastarFlags struct {
memory string
lockMemory bool
reserveMemory string
hugepages string
cpuSet string
ioPropertiesFile string
ioProperties string
smp int
threadAffinity bool
numIoQueues int
maxIoRequests int
mbind bool
overprovisioned bool
memory string
lockMemory bool
reserveMemory string
hugepages string
cpuSet string
ioPropertiesFile string
ioProperties string
smp int
threadAffinity bool
numIoQueues int
maxIoRequests int
mbind bool
overprovisioned bool
}

const (
memoryFlag = "memory"
lockMemoryFlag = "lock-memory"
reserveMemoryFlag = "reserve-memory"
hugepagesFlag = "hugepages"
cpuSetFlag = "cpuset"
ioPropertiesFileFlag = "io-properties-file"
ioPropertiesFlag = "io-properties"
wellKnownIOFlag = "well-known-io"
smpFlag = "smp"
threadAffinityFlag = "thread-affinity"
numIoQueuesFlag = "num-io-queues"
maxIoRequestsFlag = "max-io-requests"
mbindFlag = "mbind"
overprovisionedFlag = "overprovisioned"
memoryFlag = "memory"
lockMemoryFlag = "lock-memory"
reserveMemoryFlag = "reserve-memory"
hugepagesFlag = "hugepages"
cpuSetFlag = "cpuset"
ioPropertiesFileFlag = "io-properties-file"
ioPropertiesFlag = "io-properties"
wellKnownIOFlag = "well-known-io"
smpFlag = "smp"
threadAffinityFlag = "thread-affinity"
numIoQueuesFlag = "num-io-queues"
maxIoRequestsFlag = "max-io-requests"
mbindFlag = "mbind"
overprovisionedFlag = "overprovisioned"
)

func NewStartCommand(
fs afero.Fs, mgr config.Manager, launcher rp.Launcher,
) *cobra.Command {
prestartCfg := prestartConfig{}
var (
configFile string
nodeID uint
seeds []string
kafkaAddr string
rpcAddr string
advertisedKafka []string
advertisedRPC string
installDirFlag string
timeout time.Duration
wellKnownIo string
configFile string
nodeID uint
seeds []string
kafkaAddr string
rpcAddr string
advertisedKafka []string
advertisedRPC string
installDirFlag string
timeout time.Duration
wellKnownIo string
)
sFlags := seastarFlags{}

command := &cobra.Command{
Use: "start",
Short: "Start redpanda",
Use: "start",
Short: "Start redpanda",
RunE: func(ccmd *cobra.Command, args []string) error {
conf, err := mgr.FindOrGenerate(configFile)
if err != nil {
Expand Down Expand Up @@ -126,7 +127,7 @@ func NewStartCommand(
kafkaAddr,
os.Getenv("REDPANDA_KAFKA_ADDRESS"),
)
kafkaApi, err := parseAddress(
kafkaApi, err := parseNamedAddress(
kafkaAddr,
config.Default().Redpanda.KafkaApi.Port,
)
Expand Down Expand Up @@ -161,7 +162,7 @@ func NewStartCommand(
",",
),
)
advKafkaApi, err := parseAddresses(
advKafkaApi, err := parseNamedAddresses(
advertisedKafka,
config.Default().Redpanda.KafkaApi.Port,
)
Expand Down Expand Up @@ -343,19 +344,19 @@ func NewStartCommand(

func flagsMap(sFlags seastarFlags) map[string]interface{} {
return map[string]interface{}{
memoryFlag: sFlags.memory,
lockMemoryFlag: sFlags.lockMemory,
reserveMemoryFlag: sFlags.reserveMemory,
ioPropertiesFileFlag: sFlags.ioPropertiesFile,
ioPropertiesFlag: sFlags.ioProperties,
cpuSetFlag: sFlags.cpuSet,
smpFlag: sFlags.smp,
hugepagesFlag: sFlags.hugepages,
threadAffinityFlag: sFlags.threadAffinity,
numIoQueuesFlag: sFlags.numIoQueues,
maxIoRequestsFlag: sFlags.maxIoRequests,
mbindFlag: sFlags.mbind,
overprovisionedFlag: sFlags.overprovisioned,
memoryFlag: sFlags.memory,
lockMemoryFlag: sFlags.lockMemory,
reserveMemoryFlag: sFlags.reserveMemory,
ioPropertiesFileFlag: sFlags.ioPropertiesFile,
ioPropertiesFlag: sFlags.ioProperties,
cpuSetFlag: sFlags.cpuSet,
smpFlag: sFlags.smp,
hugepagesFlag: sFlags.hugepages,
threadAffinityFlag: sFlags.threadAffinity,
numIoQueuesFlag: sFlags.numIoQueues,
maxIoRequestsFlag: sFlags.maxIoRequests,
mbindFlag: sFlags.mbind,
overprovisionedFlag: sFlags.overprovisioned,
}
}

Expand Down Expand Up @@ -451,8 +452,8 @@ func buildRedpandaFlags(
finalFlags[n] = fmt.Sprint(v)
}
return &rp.RedpandaArgs{
ConfigFilePath: conf.ConfigFile,
SeastarFlags: finalFlags,
ConfigFilePath: conf.ConfigFile,
SeastarFlags: finalFlags,
}, nil
}

Expand Down Expand Up @@ -556,9 +557,9 @@ func tuneAll(
tuner := tunerFactory.CreateTuner(tunerName, params)
supported, reason := tuner.CheckIfSupported()
payload := api.TunerPayload{
Name: tunerName,
Enabled: enabled,
Supported: supported,
Name: tunerName,
Enabled: enabled,
Supported: supported,
}
if !enabled {
log.Infof("Skipping disabled tuner %s", tunerName)
Expand Down Expand Up @@ -607,9 +608,9 @@ func check(
}
for _, result := range results {
payload := api.CheckPayload{
Name: result.Desc,
Current: result.Current,
Required: result.Required,
Name: result.Desc,
Current: result.Current,
Required: result.Required,
}
if result.Err != nil {
payload.ErrorMsg = result.Err.Error()
Expand Down Expand Up @@ -714,8 +715,8 @@ func parseAddress(addr string, defaultPort int) (*config.SocketAddress, error) {
if len(hostPort) != 2 {
// It's just a hostname with no port. Use the default port.
return &config.SocketAddress{
Address: strings.Trim(hostPort[0], " "),
Port: defaultPort,
Address: strings.Trim(hostPort[0], " "),
Port: defaultPort,
}, nil
}
// It's a host:port combo.
Expand All @@ -724,17 +725,54 @@ func parseAddress(addr string, defaultPort int) (*config.SocketAddress, error) {
return nil, fmt.Errorf("Port must be an int")
}
return &config.SocketAddress{
Address: host,
Port: port,
Address: host,
Port: port,
}, nil
}

func parseAddresses(
func parseNamedAddress(addr string, defaultPort int) (*config.NamedSocketAddress, error) {
scheme, hostname, port, err := parseURL(addr)
if err != nil {
return nil, err
}
if port == 0 {
port = defaultPort
}
return &config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: hostname,
Port: port,
},
Name: scheme,
}, nil
}

func parseURL(addr string) (scheme, hostname string, port int, err error) {
root := "//"
if !strings.Contains(addr, root) {
addr = fmt.Sprintf("%s%s", root, addr)
}
u, err := url.Parse(addr)
if err != nil {
return "", "", 0, err
}
if u.Port() != "" {
port, err = strconv.Atoi(u.Port())
if err != nil {
return "", "", 0, err
}
}
scheme = u.Scheme
hostname = u.Hostname()
return scheme, hostname, port, nil
}

func parseNamedAddresses(
addrs []string, defaultPort int,
) ([]config.SocketAddress, error) {
as := make([]config.SocketAddress, 0, len(addrs))
) ([]config.NamedSocketAddress, error) {
as := make([]config.NamedSocketAddress, 0, len(addrs))
for _, addr := range addrs {
a, err := parseAddress(addr, defaultPort)
a, err := parseNamedAddress(addr, defaultPort)
if err != nil {
return nil, err
}
Expand Down
97 changes: 51 additions & 46 deletions src/go/rpk/pkg/config/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,71 +12,76 @@ package config
import "path"

type Config struct {
NodeUuid string `yaml:"node_uuid,omitempty" mapstructure:"node_uuid,omitempty" json:"nodeUuid"`
Organization string `yaml:"organization,omitempty" mapstructure:"organization,omitempty" json:"organization"`
LicenseKey string `yaml:"license_key,omitempty" mapstructure:"license_key,omitempty" json:"licenseKey"`
ClusterId string `yaml:"cluster_id,omitempty" mapstructure:"cluster_id,omitempty" json:"clusterId"`
ConfigFile string `yaml:"config_file" mapstructure:"config_file" json:"configFile"`
Redpanda RedpandaConfig `yaml:"redpanda" mapstructure:"redpanda" json:"redpanda"`
Rpk RpkConfig `yaml:"rpk" mapstructure:"rpk" json:"rpk"`
NodeUuid string `yaml:"node_uuid,omitempty" mapstructure:"node_uuid,omitempty" json:"nodeUuid"`
Organization string `yaml:"organization,omitempty" mapstructure:"organization,omitempty" json:"organization"`
LicenseKey string `yaml:"license_key,omitempty" mapstructure:"license_key,omitempty" json:"licenseKey"`
ClusterId string `yaml:"cluster_id,omitempty" mapstructure:"cluster_id,omitempty" json:"clusterId"`
ConfigFile string `yaml:"config_file" mapstructure:"config_file" json:"configFile"`
Redpanda RedpandaConfig `yaml:"redpanda" mapstructure:"redpanda" json:"redpanda"`
Rpk RpkConfig `yaml:"rpk" mapstructure:"rpk" json:"rpk"`
}

type RedpandaConfig struct {
Directory string `yaml:"data_directory" mapstructure:"data_directory" json:"dataDirectory"`
RPCServer SocketAddress `yaml:"rpc_server" mapstructure:"rpc_server" json:"rpcServer"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" mapstructure:"advertised_rpc_api,omitempty" json:"advertisedRpcApi,omitempty"`
KafkaApi SocketAddress `yaml:"kafka_api" mapstructure:"kafka_api" json:"kafkaApi"`
AdvertisedKafkaApi []SocketAddress `yaml:"advertised_kafka_api,omitempty" mapstructure:"advertised_kafka_api,omitempty" json:"advertisedKafkaApi,omitempty"`
KafkaApiTLS ServerTLS `yaml:"kafka_api_tls,omitempty" mapstructure:"kafka_api_tls,omitempty" json:"kafkaApiTls"`
AdminApi SocketAddress `yaml:"admin" mapstructure:"admin" json:"admin"`
Id int `yaml:"node_id" mapstructure:"node_id" json:"id"`
SeedServers []SeedServer `yaml:"seed_servers" mapstructure:"seed_servers" json:"seedServers"`
DeveloperMode bool `yaml:"developer_mode" mapstructure:"developer_mode" json:"developerMode"`
Directory string `yaml:"data_directory" mapstructure:"data_directory" json:"dataDirectory"`
RPCServer SocketAddress `yaml:"rpc_server" mapstructure:"rpc_server" json:"rpcServer"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" mapstructure:"advertised_rpc_api,omitempty" json:"advertisedRpcApi,omitempty"`
KafkaApi NamedSocketAddress `yaml:"kafka_api" mapstructure:"kafka_api" json:"kafkaApi"`
AdvertisedKafkaApi []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" mapstructure:"advertised_kafka_api,omitempty" json:"advertisedKafkaApi,omitempty"`
KafkaApiTLS ServerTLS `yaml:"kafka_api_tls,omitempty" mapstructure:"kafka_api_tls,omitempty" json:"kafkaApiTls"`
AdminApi SocketAddress `yaml:"admin" mapstructure:"admin" json:"admin"`
Id int `yaml:"node_id" mapstructure:"node_id" json:"id"`
SeedServers []SeedServer `yaml:"seed_servers" mapstructure:"seed_servers" json:"seedServers"`
DeveloperMode bool `yaml:"developer_mode" mapstructure:"developer_mode" json:"developerMode"`
}

type SeedServer struct {
Host SocketAddress `yaml:"host" mapstructure:"host" json:"host"`
}

type SocketAddress struct {
Address string `yaml:"address" mapstructure:"address" json:"address"`
Port int `yaml:"port" mapstructure:"port" json:"port"`
Address string `yaml:"address" mapstructure:"address" json:"address"`
Port int `yaml:"port" mapstructure:"port" json:"port"`
}

type NamedSocketAddress struct {
SocketAddress `yaml:",inline" mapstructure:",squash"`
Name string `yaml:"name,omitempty" mapstructure:"name,omitempty" json:"name,omitempty"`
}

type TLS struct {
KeyFile string `yaml:"key_file,omitempty" mapstructure:"key_file,omitempty" json:"keyFile"`
CertFile string `yaml:"cert_file,omitempty" mapstructure:"cert_file,omitempty" json:"certFile"`
TruststoreFile string `yaml:"truststore_file,omitempty" mapstructure:"truststore_file,omitempty" json:"truststoreFile"`
KeyFile string `yaml:"key_file,omitempty" mapstructure:"key_file,omitempty" json:"keyFile"`
CertFile string `yaml:"cert_file,omitempty" mapstructure:"cert_file,omitempty" json:"certFile"`
TruststoreFile string `yaml:"truststore_file,omitempty" mapstructure:"truststore_file,omitempty" json:"truststoreFile"`
}

type ServerTLS struct {
KeyFile string `yaml:"key_file,omitempty" mapstructure:"key_file,omitempty" json:"keyFile"`
CertFile string `yaml:"cert_file,omitempty" mapstructure:"cert_file,omitempty" json:"certFile"`
TruststoreFile string `yaml:"truststore_file,omitempty" mapstructure:"truststore_file,omitempty" json:"truststoreFile"`
Enabled bool `yaml:"enabled,omitempty" mapstructure:"enabled,omitempty" json:"enabled"`
KeyFile string `yaml:"key_file,omitempty" mapstructure:"key_file,omitempty" json:"keyFile"`
CertFile string `yaml:"cert_file,omitempty" mapstructure:"cert_file,omitempty" json:"certFile"`
TruststoreFile string `yaml:"truststore_file,omitempty" mapstructure:"truststore_file,omitempty" json:"truststoreFile"`
Enabled bool `yaml:"enabled,omitempty" mapstructure:"enabled,omitempty" json:"enabled"`
}

type RpkConfig struct {
TLS TLS `yaml:"tls,omitempty" mapstructure:"tls,omitempty" json:"tls"`
AdditionalStartFlags []string `yaml:"additional_start_flags,omitempty" mapstructure:"additional_start_flags,omitempty" json:"additionalStartFlags"`
EnableUsageStats bool `yaml:"enable_usage_stats" mapstructure:"enable_usage_stats" json:"enableUsageStats"`
TuneNetwork bool `yaml:"tune_network" mapstructure:"tune_network" json:"tuneNetwork"`
TuneDiskScheduler bool `yaml:"tune_disk_scheduler" mapstructure:"tune_disk_scheduler" json:"tuneDiskScheduler"`
TuneNomerges bool `yaml:"tune_disk_nomerges" mapstructure:"tune_disk_nomerges" json:"tuneNomerges"`
TuneDiskWriteCache bool `yaml:"tune_disk_write_cache" mapstructure:"tune_disk_write_cache" json:"tuneDiskWriteCache"`
TuneDiskIrq bool `yaml:"tune_disk_irq" mapstructure:"tune_disk_irq" json:"tuneDiskIrq"`
TuneFstrim bool `yaml:"tune_fstrim" mapstructure:"tune_fstrim" json:"tuneFstrim"`
TuneCpu bool `yaml:"tune_cpu" mapstructure:"tune_cpu" json:"tuneCpu"`
TuneAioEvents bool `yaml:"tune_aio_events" mapstructure:"tune_aio_events" json:"tuneAioEvents"`
TuneClocksource bool `yaml:"tune_clocksource" mapstructure:"tune_clocksource" json:"tuneClocksource"`
TuneSwappiness bool `yaml:"tune_swappiness" mapstructure:"tune_swappiness" json:"tuneSwappiness"`
TuneTransparentHugePages bool `yaml:"tune_transparent_hugepages" mapstructure:"tune_transparent_hugepages" json:"tuneTransparentHugePages"`
EnableMemoryLocking bool `yaml:"enable_memory_locking" mapstructure:"enable_memory_locking" json:"enableMemoryLocking"`
TuneCoredump bool `yaml:"tune_coredump" mapstructure:"tune_coredump" json:"tuneCoredump"`
CoredumpDir string `yaml:"coredump_dir,omitempty" mapstructure:"coredump_dir,omitempty" json:"coredumpDir"`
WellKnownIo string `yaml:"well_known_io,omitempty" mapstructure:"well_known_io,omitempty" json:"wellKnownIo"`
Overprovisioned bool `yaml:"overprovisioned" mapstructure:"overprovisioned" json:"overprovisioned"`
SMP *int `yaml:"smp,omitempty" mapstructure:"smp,omitempty" json:"smp,omitempty"`
TLS TLS `yaml:"tls,omitempty" mapstructure:"tls,omitempty" json:"tls"`
AdditionalStartFlags []string `yaml:"additional_start_flags,omitempty" mapstructure:"additional_start_flags,omitempty" json:"additionalStartFlags"`
EnableUsageStats bool `yaml:"enable_usage_stats" mapstructure:"enable_usage_stats" json:"enableUsageStats"`
TuneNetwork bool `yaml:"tune_network" mapstructure:"tune_network" json:"tuneNetwork"`
TuneDiskScheduler bool `yaml:"tune_disk_scheduler" mapstructure:"tune_disk_scheduler" json:"tuneDiskScheduler"`
TuneNomerges bool `yaml:"tune_disk_nomerges" mapstructure:"tune_disk_nomerges" json:"tuneNomerges"`
TuneDiskWriteCache bool `yaml:"tune_disk_write_cache" mapstructure:"tune_disk_write_cache" json:"tuneDiskWriteCache"`
TuneDiskIrq bool `yaml:"tune_disk_irq" mapstructure:"tune_disk_irq" json:"tuneDiskIrq"`
TuneFstrim bool `yaml:"tune_fstrim" mapstructure:"tune_fstrim" json:"tuneFstrim"`
TuneCpu bool `yaml:"tune_cpu" mapstructure:"tune_cpu" json:"tuneCpu"`
TuneAioEvents bool `yaml:"tune_aio_events" mapstructure:"tune_aio_events" json:"tuneAioEvents"`
TuneClocksource bool `yaml:"tune_clocksource" mapstructure:"tune_clocksource" json:"tuneClocksource"`
TuneSwappiness bool `yaml:"tune_swappiness" mapstructure:"tune_swappiness" json:"tuneSwappiness"`
TuneTransparentHugePages bool `yaml:"tune_transparent_hugepages" mapstructure:"tune_transparent_hugepages" json:"tuneTransparentHugePages"`
EnableMemoryLocking bool `yaml:"enable_memory_locking" mapstructure:"enable_memory_locking" json:"enableMemoryLocking"`
TuneCoredump bool `yaml:"tune_coredump" mapstructure:"tune_coredump" json:"tuneCoredump"`
CoredumpDir string `yaml:"coredump_dir,omitempty" mapstructure:"coredump_dir,omitempty" json:"coredumpDir"`
WellKnownIo string `yaml:"well_known_io,omitempty" mapstructure:"well_known_io,omitempty" json:"wellKnownIo"`
Overprovisioned bool `yaml:"overprovisioned" mapstructure:"overprovisioned" json:"overprovisioned"`
SMP *int `yaml:"smp,omitempty" mapstructure:"smp,omitempty" json:"smp,omitempty"`
}

func (conf *Config) PIDFile() string {
Expand Down
Loading

0 comments on commit 2ff2279

Please sign in to comment.