Skip to content
This repository was archived by the owner on Dec 14, 2020. It is now read-only.
This repository was archived by the owner on Dec 14, 2020. It is now read-only.

When setting linkcredit to 2 or higher, I cannot consume a large number of big data packets in the queue #191

@qiulian111

Description

@qiulian111

When setting linkcredit to 2 or higher, I cannot consume a large number of big data packets in the queue, it's ok when set linkcredit to 1.
Please take a look ,thanks
Here is my code :

package main

import (
	"context"
	"log"
	"strings"
	"time"

	"pack.ag/amqp" //version v0.12.4
)

func main() {
	sendData(20)
	consumeData(2) //set link credit 1 is ok , failed when 2
}

func sendData(maxNum int) {
	log.Println("Sending...")
	client, _ := amqp.Dial("amqp://192.168.8.6:5672", amqp.ConnSASLPlain("data_user", "jwi_gogogo"), amqp.ConnServerHostname("vhost:/"),
		amqp.ConnIdleTimeout(0), amqp.ConnMaxFrameSize(1*1024*1024))

	session, _ := client.NewSession(
		amqp.SessionIncomingWindow(65535),
		amqp.SessionOutgoingWindow(65535),
	)

	sender, _ := session.NewSender(
		amqp.LinkTargetAddress("ipc.test"),
	)
	ctx := context.Background()
	baseStr := "abcdefghij1234567890"
	repeatCount := 5 * 1024 * 1024 / 20 // data size 5M
	data := strings.Repeat(baseStr, repeatCount)
	dataBytes := []byte(data)
	for i := 0; i < maxNum; i++ {
		myctx, cancel := context.WithTimeout(ctx, 5*time.Second)
		ampMsg := amqp.NewMessage(dataBytes)
		ampMsg.Header = &amqp.MessageHeader{TTL: 24 * time.Hour}
		sender.Send(myctx, ampMsg)
		cancel()
	}
	log.Println("done.")
}

func consumeData(linkCredit uint32) {
	log.Println("consuming...")
	var receiveCount int = 0
	client, _ := amqp.Dial("amqp://192.168.8.6:5672", amqp.ConnSASLPlain("data_user", "jwi_gogogo"), amqp.ConnServerHostname("vhost:/"),
		amqp.ConnIdleTimeout(0), amqp.ConnMaxFrameSize(1*1024*1024))

	session, _ := client.NewSession(
		amqp.SessionIncomingWindow(65535),
		amqp.SessionOutgoingWindow(65535),
	)
	receiver, _ := session.NewReceiver(
		amqp.LinkSourceAddress("ipc.test"),
		amqp.LinkCredit(linkCredit),
		amqp.LinkSenderSettle(amqp.ModeSettled),
		amqp.LinkBatching(false),
		amqp.LinkBatchMaxAge(5*time.Second),
	)

	//keep connection alive
	keepConnAlive := func() {
		sender, _ := session.NewSender(
			amqp.LinkTargetAddress("connHeartbeat"),
		)
		ctx := context.Background()
		dataBytes := []byte("")
		for {
			myctx, cancel := context.WithTimeout(ctx, 5*time.Second)
			ampMsg := amqp.NewMessage(dataBytes)
			ampMsg.Header = &amqp.MessageHeader{TTL: 30 * time.Second}
			sender.Send(myctx, ampMsg)
			cancel()
			time.Sleep(30 * time.Second)
		}
	}
	go keepConnAlive()

	bctx := context.Background()
	defer func() {
		rctx, cancel := context.WithTimeout(bctx, 3*time.Second)
		receiver.Close(rctx)
		cancel()
	}()
	for {
		msg, err := receiver.Receive(bctx)
		if err != nil {
			log.Fatal(err)
		}
		msg.Accept()
		receiveCount++
		log.Println("count:", receiveCount)
	}
}

and here is the log :

2019/11/25 14:26:56 Sending...
14:26:56.988790 TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 65535, OutgoingWindow: 65535, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:26:56.994790 RX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 65535, OutgoingWindow: 65535, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:26:56.994790 TX: Attach{Name: qDe58i39hyBwvT0w5BGi9FN6iACMkQdEXtdyD1R9HOIDrhqfiflRww, Handle: 0, Role: Sender, SenderSettleMode: , ReceiverSettleMode: , Source: , Target: source{Address: ipc.test, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:26:56.998790 RX(Session): Attach{Name: qDe58i39hyBwvT0w5BGi9FN6iACMkQdEXtdyD1R9HOIDrhqfiflRww, Handle: 0, Role: Receiver, SenderSettleMode: , ReceiverSettleMode: , Source: , Target: source{Address: ipc.test, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:26:56.999791 RX(Session): Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: , LinkCredit: 65536, Available: , Drain: false, Echo: false, Properties: map[]}
14:26:57.492819 RX(Session): Disposition{Role: Receiver, First: 1, Last: 1, Settled: true, State: Accepted, Batchable: false}
14:26:57.960846 RX(Session): Disposition{Role: Receiver, First: 2, Last: 2, Settled: true, State: Accepted, Batchable: false}
14:26:58.444873 RX(Session): Disposition{Role: Receiver, First: 3, Last: 3, Settled: true, State: Accepted, Batchable: false}
......(ignore 4-19)
14:27:06.419329 RX(Session): Disposition{Role: Receiver, First: 20, Last: 20, Settled: true, State: Accepted, Batchable: false}
2019/11/25 14:27:06 done.
2019/11/25 14:27:06 consuming...
14:27:06.429330 TX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 65535, OutgoingWindow: 65535, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:27:06.434330 RX: Begin{RemoteChannel: 0, NextOutgoingID: 0, IncomingWindow: 65535, OutgoingWindow: 65535, HandleMax: 4294967295, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:27:06.434330 TX: Attach{Name: 47e76vhclAoTfoVXAnIFK7nbORkpK3XcPNk5N-gyNxv-doVJKjb2aA, Handle: 0, Role: Receiver, SenderSettleMode: settled, ReceiverSettleMode: , Source: source{Address: ipc.test, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: Outcomes: [], Capabilities: []}, Target: , Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:27:06.438330 RX(Session): Attach{Name: 47e76vhclAoTfoVXAnIFK7nbORkpK3XcPNk5N-gyNxv-doVJKjb2aA, Handle: 0, Role: Sender, SenderSettleMode: settled, ReceiverSettleMode: , Source: source{Address: ipc.test, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], DistributionMode: , Filter: map[], DefaultOutcome: ReleasedOutcomes: [amqp:accepted:list amqp:rejected:list amqp:released:list], Capabilities: []}, Target: , Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:27:06.438330 TX: Attach{Name: 2TXU9juZrYMuyPCfQK7EpNaQvzoKfrWfsBGvwaXL7rjXzU-0fZLshg, Handle: 1, Role: Sender, SenderSettleMode: , ReceiverSettleMode: , Source: , Target: source{Address: connHeartbeat, Durable: 0, ExpiryPolicy: , Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0, MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:27:06.438330 TX(Session): Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: 0, LinkCredit: 2, Available: , Drain: false, Echo: false, Properties: map[]}
14:27:06.441331 RX(Session): Attach{Name: 2TXU9juZrYMuyPCfQK7EpNaQvzoKfrWfsBGvwaXL7rjXzU-0fZLshg, Handle: 1, Role: Receiver, SenderSettleMode: , ReceiverSettleMode: , Source: , Target: source{Address: connHeartbeat, Durable: 0, ExpiryPolicy: session-end, Timeout: 0, Dynamic: false, DynamicNodeProperties: map[], Capabilities: []}, Unsettled: map[], IncompleteUnsettled: false, InitialDeliveryCount: 0,
MaxMessageSize: 0, OfferedCapabilities: [], DesiredCapabilities: [], Properties: map[]}
14:27:06.441331 RX(Session): Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 1, DeliveryCount: , LinkCredit: 65536, Available: , Drain: false, Echo: false, Properties: map[]}
14:27:06.441331 RX(Session): Flow{NextIncomingID: 0, IncomingWindow: 65535, NextOutgoingID: 0, OutgoingWindow: 65535, Handle: 0, DeliveryCount: 0, LinkCredit: 2, Available: 20, Drain: false, Echo: false, Properties: map[]}
14:27:07.246377 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x01", MessageFormat: 0, Settled: true, More: true, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 1048542}
14:27:11.936645 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x01", MessageFormat: 0, Settled: true, More: true, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 1048542}
14:27:20.626142 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x01", MessageFormat: 0, Settled: true, More: true, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 1048542}
14:27:32.458819 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x01", MessageFormat: 0, Settled: true, More: true, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 1048542}
14:27:46.957648 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x01", MessageFormat: 0, Settled: true, More: true, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 1048542}
14:27:46.959648 RX(Session): Transfer{Handle: 0, DeliveryID: 0, DeliveryTag: "\x00\x00\x00\x00\x00\x00\x00\x01", MessageFormat: 0, Settled: true, More: false, ReceiverSettleMode: , State: , Resume: false, Aborted: false, Batchable: false, Payload [size]: 222}
2019/11/25 14:27:46 count: 1
14:27:46.998650 TX(Session): Flow{NextIncomingID: 6, IncomingWindow: 65535, NextOutgoingID: 2, OutgoingWindow: 65535, Handle: 0, DeliveryCount: 1, LinkCredit: 2, Available: , Drain: false, Echo: false, Properties: map[]}
2019/11/25 14:27:59 read tcp 192.168.4.125:61393->192.168.8.6:5672: wsarecv: An existing connection was forcibly closed by the remote host.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions