Skip to content

Commit

Permalink
Profile base to dev (#350)
Browse files Browse the repository at this point in the history
* preview to base (#333)

* Profile base bytesbuffer (#326)

* add WriteString to BytesBuffer

* Encode WriteString

* Profile base object reuse (#327)

* reuse bytesbuffer & stream

* Optimise StringMap Range (#328)

* Profile base object reuse (#332)

* reuse bytesbuffer & stream

* Profile preview to base (#349)

* Profile base add metric (#338)
* Optimise filter addMetric
* Profile base findcluster (#340)
* Profile base findcluster (#337)
* Profile base do acccesslog (#339)
* bugfix: BytesBuffer.Reset (#334)
* Access log (#336)
* Profile base do accesslog (#343)
 Accesslogv2 (#342)
* Profile base preview escape (#345)
* optimise Escape
* Profile base timenow (#344) (#347)
* Profile base do accesslog (#343)
* update: log/bytesbuffer (#341)
* bytesBuffer release
* Accesslogv2 (#346)
* update: log/bytesbuffer
  • Loading branch information
snail007 authored Dec 6, 2023
1 parent 890edb9 commit 3cf5d10
Show file tree
Hide file tree
Showing 23 changed files with 1,027 additions and 252 deletions.
51 changes: 35 additions & 16 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ var (
setAgentLock sync.Mutex
notFoundProviderCount int64 = 0
defaultInitClusterTimeout int64 = 10000 //ms
clusterSlicePool = sync.Pool{
New: func() interface{} {
return make([]serviceMapItem, 0, 5)
},
}
)

type Agent struct {
Expand Down Expand Up @@ -794,32 +799,28 @@ func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
}
return res
}
func (a *agentMessageHandler) matchRule(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string) (foundClusters []serviceMapItem, err error) {
func (a *agentMessageHandler) fillMatch(typ, cond, key string, data []serviceMapItem, f func(u *motan.URL) string, match *[]serviceMapItem) error {
if cond == "" {
err = fmt.Errorf("empty %s is not supported", typ)
return
return fmt.Errorf("empty %s is not supported", typ)
}
for _, item := range data {
if f(item.url) == cond {
foundClusters = append(foundClusters, item)
*match = append(*match, item)
}
}
if len(foundClusters) == 0 {
err = fmt.Errorf("cluster not found. cluster:%s", key)
return
if len(*match) == 0 {
return fmt.Errorf("cluster not found. cluster:%s", key)
}
return
return nil
}
func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.MotanCluster, key string, err error) {
service := request.GetServiceName()
group := request.GetAttachment(mpro.MGroup)
version := request.GetAttachment(mpro.MVersion)
protocol := request.GetAttachment(mpro.MProxyProtocol)
reqInfo := fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}",
service, group, protocol, version)
serviceItemArrI, exists := a.agent.serviceMap.Load(service)
if !exists {
err = fmt.Errorf("cluster not found. cluster:%s, %s", service, reqInfo)
err = fmt.Errorf("cluster not found. cluster:%s, %s", service, getReqInfo(service, group, protocol, version))
return
}
search := []struct {
Expand All @@ -832,23 +833,31 @@ func (a *agentMessageHandler) findCluster(request motan.Request) (c *cluster.Mot
{"protocol", protocol, func(u *motan.URL) string { return u.Protocol }},
{"version", version, func(u *motan.URL) string { return u.GetParam(motan.VersionKey, "") }},
}
foundClusters := serviceItemArrI.([]serviceMapItem)
clusters := serviceItemArrI.([]serviceMapItem)
matched := clusterSlicePool.Get().([]serviceMapItem)
if cap(matched) < len(clusters) {
matched = make([]serviceMapItem, 0, len(clusters))
}
for i, rule := range search {
if i == 0 {
key = rule.cond
} else {
key += "_" + rule.cond
}
foundClusters, err = a.matchRule(rule.tip, rule.cond, key, foundClusters, rule.condFn)
err = a.fillMatch(rule.tip, rule.cond, key, clusters, rule.condFn, &matched)
if err != nil {
putBackClusterSlice(matched)
return
}
if len(foundClusters) == 1 {
c = foundClusters[0].cluster
if len(matched) == 1 {
c = matched[0].cluster
putBackClusterSlice(matched)
return
}
matched = matched[:0]
}
err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, reqInfo)
err = fmt.Errorf("less condition to select cluster, maybe this service belongs to multiple group, protocol, version; cluster: %s, %s", key, getReqInfo(service, group, protocol, version))
putBackClusterSlice(matched)
return
}

Expand Down Expand Up @@ -1222,6 +1231,11 @@ func urlExist(url *motan.URL, urls map[string]*motan.URL) bool {
return false
}

func getReqInfo(service, group, protocol, version string) string {
return fmt.Sprintf("request information: {service: %s, group: %s, protocol: %s, version: %s}",
service, group, protocol, version)
}

func (a *Agent) SubscribeService(url *motan.URL) error {
if urlExist(url, a.Context.RefersURLs) {
return fmt.Errorf("url exist, ignore subscribe, url: %s", url.GetIdentity())
Expand Down Expand Up @@ -1251,3 +1265,8 @@ func (a *Agent) UnexportService(url *motan.URL) error {
}
return nil
}

func putBackClusterSlice(s []serviceMapItem) {
s = s[:0]
clusterSlicePool.Put(s)
}
59 changes: 56 additions & 3 deletions core/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ import (
"encoding/binary"
"errors"
"io"
"math/rand"
"sync"
"time"
)

var (
maxReuseBufSize = 204800
discardRatio = 0.1
bytesBufferPool = sync.Pool{New: func() interface{} {
return new(BytesBuffer)
}}
)

// BytesBuffer is a variable-sized buffer of bytes with Read and Write methods.
Expand All @@ -26,10 +37,16 @@ func NewBytesBuffer(initsize int) *BytesBuffer {

// NewBytesBufferWithOrder create a empty BytesBuffer with initial size and byte order
func NewBytesBufferWithOrder(initsize int, order binary.ByteOrder) *BytesBuffer {
return &BytesBuffer{buf: make([]byte, initsize),
order: order,
temp: make([]byte, 8),
bb := AcquireBytesBuffer()
if bb.buf == nil {
bb.buf = make([]byte, initsize)
}
if bb.temp == nil {
bb.temp = make([]byte, 8)
}
bb.order = order

return bb
}

// CreateBytesBuffer create a BytesBuffer from data bytes
Expand Down Expand Up @@ -78,6 +95,16 @@ func (b *BytesBuffer) WriteByte(c byte) {
b.wpos++
}

// WriteString write a str string append the BytesBuffer, and the wpos will increase len(str)
func (b *BytesBuffer) WriteString(str string) {
l := len(str)
if len(b.buf) < b.wpos+l {
b.grow(l)
}
copy(b.buf[b.wpos:], str)
b.wpos += l
}

// Write write a byte array append the BytesBuffer, and the wpos will increase len(bytes)
func (b *BytesBuffer) Write(bytes []byte) {
l := len(bytes)
Expand Down Expand Up @@ -262,3 +289,29 @@ func (b *BytesBuffer) Remain() int { return b.wpos - b.rpos }
func (b *BytesBuffer) Len() int { return b.wpos - 0 }

func (b *BytesBuffer) Cap() int { return cap(b.buf) }

func hitDiscard() bool {
r := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(100)
if float64(r)/100 < discardRatio {
return true
}
return false
}

func AcquireBytesBuffer() *BytesBuffer {
b := bytesBufferPool.Get()
if b == nil {
return &BytesBuffer{}
}
return b.(*BytesBuffer)
}

func ReleaseBytesBuffer(b *BytesBuffer) {
if b != nil {
//if cap(b.buf) > maxReuseBufSize && hitDiscard() {
// return
//}
b.Reset()
bytesBufferPool.Put(b)
}
}
4 changes: 4 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,7 @@ const (
EUnkonwnMsg = 1003
EConvertMsg = 1004
)

const (
DefaultDecodeLength = 100
)
22 changes: 11 additions & 11 deletions core/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ func (m *StringMap) Store(key, value string) {
m.mu.Unlock()
}

func (m *StringMap) Reset() {
//TODO: 这个地方是否应该加锁呢?
m.mu.Lock()
for k := range m.innerMap {
delete(m.innerMap, k)
}
m.mu.Unlock()
}

func (m *StringMap) Delete(key string) {
m.mu.Lock()
delete(m.innerMap, key)
Expand All @@ -48,17 +57,8 @@ func (m *StringMap) LoadOrEmpty(key string) string {
// If f returns false, range stops the iteration
func (m *StringMap) Range(f func(k, v string) bool) {
m.mu.RLock()
keys := make([]string, 0, len(m.innerMap))
for k := range m.innerMap {
keys = append(keys, k)
}
m.mu.RUnlock()

for _, k := range keys {
v, ok := m.Load(k)
if !ok {
continue
}
defer m.mu.RUnlock()
for k, v := range m.innerMap {
if !f(k, v) {
break
}
Expand Down
Loading

0 comments on commit 3cf5d10

Please sign in to comment.