Skip to content

Conversation

Gsantomaggio
Copy link
Member

  • Fixes: Incorrect endpoint for reconnection #133
  • Remove PublishingIdStrategy interface. It is not necessary, the idea was to create
    a generic interface to get the publishingId.
  • Add interlock _publishingId in ReliableProducer to be thread-safe
  • Add GetLastPublishingId on the Producer Class
  • Use GetLastPublishingId on the ReliableProducer class

Signed-off-by: Gabriele Santomaggio G.santomaggio@gmail.com

- Fixes: #133
- Remove PublishingIdStrategy interface. It is not necessary, the idea was to create
  a generic interface to get the publishingId.
- Add interlock _publishingId in ReliableProducer to be thread-safe
- Add GetLastPublishingId on the Producer Class
- Use GetLastPublishingId on the ReliableProducer class

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio
Copy link
Member Author

@ricsiLT Can you please try it?

@codecov
Copy link

codecov bot commented Jun 11, 2022

Codecov Report

Merging #134 (7e33d49) into main (a7fbb67) will increase coverage by 0.18%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #134      +/-   ##
==========================================
+ Coverage   91.52%   91.71%   +0.18%     
==========================================
  Files          77       77              
  Lines        5828     5815      -13     
  Branches      358      358              
==========================================
- Hits         5334     5333       -1     
+ Misses        406      394      -12     
  Partials       88       88              
Impacted Files Coverage Δ
RabbitMQ.Stream.Client/Client.cs 91.66% <100.00%> (+1.69%) ⬆️
RabbitMQ.Stream.Client/Producer.cs 75.56% <100.00%> (+0.86%) ⬆️
...abbitMQ.Stream.Client/Reliable/ReliableProducer.cs 84.12% <100.00%> (-2.08%) ⬇️
RabbitMQ.Stream.Client/StreamSystem.cs 93.52% <100.00%> (ø)
Tests/ProducerSystemTests.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/HeartBeatRequest.cs 100.00% <0.00%> (+100.00%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 80b9df8...7e33d49. Read the comment docs.

- Fixes: #133
- Remove PublishingIdStrategy interface. It is not necessary, the idea was to create
  a generic interface to get the publishingId.
- Add interlock _publishingId in ReliableProducer to be thread-safe
- Add GetLastPublishingId on the Producer Class
- Use GetLastPublishingId on the ReliableProducer class

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@ricsiLT
Copy link
Contributor

ricsiLT commented Jun 13, 2022

On it :)

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@ricsiLT
Copy link
Contributor

ricsiLT commented Jun 13, 2022

ERROR: Error during initialization: System.TimeoutException: The operation has timed out.
   at RabbitMQ.Stream.Client.ManualResetValueTaskSource`1.System.Threading.Tasks.Sources.IValueTaskSource<T>.GetResult(Int16 token)
   at RabbitMQ.Stream.Client.Client.Request[TIn,TOut](Func`2 request, Nullable`1 timeout)
   at RabbitMQ.Stream.Client.Client.Request[TIn,TOut](Func`2 request, Nullable`1 timeout)
   at RabbitMQ.Stream.Client.Client.QueryPublisherSequence(String publisherRef, String stream)
   at RabbitMQ.Stream.Client.Producer.GetLastPublishingId()
   at RabbitMQ.Stream.Client.Reliable.ReliableProducer.GetNewReliable(Boolean boot)

Result of testing :(

It did raise exceptions on server side tho. If it's of any importance, exceptions were raised on nodes that are not the leader.

Also, weirdly, heartbeat was not set to anything on both locator and producer connections.

@Gsantomaggio
Copy link
Member Author

Gsantomaggio commented Jun 13, 2022

It did raise exceptions on server side tho.

What error?

it did raise exceptions on server side tho. If it's of any importance, exceptions were raised on nodes that are not the leader.

That's weird

@Gsantomaggio
Copy link
Member Author

Gsantomaggio commented Jun 13, 2022

Also, weirdly, heartbeat was not set to anything on both locator and producer connections.

Screen Shot 2022-06-13 at 12 33 11
Screen Shot 2022-06-13 at 12 33 21
Screen Shot 2022-06-13 at 12 33 28

The heartbeat is set correctly.

my setup is
Client --> envoy proxy --> 3 nodes rabbitmq RabbitMQ 3.10.1

connection:

       Console.WriteLine("Reliable .NET Producer");
        var addressResolver = new AddressResolver(IPEndPoint.Parse("192.168.56.10:5553"));
/// 192.168.56.10:5553 proxy address 
        var config = new StreamSystemConfig()
        {
            Heartbeat = TimeSpan.FromSeconds(30),
            AddressResolver = addressResolver,
            UserName = "test",
            Password = "test",
            ClientProvidedName = "my-locator-connection",
            Endpoints = new List<EndPoint>() {addressResolver.EndPoint}
        };

- Fixes: #133
- Remove PublishingIdStrategy interface. It is not necessary, the idea was to create
  a generic interface to get the publishingId.
- Add interlock _publishingId in ReliableProducer to be thread-safe
- Add GetLastPublishingId on the Producer Class
- Use GetLastPublishingId on the ReliableProducer class

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio
Copy link
Member Author

RabbitMQ 3.9.12 is a bit old. I would suggest updating to the last 3.9.x or better 3.10.x. We fixed some issues and we made some improvements on the stream part.

@ricsiLT
Copy link
Contributor

ricsiLT commented Jun 13, 2022

Ye, will do upgrade in a few hours/tomorrow :(

@Gsantomaggio
Copy link
Member Author

FYI: IF you want, you can find me https://rabbitmq-slack.herokuapp.com/ to have a faster interaction.

@ricsiLT
Copy link
Contributor

ricsiLT commented Jun 13, 2022

Another issue we see is doubling of connections, so to speak. Both seem to be alive.

image

(in this case, reference for publisher is env-proj-GUID which doesn't make sense as reference would be fresh every time, hopefully that doesn't change anything tho. Will move to GUID-less references later)

Again, tomorrow we should have our shiny new cluster with RHEL8/RMQ3.10.x so I can see whether that's the only issue.

@lukebakken lukebakken added this to the 1.0.0-rc.4 milestone Jun 15, 2022
@lukebakken lukebakken self-requested a review June 15, 2022 13:38
@lukebakken
Copy link
Contributor

@ricsiLT I'd like to test this myself, would you mind sharing your reproduction steps and for #133?

@Gsantomaggio
Copy link
Member Author

@lukebakken it is enough to configure a cluster with a loadbalacer.
I am using Varagrant with 2 rabbitmq nodes and envoy as proxy.

# -*- mode: ruby -*-
# vi: set ft=ruby :

# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.


BOX_IMAGE = "ubuntu/bionic64"
NODE_COUNT = 3 

Vagrant.configure("2") do |config|
  config.vm.define "node0" do |subconfig|
    subconfig.vm.box = BOX_IMAGE
    subconfig.vm.hostname = "node0"
    subconfig.vm.network :private_network, ip: "192.168.56.10"
 end
  
  (1..NODE_COUNT).each do |i|
    config.vm.define "node#{i}" do |subconfig|
      subconfig.vm.box = BOX_IMAGE
      subconfig.vm.hostname = "node#{i}"
      subconfig.vm.network :private_network, ip: "192.168.56.#{i + 10}"
    end
  end

  # Install avahi on all machines  
  config.vm.provision "shell", inline: <<-SHELL
     sudo echo "192.168.56.11 node1 " >> /etc/hosts 
     sudo echo "192.168.56.12 node2 " >> /etc/hosts 
     sudo echo "192.168.56.10 node0 " >> /etc/hosts 
     SHELL
end

envoy:

admin: {"accessLogPath":"/dev/null","address":{"socketAddress":{"address":"0.0.0.0","portValue":9901}}}

static_resources:
  listeners:
    - name: stream-ingress
      per_connection_buffer_limit_bytes: 4096
      address:
        socket_address:
          address: 0.0.0.0
          port_value: 5553
      filter_chains:
        - filters:
            - name: envoy.filters.network.tcp_proxy
              typed_config:
                "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
                stat_prefix: ingress
                cluster: stream-cluster
    - name: management-ui
      address:
        socket_address:
          address: 0.0.0.0
          port_value: 15673
      filter_chains:
        - filters:
            - name: envoy.filters.network.http_connection_manager
              typed_config:
                "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
                codec_type: auto
                stat_prefix: ingress_http
                route_config:
                  name: local_route
                  virtual_hosts:
                    - name: service
                      domains:
                        - "*"
                      routes:
                        - match:
                            prefix: "/"
                          route:
                            cluster: management-cluster
                http_filters:
                  - name: envoy.filters.http.router
                    typed_config: { }
  clusters:
    - name: stream-cluster
      connect_timeout: 0.25s
      type: STRICT_DNS
      lb_policy: ROUND_ROBIN
      load_assignment:
        cluster_name: stream
        endpoints:
          - lb_endpoints:
              - endpoint:
                  address:
                    socket_address:
                      address: node0
                      port_value: 5552
              - endpoint:
                  address:
                    socket_address:
                      address: node1
                      port_value: 5552
              - endpoint:
                  address:
                    socket_address:
                      address: node2
                      port_value: 5552
    - name: management-cluster
      connect_timeout: 0.25s
      type: STRICT_DNS
      lb_policy: ROUND_ROBIN
      load_assignment:
        cluster_name: management1
        endpoints:
          - lb_endpoints:
              - endpoint:
                  address:
                    socket_address:
                      address: node0
                      port_value: 15672
              - endpoint:
                  address:
                    socket_address:
                      address: node1
                      port_value: 15672
              - endpoint:
                  address:
                    socket_address:
                      address: node2
                      port_value: 15672

Then kill the locator connection and consumer/producer connection.
You need to use reliable producer / consumer

@Gsantomaggio
Copy link
Member Author

@ricsiLT did you have a chance to test?

@ricsiLT
Copy link
Contributor

ricsiLT commented Jun 20, 2022

Will come back with results today, sorry for the lag :(

@Gsantomaggio Gsantomaggio merged commit 332edf8 into main Jun 21, 2022
@Gsantomaggio Gsantomaggio deleted the reconnect_locator branch June 21, 2022 07:19
@Gsantomaggio
Copy link
Member Author

MErged per conversion with @ricsiLT ! Thanks for your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incorrect endpoint for reconnection
3 participants