Skip to content

Commit

Permalink
Updated
Browse files Browse the repository at this point in the history
  • Loading branch information
pateash committed Jun 5, 2024
1 parent 2e76aad commit 81db5c0
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 58 deletions.
8 changes: 0 additions & 8 deletions src/main/java/live/ashish/cpjava/interview/Main.java

This file was deleted.

21 changes: 0 additions & 21 deletions src/main/java/live/ashish/cpjava/interview/Mather.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package live.ashish.cpjava.systemdesign.genericreaderwriter;

import live.ashish.cpjava.systemdesign.genericreaderwriter.interfaces.Writer;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.BlockingQueue;

@Data
@AllArgsConstructor
public class KafkaWriter implements Writer, Runnable {
BlockingQueue<Message> queue;

//
@Override
public void writes() throws InterruptedException {
// reading continously from a queue and writing messages to sink
while (true) {
// get a message from queue
Message message = queue.take();// this will wait for any message
System.out.println("\n[WRITER] Got message: " + message);
// writes into kafka
}
}

@Override
public void run() {
try {
writes();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package live.ashish.cpjava.systemdesign.genericreaderwriter;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/*
* Write an application which consumes data from any source and writes to any desired sink.
The interface should be open for any kind of source or sink.
For the scope of this interview, you can consider the source to be a continuous stream of files being added to a local directory like resources/ or tmp/.
You can assume the sink to be an event based system like PubSub/Kafka etc. For the scope of this interview, you can consider the sink to be an in-memory queue.
Focus on getting a working solution with decent abstractions and failure handling by the end of this interview.
+=====+
Source => file/sftp/s3 etc.
Target => messaging system /kafka/pubsub etc.
Data size (input) => Array of characters ( in memory )
data output => same
* Input => TextInputreader ( line delimited ) hello world
Output => ever message should be line delimited ( define by reader )
* */
public class Main {

public static void main(String[] args) {
String path="/Users/ashishpatel/pateash/cpjava/src/main/resources/tmp";
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(100);
new Thread(new TextReader(queue, path)).start();
new Thread(new KafkaWriter(queue)).start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package live.ashish.cpjava.systemdesign.genericreaderwriter;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data @AllArgsConstructor
public class Message {
String line;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package live.ashish.cpjava.systemdesign.genericreaderwriter;

import live.ashish.cpjava.systemdesign.genericreaderwriter.interfaces.Reader;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Data
@AllArgsConstructor
public class TextReader implements Reader, Runnable {
BlockingQueue<Message> queue;
String path;
static long tsOffset = 0;// epochtime

@Override
public void reads() throws IOException {
// reading continously from a file and writing messages to queue
while (true) {
File filepath = new File(path);
// FileFilter fileFilter
Stream<File> fileStream = Arrays.stream(filepath.listFiles()); // assumption sorted by modifiedts
List<File> filesToProcess = fileStream.filter(file -> file.lastModified() > tsOffset).collect(Collectors.toList());
for (File f : filesToProcess) {
// processing this file and read it add to message
BufferedReader bufferedReader = new BufferedReader(new FileReader(f));
String line;
while ((line = bufferedReader.readLine()) != null) {
// tsOffset = f.lastModified();
try {
Message message = new Message(line);
System.out.println("\n[READER] Sending message: " + message);
queue.put(message); // waits until a message available
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
tsOffset = f.lastModified(); // this can lead to duplicates in case of in bw file reboots
}
}
}

@Override
public void run() {
try {
reads();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package live.ashish.cpjava.systemdesign.genericreaderwriter.interfaces;

import java.io.FileNotFoundException;
import java.io.IOException;

public interface Reader {

void reads() throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package live.ashish.cpjava.systemdesign.genericreaderwriter.interfaces;

public interface Writer {
void writes() throws InterruptedException;
}
28 changes: 28 additions & 0 deletions src/main/resources/tmp/test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
hello world
how are you?
29 changes: 0 additions & 29 deletions src/test/java/live/ashish/cpjava/interview/MatherTest.java

This file was deleted.

0 comments on commit 81db5c0

Please sign in to comment.