diff --git a/src/main/java/pubsub/PubSubApp.java b/src/main/java/pubsub/PubSubApp.java new file mode 100644 index 0000000..1b2057e --- /dev/null +++ b/src/main/java/pubsub/PubSubApp.java @@ -0,0 +1,139 @@ +package pubsub; + +import lombok.AllArgsConstructor; + +import java.util.*; +import java.util.concurrent.*; + +public class PubSubApp { + private static final int PUB_THREAD_COUNT = 4; + private static final int SUB_THREAD_COUNT = 3; + private static final String EXIT_PHRASE = "exit"; + + private static final BlockingQueue inputQueue = new LinkedBlockingQueue<>(); + private static final Scanner scanner = new Scanner(System.in); + private static final Object inputLock = new Object(); + private static volatile boolean shouldExit = false; + private static volatile boolean queueFinished = false; + + public static void main(String[] args) { + ExecutorService publisherThreadPool = Executors.newFixedThreadPool(PUB_THREAD_COUNT); + for (int i = 0; i < PUB_THREAD_COUNT; i++) { + publisherThreadPool.submit(new InputTask(i)); + } + + ExecutorService subscriberThreadPool = Executors.newFixedThreadPool(SUB_THREAD_COUNT); + for (int i = 0; i < SUB_THREAD_COUNT; i++) { + subscriberThreadPool.submit(new ConsumerTask(i)); + } + + publisherThreadPool.shutdown(); + try { + publisherThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.out.println("\nMain thread interrupted while waiting for input executor."); + } + + for (int i = 0; i < SUB_THREAD_COUNT; i++) { + try { + inputQueue.put(EXIT_PHRASE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + subscriberThreadPool.shutdown(); + try { + subscriberThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.out.println("\nSubscriber pool shutdown interrupted."); + } + + System.out.println("\nProgram finished. Remaining queue items (if any):"); + inputQueue.forEach(System.out::println); + } + + @AllArgsConstructor + static class InputTask implements Runnable { + private final int id; + + @Override + public void run() { + while (!shouldExit) { + String input; + + synchronized (inputLock) { + if (shouldExit) break; + + System.out.print("\nThread " + id + " waiting for input: "); + try { + if (scanner.hasNextLine()) { + input = scanner.nextLine(); + } else { + break; + } + } catch (IllegalStateException | NoSuchElementException e) { + break; + } + + if (input.equalsIgnoreCase(EXIT_PHRASE)) { + System.out.println("\nThread " + id + " received exit command."); + shouldExit = true; + synchronized (inputLock) { + try { + scanner.close(); + } catch (Exception ignored) {} + } + break; + } + } + try { + inputQueue.put(input); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + + System.out.println("\nInput - Thread " + id + " exiting..."); + } + } + + @AllArgsConstructor + static class ConsumerTask implements Runnable { + private final int id; + + @Override + public void run() { + while (!queueFinished) { + try { + String item = inputQueue.take(); + + + if (EXIT_PHRASE.equals(item)) { + System.out.println("\nConsumer " + id + " received exit command. Exiting."); + break; + } + + System.out.println("\n[Message]: " + item + " (extracted by " + Thread.currentThread().getName() + ")"); + try { + Thread.sleep(1000); // Simulate processing + } catch (InterruptedException ignored) {} + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + + if (shouldExit && inputQueue.isEmpty()) { + queueFinished = true; + System.out.println("queueFinished is set true"); + break; + } + } + System.out.println("Consumer thread " + id + " exiting."); + } + } +} \ No newline at end of file diff --git a/src/main/java/threadpool/SimpleThreadPool.java b/src/main/java/threadpool/SimpleThreadPool.java new file mode 100644 index 0000000..1904d6e --- /dev/null +++ b/src/main/java/threadpool/SimpleThreadPool.java @@ -0,0 +1,63 @@ +package threadpool; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class SimpleThreadPool { + private final int THREAD_COUNT; + private final BlockingQueue poolQueue; + private final AtomicInteger currentThreadCount = new AtomicInteger(0); + private volatile boolean isShutdown = false; + + public SimpleThreadPool(int threadCount) { + this.THREAD_COUNT = threadCount; + this.poolQueue = new LinkedBlockingQueue<>(); + } + + public BlockingQueue getTaskQueue() { + return poolQueue; + } + + public void addToPoolQueue(Runnable task) { + if (isShutdown) { + throw new IllegalStateException("Thread pool is shutdown. Cannot accept new tasks."); + } + + poolQueue.offer(task); + + if (currentThreadCount.get() < THREAD_COUNT) { + createThread(); + } + } + + private synchronized void createThread() { + if (currentThreadCount.get() >= THREAD_COUNT) return; + + Thread worker = new Thread(() -> { + try { + while (!isShutdown || !poolQueue.isEmpty()) { + Runnable task = poolQueue.take(); + try { + task.run(); + } catch (Exception e) { + System.out.println("Task failed: " + e.getMessage()); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + System.out.println("currentThreadCount is " + currentThreadCount); + currentThreadCount.decrementAndGet(); + } + }); + + currentThreadCount.incrementAndGet(); + worker.start(); + } + + public void shutdown() { + isShutdown = true; + System.out.println("Thread pool shutting down..."); + } +} \ No newline at end of file diff --git a/src/main/resources/dummy-file b/src/main/resources/dummy-file new file mode 100644 index 0000000..e69de29 diff --git a/src/test/java/threadpool/SimpleThreadPoolTest.java b/src/test/java/threadpool/SimpleThreadPoolTest.java new file mode 100644 index 0000000..b280cdf --- /dev/null +++ b/src/test/java/threadpool/SimpleThreadPoolTest.java @@ -0,0 +1,31 @@ +package threadpool; + +public class SimpleThreadPoolTest { + public static void main(String[] args) throws InterruptedException { + int threadCount = 3; + int taskCount = 20; + + SimpleThreadPool pool = new SimpleThreadPool(threadCount); + + for (int i = 1; i <= taskCount; i++) { + int taskNumber = i; + pool.addToPoolQueue(() -> { + String threadName = Thread.currentThread().getName(); + System.out.println("Task " + taskNumber + " is running on " + threadName); + try { + Thread.sleep(500); // simulate work + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + System.out.println("Task " + taskNumber + " completed on " + threadName); + }); + + Thread.sleep(100); // Optional: delay to simulate real-world task submission + } + + // Wait for tasks to complete (not ideal but fine for this simple test) + Thread.sleep(5000); + + pool.shutdown(); + } +} diff --git a/src/test/java/threadpool/SimpleThreadPoolTests.java b/src/test/java/threadpool/SimpleThreadPoolTests.java new file mode 100644 index 0000000..c18ed79 --- /dev/null +++ b/src/test/java/threadpool/SimpleThreadPoolTests.java @@ -0,0 +1,65 @@ +package threadpool; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SimpleThreadPoolTests { + + @Test + @DisplayName("Scenario 1: 2 threads, 20 tasks") + void testTwoThreadsTwentyTasks() throws InterruptedException { + runScenario(2, 20); + } + + @Test + @DisplayName("Scenario 2: 10 threads, 20 tasks") + void testTenThreadsTwentyTasks() throws InterruptedException { + runScenario(10, 20); + } + + @Test + @DisplayName("Scenario 3: 10 threads, 5 tasks") + void testTenThreadsFiveTasks() throws InterruptedException { + runScenario(10, 5); + } + + @Test + @DisplayName("Scenario 4: 1 thread, 1 task") + void testOneThreadOneTask() throws InterruptedException { + runScenario(1, 1); + } + + private void runScenario(int threadCount, int taskCount) throws InterruptedException { + System.out.printf("\n--- Running Scenario: %d threads, %d tasks ---\n", threadCount, taskCount); + + SimpleThreadPool pool = new SimpleThreadPool(threadCount); + + for (int i = 1; i <= taskCount; i++) { + final int taskNumber = i; + pool.addToPoolQueue(() -> { + String threadName = Thread.currentThread().getName(); + System.out.printf("Task %d is running on %s\n", taskNumber, threadName); + try { + Thread.sleep(300); // Simulate work + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + System.out.printf("Task %d completed on %s\n", taskNumber, threadName); + }); + + Thread.sleep(20); // Optional: staggered submission + } + + // Estimate total time for tasks to complete + int estimatedMs = (int) Math.ceil((taskCount * 300.0) / threadCount) + 500; + Thread.sleep(estimatedMs); + + assertThat(pool.getTaskQueue()) + .as("Task queue should be empty after all tasks have been processed") + .isEmpty(); + + pool.shutdown(); + } +}