Skip to content

Commit

Permalink
Using worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu committed Nov 18, 2019
1 parent 40e1b09 commit c2f7fa8
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 31 deletions.
29 changes: 5 additions & 24 deletions filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ import (
"io"
"net/http"
"net/url"

workerpool "github.com/linxGnu/gumble/worker-pool"
)

// Filer client
type Filer struct {
base *url.URL
client *httpClient
workers *workerpool.Pool
base *url.URL
client *httpClient
}

// FilerUploadResult upload result which responsed from filer server. According to https://github.com/chrislusf/seaweedfs/wiki/Filer-Server-API.
Expand All @@ -27,23 +24,7 @@ type FilerUploadResult struct {

// NewFiler new filer with filer server's url
func NewFiler(u string, client *http.Client) (f *Filer, err error) {
base, err := parseURI(u)
if err != nil {
return
}

workers := createWorkerPool()

f = &Filer{
base: base,
client: newHTTPClient(client, workers),
workers: workers,
}

// start underlying workers
f.workers.Start()

return
return newFiler(u, newHTTPClient(client))
}

func newFiler(u string, client *httpClient) (f *Filer, err error) {
Expand All @@ -66,8 +47,8 @@ var dirHeader = map[string]string{

// Close underlying daemons.
func (f *Filer) Close() (err error) {
if f.workers != nil {
f.workers.Stop()
if f.client != nil {
err = f.client.Close()
}
return
}
Expand Down
10 changes: 8 additions & 2 deletions http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@ type httpClient struct {
workers *workerpool.Pool
}

func newHTTPClient(client *http.Client, workers *workerpool.Pool) *httpClient {
func newHTTPClient(client *http.Client) *httpClient {
c := &httpClient{
client: client,
workers: workers,
workers: createWorkerPool(),
}
c.workers.Start()
return c
}

func (c *httpClient) Close() (err error) {
c.workers.Stop()
return
}

func (c *httpClient) get(base *url.URL, path string, args url.Values, header map[string]string) (body []byte, statusCode int, err error) {
req, err := http.NewRequest(http.MethodGet, encodeURI(*base, path, args), nil)
if err == nil {
Expand Down
14 changes: 9 additions & 5 deletions seaweed.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,10 @@ func NewSeaweed(masterURL string, filers []string, chunkSize int64, client *http
return
}

workers := createWorkerPool()

c = &Seaweed{
master: u,
client: newHTTPClient(client, workers),
client: newHTTPClient(client),
chunkSize: chunkSize,
workers: workers,
}

if len(filers) > 0 {
Expand All @@ -116,21 +113,28 @@ func NewSeaweed(masterURL string, filers []string, chunkSize int64, client *http
var filer *Filer
filer, err = newFiler(filers[i], c.client)
if err != nil {
_ = c.Close()
return
}
c.filers = append(c.filers, filer)
}
}

// start underlying workers
c.workers = createWorkerPool()
c.workers.Start()

return
}

// Close underlying daemons.
func (c *Seaweed) Close() (err error) {
c.workers.Stop()
if c.workers != nil {
c.workers.Stop()
}
if c.client != nil {
err = c.client.Close()
}
return
}

Expand Down

0 comments on commit c2f7fa8

Please sign in to comment.