Skip to content
This repository has been archived by the owner on Oct 27, 2020. It is now read-only.

Commit

Permalink
initial refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
GRECO, FRANK committed Aug 15, 2017
1 parent ae3367c commit b96d214
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 82 deletions.
23 changes: 17 additions & 6 deletions server/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
package server

import (
"bytes"
"fmt"
"net"
"os"
"strings"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -74,10 +74,9 @@ func StartUDPServer() (e error) {
if err != nil {
return err
}

trafficData := strings.Split(string(buf[0:n]), ",")
spec.TrafficStore.AddTraffic(trafficData[0], trafficData[1], trafficData[2], time.Now())

if err := spec.TrafficStore.Set(string(buf[0:n])); err != nil {
logrus.Errorf("could not add traffic point to store: %s", err.Error())
}
}

}
Expand All @@ -88,7 +87,9 @@ func Emit(binding spec.APIKeyBinding, keyName string, currTime time.Time) {
for _, addr := range spec.KanaliEndpoints.Subsets[0].Addresses {

if os.Getenv("POD_IP") == addr.IP {
spec.TrafficStore.AddTraffic(binding.ObjectMeta.Namespace, binding.Spec.APIProxyName, keyName, currTime)
if err := spec.TrafficStore.Set(encodeKanaliGram(binding.ObjectMeta.Namespace, binding.Spec.APIProxyName, keyName, ",")); err != nil {
logrus.Errorf("could not add traffic point to store: %s", err.Error())
}
continue
}

Expand Down Expand Up @@ -121,3 +122,13 @@ func Emit(binding spec.APIKeyBinding, keyName string, currTime time.Time) {
}

}

func encodeKanaliGram(nSpace, pName, keyName, delimiter string) string {
var buffer bytes.Buffer
buffer.WriteString(nSpace)
buffer.WriteString(delimiter)
buffer.WriteString(pName)
buffer.WriteString(delimiter)
buffer.WriteString(keyName)
return buffer.String()
}
177 changes: 101 additions & 76 deletions spec/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,174 +21,199 @@
package spec

import (
"errors"
"fmt"
"strings"
"sync"
"time"
)

// TrafficFactory is factory that implements a data structure for Kanali traffic
type TrafficFactory map[string]map[string]map[string][]time.Time
type trafficByAPIKey map[string][]time.Time
type trafficByAPIProxy map[string]trafficByAPIKey
type trafficByNamespace map[string]trafficByAPIProxy

// TrafficStore holds all Kanali traffic. It should not be mutated directly!
var TrafficStore TrafficFactory
// TrafficFactory is factory that implements a concurrency safe store for Kanali traffic
type TrafficFactory struct {
mutex sync.RWMutex
trafficMap trafficByNamespace
}

// TrafficStore holds all API traffic that Kanali has discovered
// in a cluster. It should not be mutated directly!
var TrafficStore *TrafficFactory

func init() {
TrafficStore = map[string]map[string]map[string][]time.Time{}
TrafficStore = &TrafficFactory{sync.RWMutex{}, make(trafficByNamespace)}
}

// Clear will remove all entries from the traffic store
func (store TrafficFactory) Clear() {
for k := range store {
delete(store, k)
func (s *TrafficFactory) Clear() {
s.mutex.Lock()
defer s.mutex.Unlock()
for k := range s.trafficMap {
delete(s.trafficMap, k)
}
}

// AddTraffic will add a traffic point to the traffic store
func (store TrafficFactory) AddTraffic(namespace, proxyName, keyName string, currTime time.Time) {

if _, ok := store[namespace]; !ok {
store[namespace] = map[string]map[string][]time.Time{
proxyName: {
keyName: {currTime},
},
}
return
// Set takes a traffic point and either adds it to the store
func (s *TrafficFactory) Set(obj interface{}) error {
s.mutex.Lock()
defer s.mutex.Unlock()
kgram, ok := obj.(string)
if !ok {
return errors.New("parameter not of type string")
}

if _, ok := store[namespace][proxyName]; !ok {
store[namespace][proxyName] = map[string][]time.Time{
keyName: {currTime},
}
return
nSpace, pName, keyName, err := decodeKanaliGram(kgram, ",")
if err != nil {
return err
}

if _, ok := store[namespace][proxyName][keyName]; !ok {
store[namespace][proxyName][keyName] = []time.Time{currTime}
return
if _, ok := s.trafficMap[nSpace]; !ok {
s.trafficMap[nSpace] = make(trafficByAPIProxy)
}
if _, ok := s.trafficMap[nSpace][pName]; !ok {
s.trafficMap[nSpace][pName] = make(trafficByAPIKey)
}
if _, ok := s.trafficMap[nSpace][pName][keyName]; !ok {
s.trafficMap[nSpace][pName][keyName] = make([]time.Time, 1)
}
s.trafficMap[nSpace][pName][keyName] = append(s.trafficMap[nSpace][pName][keyName], time.Now())
return nil
}

store[namespace][proxyName][keyName] = append(store[namespace][proxyName][keyName], currTime)

// IsEmpty reports whether the traffic store is empty
func (s *TrafficFactory) IsEmpty() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
return len(s.trafficMap) == 0
}

// IsQuotaViolated will see whether a quota limit has been reached
func (store TrafficFactory) IsQuotaViolated(binding APIKeyBinding, keyName string) bool {

func (s *TrafficFactory) IsQuotaViolated(binding APIKeyBinding, keyName string) bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
for _, key := range binding.Spec.Keys {

if key.Name != keyName {
continue
}

if key.Quota == 0 {
return false
}

if !store.hasTraffic(binding, keyName) {
result, err := s.Contains(binding, keyName)
if err != nil || !result {
return false
}

return len(store[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName]) >= key.Quota

return len(s.trafficMap[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName]) >= key.Quota
}

return true

}

// IsRateLimitViolated wee see whether a rate limit has been reached
func (store TrafficFactory) IsRateLimitViolated(binding APIKeyBinding, keyName string, currTime time.Time) bool {

func (s *TrafficFactory) IsRateLimitViolated(binding APIKeyBinding, keyName string, currTime time.Time) bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
for _, key := range binding.Spec.Keys {

if key.Name != keyName {
continue
}

if key.Rate == nil {
return false
}

if key.Rate.Amount == 0 {
return false
}

if !store.hasTraffic(binding, keyName) {
result, err := s.Contains(binding, keyName)
if err != nil || !result {
return false
}

return getTrafficVolume(store[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName], key.Rate.Unit, currTime, 0, len(store[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName])) >= key.Rate.Amount

return getTrafficVolume(s.trafficMap[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName], key.Rate.Unit, currTime, 0, len(s.trafficMap[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName])) >= key.Rate.Amount
}

return true

}

func (store TrafficFactory) hasTraffic(binding APIKeyBinding, keyName string) bool {

if _, ok := store[binding.ObjectMeta.Namespace]; !ok {
return false
// Contains reports whether the traffic store has any traffic for a given proxy/name combination
func (s *TrafficFactory) Contains(params ...interface{}) (bool, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
if len(params) != 2 {
return false, errors.New("expecting two parameters")
}

if _, ok := store[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName]; !ok {
return false
binding, ok := params[0].(APIKeyBinding)
if !ok {
return false, errors.New("expected the first parameter to be of type spec.APIKeyBinding")
}

if _, ok := store[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName]; !ok {
return false
keyName, ok := params[1].(string)
if !ok {
return false, errors.New("expected the first parameter to be of type string")
}
if _, ok := s.trafficMap[binding.ObjectMeta.Namespace]; !ok {
return false, nil
}
if _, ok := s.trafficMap[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName]; !ok {
return false, nil
}
if _, ok := s.trafficMap[binding.ObjectMeta.Namespace][binding.Spec.APIProxyName][keyName]; !ok {
return false, nil
}
return true, nil
}

return true
// Delete removes all traffic for a given namespace, proxy, and key combination
// TODO
func (s *TrafficFactory) Delete(obj interface{}) (interface{}, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
return nil, nil
}

// Get retrieves a set of traffic points for a unique namespace/proxy/key combination
// TODO
func (s *TrafficFactory) Get(params ...interface{}) (interface{}, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
return nil, nil
}

func getTrafficVolume(arr []time.Time, unit string, currTime time.Time, low, high int) int {

if arr == nil {
return 0
}
if high <= low {
return len(arr[low:])
}

mid := (low + high) / 2
tMinusOne, err := timeMinusOneUnit(currTime, unit)
if err != nil {
return len(arr)
}

if compareTime(tMinusOne, arr[mid]) < 0 {
return getTrafficVolume(arr, unit, currTime, 0, mid)
}

return getTrafficVolume(arr, unit, currTime, mid+1, high)

}

func timeMinusOneUnit(t time.Time, unit string) (time.Time, error) {

var newTime time.Time

d, err := time.ParseDuration(fmt.Sprintf("-1%s", strings.ToLower(string(unit[0]))))
if err != nil {
return newTime, err
}

return t.Add(d), nil

}

func compareTime(t1, t2 time.Time) int {

if t1.Equal(t2) {
return 0
}

if t1.Before(t2) {
return -1
}

return 1
}

func decodeKanaliGram(gram, delimiter string) (string, string, string, error) {
arr := strings.Split(gram, delimiter)
if len(arr) != 3 {
return "", "", "", errors.New("kgram must have 3")
}
return arr[0], arr[1], arr[2], nil
}

0 comments on commit b96d214

Please sign in to comment.