Skip to content

Commit

Permalink
release 1.368.23092
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed Apr 2, 2023
2 parents 4561a24 + 13c3e9d commit 74b14a4
Show file tree
Hide file tree
Showing 18 changed files with 412 additions and 790 deletions.
29 changes: 16 additions & 13 deletions mo_streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from mo_streams.function_factory import it
from mo_streams.object_stream import ObjectStream, ERROR, WARNING, NONE
from mo_streams.string_stream import StringStream
from mo_streams.type_utils import Typer, CallableTyper
from mo_streams.type_utils import Typer, CallableTyper, StreamTyper, LazyTyper


def stream(value):
Expand All @@ -29,11 +29,7 @@ def stream(value):
if not kv:
return EmptyStream()
_, example = kv
return ObjectStream(
((v, {"key": k}) for k, v in value.items()),
Typer(example=example),
JxType(key=JX_TEXT),
)
return ObjectStream(((v, {"key": k}) for k, v in value.items()), Typer(example=example), JxType(key=JX_TEXT),)
elif isinstance(value, bytes):
return ByteStream(Reader(iter([value])))
elif isinstance(value, str):
Expand All @@ -43,18 +39,18 @@ def stream(value):
elif isinstance(value, Stream):
return value
elif isinstance(value, type(range(1))):
return ObjectStream(
((v, {}) for v in value), Typer(example=value.stop), JxType()
)
return ObjectStream(((v, {}) for v in value), Typer(example=value.stop), JxType())
elif is_finite(value):
example = first(value)

def read_from_list():
for v in value:
yield v, {}

return ObjectStream(read_from_list(), Typer(example=example), JxType())
elif is_many(value):
example = first(value)

def read():
yield example, {}
for v in value:
Expand All @@ -66,10 +62,17 @@ def read():


ANNOTATIONS = {
(str, "encode"): CallableTyper(python_type=bytes),
(File_usingStream, "content"): CallableTyper(python_type=ByteStream),
(File, "content"): CallableTyper(python_type=ByteStream),
(ByteStream, "utf8"): CallableTyper(python_type=StringStream),
(str, "encode"): CallableTyper(return_type=bytes),
(File_usingStream, "content"): CallableTyper(return_type=ByteStream),
(File, "content"): CallableTyper(return_type=ByteStream),
(ByteStream, "utf8"): CallableTyper(return_type=StringStream),
(StringStream, "lines"): CallableTyper(return_type=StreamTyper(
member_type=Typer(python_type=str), _schema=JxType()
)),
(ByteStream, "lines"): CallableTyper(return_type=StreamTyper(
member_type=Typer(python_type=str), _schema=JxType()
)),
(ObjectStream, "map"): CallableTyper(return_type=StreamTyper(member_type=LazyTyper(), _schema=JxType())),
}

export("mo_streams.object_stream", stream)
Expand Down
11 changes: 4 additions & 7 deletions mo_streams/byte_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)


DEBUG = True
DEBUG = False


class ByteStream(Stream):
Expand All @@ -46,8 +46,7 @@ def read():
with ZipFile(reader, mode="r") as archive:
for info in archive.filelist:
yield File_usingStream(
info.filename,
lambda: ByteStream(archive.open(info.filename, "r")),
info.filename, lambda: ByteStream(archive.open(info.filename, "r")),
), {"name": info.filename}
finally:
reader.close()
Expand Down Expand Up @@ -78,9 +77,7 @@ def file(info):
# directories
return File_usingStream(info.name, lambda: None)
else:
return File_usingStream(
info.name, lambda: ByteStream(tf.extractfile(info))
)
return File_usingStream(info.name, lambda: ByteStream(tf.extractfile(info)))

def read():
try:
Expand Down Expand Up @@ -108,7 +105,7 @@ def read():
return StringStream(read())

def lines(self):
return self.utf8.lines
return self.utf8().lines()

def chunk(self, size=8192):
return ObjectStream(chunk_bytes(self.reader, size), b"", bytes, {}, JxType())
Expand Down
1 change: 0 additions & 1 deletion mo_streams/empty_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


class EmptyStream(Stream):

def to_dict(self):
return {}

Expand Down
Loading

0 comments on commit 74b14a4

Please sign in to comment.