Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collection was modified; enumeration operation may not execute in CometD.NetCore.Client.Transport.LongPollingTransport.GetResponseCallback function. #9

Open
KwizaKaneria opened this issue Jul 12, 2022 · 0 comments

Comments

@KwizaKaneria
Copy link

Currently, we are using this library to listen to push topics whenever there is a change in salesforce. We are using this library as follows.

Code for registering an event and configuring the Salesforce Streaming Client in Program.cs is:

public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
builder.Host.ConfigureServices((hostContext, services) =>
{
var salesforceConfiguration = services.BuildServiceProvider().GetRequiredService<IOptions>().Value;
services.AddResilientStreamingClient("", "", (conf) =>
{
conf.ClientId = salesforceConfiguration.ClientId;
conf.ClientSecret = salesforceConfiguration.ClientSecret;
conf.RefreshToken = salesforceConfiguration.RefreshToken;
conf.LoginUrl = salesforceConfiguration.BaseUrl;
conf.OAuthUri = Constants.OAuthUriConfig;
conf.EventOrTopicUri = Constants.EventOrTopicConfig;
conf.CometDUri = salesforceConfiguration.CometdUrl;
});
services.AddSingleton<IEventBus, EventBus>();
services.AddHostedService();
services.AddTransient<IMessageListener, UpdatedListener>();
services.AddTransient<IMessageListener, DeletedListener>();
});

    var app = builder.Build();
    app.Run();
}

Code for subscribing to events is:

public class SalesforceEventBusHostedService:IHostedService
{
private readonly ILogger _logger;
private readonly IEventBus _eventBus;
private readonly ICacheService _cacheService;

//Make a list of push topics that we created in salesforce
Private readonly List<Tuple<string,int,Type>> _eventsMapping = new()
{
new Tuple<string,int,Type>("topic/Updated",-1,typeof(UpdatedListener)),
new Tuple<string, int, Type>("topic/Deleted", -1, typeof(DeletedListener))
};

public SalesforceEventBusHostedService(ILogger<SalesforceEventBusHostedService> logger,IEventBus eventBus, ICacheService<ReplayIdDto> cacheService)
{
    _logger = logger;
    _eventBus = eventBus;
    _cacheService = cacheService;
    _cacheService.CacheRegion = $"{Constants.MessageListenerCacheRegion}";
}

private static object[] GetPlatformEventObject(string eventName,int replayId,Type eventType)
{
    var platformEvent = Activator.CreateInstance(typeof(PlatformEvent<>).MakeGenericType(eventType));
    platformEvent?.GetType().GetProperty("Name")?.SetValue(platformEvent, eventName);
    platformEvent?.GetType().GetProperty("ReplayId")?.SetValue(platformEvent, replayId);
    return new[] {platformEvent};
}

private async Task SubscribeToEvent(string eventName,int replayId,Type eventType)
{
    var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
    var subscribeMethod = typeof(IEventBus).GetMethod("Subscribe")?.MakeGenericMethod(eventType);
    await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}

private async Task UnSubscribeToEvent(string eventName,int replayId,Type eventType)
{
    var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
    var subscribeMethod = typeof(IEventBus).GetMethod("Unsubscribe")?.MakeGenericMethod(eventType);
    await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} starting.");
    for (var i = 0; i < _eventsMapping.ToList().Count; i++)
    {
        var (originalEventName, originalReplayId, originalEventType) = _eventsMapping[i];
        var replayIdFromCache = (await _cacheService.GetItemAsync(StreamingApiHelper.RemoveEventOrTopicPrefix(originalEventName)))?.ReplayId??originalReplayId;
        var (eventName, replayId, eventType) = _eventsMapping[i] = new Tuple<string, int, Type>(originalEventName, replayIdFromCache, originalEventType);
        await SubscribeToEvent(eventName, replayId, eventType);
    }
}

public async Task StopAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} stopped.");
    foreach (var (eventName,replayId, eventType) in _eventsMapping)
    {
        await UnSubscribeToEvent(eventName, replayId, eventType);
    }
}

}

But I face an issue like:

System.InvalidOperationException: Collection was modified; enumeration operation may not execute. at System.Collections.ArrayList.ArrayListEnumeratorSimple.MoveNext() at System.Net.CookieCollection.IndexOf(Cookie cookie) at System.Net.CookieCollection.Add(Cookie cookie) at CometD.NetCore.Client.Transport.LongPollingTransport.GetResponseCallback(IAsyncResult asynchronousResult) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\Transport\LongPollingTransport.cs:line 294

Can anyone please guide me on why this error is received and if there is something I should be updating?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant