Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akimowkirill threads #202

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea
**/*.iml
**/target/
104 changes: 63 additions & 41 deletions L7_threads/src/main/java/ru/track/prefork/Client.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package ru.track.prefork;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ru.track.prefork.protocol.JavaSerializationProtocol;
import java.io.*;
import java.net.Socket;
import java.util.Scanner;

import ru.track.prefork.protocol.Message;
import ru.track.prefork.protocol.Protocol;
import ru.track.prefork.protocol.JavaSerializationProtocol;
import ru.track.prefork.protocol.ProtocolException;


/**
*
*/
public class Client {

static Logger log = LoggerFactory.getLogger(Client.class);
static Logger log = LoggerFactory.getLogger(Server.class);

private int port;
private String host;
Expand All @@ -31,44 +32,65 @@ public Client(int port, String host, Protocol<Message> protocol) {
this.protocol = protocol;
}

public void loop() throws Exception {
Socket socket = new Socket(host, port);

final OutputStream out = socket.getOutputStream();
final InputStream in = socket.getInputStream();

Scanner scanner = new Scanner(System.in);
Thread scannerThread = new Thread(() -> {
try {
while (true) {
String line = scanner.nextLine();
Message msg = new Message(System.currentTimeMillis(), line);
msg.username = "Dima";
out.write(protocol.encode(msg));
out.flush();

public void Connect() throws IOException {
Socket socket = null;

try {
socket = new Socket(host, port);

final InputStream in = socket.getInputStream();
final OutputStream os = socket.getOutputStream();

//new thread, that read from console and send data to server
Scanner scan = new Scanner(System.in);
Thread scannerThread = new Thread(() -> {
try {
while (true) {
String line = scan.nextLine();
Message msg = new Message(System.currentTimeMillis(), line);
msg.username = "KIRILL"; //there can be smth another, for instant, socket.getInetAddress()
os.write(protocol.encode(msg));
os.flush();
}
} catch (IOException | ProtocolException e) {
e.printStackTrace();
}

});

scannerThread.start();


byte[] buf = new byte[1024];
while (true) {
int nRead = in.read(buf);
if (nRead != -1) {
System.out.println(protocol.decode(buf));
} else {
log.error("Connection failed");
return;
}
} catch (IOException e) {
e.printStackTrace();
} catch (ProtocolException e) {
e.printStackTrace();
}
});
scannerThread.start();

byte[] buf = new byte[1024];
while (true) {
int nRead = in.read(buf);
if (nRead != -1) {
protocol.decode(buf);
} else {
log.error("Connection failed");
return;
}

} catch (IOException | ProtocolException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(socket);
}

}


public static void main(String[] args) throws Exception {
Client client = new Client(9000, "localhost", new JavaSerializationProtocol());
client.loop();
Client myclient = new Client(9000, "localhost", new JavaSerializationProtocol());
try {
myclient.Connect();
} catch (IOException e) {
e.printStackTrace();
}


}
}

}
145 changes: 108 additions & 37 deletions L7_threads/src/main/java/ru/track/prefork/Server.java
Original file line number Diff line number Diff line change
@@ -1,77 +1,143 @@

package ru.track.prefork;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.InetAddress;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ru.track.prefork.protocol.JavaSerializationProtocol;
import ru.track.prefork.protocol.Message;
import ru.track.prefork.protocol.Protocol;
import ru.track.prefork.protocol.Message;
import ru.track.prefork.protocol.ProtocolException;
import ru.track.prefork.protocol.JavaSerializationProtocol;

import static java.lang.Thread.sleep;

/**
* - multithreaded +
* - atomic counter +
* - setName() +
* - thread -> Worker +
* - save threads
* - broadcast (fail-safe)
*
*
*/


public class Server {
static Logger log = LoggerFactory.getLogger(Server.class);

private int port;
private AtomicLong serverCounter = new AtomicLong(0);
private Protocol<Message> protocol;

private ConcurrentMap<Long, Worker> workerMap;
private ConcurrentMap<Long, Worker> workerMap; //providing additional atomic putIfAbsent, remove, and replace methods.


public Server(int port, Protocol<Message> protocol) {
this.port = port;
this.protocol = protocol;
workerMap = new ConcurrentHashMap<>();
}

public void serve() throws Exception {
ServerSocket serverSocket = new ServerSocket(port, 10, InetAddress.getByName("localhost"));

public void serve() throws IOException {

ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port, 10, InetAddress.getByName("localhost"));
} catch (IOException e) {
e.printStackTrace();
}


//begin of the console

Scanner scan = new Scanner(System.in);

Runnable consoleScanner = () -> {

while (true) {

String str = scan.nextLine().toLowerCase();

if (str.equals("list")) {
if (!workerMap.isEmpty()) {
System.out.println("List of connections:");
for (Worker i : workerMap.values()) {
System.out.println(i.getName());
}
} else System.out.println("List of connections is empty");
} else if (str.startsWith("drop ")) {
Long id = Long.parseLong(str.substring(5));
if (workerMap.get(id) != null) {
Message dropNotificationMsg = new Message(0, "Your connection was deactivated");
workerMap.get(id).send(dropNotificationMsg);
try {
workerMap.get(id).socket.close();// method close() вынести наружу
} catch (IOException e) {
System.err.println("Socket was not closed correctly");
e.printStackTrace();
}
workerMap.remove(id);
System.out.println("Dropped successfully");


} else {
System.err.println("Drop: error\nClient does not exist");
}

} else {
System.out.println("Unknown command");
}

}
};
new Thread(consoleScanner).start();

//end of the console


while (true) {
log.info("on select...");
final Socket socket = serverSocket.accept();
final long workerId = serverCounter.getAndIncrement();
Worker worker = new Worker(socket, protocol, workerId);
workerMap.put(workerId, worker);
worker.start();
try {
log.info("serving...");

final Socket socket = serverSocket.accept(); //Listens for a connection to be made to this socket and accepts it.
final long workerId = serverCounter.getAndIncrement();
//limit for number of clients

Worker worker = new Worker(socket, protocol, workerId);
workerMap.put(workerId, worker);
worker.start();

} catch (Exception e) {
e.printStackTrace();
}
}
}

class Worker extends Thread {
private long id;

@NotNull
private Socket socket;

@NotNull
private Protocol<Message> protocol;

@NotNull
private OutputStream out;
OutputStream out;


public Worker(@NotNull Socket socket, @NotNull Protocol<Message> protocol, long id) throws Exception {
this.socket = socket;
this.id = id;
this.protocol = protocol;
//Changes the name of this thread to be equal to the argument
setName(String.format("Client[%d]@%s:%d", id, socket.getInetAddress(), socket.getPort()));

out = socket.getOutputStream();
Expand All @@ -84,31 +150,34 @@ public void run() {
handleSocket(socket);
} catch (Exception e) {
workerMap.remove(id);
// throw new RuntimeException(e);
System.err.println("Socket closed");
//throw new RuntimeException(e);
}
}

private void send(Message message) {
try {
out.write(protocol.encode(message));
out.flush();
} catch (IOException e) {
e.printStackTrace();
} catch (ProtocolException e) {
} catch (IOException | ProtocolException e) {
e.printStackTrace();
}
}

private void handleSocket(Socket socket) throws IOException, ProtocolException {

private void handleSocket(Socket socket) throws IOException, ProtocolException {
final InputStream in = socket.getInputStream();
byte[] buf = new byte[1024];
while (true) {
int nRead = in.read(buf);
if (nRead != -1) {
Message fromClient = protocol.decode(buf);
fromClient.text = ">" + fromClient.text;
workerMap.forEach((aLong, worker) -> worker.send(fromClient));
Message fromClient = protocol.decode(buf); //decode because of this is server
fromClient.data = String.format("Client@%s:%d>", socket.getInetAddress(), socket.getPort())
+ fromClient.data;
workerMap.forEach((aLong, worker) -> {
if (worker.id != id)
worker.send(fromClient);
}); //broadcast
} else {
log.error("Connection failed");
return;
Expand All @@ -117,8 +186,10 @@ private void handleSocket(Socket socket) throws IOException, ProtocolException {
}
}

public static void main(String[] args) throws Exception {
Server server = new Server(9000, new JavaSerializationProtocol());
server.serve();
public static void main(String[] args) throws IOException {
Server myserv = new Server(9000, new JavaSerializationProtocol());
myserv.serve();
}
}


}
Loading