Skip to content

Commit

Permalink
Merge 8e3b4a3 into 8702c56
Browse files Browse the repository at this point in the history
  • Loading branch information
zouyx committed Jul 20, 2020
2 parents 8702c56 + 8e3b4a3 commit 66ce150
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 9 deletions.
4 changes: 4 additions & 0 deletions component/notify/change_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (c *CustomChangeListener) OnChange(changeEvent *storage.ChangeEvent) {
Assert(c.t, storage.ADDED, Equal(changeEvent.Changes["key2"].ChangeType))
}

func (c *CustomChangeListener) OnNewestChange(configuration map[string]interface{}) {

}

func TestListenChangeEvent(t *testing.T) {
go buildNotifyResult(t)
group := sync.WaitGroup{}
Expand Down
17 changes: 16 additions & 1 deletion storage/change_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func init() {
type ChangeListener interface {
//OnChange 增加变更监控
OnChange(event *ChangeEvent)

//OnNewestChange 监控最新变更
OnNewestChange(configuration map[string]interface{})
}

//config change type
Expand Down Expand Up @@ -67,14 +70,26 @@ func GetChangeListeners() *list.List {

//push config change event
func pushChangeEvent(event *ChangeEvent) {
pushChange(func(listener ChangeListener) {
go listener.OnChange(event)
})
}

func pushNewestChanges(configuration map[string]interface{}) {
pushChange(func(listener ChangeListener) {
go listener.OnNewestChange(configuration)
})
}

func pushChange(f func(ChangeListener)) {
// if channel is null ,mean no listener,don't need to push msg
if changeListeners == nil || changeListeners.Len() == 0 {
return
}

for i := changeListeners.Front(); i != nil; i = i.Next() {
listener := i.Value.(ChangeListener)
go listener.OnChange(event)
f(listener)
}
}

Expand Down
4 changes: 4 additions & 0 deletions storage/change_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (t *CustomChangeListener) OnChange(event *ChangeEvent) {
t.w.Done()
}

func (t *CustomChangeListener) OnNewestChange(configuration map[string]interface{}) {

}

func TestAddChangeListener(t *testing.T) {

AddChangeListener(nil)
Expand Down
18 changes: 13 additions & 5 deletions storage/event_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
fmtInvalidKey = "invalid key format for key %s"
fmtInvalidKey = "invalid key format for key %s"
)

var (
Expand Down Expand Up @@ -45,7 +45,9 @@ func UseEventDispatch() {
}

// RegisterListener 为某些key注释Listener
func RegisterListener(listener Listener, keys ...string) error {return eventDispatch.RegisterListener(listener, keys...)}
func RegisterListener(listener Listener, keys ...string) error {
return eventDispatch.RegisterListener(listener, keys...)
}

// RegisterListener 是为某些key注释Listener的方法
func (d *Dispatcher) RegisterListener(listenerObject Listener, keys ...string) error {
Expand Down Expand Up @@ -86,7 +88,9 @@ func invalidKey(key string) bool {
}

// UnRegisterListener 为某些key注释Listener
func UnRegisterListener(listenerObj Listener, keys ...string) error {return eventDispatch.UnRegisterListener(listenerObj, keys...)}
func UnRegisterListener(listenerObj Listener, keys ...string) error {
return eventDispatch.UnRegisterListener(listenerObj, keys...)
}

// UnRegisterListener 用于为某些key注释Listener
func (d *Dispatcher) UnRegisterListener(listenerObj Listener, keys ...string) error {
Expand Down Expand Up @@ -116,7 +120,7 @@ func (d *Dispatcher) UnRegisterListener(listenerObj Listener, keys ...string) er
}

//OnChange 实现Apollo的ChangeEvent处理
func (d *Dispatcher) OnChange(changeEvent *ChangeEvent){
func (d *Dispatcher) OnChange(changeEvent *ChangeEvent) {
if changeEvent == nil {
return
}
Expand All @@ -126,6 +130,10 @@ func (d *Dispatcher) OnChange(changeEvent *ChangeEvent){
}
}

func (d *Dispatcher) OnNewestChange(configuration map[string]interface{}) {

}

func (d *Dispatcher) dispatchEvent(eventKey string, event *ConfigChange) {
for regKey, listenerList := range d.listeners {
matched, err := regexp.MatchString(regKey, eventKey)
Expand All @@ -145,7 +153,7 @@ func (d *Dispatcher) dispatchEvent(eventKey string, event *ConfigChange) {
func convertToEvent(key string, event *ConfigChange) *Event {
e := &Event{
EventType: event.ChangeType,
Key: key,
Key: key,
}
switch event.ChangeType {
case ADDED:
Expand Down
10 changes: 7 additions & 3 deletions storage/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,16 @@ func UpdateApolloConfig(apolloConfig *env.ApolloConfig, isBackupConfig bool) {
log.Error("apolloConfig is null,can't update!")
return
}

//update apollo connection config
env.SetCurrentApolloConfig(apolloConfig.NamespaceName, &apolloConfig.ApolloConnConfig)

//get change list
changeList := UpdateApolloConfigCache(apolloConfig.Configurations, configCacheExpireTime, apolloConfig.NamespaceName)

//push all newest changes
pushNewestChanges(apolloConfig.Configurations)

if len(changeList) > 0 {
//create config change event base on change list
event := createConfigChangeEvent(changeList, apolloConfig.NamespaceName)
Expand All @@ -204,9 +211,6 @@ func UpdateApolloConfig(apolloConfig *env.ApolloConfig, isBackupConfig bool) {
pushChangeEvent(event)
}

//update apollo connection config
env.SetCurrentApolloConfig(apolloConfig.NamespaceName, &apolloConfig.ApolloConnConfig)

if isBackupConfig {
//write config file async
go extension.GetFileHandler().WriteConfigFile(apolloConfig, env.GetPlainAppConfig().GetBackupConfigPath())
Expand Down

0 comments on commit 66ce150

Please sign in to comment.