Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: deadlock after disconnection #131

Closed
zhangdaoling opened this issue Apr 7, 2015 · 2 comments
Closed

consumer: deadlock after disconnection #131

zhangdaoling opened this issue Apr 7, 2015 · 2 comments
Labels

Comments

@zhangdaoling
Copy link

I use go-nsq to consum msg. The HandlerMessage will return nil or err if needed. I am no sure where is the problem, please help.

this is my step:
1: set MaxAttmepts = 2; ConnectToNSQLookupds is (127.0.0.2:4161 and 127.0.0.1:4161); 127.0.0.1 is real nsqlookupd, 127.0.0.2 is nothing
2: produce 1 msg, HandlerMessage return nil, everything is ok
3: produce 1 msg, HandlerMessage return err, everything is ok
4: After MaxAttempts, I kill nsqlookupd and nsqd. Restart nsqlookupd and nsqdm
5: produce 1 msg, consumer can not get any msg. I check the admin-web and msg are saved in nsq.

if handler return nil, everything is ok.

this is debug log:

2015/04/07 18:02:22 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:02:22 DBG    1 [test/ch] starting Handler
2015/04/07 18:02:22 INF    1 [test/ch] (zhangdl-pc:4150) connecting to nsqd
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false}
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 0)
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 1)
2015/04/07 18:02:22 DBG    1 [test/ch] (zhangdl-pc:4150) FIN 081fb125edf2e000
2015/04/07 18:02:47 DBG    1 [test/ch] (zhangdl-pc:4150) sending RDY 1 (0 remain from last RDY 1)
2015/04/07 18:02:47 ERR    1 [test/ch] Handler returned error (tsdb retrun err code: 400) for msg 081fb125edf2e001
2015/04/07 18:02:47 DBG    1 [test/ch] (zhangdl-pc:4150) REQ 081fb125edf2e001
2015/04/07 18:02:47 WRN    1 [test/ch] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
2015/04/07 18:02:49 WRN    1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:02:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:03:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:03:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:03:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:04:17 ERR    1 [test/ch] Handler returned error (tsdb retrun err code: 400) for msg 081fb125edf2e001
2015/04/07 18:04:17 DBG    1 [test/ch] (zhangdl-pc:4150) REQ 081fb125edf2e001
2015/04/07 18:04:17 WRN    1 [test/ch] backing off for 4.0000 seconds (backoff level 2), setting all to RDY 0
2015/04/07 18:04:21 WRN    1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:04:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:04:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:04:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:05:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:05:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:05:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:06:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:06:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:06:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:07:17 WRN    1 [test/ch] msg 081fb125edf2e001 attempted 3 times, giving up
2015/04/07 18:07:17 DBG    1 [test/ch] (zhangdl-pc:4150) FIN 081fb125edf2e001
2015/04/07 18:07:17 WRN    1 [test/ch] backing off for 2.0000 seconds (backoff level 1), setting all to RDY 0
2015/04/07 18:07:19 WRN    1 [test/ch] (zhangdl-pc:4150) backoff timeout expired, sending RDY 1
2015/04/07 18:07:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:07:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:07:52 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:08:22 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:08:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:08:27 ERR    1 [test/ch] (zhangdl-pc:4150) IO error - EOF 
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) beginning close
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) readLoop exiting
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) breaking out of writeLoop
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) writeLoop exiting
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) finished draining, cleanup exiting
2015/04/07 18:08:27 INF    1 [test/ch] (zhangdl-pc:4150) clean close complete
2015/04/07 18:08:27 WRN    1 [test/ch] there are 0 connections left alive
2015/04/07 18:08:27 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:08:27 ERR    1 [test/ch] error querying nsqlookupd (http://127.0.0.2:4161/lookup?topic=test) - Get http://127.0.0.2:4161/lookup?topic=test: dial tcp 127.0.0.2:4161: connection refused 
2015/04/07 18:09:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:09:25 INF    1 [test/ch] (zhangdl-pc:4150) connecting to nsqd
2015/04/07 18:09:25 DBG    1 [test/ch] (zhangdl-pc:4150) IDENTIFY response: &{MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false AuthRequired:false}
2015/04/07 18:09:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:10:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:10:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:10:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:11:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:11:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:11:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:12:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:12:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:12:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:13:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:13:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:13:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:14:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.2:4161/lookup?topic=test
2015/04/07 18:14:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:14:55 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received
2015/04/07 18:15:25 INF    1 [test/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2015/04/07 18:15:25 DBG    1 [test/ch] (zhangdl-pc:4150) heartbeat received

this is my code:

package main

import (
    "bytes"
    "config"
    "errors"
    "flag"
    "fmt"
    "github.com/bitly/go-nsq"
    "github.com/bitly/go-simplejson"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "plog"
    "strings"
    //"time"
)

type MyTestHandler struct {
    q        *nsq.Consumer
    tsdbAddr string
}

//handle msg
func (h *MyTestHandler) HandleMessage(message *nsq.Message) error {
    //send to opentsdb
    httpclient := &http.Client{}
    a := message.Body
    req, _ := http.NewRequest("POST", h.tsdbAddr, bytes.NewBuffer(a))
    response, err := httpclient.Do(req)
    if err != nil {
        plog.Errorf(" http_to_tsdb_err, MsgId: %s, TsAddr: %s, Attempts %d, err(%s)", message.ID, h.tsdbAddr, message.Attempts, err)
        return err
    }
    defer response.Body.Close()

    //check tsdb resp
    code := response.StatusCode
    if code != 204 && code != 200 {
        body, _ := ioutil.ReadAll(response.Body)
        respdata, err := simplejson.NewJson(body)
        if err != nil {
            plog.Errorf(" tsdb_resp_json_err, MsgId: %s, TsAddr: %s, Attempts %d, err(%s)", message.ID, h.tsdbAddr, message.Attempts, err)
            return err
        }
        respMessage, _ := respdata.Get("error").Get("message").String()
        respDetails, _ := respdata.Get("error").Get("details").String()
        err = errors.New(fmt.Sprintf("tsdb retrun err code: %d", code))
        plog.Errorf(" tsdb_resp_code_err, MsgId: %s, TsAddr: %s, Attempts %d, code: %d, message: %s, details: %s", message.ID, h.tsdbAddr, message.Attempts, code, respMessage, strings.Replace(respDetails, "\n", " ", -1))
        return err
    }

    plog.Infof(" success_to_tsdb, MsgId: %s, TsAddr: %s, Attempts %d", message.ID, h.tsdbAddr, message.Attempts)
    return nil
}

func main() {
    //get config
    var inputConfigFile = flag.String("c", "./xx.conf", "nsqlookupd ip and port")
    flag.Parse()
    proxyConfig, err := config.New(*inputConfigFile)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println(proxyConfig)
    fmt.Println("NsqlookupAddr: ", proxyConfig.NsqlookupAddr)
    fmt.Println("NsqTopic: ", proxyConfig.NsqTopic)
    fmt.Println("NsqChan: ", proxyConfig.NsqChan)
    fmt.Println("TsdbAddr: ", proxyConfig.TsdbAddr)
    fmt.Println("AccessLog: ", proxyConfig.AccessLog)
    fmt.Println("ErrorLog: ", proxyConfig.ErrorLog)
    fmt.Println("GoNsqLibLog: ", proxyConfig.GoNsqLibLog)
    fmt.Println("HandlerMaxRetry: ", proxyConfig.MaxAttempts)
    fmt.Println("MaxFlight: ", proxyConfig.MaxFlight)
    fmt.Println("HeartbeatInterval: ", proxyConfig.HeartbeatInterval)
    fmt.Println("ReadTimeout: ", proxyConfig.ReadTimeout)
    fmt.Println("LookupdPollInterval: ", proxyConfig.LookupdPollInterval)
    fmt.Println("RequeueFlag: ", proxyConfig.RequeueFlag)

    //open log
    plog.StartLog(proxyConfig.AccessLog, proxyConfig.ErrorLog)

    //check config
    if proxyConfig.MaxAttempts <= 0 {
        fmt.Printf("err: HandlerMaxRetry: %d <= 0", proxyConfig.MaxAttempts)
        return
    }
    if proxyConfig.MaxFlight <= 0 {
        fmt.Printf("err: MaxFlight: %d <= 0", proxyConfig.MaxFlight)
        return
    }
    if proxyConfig.ReadTimeout < proxyConfig.HeartbeatInterval {
        fmt.Printf("err: ReadTimeout %d < HeartbeatInterval %d", proxyConfig.ReadTimeout, proxyConfig.HeartbeatInterval)
        return
    }
    if proxyConfig.LookupdPollInterval < proxyConfig.HeartbeatInterval {
        fmt.Printf("err: LookupdPollInterval %d < HeartbeatInterval %d", proxyConfig.LookupdPollInterval, proxyConfig.HeartbeatInterval)
        return
    }

    //set config
    config := nsq.NewConfig()
    config.MaxInFlight = int(proxyConfig.MaxFlight)
    config.MaxAttempts = uint16(proxyConfig.MaxAttempts)
    //config.HeartbeatInterval = time.Duration(proxyConfig.HeartbeatInterval * 1000000)
    //config.ReadTimeout = time.Duration(proxyConfig.ReadTimeout * 1000000)
    //config.LookupdPollInterval = time.Duration(proxyConfig.LookupdPollInterval * 1000000)

    //consumer
    q, _ := nsq.NewConsumer(proxyConfig.NsqTopic, proxyConfig.NsqChan, config)
    //set nsq lib log
    nsqLibLog, err_log := os.OpenFile(proxyConfig.GoNsqLibLog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if err_log != nil {
        fmt.Println(err_log)
        return
    }
    q.SetLogger(log.New(nsqLibLog, "", log.Ldate|log.Ltime), 0)
    fmt.Println(config)
    //start
    h := &MyTestHandler{
        q:        q,
        tsdbAddr: "http://" + proxyConfig.TsdbAddr + "/api/put",
    }
    q.AddHandler(h)
    err = q.ConnectToNSQLookupds(proxyConfig.NsqlookupAddr)
    if err != nil {
        plog.Error("ConnectToNSQLookupd err:", err)
        os.Exit(-1)
    }

    <-q.StopChan
}
@jehiah
Copy link
Member

jehiah commented Apr 7, 2015

@zhangdaoling thanks for your detailed writeup. Can you confirm that you are able to reproduce this reliably or not? Also can you share what go-nsq version (or commit) you are using?

If you are able to reproduce this a copy of stats from the nsqd /stats http endpoint after your client has stopped receiving new messages would be helpful. That will help us understand the state of the connection from the NSQD side.

@jehiah jehiah added the bug label Apr 7, 2015
@mreiferson mreiferson changed the title consumer can not get date after restart nsqdlookup and nsqd if HandlerMessage return err consumer: deadlock after disconnection Apr 7, 2015
@mreiferson
Copy link
Member

see #132, thanks for the report!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants