Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions src/main/java/pubsub/PubSubApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package pubsub;

import lombok.AllArgsConstructor;

import java.util.*;
import java.util.concurrent.*;

public class PubSubApp {
private static final int PUB_THREAD_COUNT = 4;
private static final int SUB_THREAD_COUNT = 3;
private static final String EXIT_PHRASE = "exit";

private static final BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>();
private static final Scanner scanner = new Scanner(System.in);
private static final Object inputLock = new Object();
private static volatile boolean shouldExit = false;
private static volatile boolean queueFinished = false;

public static void main(String[] args) {
ExecutorService publisherThreadPool = Executors.newFixedThreadPool(PUB_THREAD_COUNT);
for (int i = 0; i < PUB_THREAD_COUNT; i++) {
publisherThreadPool.submit(new InputTask(i));
}

ExecutorService subscriberThreadPool = Executors.newFixedThreadPool(SUB_THREAD_COUNT);
for (int i = 0; i < SUB_THREAD_COUNT; i++) {
subscriberThreadPool.submit(new ConsumerTask(i));
}

publisherThreadPool.shutdown();
try {
publisherThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("\nMain thread interrupted while waiting for input executor.");
}

for (int i = 0; i < SUB_THREAD_COUNT; i++) {
try {
inputQueue.put(EXIT_PHRASE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

subscriberThreadPool.shutdown();
try {
subscriberThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("\nSubscriber pool shutdown interrupted.");
}

System.out.println("\nProgram finished. Remaining queue items (if any):");
inputQueue.forEach(System.out::println);
}

@AllArgsConstructor
static class InputTask implements Runnable {
private final int id;

@Override
public void run() {
while (!shouldExit) {
String input;

synchronized (inputLock) {
if (shouldExit) break;

System.out.print("\nThread " + id + " waiting for input: ");
try {
if (scanner.hasNextLine()) {
input = scanner.nextLine();
} else {
break;
}
} catch (IllegalStateException | NoSuchElementException e) {
break;
}

if (input.equalsIgnoreCase(EXIT_PHRASE)) {
System.out.println("\nThread " + id + " received exit command.");
shouldExit = true;
synchronized (inputLock) {
try {
scanner.close();
} catch (Exception ignored) {}
}
break;
}
}
try {
inputQueue.put(input);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

System.out.println("\nInput - Thread " + id + " exiting...");
}
}

@AllArgsConstructor
static class ConsumerTask implements Runnable {
private final int id;

@Override
public void run() {
while (!queueFinished) {
try {
String item = inputQueue.take();


if (EXIT_PHRASE.equals(item)) {
System.out.println("\nConsumer " + id + " received exit command. Exiting.");
break;
}

System.out.println("\n[Message]: " + item + " (extracted by " + Thread.currentThread().getName() + ")");
try {
Thread.sleep(1000); // Simulate processing
} catch (InterruptedException ignored) {}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}

if (shouldExit && inputQueue.isEmpty()) {
queueFinished = true;
System.out.println("queueFinished is set true");
break;
}
}
System.out.println("Consumer thread " + id + " exiting.");
}
}
}
63 changes: 63 additions & 0 deletions src/main/java/threadpool/SimpleThreadPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package threadpool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class SimpleThreadPool {
private final int THREAD_COUNT;
private final BlockingQueue<Runnable> poolQueue;
private final AtomicInteger currentThreadCount = new AtomicInteger(0);
private volatile boolean isShutdown = false;

public SimpleThreadPool(int threadCount) {
this.THREAD_COUNT = threadCount;
this.poolQueue = new LinkedBlockingQueue<>();
}

public BlockingQueue<Runnable> getTaskQueue() {
return poolQueue;
}

public void addToPoolQueue(Runnable task) {
if (isShutdown) {
throw new IllegalStateException("Thread pool is shutdown. Cannot accept new tasks.");
}

poolQueue.offer(task);

if (currentThreadCount.get() < THREAD_COUNT) {
createThread();
}
}

private synchronized void createThread() {
if (currentThreadCount.get() >= THREAD_COUNT) return;

Thread worker = new Thread(() -> {
try {
while (!isShutdown || !poolQueue.isEmpty()) {
Runnable task = poolQueue.take();
try {
task.run();
} catch (Exception e) {
System.out.println("Task failed: " + e.getMessage());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println("currentThreadCount is " + currentThreadCount);
currentThreadCount.decrementAndGet();
}
});

currentThreadCount.incrementAndGet();
worker.start();
}

public void shutdown() {
isShutdown = true;
System.out.println("Thread pool shutting down...");
}
}
Empty file added src/main/resources/dummy-file
Empty file.
31 changes: 31 additions & 0 deletions src/test/java/threadpool/SimpleThreadPoolTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package threadpool;

public class SimpleThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
int taskCount = 20;

SimpleThreadPool pool = new SimpleThreadPool(threadCount);

for (int i = 1; i <= taskCount; i++) {
int taskNumber = i;
pool.addToPoolQueue(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Task " + taskNumber + " is running on " + threadName);
try {
Thread.sleep(500); // simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskNumber + " completed on " + threadName);
});

Thread.sleep(100); // Optional: delay to simulate real-world task submission
}

// Wait for tasks to complete (not ideal but fine for this simple test)
Thread.sleep(5000);

pool.shutdown();
}
}
65 changes: 65 additions & 0 deletions src/test/java/threadpool/SimpleThreadPoolTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package threadpool;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class SimpleThreadPoolTests {

@Test
@DisplayName("Scenario 1: 2 threads, 20 tasks")
void testTwoThreadsTwentyTasks() throws InterruptedException {
runScenario(2, 20);
}

@Test
@DisplayName("Scenario 2: 10 threads, 20 tasks")
void testTenThreadsTwentyTasks() throws InterruptedException {
runScenario(10, 20);
}

@Test
@DisplayName("Scenario 3: 10 threads, 5 tasks")
void testTenThreadsFiveTasks() throws InterruptedException {
runScenario(10, 5);
}

@Test
@DisplayName("Scenario 4: 1 thread, 1 task")
void testOneThreadOneTask() throws InterruptedException {
runScenario(1, 1);
}

private void runScenario(int threadCount, int taskCount) throws InterruptedException {
System.out.printf("\n--- Running Scenario: %d threads, %d tasks ---\n", threadCount, taskCount);

SimpleThreadPool pool = new SimpleThreadPool(threadCount);

for (int i = 1; i <= taskCount; i++) {
final int taskNumber = i;
pool.addToPoolQueue(() -> {
String threadName = Thread.currentThread().getName();
System.out.printf("Task %d is running on %s\n", taskNumber, threadName);
try {
Thread.sleep(300); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.printf("Task %d completed on %s\n", taskNumber, threadName);
});

Thread.sleep(20); // Optional: staggered submission
}

// Estimate total time for tasks to complete
int estimatedMs = (int) Math.ceil((taskCount * 300.0) / threadCount) + 500;
Thread.sleep(estimatedMs);

assertThat(pool.getTaskQueue())
.as("Task queue should be empty after all tasks have been processed")
.isEmpty();

pool.shutdown();
}
}