Skip to content

Commit

Permalink
Merge cbda852 into 0d229e9
Browse files Browse the repository at this point in the history
  • Loading branch information
nexustar authored Aug 9, 2023
2 parents 0d229e9 + cbda852 commit b853058
Show file tree
Hide file tree
Showing 19 changed files with 143 additions and 23 deletions.
2 changes: 2 additions & 0 deletions components/dm/spec/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (c *DMMasterComponent) Instances() []Instance {
ManageHost: s.ManageHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),

Ports: []int{
s.Port,
Expand Down Expand Up @@ -284,6 +285,7 @@ func (c *DMWorkerComponent) Instances() []Instance {
ManageHost: s.ManageHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),

Ports: []int{
s.Port,
Expand Down
18 changes: 18 additions & 0 deletions components/dm/spec/topology_dm.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type MasterSpec struct {
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Expand Down Expand Up @@ -202,6 +203,14 @@ func (s *MasterSpec) GetAdvertisePeerURL(enableTLS bool) string {
return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.PeerPort))
}

// GetSource returns source to download the component
func (s *MasterSpec) GetSource() string {
if s.Source == "" {
return ComponentDMMaster
}
return s.Source
}

// WorkerSpec represents the Master topology specification in topology.yaml
type WorkerSpec struct {
Host string `yaml:"host"`
Expand All @@ -216,6 +225,7 @@ type WorkerSpec struct {
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Expand Down Expand Up @@ -272,6 +282,14 @@ func (s *WorkerSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *WorkerSpec) GetSource() string {
if s.Source == "" {
return ComponentDMWorker
}
return s.Source
}

// UnmarshalYAML sets default values when unmarshaling the topology file
func (s *Specification) UnmarshalYAML(unmarshal func(any) error) error {
type topology Specification
Expand Down
10 changes: 5 additions & 5 deletions pkg/cluster/manager/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func buildScaleOutTask(
tb = tb.DeploySpark(inst, sparkVer.String(), srcPath, deployDir)
default:
tb.CopyComponent(
inst.ComponentName(),
inst.ComponentSource(),
inst.OS(),
inst.Arch(),
version,
Expand Down Expand Up @@ -688,7 +688,7 @@ func buildDownloadCompTasks(
var tasks []*task.StepDisplay
uniqueTaskList := set.NewStringSet()
topo.IterInstance(func(inst spec.Instance) {
key := fmt.Sprintf("%s-%s-%s", inst.ComponentName(), inst.OS(), inst.Arch())
key := fmt.Sprintf("%s-%s-%s", inst.ComponentSource(), inst.OS(), inst.Arch())
if found := uniqueTaskList.Exist(key); !found {
uniqueTaskList.Insert(key)

Expand All @@ -698,13 +698,13 @@ func buildDownloadCompTasks(
// download spark as dependency of tispark
tasks = append(tasks, buildDownloadSparkTask(inst, logger, gOpt))
} else {
version = bindVersion(inst.ComponentName(), clusterVersion)
version = bindVersion(inst.ComponentSource(), clusterVersion)
}

t := task.NewBuilder(logger).
Download(inst.ComponentName(), inst.OS(), inst.Arch(), version).
Download(inst.ComponentSource(), inst.OS(), inst.Arch(), version).
BuildAsStep(fmt.Sprintf(" - Download %s:%s (%s/%s)",
inst.ComponentName(), version, inst.OS(), inst.Arch()))
inst.ComponentSource(), version, inst.OS(), inst.Arch()))
tasks = append(tasks, t)
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/manager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (m *Manager) Deploy(

// Deploy components to remote
topo.IterInstance(func(inst spec.Instance) {
version := m.bindVersion(inst.ComponentName(), clusterVersion)
version := m.bindVersion(inst.ComponentSource(), clusterVersion)
deployDir := spec.Abs(globalOptions.User, inst.DeployDir())
// data dir would be empty for components which don't need it
dataDirs := spec.MultiDirAbs(globalOptions.User, inst.DataDir())
Expand Down Expand Up @@ -285,7 +285,7 @@ func (m *Manager) Deploy(
t = t.DeploySpark(inst, sparkVer.String(), "" /* default srcPath */, deployDir)
default:
t = t.CopyComponent(
inst.ComponentName(),
inst.ComponentSource(),
inst.OS(),
inst.Arch(),
version,
Expand Down
10 changes: 5 additions & 5 deletions pkg/cluster/manager/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ Do you want to continue? [y/N]:`,
}
}

version := m.bindVersion(inst.ComponentName(), clusterVersion)
version := m.bindVersion(inst.ComponentSource(), clusterVersion)

// Download component from repository
key := fmt.Sprintf("%s-%s-%s-%s", compName, version, inst.OS(), inst.Arch())
if _, found := uniqueComps[key]; !found {
uniqueComps[key] = struct{}{}
t := task.NewBuilder(m.logger).
Download(inst.ComponentName(), inst.OS(), inst.Arch(), version).
Download(inst.ComponentSource(), inst.OS(), inst.Arch(), version).
Build()
downloadCompTasks = append(downloadCompTasks, t)
}
Expand All @@ -129,7 +129,7 @@ Do you want to continue? [y/N]:`,
switch inst.ComponentName() {
case spec.ComponentPrometheus, spec.ComponentGrafana, spec.ComponentAlertmanager:
tb.CopyComponent(
inst.ComponentName(),
inst.ComponentSource(),
inst.OS(),
inst.Arch(),
version,
Expand All @@ -142,7 +142,7 @@ Do you want to continue? [y/N]:`,
}

// backup files of the old version
tb = tb.BackupComponent(inst.ComponentName(), base.Version, inst.GetManageHost(), deployDir)
tb = tb.BackupComponent(inst.ComponentSource(), base.Version, inst.GetManageHost(), deployDir)

if deployerInstance, ok := inst.(DeployerInstance); ok {
deployerInstance.Deploy(tb, "", deployDir, version, name, clusterVersion)
Expand All @@ -161,7 +161,7 @@ Do you want to continue? [y/N]:`,
tb = tb.DeploySpark(inst, sparkVer.String(), "" /* default srcPath */, deployDir)
default:
tb = tb.CopyComponent(
inst.ComponentName(),
inst.ComponentSource(),
inst.OS(),
inst.Arch(),
version,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/alertmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (i *AlertManagerInstance) InitConfig(
if err := i.TransferLocalConfigFile(ctx, e, configPath, dst); err != nil {
return err
}
return checkConfig(ctx, e, i.ComponentName(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".yml", paths, nil)
return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".yml", paths, nil)
}

// ScaleConfig deploy temporary config on scaling
Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type CDCSpec struct {
GCTTL int64 `yaml:"gc-ttl,omitempty" validate:"gc-ttl:editable"`
TZ string `yaml:"tz,omitempty" validate:"tz:editable"`
TiCDCClusterID string `yaml:"ticdc_cluster_id"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Expand Down Expand Up @@ -91,6 +92,14 @@ func (s *CDCSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *CDCSpec) GetSource() string {
if s.Source == "" {
return ComponentCDC
}
return s.Source
}

// CDCComponent represents CDC component.
type CDCComponent struct{ Topology *Specification }

Expand All @@ -116,6 +125,7 @@ func (c *CDCComponent) Instances() []Instance {
ManageHost: s.ManageHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),

Ports: []int{
s.Port,
Expand Down
12 changes: 11 additions & 1 deletion pkg/cluster/spec/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type DashboardSpec struct {
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Expand Down Expand Up @@ -94,6 +95,14 @@ func (s *DashboardSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *DashboardSpec) GetSource() string {
if s.Source == "" {
return ComponentDashboard
}
return s.Source
}

// DashboardComponent represents Drainer component.
type DashboardComponent struct{ Topology *Specification }

Expand All @@ -119,6 +128,7 @@ func (c *DashboardComponent) Instances() []Instance {
ManageHost: s.ManageHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.Source,

Ports: []int{
s.Port,
Expand Down Expand Up @@ -214,7 +224,7 @@ func (i *DashboardInstance) InitConfig(
return err
}

return checkConfig(ctx, e, i.ComponentName(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths, nil)
return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths, nil)
}

// setTLSConfig set TLS Config to support enable/disable TLS
Expand Down
12 changes: 11 additions & 1 deletion pkg/cluster/spec/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type DrainerSpec struct {
LogDir string `yaml:"log_dir,omitempty"`
CommitTS *int64 `yaml:"commit_ts,omitempty" validate:"commit_ts:editable"` // do not use it anymore, exist for compatibility
Offline bool `yaml:"offline,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Expand Down Expand Up @@ -112,6 +113,14 @@ func (s *DrainerSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *DrainerSpec) GetSource() string {
if s.Source == "" {
return ComponentDrainer
}
return s.Source
}

// DrainerComponent represents Drainer component.
type DrainerComponent struct{ Topology *Specification }

Expand All @@ -137,6 +146,7 @@ func (c *DrainerComponent) Instances() []Instance {
ManageHost: s.ManageHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),

Ports: []int{
s.Port,
Expand Down Expand Up @@ -264,7 +274,7 @@ func (i *DrainerInstance) InitConfig(
return err
}

return checkConfig(ctx, e, i.ComponentName(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths, nil)
return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths, nil)
}

// setTLSConfig set TLS Config to support enable/disable TLS
Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/spec/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Instance interface {
ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
PrepareStart(ctx context.Context, tlsCfg *tls.Config) error
ComponentName() string
ComponentSource() string
InstanceName() string
ServiceName() string
ResourceControl() meta.ResourceControl
Expand Down Expand Up @@ -142,6 +143,7 @@ type BaseInstance struct {
ListenHost string
Port int
SSHP int
Source string

Ports []int
Dirs []string
Expand Down Expand Up @@ -302,6 +304,14 @@ func (i *BaseInstance) ComponentName() string {
return i.Name
}

// ComponentSource implements Instance interface
func (i *BaseInstance) ComponentSource() string {
if i.Source == "" {
return i.Name
}
return i.Source
}

// InstanceName implements Instance interface
func (i *BaseInstance) InstanceName() string {
if i.Port > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/spec/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (i *MonitorInstance) InitConfig(
return err
}

return checkConfig(ctx, e, i.ComponentName(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".yml", paths, nil)
return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".yml", paths, nil)
}

// setTLSConfig set TLS Config to support enable/disable TLS
Expand Down
12 changes: 11 additions & 1 deletion pkg/cluster/spec/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type PDSpec struct {
DeployDir string `yaml:"deploy_dir,omitempty"`
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Expand Down Expand Up @@ -137,6 +138,14 @@ func (s *PDSpec) GetAdvertisePeerURL(enableTLS bool) string {
return fmt.Sprintf("%s://%s", scheme, utils.JoinHostPort(s.Host, s.PeerPort))
}

// GetSource returns source to download the component
func (s *PDSpec) GetSource() string {
if s.Source == "" {
return ComponentPD
}
return s.Source
}

// PDComponent represents PD component.
type PDComponent struct{ Topology *Specification }

Expand Down Expand Up @@ -165,6 +174,7 @@ func (c *PDComponent) Instances() []Instance {
ListenHost: s.ListenHost,
Port: s.ClientPort,
SSHP: s.SSHPort,
Source: s.GetSource(),

Ports: []int{
s.ClientPort,
Expand Down Expand Up @@ -273,7 +283,7 @@ func (i *PDInstance) InitConfig(
return err
}

return checkConfig(ctx, e, i.ComponentName(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths, nil)
return checkConfig(ctx, e, i.ComponentName(), i.ComponentSource(), clusterVersion, i.OS(), i.Arch(), i.ComponentName()+".toml", paths, nil)
}

// setTLSConfig set TLS Config to support enable/disable TLS
Expand Down
10 changes: 10 additions & 0 deletions pkg/cluster/spec/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type PumpSpec struct {
DataDir string `yaml:"data_dir,omitempty"`
LogDir string `yaml:"log_dir,omitempty"`
Offline bool `yaml:"offline,omitempty"`
Source string `yaml:"source,omitempty" validate:"source:editable"`
NumaNode string `yaml:"numa_node,omitempty" validate:"numa_node:editable"`
Config map[string]any `yaml:"config,omitempty" validate:"config:ignore"`
ResourceControl meta.ResourceControl `yaml:"resource_control,omitempty" validate:"resource_control:editable"`
Expand Down Expand Up @@ -111,6 +112,14 @@ func (s *PumpSpec) IgnoreMonitorAgent() bool {
return s.IgnoreExporter
}

// GetSource returns source to download the component
func (s *PumpSpec) GetSource() string {
if s.Source == "" {
return ComponentPump
}
return s.Source
}

// PumpComponent represents Pump component.
type PumpComponent struct{ Topology *Specification }

Expand All @@ -136,6 +145,7 @@ func (c *PumpComponent) Instances() []Instance {
ManageHost: s.ManageHost,
Port: s.Port,
SSHP: s.SSHPort,
Source: s.GetSource(),

Ports: []int{
s.Port,
Expand Down
Loading

0 comments on commit b853058

Please sign in to comment.