New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#3649 - Add ZStream.fromTcpSocketServer #3677
#3649 - Add ZStream.fromTcpSocketServer #3677
Conversation
8a969f1
to
ead4739
Compare
@regis-leray Nice work. This is looking good. Can we switch to using |
ead4739
to
f107be1
Compare
@iravid thanks for the the tips asynchronous, i tested locally it works. missing test and it will be ready to merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, this looks great.
Left a few more requests. Tests can go in ZStreamPlatformSpecificSpec.scala
.
f107be1
to
bdd2b7c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few minor suggestions while this is still open.
bdd2b7c
to
b390785
Compare
def fromSocketServer( | ||
port: Int, | ||
host: Option[String] = None | ||
): ZStream[Blocking, Throwable, UManaged[Connection]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about using ZManaged#scope to pull the manageds into the stream itself?
This leads to a far more ergonomic signature and is still resource safe as all connections will be closed when the stream is closed.
Something like this https://github.com/zio/zio-keeper/blob/master/keeper/src/main/scala/zio/keeper/transport/tcp.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think that'd be good. ZStream[Connection]
is indeed nicer than ZStream[Managed[Connection]]
. What do you think @regis-leray?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
We just need a test or two after this @regis-leray and this should be good to go. |
3b0ac92
to
ac218e8
Compare
@iravid sorry for the delay. I provided some test. |
ac218e8
to
849ebef
Compare
@iravid i added some javadoc |
9fcc4d8
to
dd1bd2a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last tweak then good to merge. Thanks @regis-leray
} | ||
) | ||
} | ||
} >>= (ZStream.managed(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is subtly wrong, because it means that as soon as you progress to the next connection, the previous connection is closed. Instead, let's use ZManaged.scope
here:
for {
server <- ZStream.managed(...)
registerConnection <- ZStream.managed(ZManaged.scope)
conn <- ZStream.repeatEffect {
IO.effectAsync(...).flatMap(managedConn => registerConnection(managedConn).map(_._2))
}
} yield conn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tip ! i was not aware of this ZManaged.scope
dd1bd2a
to
d9a0956
Compare
Great work @regis-leray, thank you! |
closes #3649