Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

the http connection keep alive fail #1289

Closed
cxwshawn opened this issue Jan 26, 2015 · 3 comments
Closed

the http connection keep alive fail #1289

cxwshawn opened this issue Jan 26, 2015 · 3 comments

Comments

@cxwshawn
Copy link

Hey, guys, I want to import httpoutput upload speed, so I modified http_output.go codes:
func (o *HttpOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (err error) {
if or.Encoder() == nil {
return errors.New("Encoder must be specified.")
}

var (
    e        error
    outBytes []byte
)
inChan := or.InChan()
procs := o.SendProcs + o.AuxSendProcs
stopCh := make(chan bool, procs)

for i := 0; i < o.SendProcs; i++ {
    go func(inChan chan *pipeline.PipelinePack) {
        for pack := range inChan {
            outBytes, e = or.Encode(pack)

            if e != nil {
                pack.Recycle()
                or.LogError(e)
                continue
            }
            if outBytes == nil {
                pack.Recycle()
                continue
            }
            if e, errcode := o.request(or, outBytes); e != nil {
                or.LogError(e)
                if errcode == -1 {
                    h.PipelineConfig().Router().InChan() <- pack
                } else {
                    pack.Recycle()
                }
            } else {
                pack.Recycle()
            }
        }
        stopCh <- true
    }(inChan)
}

hour := 0
for i := 0; i < o.AuxSendProcs; i++ {
    go func(inChan chan *pipeline.PipelinePack) {
        for pack := range inChan {
            hour = time.Now().Hour()
            outBytes, e = or.Encode(pack)

            if e != nil {
                pack.Recycle()
                or.LogError(e)
                continue
            }
            if outBytes == nil {
                pack.Recycle()
                continue
            }
            if e, errcode := o.request(or, outBytes); e != nil {
                or.LogError(e)
                if errcode == -1 {
                    h.PipelineConfig().Router().InChan() <- pack
                } else {
                    pack.Recycle()
                }
            } else {
                pack.Recycle()
            }
            if hour < 1 || hour > 7 {
                time.Sleep(time.Millisecond * 100)
            }
        }
        stopCh <- true
    }(inChan)
}
for i := 0; i < procs; i++ {
    <-stopCh
}
// for pack := range inChan {
//  // tob, _ := json.Marshal(pack.Message)
//  // fmt.Println(string(tob))
//  outBytes, e = or.Encode(pack)
//  pack.Recycle()
//  if e != nil {
//      or.LogError(e)
//      continue
//  }
//  if outBytes == nil {
//      continue
//  }
//  if e = o.request(or, outBytes); e != nil {
//      or.LogError(e)
//  }
// }

return

}

but when I track the transmitting process, I found the http_output connection port changes all the time, so I doubt the keepalive is not functioning, is there any problem with this ? I have checked the client.go code, it says like this:
// The Client's Transport typically has internal state (cached TCP
// connections), so Clients should be reused instead of created as
// needed. Clients are safe for concurrent use by multiple goroutines.

so I can't find what's wrong with codes, anyone can help me ?

@ghost
Copy link

ghost commented Jan 26, 2015

I'm not sure, but I think the connection isn't reused because o.request() doesn't consume the response body for 200-class responses. Could you try modifying o.request like so, please?

// Line 140, plugins/http/http_output.go.
if resp, err = o.client.Do(req); err != nil {
    // ...
}
defer resp.Body.Close()
defer io.Copy(ioutil.Discard, resp.Body) // Always consume the response body.
if resp.StatusCode >= 400 {
    // ...
}
return

@cxwshawn
Copy link
Author

Hi, kitcambridge, thanks for you answer, but I have tried this way, it didn't work .

@cxwshawn
Copy link
Author

all right~, got the solution, reference golang/go#6785

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant