Skip to content

Commit

Permalink
Add get pagination (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
jabielecki committed Feb 27, 2024
1 parent fd255f7 commit 4bb3e0a
Showing 1 changed file with 132 additions and 10 deletions.
142 changes: 132 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)

const DefaultMaxRetries int = 3
Expand All @@ -31,8 +32,7 @@ var SynchronousApiEndpoints = [...]string{
}

// Client is an HTTP Catalyst Center client.
// Use cc.NewClient to initiate a client.
// This will ensure proper cookie handling and processing of modifiers.
// Always use NewClient to construct it, otherwise requests will panic.
type Client struct {
// HttpClient is the *http.Client used for API requests.
HttpClient *http.Client
Expand All @@ -54,8 +54,10 @@ type Client struct {
BackoffDelayFactor float64
// Maximum async operations wait time
DefaultMaxAsyncWaitTime int
// Authentication mutex
// Authentication mutex ensures that API login is non-concurrent
AuthenticationMutex *sync.Mutex
readers chan int
writers chan int
}

// NewClient creates a new Catalyst Center HTTP client.
Expand Down Expand Up @@ -85,8 +87,30 @@ func NewClient(url, usr, pwd string, mods ...func(*Client)) (Client, error) {
BackoffDelayFactor: DefaultBackoffDelayFactor,
DefaultMaxAsyncWaitTime: DefaultDefaultMaxAsyncWaitTime,
AuthenticationMutex: &sync.Mutex{},
readers: make(chan int),
writers: make(chan int),
}

go func() {
var readerNum, writerNum int

for {
select {
case delta := <-client.readers:
readerNum += delta
for readerNum != 0 {
readerNum += <-client.readers
}

case delta := <-client.writers:
writerNum += delta
for writerNum != 0 {
writerNum += <-client.writers
}
}
}
}()

for _, mod := range mods {
mod(&client)
}
Expand Down Expand Up @@ -177,6 +201,15 @@ func (client *Client) Do(req Req) (Res, error) {

var res Res

defer func() {
log.Printf("[DEBUG] Exit from Do method: %s, %s", req.HttpReq.Method, req.HttpReq.URL)
}()

if req.HttpReq.Method == "DELETE" || req.HttpReq.Method == "POST" || req.HttpReq.Method == "PUT" {
client.writers <- +1
defer func() { client.writers <- -1 }()
}

for attempts := 0; ; attempts++ {
req.HttpReq.Body = io.NopCloser(bytes.NewBuffer(body))
if req.LogPayload {
Expand All @@ -189,7 +222,6 @@ func (client *Client) Do(req Req) (Res, error) {
if err != nil {
if ok := client.Backoff(attempts); !ok {
log.Printf("[ERROR] HTTP Connection error occured: %+v", err)
log.Printf("[DEBUG] Exit from Do method")
return Res{}, err
} else {
log.Printf("[ERROR] HTTP Connection failed: %s, retries: %v", err, attempts)
Expand All @@ -202,7 +234,6 @@ func (client *Client) Do(req Req) (Res, error) {
if err != nil {
if ok := client.Backoff(attempts); !ok {
log.Printf("[ERROR] Cannot decode response body: %+v", err)
log.Printf("[DEBUG] Exit from Do method")
return Res{}, err
} else {
log.Printf("[ERROR] Cannot decode response body: %s, retries: %v", err, attempts)
Expand All @@ -215,12 +246,10 @@ func (client *Client) Do(req Req) (Res, error) {
}

if httpRes.StatusCode >= 200 && httpRes.StatusCode <= 299 {
log.Printf("[DEBUG] Exit from Do method")
break
} else {
if ok := client.Backoff(attempts); !ok {
log.Printf("[ERROR] HTTP Request failed: StatusCode %v", httpRes.StatusCode)
log.Printf("[DEBUG] Exit from Do method")
return res, fmt.Errorf("HTTP Request failed: StatusCode %v", httpRes.StatusCode)
} else if httpRes.StatusCode == 429 {
retryAfter := httpRes.Header.Get("Retry-After")
Expand All @@ -240,7 +269,6 @@ func (client *Client) Do(req Req) (Res, error) {
continue
} else {
log.Printf("[ERROR] HTTP Request failed: StatusCode %v", httpRes.StatusCode)
log.Printf("[DEBUG] Exit from Do method")
return res, fmt.Errorf("HTTP Request failed: StatusCode %v", httpRes.StatusCode)
}
}
Expand Down Expand Up @@ -319,17 +347,111 @@ func (client *Client) WaitTask(req *Req, res *Res) (Res, error) {
return *res, nil
}

// Get makes a GET request and returns a GJSON result.
// Results will be the raw data structure as returned by Catalyst Center
var maxItems = 500

// Get makes a GET request and returns a gjson result.
// Before GET is issued, the func ensures that no writing (DELETE/POST/PUT) would run concurrently with it
// on the entire client (on any path).
//
// Results will be the raw data structure as returned by Catalyst Center, except when it contains an array named
// "response" with 500 items. In that case the func continues with more GET requests until it can return a concatenation
// of all the retrieved items from all the pages.
//
// With multiple GETs, the concurrency protection is uninterrupted from the first page until the last page.
// Protection from concurrent POST or DELETE helps against boundary items being shifted between the pages.
// Protection from concurrent PUT helps against items moving between pages, when sort becomes unstable due to
// modification of items.
// Unfortunately, the protection does not cover any requests from other clients/processes/systems.
func (client *Client) Get(path string, mods ...func(*Req)) (Res, error) {
client.readers <- +1
defer func() { client.readers <- -1 }()

offset := 1
gather := gatherer{}
gather.WriteByte('[')

for {
raw, err := client.get(pathWithOffset(path, offset), mods...)
if err != nil {
return raw, err
}

response := raw.Get("response")
if !response.Exists() {
return raw, err
}

if !response.IsArray() {
if offset != 1 {
return gjson.Parse("null"), fmt.Errorf("expected `response` to be an array, but got: %s", response.Type)
}
return raw, err
}

items := response.Array()

if len(items) != maxItems {
if offset == 1 {
return raw, err
} else {
gather.Grow(len(raw.Raw) - 1) // hot path optimization
gather.GatherJSON(items, ',')
gather.WriteByte(']')

s, err := sjson.SetRawBytes([]byte(raw.Raw), "response", gather.Bytes())
if err != nil {
return gjson.Parse("null"), err
}

log.Printf("[DEBUG] All GET pages combined: %s", s)

return gjson.ParseBytes(s), nil
}
}

gather.GatherJSON(items, ',')
offset += len(items)
}
}

// get is like Get but without pagination.
func (client *Client) get(path string, mods ...func(*Req)) (Res, error) {
req := client.NewReq("GET", path, nil, mods...)
err := client.Authenticate()
if err != nil {
return Res{}, err
}

return client.Do(req)
}

func pathWithOffset(path string, offset int) string {
if offset <= 1 {
return path
}

sep := "?"
if strings.Contains(path, sep) {
sep = "&"
}

return fmt.Sprintf("%s%soffset=%d", path, sep, offset)
}

type gatherer struct {
bytes.Buffer
}

// GatherJSON appends additional JSON bytes. It puts a separator character between items.
func (g *gatherer) GatherJSON(items []gjson.Result, separator byte) {
for _, item := range items {
if len(g.Bytes()) > 1 {
_ = g.WriteByte(separator)
}
_, _ = g.Write([]byte(item.Raw))
}
}

// Delete makes a DELETE request.
func (client *Client) Delete(path string, mods ...func(*Req)) (Res, error) {
req := client.NewReq("DELETE", path, nil, mods...)
Expand Down

0 comments on commit 4bb3e0a

Please sign in to comment.