Skip to content

Commit

Permalink
ADD: publish/consume throttler API
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed May 31, 2020
1 parent cdc3db4 commit 9308de4
Show file tree
Hide file tree
Showing 13 changed files with 733 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ $(PROGRAM):
test:
- cd scripts/redis && docker-compose up -d && cd ../..
@LMSTFY_TEST_CONFIG=`pwd`/scripts/test-conf.toml sh scripts/run-test.sh
@cp coverage.out build/
@ cp coverage.out build/
- cd scripts/redis && docker-compose down && cd ../..

coverage:
Expand Down
3 changes: 2 additions & 1 deletion auth/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"strings"
"sync"

"github.com/go-redis/redis"

"github.com/bitleak/lmstfy/config"
"github.com/bitleak/lmstfy/engine"
"github.com/bitleak/lmstfy/helper"
"github.com/go-redis/redis"
)

const TokenPrefix = "tk"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/oklog/ulid v0.3.0
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.9.1 // indirect
github.com/prometheus/common v0.9.1
github.com/prometheus/procfs v0.0.11 // indirect
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.2.2 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ github.com/BurntSushi/toml v0.3.0/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a h1:BtpsbiV638WQZwhA98cEZw2BsbnQJrbd0BI7tsy0W1c=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -184,6 +186,7 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
Expand Down
128 changes: 120 additions & 8 deletions server/handlers/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ import (
"math"
"net/http"
"net/http/pprof"
"strconv"
"strings"

"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"

"github.com/bitleak/lmstfy/auth"
"github.com/bitleak/lmstfy/engine"
"github.com/bitleak/lmstfy/server/middleware"
"github.com/bitleak/lmstfy/throttler"
"github.com/bitleak/lmstfy/uuid"
"github.com/bitleak/lmstfy/version"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)

// GET /metrics
Expand Down Expand Up @@ -100,25 +103,134 @@ func NewToken(c *gin.Context) {

// DELETE /token/:namespace/:token
func DeleteToken(c *gin.Context) {
logger := GetHTTPLogger(c)
tm := auth.GetTokenManager()
if err := tm.Delete(c.Query("pool"), c.Param("namespace"), c.Param("token")); err != nil {
pool, token := parseToken(c.Param("token"))
namespace := c.Param("namespace")
if err := tm.Delete(pool, c.Param("namespace"), token); err != nil {
if err == auth.ErrPoolNotExist {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
} else {
logger := GetHTTPLogger(c)
logger.WithFields(logrus.Fields{
"pool": c.Query("pool"),
"namespace": c.Param("namespace"),
"token": c.Param("token"),
"pool": pool,
"namespace": namespace,
"token": token,
"err": err,
}).Error("Failed to delete the token")
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
}
return
}
if err := throttler.GetThrottler().Delete(pool, namespace, token); err != nil {
logger.WithFields(logrus.Fields{
"pool": c.Query("pool"),
"namespace": c.Param("namespace"),
"token": c.Param("token"),
"err": err,
}).Error("Failed to delete the token's limiter")
}
c.Status(http.StatusNoContent)
}

// GET /limiters
func ListLimiters(c *gin.Context) {
forceUpdate, _ := strconv.ParseBool(c.Query("force_update"))
c.JSON(http.StatusOK, throttler.GetThrottler().GetAll(forceUpdate))
}

// POST /token/:namespace/:token/limit
func AddLimiter(c *gin.Context) {
logger := GetHTTPLogger(c)
var limiter throttler.Limiter
if err := c.BindJSON(&limiter); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err})
return
}
if limiter.Read == 0 && limiter.Write == 0 {
c.JSON(http.StatusBadRequest, gin.H{"limiter": limiter})
return
}
if limiter.Interval == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "interval should be > 0"})
return
}
pool, token := parseToken(c.Param("token"))
namespace := c.Param("namespace")
err := throttler.GetThrottler().Add(pool, namespace, token, &limiter)
if err != nil {
logger.WithFields(logrus.Fields{
"token": c.Param("token"),
"limiter": limiter,
"err": err,
}).Error("Failed to add the token's limiter")
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}

c.JSON(http.StatusCreated, gin.H{"limiter": limiter})
}

// DELETE /token/:namespace/:token/limit
func DeleteLimiter(c *gin.Context) {
logger := GetHTTPLogger(c)
pool, token := parseToken(c.Param("token"))
namespace := c.Param("namespace")
err := throttler.GetThrottler().Delete(pool, namespace, token)
if err != nil {
logger.WithFields(logrus.Fields{
"token": c.Param("token"),
"err": err,
}).Error("Failed to delete the token's limiter")
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"msg": "success"})
}

// GET /token/:namespace/:token/limit
func GetLimiter(c *gin.Context) {
pool, token := parseToken(c.Param("token"))
namespace := c.Param("namespace")
limiter := throttler.GetThrottler().Get(pool, namespace, token)
if limiter == nil {
c.JSON(http.StatusNotFound, nil)
} else {
c.JSON(http.StatusOK, limiter)
}
}

// PUT /token/:namespace/:token/limit
func SetLimiter(c *gin.Context) {
logger := GetHTTPLogger(c)
var limiter throttler.Limiter
if err := c.BindJSON(&limiter); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err})
return
}
if limiter.Read == 0 && limiter.Write == 0 {
c.JSON(http.StatusBadRequest, gin.H{"limiter": limiter})
return
}
if limiter.Interval == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "interval should be > 0"})
return
}

pool, token := parseToken(c.Param("token"))
namespace := c.Param("namespace")
err := throttler.GetThrottler().Set(pool, namespace, token, &limiter)
if err != nil {
logger.WithFields(logrus.Fields{
"token": c.Param("token"),
"limiter": limiter,
"err": err,
}).Error("Failed to set the token's limiter")
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"limiter": limiter})
}

