Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions src/main/java/com/pusher/client/channel/impl/ChannelManager.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.pusher.client.channel.impl;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.google.gson.Gson;
import com.pusher.client.AuthorizationFailureException;
Expand All @@ -20,7 +20,8 @@
public class ChannelManager implements ConnectionEventListener {

private static final Gson GSON = new Gson();
private final Map<String, InternalChannel> channelNameToChannelMap = new HashMap<String, InternalChannel>();
private final Map<String, InternalChannel> channelNameToChannelMap = new ConcurrentHashMap<String, InternalChannel>();

private final Factory factory;
private InternalConnection connection;

Expand Down Expand Up @@ -70,8 +71,7 @@ public void setConnection(final InternalConnection connection) {
connection.bind(ConnectionState.CONNECTED, this);
}

public void subscribeTo(final InternalChannel channel, final ChannelEventListener listener,
final String... eventNames) {
public void subscribeTo(final InternalChannel channel, final ChannelEventListener listener, final String... eventNames) {

validateArgumentsAndBindEvents(channel, listener, eventNames);
channelNameToChannelMap.put(channel.getName(), channel);
Expand All @@ -88,7 +88,6 @@ public void unsubscribeFrom(final String channelName) {
if (channel == null) {
return;
}

if (connection.getState() == ConnectionState.CONNECTED) {
sendUnsubscribeMessage(channel);
}
Expand Down Expand Up @@ -116,8 +115,7 @@ public void onMessage(final String event, final String wholeMessage) {
public void onConnectionStateChange(final ConnectionStateChange change) {

if (change.getCurrentState() == ConnectionState.CONNECTED) {

for (final InternalChannel channel : channelNameToChannelMap.values()) {
for(final InternalChannel channel : channelNameToChannelMap.values()){
sendOrQueueSubscribeMessage(channel);
}
}
Expand Down Expand Up @@ -181,8 +179,7 @@ public void run() {
}
}

private void validateArgumentsAndBindEvents(final InternalChannel channel, final ChannelEventListener listener,
final String... eventNames) {
private void validateArgumentsAndBindEvents(final InternalChannel channel, final ChannelEventListener listener, final String... eventNames) {

if (channel == null) {
throw new IllegalArgumentException("Cannot subscribe to a null channel");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.pusher.client.connection.ConnectionStateChange;
import com.pusher.client.connection.impl.InternalConnection;
import com.pusher.client.util.Factory;
import java.util.concurrent.Executors;

@RunWith(MockitoJUnitRunner.class)
public class ChannelManagerTest {
Expand Down Expand Up @@ -325,7 +326,7 @@ public void testGetChannelFromString(){

@Test
public void testGetNonExistentChannelFromString(){
Channel channel = channelManager.getChannel("woot");
Channel channel = channelManager.getChannel("woot");
assertNull(channel);
}

Expand Down Expand Up @@ -369,4 +370,25 @@ public void testGetNonExistentPresenceChannel(){
PresenceChannel channel = channelManager.getPresenceChannel("presence-yolo");
assertNull(channel);
}

@Test
public void testConcurrentModificationExceptionDoesNotHappenWhenConnectionIsEstablished() {
for(int i = 0; i<1000; i++) {
channelManager.subscribeTo(new ChannelImpl("channel" + i, factory), null);
}

Runnable removeChannels = new Runnable() {
@Override
public void run() {
System.out.println("Start unsubscribe");
for(int i=900; i<1000; i++){
channelManager.unsubscribeFrom("channel"+i);
}
System.out.println("end unsubscribe");
}
};
Executors.newSingleThreadExecutor().submit(removeChannels);

channelManager.onConnectionStateChange(new ConnectionStateChange(ConnectionState.CONNECTING, ConnectionState.CONNECTED));
}
}