-
Notifications
You must be signed in to change notification settings - Fork 16
/
zip.jl
96 lines (86 loc) · 3.15 KB
/
zip.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
struct DiskZip{Is<:Tuple}
is::Is
end
Base.iterate(dz::DiskZip) = Base.iterate(Iterators.Zip(dz.is))
Base.iterate(dz::DiskZip, i) = Base.iterate(Iterators.Zip(dz.is), i)
Base.first(dz::DiskZip) = Base.first(Iterators.Zip(dz.is))
Base.last(dz::DiskZip) = Base.last(Iterators.Zip(dz.is))
Base.length(dz::DiskZip) = Base.length(Iterators.Zip(dz.is))
Base.size(dz::DiskZip) = Base.size(Iterators.Zip(dz.is))
function Base.IteratorSize(::Type{DiskZip{Is}}) where {Is<:Tuple}
return Base.IteratorSize(Iterators.Zip{Is})
end
function Base.IteratorEltype(::Type{DiskZip{Is}}) where {Is<:Tuple}
return Base.IteratorEltype(Iterators.Zip{Is})
end
# Rechunk using the chunks of the first Chunked array
# This forces the iteration order to be the same for
# all arrays.
function DiskZip(As::AbstractArray...)
map(As) do A
size(A) == size(first(As)) ||
throw(DimensionMismatch("Arrays zipped with disk arrays must be the same size"))
end
# Get the chunkes of the first Chunked array
chunks = reduce(As; init=nothing) do acc, A
if isnothing(acc) && (haschunks(A) isa Chunked)
eachchunk(A)
else
acc
end
end
if isnothing(chunks)
return DiskZip(As)
else
rechunked = map(As) do A
RechunkedDiskArray(A, chunks)
end
return DiskZip(rechunked)
end
end
# For now we only allow zip on exact same-sized arrays
function DiskZip(As...)
throw(
ArgumentError("zip on disk arrays only works with other same-sized AbstractArray")
)
end
# Collect zipped disk arrays in the right order
function Base.collect(dz::DiskZip)
out = similar(first(dz.is), eltype(dz))
i = iterate(dz)
for I in eachindex(first(dz.is))
out[I] = first(i)
i = iterate(dz, last(i))
end
return out
end
_zip_error() = throw(ArgumentError("Cannot `zip` a disk array with an iterator"))
function Base.zip(A1::AbstractDiskArray, A2::AbstractDiskArray, As::AbstractArray...)
return DiskZip(A1, A2, As...)
end
function Base.zip(A1::AbstractDiskArray, A2::AbstractArray, As::AbstractArray...)
return DiskZip(A1, A1, As...)
end
function Base.zip(A1::AbstractArray, A2::AbstractDiskArray, As::AbstractArray...)
return DiskZip(A1, A2, As...)
end
Base.zip(::AbstractDiskArray, x, xs...) = _zip_error()
Base.zip(x, ::AbstractDiskArray, xs...) = _zip_error()
Base.zip(x::AbstractDiskArray, ::AbstractDiskArray, xs...) = _zip_error()
macro implement_zip(t)
t = esc(t)
quote
Base.zip(A1::$t, A2::$t, As::AbstractArray...) = $DiskZip(A1, A2, As...)
Base.zip(A1::$t, A2::AbstractArray, As::AbstractArray...) = $DiskZip(A1, A2, As...)
Base.zip(A1::AbstractArray, A2::$t, As::AbstractArray...) = $DiskZip(A1, A2, As...)
function Base.zip(A1::AbstractDiskArray, A2::$t, As::AbstractArray...)
return $DiskZip(A1, A2, As...)
end
function Base.zip(A1::$t, A2::AbstractDiskArray, As::AbstractArray...)
return $DiskZip(A1, A2, As...)
end
Base.zip(::$t, x, xs...) = $_zip_error()
Base.zip(x, ::$t, xs...) = $_zip_error()
Base.zip(::$t, ::$t, xs...) = $_zip_error()
end
end