// GET /version
func Version(c *gin.Context) {
c.IndentedJSON(http.StatusOK, gin.H{
Expand Down
109 changes: 109 additions & 0 deletions server/handlers/admin_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package handlers_test

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"testing"

"github.com/bitleak/lmstfy/auth"
"github.com/bitleak/lmstfy/server/handlers"
"github.com/bitleak/lmstfy/throttler"
"github.com/bitleak/lmstfy/uuid"
)

Expand Down Expand Up @@ -65,3 +68,109 @@ func TestDeleteToken(t *testing.T) {
t.Fatal("Expected token to be deleted")
}
}

func getLimiter(namespace, token string) (*throttler.Limiter, error) {
// force update limiters
throttler.GetThrottler().GetAll(true)
targetUrl := fmt.Sprintf("http://localhost/token/%s/%s/limit", namespace, token)
req, err := http.NewRequest("GET", targetUrl, nil)
if err != nil {
return nil, fmt.Errorf("failed to create the request, err: %s", err.Error())
}
c, e, resp := ginTest(req)
e.GET("/token/:namespace/:token/limit", handlers.GetLimiter)
e.HandleContext(c)
if resp.Code != http.StatusOK && resp.Code != http.StatusNotFound {
return nil, fmt.Errorf("http code expected %d, but got %d", http.StatusOK, resp.Code)
}
if resp.Code == http.StatusOK {
var limiter throttler.Limiter
json.Unmarshal(resp.Body.Bytes(), &limiter)
return &limiter, nil
}
return nil, nil
}

func addTokenLimit(namespace, token, limitStr string) error {
targetUrl := fmt.Sprintf("http://localhost/token/%s/%s/limit", namespace, token)
req, err := http.NewRequest("POST", targetUrl, bytes.NewReader([]byte(limitStr)))
if err != nil {
return fmt.Errorf("failed to create the request, err: %s", err.Error())
}
c, e, resp := ginTest(req)
e.POST("/token/:namespace/:token/limit", handlers.AddLimiter)
e.HandleContext(c)
if resp.Code != http.StatusCreated {
return fmt.Errorf("http code expected %d, but got %d", http.StatusOK, resp.Code)
}
return nil
}

func TestAddTokenLimiter(t *testing.T) {
limitStr := "{\"read\": 100, \"write\": 100, \"interval\":100}"
namespace := "ns-token"
tk := auth.GetTokenManager()
token, _ := tk.New("", "ns-token", uuid.GenUniqueID(), "token limiter")
if err := addTokenLimit(namespace, token, limitStr); err != nil {
t.Fatal(err)
}
limiter, err := getLimiter(namespace, token)
if err != nil {
t.Fatal(err.Error())
}
if limiter.Read != 100 && limiter.Write != 100 && limiter.Interval != 100 {
t.Fatalf("Invaild limiter's value, %v", limiter)
}
if err := addTokenLimit(namespace, token, limitStr); err == nil {
t.Fatal("duplicate token error was expected")
}
}

func TestSetTokenLimiter(t *testing.T) {
tk := auth.GetTokenManager()
token, _ := tk.New("", "ns-token", uuid.GenUniqueID(), "token limiter")
targetUrl := fmt.Sprintf("http://localhost/token/ns-token/%s/limit", token)
limitStr := "{\"read\": 100, \"write\": 100, \"interval\":100}"
req, err := http.NewRequest("PUT", targetUrl, bytes.NewReader([]byte(limitStr)))
if err != nil {
t.Fatalf("Failed to create the request, err: %s", err.Error())
}
c, e, resp := ginTest(req)
e.PUT("/token/:namespace/:token/limit", handlers.SetLimiter)
e.HandleContext(c)
if resp.Code != http.StatusOK {
t.Logf(resp.Body.String())
t.Fatalf("Failed to add the limit to the token, err: %v", err)
}
}

func TestDeleteTokenLimiter(t *testing.T) {
limitStr := "{\"read\": 100, \"write\": 100, \"interval\":100}"
namespace := "ns-token"
tk := auth.GetTokenManager()
token, _ := tk.New("", "ns-token", uuid.GenUniqueID(), "token limiter")
if err := addTokenLimit(namespace, token, limitStr); err != nil {
t.Fatal(err)
}

targetUrl := fmt.Sprintf("http://localhost/token/%s/%s/limit", namespace, token)
req, err := http.NewRequest("DELETE", targetUrl, nil)
if err != nil {
t.Fatalf("Failed to create the request, err: %s", err.Error())
}
c, e, resp := ginTest(req)
e.DELETE("/token/:namespace/:token/limit", handlers.DeleteLimiter)
e.HandleContext(c)
if resp.Code != http.StatusOK {
t.Logf(resp.Body.String())
t.Fatalf("Failed to add the limit to the token, err: %v", err)
}

limiter, err := getLimiter("ns-token", token)
if err != nil {
t.Fatal(err.Error())
}
if limiter != nil {
t.Fatal("the token's limiter was expected to be deleted")
}
}
3 changes: 2 additions & 1 deletion server/handlers/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/bitleak/lmstfy/auth"
"github.com/bitleak/lmstfy/config"
"github.com/bitleak/lmstfy/engine"
"github.com/gin-gonic/gin"
)
Expand All @@ -26,7 +27,7 @@ func parseToken(rawToken string) (pool, token string) {
if len(splits) == 2 {
return splits[0], splits[1]
}
return "default", rawToken
return config.DefaultPoolName, rawToken
}

func SetupQueueEngine(c *gin.Context) {
Expand Down
5 changes: 5 additions & 0 deletions server/handlers/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
redis_engine "github.com/bitleak/lmstfy/engine/redis"
"github.com/bitleak/lmstfy/helper"
"github.com/bitleak/lmstfy/server/handlers"
"github.com/bitleak/lmstfy/throttler"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -55,6 +57,9 @@ func setup() {
if err := auth.Setup(CONF); err != nil {
panic(fmt.Sprintf("Failed to setup auth module: %s", err))
}
if err := throttler.Setup(&CONF.AdminRedis, logger); err != nil {
panic(fmt.Sprintf("Failed to setup throttler module: %s", err))
}
handlers.SetupParamDefaults(CONF)
handlers.Setup(logger)
for _, poolConf := range CONF.Pool {
Expand Down

0 comments on commit 9308de4

Please sign in to comment.