Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor towards sinks and sources other than CSVs #1

Merged
merged 3 commits into from
May 3, 2022

Conversation

tlazaro
Copy link
Collaborator

@tlazaro tlazaro commented Apr 21, 2022

I wanted to be able to read from sources that were not CSV. The Row[A] is the only conduit for encoding and decoding an A and that won't necessarily work for other sources.

Tried to do minimal changes, left CSV as the implementation in most places, didn't change the signature of Event.source but added one that can take a PipeCodec[A] instead.

The PipeCodec[A] interface is not well thought out, I hacked it so I could replace the CSV hardcoded implementation but it will need changes.

@tlazaro tlazaro requested a review from johnynek April 21, 2022 18:29
@johnynek
Copy link
Owner

We need to set up CI.

Also I could just make this OSS as a reference implementation of the talk I gave. Really no reason to be private.

@tlazaro
Copy link
Collaborator Author

tlazaro commented Apr 21, 2022

I can add CI, do you have an example in mind I can copy it from? Otherwise I can figure it out.

@johnynek
Copy link
Owner

I think this plugin basically solves all the problems:

https://github.com/typelevel/sbt-typelevel

But it does require a bit of setup.

Copy link
Owner

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some nits, you can take or leave then merge.

* @param name a unique name for a given source. To run, this name has to be connected to an input path
* @param validator the validator to check each input value and extract the timestamp
*/
def genericSource[A: PipeCodec](name: String, validator: Validator[A]): Event.Source[A] =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this should be called def source and the one above should be called def csvSource or something.

* @tparam A
*/
trait PipeCodec[A] {
def pipe[F[_]: RaiseThrowable: Sync]: Pipe[F, Byte, A]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd probably call this def decode or something.

def csv[A: Row](skipHeader: Boolean = true): PipeCodec[A] =
new CsvCodec[A](skipHeader)

def json[A: Codec](): PipeCodec[A] = new JsonCodec[A]()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: def json[A: Codec]: PipeCodec[A] = ... no need for ()

@johnynek johnynek mentioned this pull request Apr 22, 2022
@tlazaro
Copy link
Collaborator Author

tlazaro commented May 3, 2022

Made the fixes and rebased to #4

@johnynek
Copy link
Owner

johnynek commented May 3, 2022

looks like we need to reformat.

Also the scala 2.13 version seems really old. We should probably look at that.

@tlazaro
Copy link
Collaborator Author

tlazaro commented May 3, 2022

Sure, after this is done I will check the Scala and dependencies versions.

@tlazaro tlazaro merged commit 142a774 into oscar/frdcandle May 3, 2022
@tlazaro tlazaro deleted the tlazaro/pipe-codec branch May 3, 2022 20:30
tlazaro added a commit that referenced this pull request Sep 11, 2022
Refactor towards sinks and sources other than CSVs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants