Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: sometimes got heartbeat but not message #206

Closed
slayercat opened this issue Mar 27, 2017 · 20 comments
Closed

consumer: sometimes got heartbeat but not message #206

slayercat opened this issue Mar 27, 2017 · 20 comments
Labels

Comments

@slayercat
Copy link

slayercat commented Mar 27, 2017

1. env

nsqd --version
nsqd v1.0.0-compat (built w/go1.8)

go-nsq version
commit b9762cd
Tue Feb 14 16:13:23 2017 -0800

# go version
go version go1.8 linux/amd64

2. what do i meet

  • Program runed, but channel not created at http://127.0.0.1:4151/stats
  • Sometimes this will appear, it always appear at program starts. And when appear, program will never( wait for 2days) got any message.
  • It always appear at program starts. When program successfully starts, seems won't have a chance to reproduct.
  • I have serveral topic to listen. when it appears in one topic, other topic may not affected.
  • When set go-nsq debug on, program will print [d.c2.cp#ephemeral/detect#ephemeral] (127.0.0.3:4150) heartbeat received. but seems won't got any other nsq message.
# curl -XGET http://127.0.0.1:4151/stats; 
[root@18-190 ptd]# curl -XGET http://127.0.0.1:4151/stats; 
nsqd v1.0.0-compat (built w/go1.8)
start_time 2017-03-27T19:07:02+08:00
uptime 45m7.260003591s

Health: OK

   [.raw.raw#ephemeral] depth: 0     be-depth: 0     msgs: 6656     e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 6656     e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 6656     re-q: 0        msgs: 6656     connected: 8s

   [d.c2.cp#ephemeral] depth: 10000 be-depth: 0     msgs: 791227   e2e%: 

   [d.c2.cp.done#ephemeral] depth: 0     be-depth: 0     msgs: 0        e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 0        e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 0        re-q: 0        msgs: 0        connected: 8s

   [d.c2.domain#ephemeral] depth: 10000 be-depth: 0     msgs: 208218   e2e%: 

...

code snip

func nsqSubscribe(addr string, topic string, channel string, hdlr nsq.HandlerFunc) error {
    consumer, err := nsq.NewConsumer(topic, channel, nsq.NewConfig())
    if err != nil {
        print("new consumer error: ", err, "\n")
        time.Sleep(1 * time.Second) //wait 1s
        panic(err)
    }   
    consumer.AddHandler(hdlr)
    err = consumer.ConnectToNSQD(addr)
    if err != nil {
        print("connect nsqd error: ", err, "\n")
        time.Sleep(1 * time.Second) //wait 1s
        panic(err)
    }   
    _ = <-consumer.StopChan
    panic("nsq conn dead topic=" + topic + " channel=" + channel)
    return nil 
}

func main(){
producer, err := nsq.NewProducer(nsqConf.Local.Addr, nsq.NewConfig())
    if err != nil {
        panic(err)
    }
    go func() {
        detector := new(c2.C2Sdk)

        if detectorEnabled {
            err = detector.Init()
            if err != nil {
                fmt.Println("Failed to init c2")
                panic(err)
            }
        }
        nsqSubscribe(nsqConf.Local.Addr, "d.c2.cp#ephemeral", "detect#ephemeral",
            nsq.HandlerFunc(func(message *nsq.Message) error {
                return handler_scan(message, detector,
                    producer, unmarshal_url, scan_url,
                    "d.c2.cp.done#ephemeral")
            }))
    }()
go func() {
        detector := new(c2.C2Sdk)

        if detectorEnabled {
            err = detector.Init()
            if err != nil {
                fmt.Println("Failed to init c2")
                panic(err)
            }
        } 
        nsqSubscribe(nsqConf.Local.Addr, "d.c2.url#ephemeral", "detect#ephemeral",
            nsq.HandlerFunc(func(message *nsq.Message) error {
                return handler_scan(message, detector,
                    producer, unmarshal_url, scan_url,
                    "d.c2.url.done#ephemeral")
            }))
    }()

.....

}

3. how to reproduct

  • set program connect to a busy nsq server
  • start & restart it for serveral times. ( 5-10 times). when not repoduct, retry.
@slayercat
Copy link
Author

did some dig and find redistributeRDY() always returns at

if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
    return
}

@slayercat
Copy link
Author

slayercat commented Mar 27, 2017

Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    4 [d.c2.cp#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    3 [d.c2.domain#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:43 18-190 c2detector[23104]: 2017/03/27 20:35:43 INF    5 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:35:48 18-190 c2detector[23104]: 2017/03/27 20:35:48 INF    2 [d.c2.ip#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true

func (r *Consumer) redistributeRDY() {
    r.log(LogLevelInfo, "in func rdyLoop redistributeRDY")
    if r.inBackoffTimeout() {
        r.log(LogLevelInfo, "rdyLoop redistributeRDY: inBackoffTimeout")
        return
    }    


    // if an external heuristic set needRDYRedistributed we want to wait 
    // until we can actually redistribute to proceed
    conns := r.conns()
    if len(conns) == 0 {
        r.log(LogLevelInfo, "rdyLoop redistributeRDY: conns=0")
        return
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: conns!=0")

    maxInFlight := r.getMaxInFlight()
    if len(conns) > int(maxInFlight) {
        r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
            len(conns), maxInFlight)
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 1111")

    if r.inBackoff() && len(conns) > 1 {
        r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 2222")

    if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) { 
        r.log(LogLevelInfo, "rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true")
        return
    }    

    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 3333")

@slayercat
Copy link
Author

r.needRDYRedistributed is 0

@slayercat
Copy link
Author

    maxInFlight := r.getMaxInFlight()
    r.log(LogLevelInfo, "count max in fight=%d   count conns= %d",int(maxInFlight), len(conns))
    if len(conns) > int(maxInFlight) {
        r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
            len(conns), maxInFlight)
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 1111")

    r.log(LogLevelInfo, "in backoff? %s", r.inBackoff())
    if r.inBackoff() && len(conns) > 1 {
        r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
        atomic.StoreInt32(&r.needRDYRedistributed, 1)
    }    
    r.log(LogLevelInfo, "rdyLoop redistributeRDY: 2222")
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] in func rdyLoop redistributeRDY
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: conns!=0
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] count max in fight=1   count conns= 1
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 1111
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] in backoff? %!s(bool=false)
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: 2222
Mar 27 20:57:58 18-190 c2detector[29336]: 2017/03/27 20:57:58 INF    4 [d.c2.url#ephemeral/detect#ephemeral] rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true --> r.needRDYRedistributed0

@slayercat
Copy link
Author

seems any reproducted channel will not call updateRDY for any time. digging why.

@ploxiln
Copy link
Member

ploxiln commented Mar 27, 2017

What do you mean by "reproducted"?

@slayercat
Copy link
Author

sorry for that, @ploxiln . I mean when this issue appears. the associated consumer will not call function
updateRDY. I'm still digging it.

@slayercat
Copy link
Author

When I make nsqd log verbose , I saw channel created. but when curl http://127.0.0.1:4151/stats. the target channel not exists, and the go-nsq consumer will not get any message either.

Mar 28 19:19:13 18-190 nsqd[935]: [nsqd] 2017/03/28 19:19:13.084582 TOPIC(d.c2.cp#ephemeral): new channel(detect#ephemeral)
Mar 28 19:19:13 18-190 nsqd[935]: [nsqd] 2017/03/28 19:19:13.189118 TOPIC(d.c2.cp#ephemeral): created


[root@18-190 ptd]# date;curl -s -XGET http://127.0.0.1:4151/stats
Tue Mar 28 19:25:18 CST 2017
nsqd v1.0.0-compat (built w/go1.8)
start_time 2017-03-27T19:07:02+08:00
uptime 24h18m16.376911107s

Health: OK

   [.raw.raw#ephemeral] depth: 0     be-depth: 0     msgs: 534      e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 534      e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 534      re-q: 0        msgs: 534      connected: 7s

   [d.c2.cp#ephemeral] depth: 10000 be-depth: 0     msgs: 41524    e2e%: 

   [d.c2.cp.done#ephemeral] depth: 0     be-depth: 0     msgs: 0        e2e%: 
      [processor#ephemeral      ] depth: 0     be-depth: 0     inflt: 0    def: 0    re-q: 0     timeout: 0     msgs: 0        e2e%: 
        [V2 18-190               ] state: 3 inflt: 0    rdy: 1    fin: 0        re-q: 0        msgs: 0        connected: 7s


@slayercat
Copy link
Author



[root@18-190 ptd]# netstat -anop|grep c2
tcp 0 0 127.0.0.1:38372 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
tcp 0 0 127.0.0.1:38370 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
tcp 0 0 127.0.0.1:38368 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
tcp 0 0 127.0.0.1:38374 127.0.0.3:4150 ESTABLISHED 21601/c2detector off (0.00/0/0)
unix 3 [ ] STREAM CONNECTED 360495996 21601/c2detector
[root@18-190 ptd]# cat nsq.log |grep 38368
Mar 28 20:49:17 18-190 nsqd[16733]: [nsqd] 2017/03/28 20:49:17.838368 [127.0.0.1:53812] state rdy: 1 inflt: 0
Mar 28 20:51:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:51:47.325215 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:51:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:51:47.325381 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:51:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:51:47.325390 Exec client=127.0.0.1:38368
Mar 28 20:52:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:17.325252 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:52:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:17.325391 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:52:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:17.325399 Exec client=127.0.0.1:38368
Mar 28 20:52:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:47.325343 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:52:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:47.326081 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:52:47 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:52:47.326090 Exec client=127.0.0.1:38368
Mar 28 20:53:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:53:17.325381 [127.0.0.1:38368] state rdy: 1 inflt: 0
Mar 28 20:53:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:53:17.325625 PROTOCOL(V2): [127.0.0.1:38368] [NOP]
Mar 28 20:53:17 18-190 nsqd[21189]: [nsqd] 2017/03/28 20:53:17.325634 Exec client=127.0.0.1:38368

when got command NOP, conn state is stateSubscribed in nsqd..

@slayercat
Copy link
Author

Seems every connection registed self to channel.clients using func (c *Channel) AddClient(clientID int64, client Consumer). even when issue appears.

But when print channel.clients in nsq, the queue is gone.

[root@18-190 ]# netstat -anop|grep c2
tcp        0      0 127.0.0.1:50312         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)
tcp        0      0 127.0.0.1:50314         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)
tcp        0      0 127.0.0.1:50308         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)
tcp        0      0 127.0.0.1:50310         127.0.0.3:4150          ESTABLISHED 12338/c2detector     off (0.00/0/0)


[root@18-190 ]# cat shit |grep 312
CHANNEL(detect#ephemeral): topic d.c2.ip#ephemeral add client 127.0.0.1:50312 (id=37) not exists. done
[root@18-190 ]# cat shit |grep 314
CHANNEL(detect#ephemeral): topic d.c2.domain#ephemeral add client 127.0.0.1:50314 (id=38) not exists. done
[root@18-190 ]# cat shit |grep 308
CHANNEL(detect#ephemeral): topic d.c2.cp#ephemeral add client 127.0.0.1:50308 (id=35) not exists. done
[root@18-190 ]# cat shit |grep 310
CHANNEL(detect#ephemeral): topic d.c2.url#ephemeral add client 127.0.0.1:50310 (id=36) not exists. done

[root@18-190 ptd]# grep 50308 nsq.log  -C 4
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344281 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.ip#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344418 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.cp#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344534 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.cp#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.344651 PROTOCOL(V2): [127.0.0.1:50362] [PUB d.c2.domain#ephemeral]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458219 [127.0.0.1:50308] state rdy:    1 inflt:    0
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458315 PROTOCOL(V2): [127.0.0.1:50308] [NOP]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458333 NOP: client=127.0.0.1:50308(id=%!d(string=18-190)), status=3
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.458344 NOP: topicname=d.c2.url#ephemeral. clients=127.0.0.1:50310

^^^^^^

Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.464984 [127.0.0.1:50310] state rdy:    1 inflt:    0
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.465069 PROTOCOL(V2): [127.0.0.1:50310] [NOP]
Mar 28 22:22:00 18-190 nsqd[12110]: [nsqd] 2017/03/28 22:22:00.465079 NOP: client=127.0.0.1:50310(id=%!d(string=18-190)), status=3


@ploxiln
Copy link
Member

ploxiln commented Mar 28, 2017

This sounds like a race in creating ephemeral channels - like the channel is cleaned-up before the client is fully connected (because ephemeral channels with no connections are removed). (But I haven't really investigated myself yet.)

@slayercat
Copy link
Author

thanks @ploxiln .

I've printed address of Channel.clients, seems it's differ in Channel.AddClient and protocolV2.NOP.

It's really likely as you said.

CHANNEL(detect#ephemeral): topic d.c2.url#ephemeral add client 127.0.0.1:53128 (id=27) not exists. done. clientsaddr=0xc8200aa870 
CHANNEL(detect#ephemeral): topic d.c2.domain#ephemeral add client 127.0.0.1:53130 (id=28) not exists. done. clientsaddr=0xc82019a2d0 
CHANNEL(detect#ephemeral): topic d.c2.cp#ephemeral add client 127.0.0.1:53134 (id=30) not exists. done. clientsaddr=0xc8209c63f0 
CHANNEL(detect#ephemeral): topic d.c2.ip#ephemeral add client 127.0.0.1:53132 (id=29) not exists. done. clientsaddr=0xc8204fb290 

[root@18-190 ptd]# grep '0xc8204fb290' nsq.log|tail -3
Mar 29 10:47:00 18-190 nsqd[27330]: [nsqd] 2017/03/29 10:47:00.101482 NOP: topicname=d.c2.ip#ephemeral. clientsaddr=0xc8204fb290
Mar 29 10:47:00 18-190 nsqd[27330]: [nsqd] 2017/03/29 10:47:00.114756 NOP: topicname=d.c2.ip#ephemeral. clientsaddr=0xc8204fb290
Mar 29 10:47:00 18-190 nsqd[27330]: [nsqd] 2017/03/29 10:47:00.115416 NOP: topicname=d.c2.ip#ephemeral. clientsaddr=0xc8204fb290
[root@18-190 ptd]# grep '0xc8209c63f0' nsq.log|tail -3
[root@18-190 ptd]# grep '0xc82019a2d0' nsq.log|tail -3
[root@18-190 ptd]# grep '0xc8200aa870' nsq.log|tail -3

@mreiferson
Copy link
Member

mreiferson commented Mar 29, 2017

Seems like specifically a race in nsqd when an ephemeral channel's last client disconnects and the cleanup process is triggered and simultaneously a client begins connecting and subscribing.

@mreiferson
Copy link
Member

@slayercat thanks for the report — at this point it seems like a bug in nsqd and we should migrate this issue over to that repo.

@slayercat
Copy link
Author

slayercat commented Mar 29, 2017

thanks, @mreiferson .

any workaround here? like mask the cleanup process?

@mreiferson
Copy link
Member

I suspect that the fix in nsqd will be to somehow lock around cleanup so that a new client cannot be added until completion.

@slayercat
Copy link
Author

slayercat commented Mar 29, 2017

I masked delete callback of channel & topic. and it seems work for me.

// RemoveClient removes a client from the Channel's client list
func (c *Channel) RemoveClient(clientID int64) {
    c.Lock()
    defer c.Unlock()

    _, ok := c.clients[clientID]
    if !ok {
        return
    }   
    delete(c.clients, clientID)

    if len(c.clients) == 0 && c.ephemeral == true {
        //go c.deleter.Do(func() { c.deleteCallback(c) })
        ////////mask for https://github.com/nsqio/go-nsq/issues/206
    }   
}
// DeleteExistingChannel removes a channel from the topic only if it exists
func (t *Topic) DeleteExistingChannel(channelName string) error {
    t.Lock()
    channel, ok := t.channelMap[channelName]
    if !ok {
        t.Unlock()
        return errors.New("channel does not exist")
    }
    delete(t.channelMap, channelName)
    // not defered so that we can continue while the channel async closes
    numChannels := len(t.channelMap)
    t.Unlock()

    t.ctx.nsqd.logf("TOPIC(%s): deleting channel %s", t.name, channel.name)

    // delete empties the channel before closing
    // (so that we dont leave any messages around)
    channel.Delete()

    // update messagePump state
    select {
    case t.channelUpdateChan <- 1:
    case <-t.exitChan:
    }   

    if numChannels == 0 && t.ephemeral == true {
        //go t.deleter.Do(func() { t.deleteCallback(t) })
       ////////mask for https://github.com/nsqio/go-nsq/issues/206
    }   

    return nil 
}

@ploxiln
Copy link
Member

ploxiln commented Mar 29, 2017

Your workaround is equivalent to using a normal non-ephemeral topic and channel. Usually that's what you really want anyway :)

@slayercat
Copy link
Author

thanks, @ploxiln

ephemeral channel won't write messages to disk. so there's some kind differ :)

I'll trying to fix the issue when my schedule allows.

thank you again.

@mreiferson
Copy link
Member

continue in nsqio/nsq#883 please, thanks!

@mreiferson mreiferson added the bug label Apr 15, 2017
slayercat pushed a commit to slayercat/nsq that referenced this issue May 2, 2017
…ent.

It's possible for a client reconnecting quickly and subscribed to an ephemeral channel to race with nsqd's cleanup of said ephemeral channel, as documented in nsqio/go-nsq#206.

Fixes nsqio#883
slayercat pushed a commit to slayercat/nsq that referenced this issue May 2, 2017
…ent.

It's possible for a client reconnecting quickly and subscribed to an ephemeral channel to race with nsqd's cleanup of said ephemeral channel, as documented in nsqio/go-nsq#206.

Fixes nsqio#883
slayercat pushed a commit to slayercat/nsq that referenced this issue May 2, 2017
…ent.

It's possible for a client reconnecting quickly and subscribed to an ephemeral channel to race with nsqd's cleanup of said ephemeral channel, as documented in nsqio/go-nsq#206.

Fixes nsqio#883
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants