Skip to content

Commit

Permalink
Added test to verify a consumer gets both messages from a subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
phatboyg committed Mar 27, 2012
1 parent 10b7dd2 commit 3dad8f2
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 89 deletions.
143 changes: 74 additions & 69 deletions src/MassTransit.Reactive.Tests/BasicExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,87 +12,92 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Reactive.Tests
{
using System;
using System.Reactive.Linq;
using Magnum.Extensions;
using Magnum.TestFramework;
using NUnit.Framework;
using MassTransit.TestFramework;
using System;
using System.Reactive.Linq;
using Magnum.Extensions;
using Magnum.TestFramework;
using NUnit.Framework;
using TestFramework;

[Scenario, Explicit("Fails from command-line build, don't know why, so I'm removing it from the build")]
public class BasicExample :
Given_a_standalone_service_bus
{
[Given]
public void A_reactive_query_is_observing_a_bus_message()
{
_observable = LocalBus.AsObservable<PingMessage>();
[Scenario, Explicit("Fails from command-line build, don't know why, so I'm removing it from the build")]
public class BasicExample :
Given_a_standalone_service_bus
{
[Given]
public void A_reactive_query_is_observing_a_bus_message()
{
_observable = LocalBus.AsObservable<PingMessage>();

_thatJustHappened = new Future<PingMessage>();
_subscription = _observable.Subscribe(m => _thatJustHappened.Complete(m));
_thatJustHappened = new Future<PingMessage>();
_subscription = _observable.Subscribe(m => _thatJustHappened.Complete(m));

LocalBus.Publish(new PingMessage());
}
LocalBus.Publish(new PingMessage());
}

IObservable<PingMessage> _observable;
Future<PingMessage> _thatJustHappened;
IDisposable _subscription;
IObservable<PingMessage> _observable;
Future<PingMessage> _thatJustHappened;
IDisposable _subscription;

[Finally]
public void Finally()
{
_subscription.Dispose();
}
[Finally]
public void Finally()
{
_subscription.Dispose();
}

[Then]
public void The_message_should_be_observed()
{
Assert.IsNotNull(_observable.Timeout(8.Seconds()).Take(1).Single());
[Then]
public void The_message_should_be_observed()
{
Assert.IsNotNull(_observable.Timeout(8.Seconds()).Take(1).Single());

_thatJustHappened.WaitUntilCompleted(8.Seconds()).ShouldBeTrue();
}
}
[Serializable]
public class PingMessage :
IEquatable<PingMessage>,
CorrelatedBy<Guid>
{
private Guid _id = new Guid("D62C9B1C-8E31-4D54-ADD7-C624D56085A4");
_thatJustHappened.WaitUntilCompleted(8.Seconds()).ShouldBeTrue();
}
}

public PingMessage()
{
}
[Serializable]
public class PingMessage :
IEquatable<PingMessage>,
CorrelatedBy<Guid>
{
Guid _id = new Guid("D62C9B1C-8E31-4D54-ADD7-C624D56085A4");

public PingMessage(Guid id)
{
_id = id;
}
public PingMessage()
{
}

public Guid CorrelationId
{
get { return _id; }
set { _id = value; }
}
public PingMessage(Guid id)
{
_id = id;
}

public bool Equals(PingMessage obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
return obj._id.Equals(_id);
}
public Guid CorrelationId
{
get { return _id; }
set { _id = value; }
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
if (obj.GetType() != typeof(PingMessage)) return false;
return Equals((PingMessage)obj);
}
public bool Equals(PingMessage obj)
{
if (ReferenceEquals(null, obj))
return false;
if (ReferenceEquals(this, obj))
return true;
return obj._id.Equals(_id);
}

public override int GetHashCode()
{
return _id.GetHashCode();
}
}
public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj))
return false;
if (ReferenceEquals(this, obj))
return true;
if (obj.GetType() != typeof(PingMessage))
return false;
return Equals((PingMessage)obj);
}

public override int GetHashCode()
{
return _id.GetHashCode();
}
}
}
1 change: 1 addition & 0 deletions src/MassTransit.Tests/MassTransit.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
<Compile Include="Serialization\SerializationException_Specs.cs" />
<Compile Include="Services\HealthMonitoring\HealthClient_Integration_Specs.cs" />
<Compile Include="Services\Timeout\TimeoutService_Specs.cs" />
<Compile Include="Subscriptions\Consumer_Specs.cs" />
<Compile Include="Subscriptions\DynamicAndStaticSubscriptions_Specs.cs" />
<Compile Include="Subscriptions\NewSubscriptionSystem_Specs.cs" />
<Compile Include="Subscriptions\MultipleSubscribers_Specs.cs" />
Expand Down
40 changes: 20 additions & 20 deletions src/MassTransit.Tests/PublishSubscribe_Specs.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2007-2010 The Apache Software Foundation.
// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
Expand All @@ -12,26 +12,26 @@
// specific language governing permissions and limitations under the License.
namespace MassTransit.Tests
{
using Magnum.Extensions;
using Messages;
using NUnit.Framework;
using TestConsumers;
using TextFixtures;
using Magnum.Extensions;
using Messages;
using NUnit.Framework;
using TestConsumers;
using TextFixtures;

[TestFixture]
public class PublishSubscribe_Specs :
LoopbackTestFixture
{
[Test]
public void A_simple_bus_should_be_able_to_subscribe_and_publish()
{
TestMessageConsumer<PingMessage> consumer = new TestMessageConsumer<PingMessage>();
LocalBus.SubscribeInstance(consumer);
[TestFixture]
public class PublishSubscribe_Specs :
LoopbackTestFixture
{
[Test]
public void A_simple_bus_should_be_able_to_subscribe_and_publish()
{
var consumer = new TestMessageConsumer<PingMessage>();
LocalBus.SubscribeInstance(consumer);

PingMessage message = new PingMessage();
LocalBus.Publish(message);
var message = new PingMessage();
LocalBus.Publish(message);

consumer.ShouldHaveReceivedMessage(message, 500.Milliseconds());
}
}
consumer.ShouldHaveReceivedMessage(message, 8.Seconds());
}
}
}
164 changes: 164 additions & 0 deletions src/MassTransit.Tests/Subscriptions/Consumer_Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2007-2011 Chris Patterson, Dru Sellers, Travis Smith, et. al.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
namespace MassTransit.Tests.Subscriptions
{
using System;
using BusConfigurators;
using Magnum.Extensions;
using NUnit.Framework;
using TestConsumers;
using TestFramework;
using TextFixtures;

[TestFixture]
public class A_consumer_with_two_message_contracts :
LoopbackLocalAndRemoteTestFixture
{
protected override void ConfigureRemoteBus(ServiceBusConfigurator configurator)
{
base.ConfigureRemoteBus(configurator);

configurator.Subscribe(x => { x.Consumer<ConsumerOfAAndB>(); });
}

protected override void EstablishContext()
{
base.EstablishContext();

LocalBus.ShouldHaveSubscriptionFor<A>();
LocalBus.ShouldHaveSubscriptionFor<B>();
}

class ConsumerOfAAndB :
Consumes<A>.All,
Consumes<B>.All
{
readonly TestConsumerBase<A> _consumerA;
readonly TestConsumerBase<B> _consumerB;

public ConsumerOfAAndB()
{
_consumerA = new TestConsumerBase<A>();
_consumerB = new TestConsumerBase<B>();
}

public TestConsumerBase<A> ConsumerA
{
get { return _consumerA; }
}

public TestConsumerBase<B> ConsumerB
{
get { return _consumerB; }
}

public void Consume(A message)
{
_consumerA.Consume(message);
}

public void Consume(B message)
{
_consumerB.Consume(message);
}
}

class A
{
public A()
{
Value = Guid.NewGuid();
}

public Guid Value { get; private set; }

public bool Equals(A other)
{
if (ReferenceEquals(null, other))
return false;
if (ReferenceEquals(this, other))
return true;
return other.Value.Equals(Value);
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj))
return false;
if (ReferenceEquals(this, obj))
return true;
if (obj.GetType() != typeof(A))
return false;
return Equals((A)obj);
}

public override int GetHashCode()
{
return Value.GetHashCode();
}
}

class B
{
public B()
{
Value = Guid.NewGuid();
}

public Guid Value { get; private set; }

public bool Equals(B other)
{
if (ReferenceEquals(null, other))
return false;
if (ReferenceEquals(this, other))
return true;
return other.Value.Equals(Value);
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj))
return false;
if (ReferenceEquals(this, obj))
return true;
if (obj.GetType() != typeof(B))
return false;
return Equals((B)obj);
}

public override int GetHashCode()
{
return Value.GetHashCode();
}
}

[Test]
public void Should_have_received_a()
{
var message = new A();
LocalBus.Publish(message);

TestConsumerBase<A>.OnlyOneShouldHaveReceivedMessage(message, 8.Seconds());
}

[Test]
public void Should_have_received_b()
{
var message = new B();
RemoteBus.Publish(message);

TestConsumerBase<B>.OnlyOneShouldHaveReceivedMessage(message, 8.Seconds());
}
}
}

0 comments on commit 3dad8f2

Please sign in to comment.