Skip to content

Commit

Permalink
Messages target specific clients. Added reset command from client
Browse files Browse the repository at this point in the history
  • Loading branch information
wdlindmeier committed Aug 7, 2013
1 parent f41f07b commit 395b16c
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 109 deletions.
208 changes: 118 additions & 90 deletions Most-Pixels-Ever-Server/src/mpe/server/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;

public class Connection extends Thread {

Expand Down Expand Up @@ -58,104 +60,128 @@ void read(String msg) {
String[] tokens = msg.split("\\|");

switch(startsWith){
// For Starting Up

case 'A':
if (parent.verbose) {
System.out.println("Raw receive: " + msg);
}
clientID = Integer.parseInt(tokens[1]);

// Async Client Joined
case 'A':
if (parent.verbose) {
System.out.println("Raw receive: " + msg);
}
clientID = Integer.parseInt(tokens[1]);

// We are asynchronous
isAsynch = true;

// Let's see if we want to get messages
if (tokens.length > 2) {
try {
asynchReceive = Boolean.parseBoolean(tokens[2]);
} catch (Exception e) {
System.out.println("Malformed boolean for synch receive");
}
}
// We are asynchronous
isAsynch = true;

System.out.println("Connecting asynch client " + clientID + " receiver: " + asynchReceive);
// Let's see if we want to get messages
if (tokens.length > 3) {
try {
asynchReceive = Boolean.parseBoolean(tokens[3]);
} catch (Exception e) {
System.out.println("Malformed boolean for synch receive");
}
}

System.out.println("Connecting asynch client " + clientID + " receiver: " + asynchReceive);

// I don't think we need to bother keep a reference to this object
// unless it needs to receive messages, ah but let's do it anyway
parent.addConnection(this);
// I don't think we need to bother keep a reference to this object
// unless it needs to receive messages, ah but let's do it anyway
parent.addConnection(this);

break;
case 'S':
break;

// Sync client joined
case 'S':

// Making the decision that if the server is paused and there is a new connection
// It starts again
/*if (parent.paused) {
parent.paused = false;
}*/
// Making the decision that if the server is paused and there is a new connection
// It starts again
/*if (parent.paused) {
parent.paused = false;
}*/

if (parent.verbose) {
System.out.println("Raw receive: " + msg);
}
clientID = Integer.parseInt(tokens[1]);

parent.addConnection(this);

System.out.println("Connecting synch client " + clientID);
int total = parent.totalConnections();

// We should only wait the *first* time if we are told to wait for everyone
// otherwise we can just reset if someone has disconnected and reconnected
if (parent.waitForAll && !parent.allConnected) {
parent.allConnected = (total == parent.numRequiredClients);
if (parent.allConnected) {
parent.triggerFrame(false);
}
} else {
parent.triggerFrame(true);
}

break;
//is it receiving a "done"?
case 'D':
if (!parent.waitForAll || parent.allConnected) {
clientID = Integer.parseInt(tokens[1]);
int fc = Integer.parseInt(tokens[2]);
if (parent.verbose) {
System.out.println("Client: " + clientID + " says done with: " + fc + " server count: " + parent.frameCount);
}
if (fc == parent.frameCount) {
parent.setReady(clientID);
}
}
break;
case 'T':
if (parent.verbose) {
System.out.println("Adding message to next frameEvent: " + clientID + "," + tokens[1]);
}
parent.newMessage = true;
parent.message += clientID + "," + tokens[1] + "|";
break;


case 'P':
parent.paused = !parent.paused;
if (parent.verbose) {
if (parent.paused){
System.out.println("Client " + clientID + " has asked to pause.");
} else {
System.out.println("Client " + clientID + " has asked to unpause.");

}
}
// If we are unpausing and everyone is ready go ahead onto next frame
// This should probably be refactored and combined more nicely with what happens in setReady()
if (!parent.paused && parent.isReady()) {
parent.frameCount++;
parent.triggerFrame(false);
}
break;
if (parent.verbose) {
System.out.println("Raw receive: " + msg);
}
clientID = Integer.parseInt(tokens[1]);

parent.addConnection(this);

System.out.println("Connecting synch client " + clientID);
int total = parent.totalConnections();

// We should only wait the *first* time if we are told to wait for everyone
// otherwise we can just reset if someone has disconnected and reconnected
if (parent.waitForAll && !parent.allConnected) {
parent.allConnected = (total == parent.numRequiredClients);
if (parent.allConnected) {
parent.triggerFrame(false);
}
} else {
parent.triggerFrame(true);
}

break;

// Done rendering
case 'D':
if (!parent.waitForAll || parent.allConnected) {
clientID = Integer.parseInt(tokens[1]);
int fc = Integer.parseInt(tokens[2]);
if (parent.verbose) {
System.out.println("Client: " + clientID + " says done with: " + fc + " server count: " + parent.frameCount);
}
if (fc == parent.frameCount) {
parent.setReady(clientID);
}
}
break;

// DaTa message was received
case 'T':
if (parent.verbose) {
System.out.println("Adding message to next frameEvent: " + clientID + "," + tokens[1]);
}
String message = tokens[1];
ArrayList<Integer> toClientIDs = new ArrayList<Integer>();
if (tokens.length > 2) {
// Add the ints
String clientIDs = tokens[2];
if (clientIDs.length() > 0) {
ArrayList<String> ids = new ArrayList<String>(Arrays.asList(clientIDs.split(",")));
for(String id : ids) {
toClientIDs.add(Integer.parseInt(id));
}
}
} else {
toClientIDs = parent.receivingClientIDs();
}
parent.addMessage(message, clientID, toClientIDs);
break;

// Pause
case 'P':
parent.paused = !parent.paused;
if (parent.verbose) {
if (parent.paused){
System.out.println("Client " + clientID + " has asked to pause.");
} else {
System.out.println("Client " + clientID + " has asked to unpause.");

}
}
// If we are unpausing and everyone is ready go ahead onto next frame
// This should probably be refactored and combined more nicely with what happens in setReady()
if (!parent.paused && parent.isReady()) {
parent.frameCount++;
parent.triggerFrame(false);
}
break;

// Reset
case 'R':
parent.paused = false;
parent.triggerFrame(true);
break;

}
}

Expand All @@ -182,7 +208,6 @@ public void run() {
public void killMe(){
System.out.println("Removing Connection " + clientID);
parent.killConnection(clientID);

if (parent.allDisconected()) {
parent.resetFrameCount();
} else {
Expand All @@ -202,6 +227,9 @@ public void send(String msg) {
msg+="\n";
dos.write(msg.getBytes());
dos.flush();
} catch (java.net.SocketException e) {
// If we've got a broken pipe, remove the connection
parent.killConnection(clientID);
} catch (IOException e) {
e.printStackTrace();
}
Expand Down
96 changes: 77 additions & 19 deletions Most-Pixels-Ever-Server/src/mpe/server/MPEServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ public class MPEServer {
private HashMap<Integer,Connection> connectionlookup = new HashMap<Integer,Connection>();
private ArrayList<Connection> synchconnections = new ArrayList<Connection>();
private ArrayList<Connection> asynchconnections = new ArrayList<Connection>();
private HashMap<Integer,ArrayList<String>> messages = new HashMap<Integer,ArrayList<String>>();

private int port;
int frameCount = 0;
private long before;

// Server will add a message to the frameEvent
public boolean newMessage = false;
public String message = "";
// public boolean newMessage = false;
// public String message = "";

public boolean dataload = false;

Expand Down Expand Up @@ -94,6 +95,36 @@ public void run() {
System.out.println("Zoinks!" + e);
}
}

public synchronized void addMessage(String message, Integer fromClientId, ArrayList<Integer> toClientIDs)
{
String formattedMessage = fromClientId + "," + message;
for (Integer cID : toClientIDs)
{
ArrayList<String> clientMessages = messages.get(cID);
if (clientMessages == null)
{
clientMessages = new ArrayList<String>();
messages.put(cID, clientMessages);
}
clientMessages.add(formattedMessage);
}
}

public ArrayList<Integer> receivingClientIDs()
{
ArrayList<Integer> clientIDs = new ArrayList<Integer>();
// Add all client IDs to the message
for (Connection conn : synchconnections) {
clientIDs.add(conn.clientID);
}
for (Connection conn : asynchconnections) {
if (conn.asynchReceive) {
clientIDs.add(conn.clientID);
}
}
return clientIDs;
}

// Synchronize?!!!
public synchronized void triggerFrame(boolean reset) {
Expand Down Expand Up @@ -122,10 +153,37 @@ public synchronized void triggerFrame(boolean reset) {
String send = "G|"+frameCount;
// Adding a data message to the frameEvent
// substring removes the ":" at the end.
if (newMessage) send += "|" + message.substring(0, message.length()-1);
if (messages.size() > 0)
{
ArrayList<Integer> rcvClientIDs = receivingClientIDs();
for (Integer clientID : rcvClientIDs) {
String clientSend = send;
ArrayList<String> messageBodies = messages.get(clientID);
if (messageBodies != null) {
for (String message : messageBodies) {
clientSend += "|" + message;
}
}
Connection c = connectionlookup.get(clientID);
// Make sure the client should receive messages
if (c != null){
c.send(clientSend);
}
}
messages.clear();
}
else
{
sendAll(send);
}
/*
if (newMessage && message.length() > 0) {
send += "|" + message.substring(0, message.length()-1);
}
newMessage = false;
message = "";
sendAll(send);
*/
}

// After frame is triggered all connections should be set to "unready"
Expand All @@ -152,16 +210,14 @@ public synchronized void sendAll(String msg){
if (verbose) {
System.out.println("Sending to " + synchconnections.size() + " sync clients, " + howmany + " async clients: " + msg);
}

for (Connection conn : synchconnections) {
conn.send(msg);
}

for (Connection conn : asynchconnections) {
if (conn.asynchReceive) {
conn.send(msg);
}
}

ArrayList<Integer> rcvClientIDs = receivingClientIDs();
for (Integer clientID : rcvClientIDs) {
Connection c = connectionlookup.get(clientID);
if (c != null){
c.send(msg);
}
}
}

public void killConnection(int i){
Expand All @@ -177,11 +233,15 @@ boolean allDisconected(){
return true;
} else return false;
}

void resetFrameCount(){
frameCount = 0;
newMessage = false;
message = "";
print ("resetting frame count.");
//newMessage = false;
//message = "";
messages.clear();
if (verbose) {
System.out.println("resetting frame count.");
}
}

public void killServer() {
Expand Down Expand Up @@ -356,13 +416,11 @@ public synchronized boolean isReady() {
}

public void addConnection(Connection c) {
// TODO Account for asynch connections
if (c.isAsynch) {
asynchconnections.add(c);
connectionlookup.put(c.clientID,c);
} else {
synchconnections.add(c);
connectionlookup.put(c.clientID,c);
}
connectionlookup.put(c.clientID,c);
}
}

0 comments on commit 395b16c

Please sign in to comment.