/
package.scala
139 lines (120 loc) · 5.28 KB
/
package.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package io.iteratee
import cats.effect.Sync
import io.iteratee.internal.Step
import java.io.{
BufferedInputStream,
BufferedOutputStream,
BufferedReader,
BufferedWriter,
File,
FileInputStream,
FileOutputStream,
FileReader,
FileWriter,
InputStream,
InputStreamReader,
OutputStream,
OutputStreamWriter
}
import java.util.zip.{ ZipEntry, ZipFile }
import scala.Predef.genericArrayOps
import scala.collection.JavaConverters._
package object files {
def readLines[F[_]](file: File)(implicit F: Sync[F]): Enumerator[F, String] =
enumerateLines(new BufferedReader(new FileReader(file)))
def readLinesFromStream[F[_]](stream: InputStream)(implicit F: Sync[F]): Enumerator[F, String] =
enumerateLines(new BufferedReader(new InputStreamReader(stream)))
def readBytes[F[_]](file: File)(implicit F: Sync[F]): Enumerator[F, Array[Byte]] =
enumerateBytes(new BufferedInputStream(new FileInputStream(file)))
def readBytesFromStream[F[_]](stream: InputStream)(implicit F: Sync[F]): Enumerator[F, Array[Byte]] =
enumerateBytes(new BufferedInputStream(stream))
def readZipStreams[F[_]](file: File)(implicit F: Sync[F]): Enumerator[F, (ZipEntry, InputStream)] =
Enumerator.liftM(F.delay(new ZipFile(file))).flatMap { zipFile =>
new ZipFileEnumerator(zipFile, zipFile.entries.asScala).ensure(F.delay(zipFile.close()))
}
def listFiles[F[_]](dir: File)(implicit F: Sync[F]): Enumerator[F, File] =
Enumerator.liftM(F.delay(dir.listFiles)).flatMap {
case null => Enumerator.empty[F, File]
case files => Enumerator.enumVector(Vector(files: _*))
}
def listFilesRec[F[_]](dir: File)(implicit F: Sync[F]): Enumerator[F, File] = listFiles[F](dir).flatMap {
case item if item.isDirectory => listFilesRec(item)
case item => Enumerator.enumOne(item)
}
def writeLines[F[_]](file: File)(implicit F: Sync[F]): Iteratee[F, String, Unit] =
Iteratee.liftM(F.delay(new BufferedWriter(new FileWriter(file)))).flatMap { writer =>
Iteratee
.foldM[F, String, Unit](())((_, line) =>
F.delay {
writer.write(line)
writer.newLine()
}
)
.ensure(F.delay(writer.close()))
}
def writeLinesToStream[F[_]](stream: OutputStream)(implicit F: Sync[F]): Iteratee[F, String, Unit] =
Iteratee.liftM(F.delay(new BufferedWriter(new OutputStreamWriter(stream)))).flatMap { writer =>
Iteratee
.foldM[F, String, Unit](())((_, line) =>
F.delay {
writer.write(line)
writer.newLine()
}
)
.ensure(F.delay(writer.close()))
}
def writeBytes[F[_]](file: File)(implicit F: Sync[F]): Iteratee[F, Array[Byte], Unit] =
Iteratee.liftM(F.delay(new BufferedOutputStream(new FileOutputStream(file)))).flatMap { stream =>
Iteratee
.foldM[F, Array[Byte], Unit](())((_, bytes) => F.delay(stream.write(bytes)))
.ensure(F.delay(stream.close()))
}
def writeBytesToStream[F[_]](stream: OutputStream)(implicit F: Sync[F]): Iteratee[F, Array[Byte], Unit] =
Iteratee.liftM(F.delay(new BufferedOutputStream(stream))).flatMap { stream =>
Iteratee
.foldM[F, Array[Byte], Unit](())((_, bytes) => F.delay(stream.write(bytes)))
.ensure(F.delay(stream.close()))
}
private[this] def enumerateLines[F[_]](reader: => BufferedReader)(implicit F: Sync[F]): Enumerator[F, String] =
Enumerator.liftM(F.delay(reader)).flatMap(reader => new LineEnumerator(reader).ensure(F.delay(reader.close())))
private[this] def enumerateBytes[F[_]](stream: => InputStream)(implicit F: Sync[F]): Enumerator[F, Array[Byte]] =
Enumerator.liftM(F.delay(stream)).flatMap(reader => new ByteEnumerator(stream).ensure(F.delay(stream.close())))
private[this] final class LineEnumerator[F[_]](reader: BufferedReader)(implicit F: Sync[F])
extends Enumerator[F, String] {
final def apply[A](s: Step[F, String, A]): F[Step[F, String, A]] =
if (s.isDone) F.pure(s)
else
F.flatMap(F.delay(reader.readLine())) {
case null => F.pure(s)
case line => F.flatMap(s.feedEl(line))(apply)
}
}
private[this] final class ByteEnumerator[F[_]](stream: InputStream, bufferSize: Int = 8192)(implicit F: Sync[F])
extends Enumerator[F, Array[Byte]] {
final def apply[A](s: Step[F, Array[Byte], A]): F[Step[F, Array[Byte], A]] =
if (s.isDone) F.pure(s)
else
F.flatten(
F.delay {
val array = new Array[Byte](bufferSize)
val bytesRead = stream.read(array, 0, bufferSize)
val read = if (bytesRead == bufferSize) array else array.slice(0, bytesRead)
if (bytesRead == -1) F.pure(s) else F.flatMap(s.feedEl(read))(apply(_))
}
)
}
private[this] final class ZipFileEnumerator[F[_]](zipFile: ZipFile, iterator: Iterator[ZipEntry])(implicit F: Sync[F])
extends Enumerator[F, (ZipEntry, InputStream)] {
final def apply[A](s: Step[F, (ZipEntry, InputStream), A]): F[Step[F, (ZipEntry, InputStream), A]] =
if (s.isDone) F.pure(s)
else
F.flatten(
F.delay(
if (iterator.hasNext) {
val entry = iterator.next
F.flatMap(s.feedEl((entry, zipFile.getInputStream(entry))))(apply)
} else F.pure(s)
)
)
}
}