-
Notifications
You must be signed in to change notification settings - Fork 12
/
blockcyclic.py
294 lines (199 loc) · 8.43 KB
/
blockcyclic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import numpy as np
from mpi4py import MPI
# Map numpy type into MPI type
_typemap = { np.float32 : MPI.FLOAT,
np.float64 : MPI.DOUBLE,
np.complex128 : MPI.COMPLEX16 }
def ceildiv(x, y):
"""Round to ceiling division."""
return ((int(x) - 1) / int(y) + 1)
def pid_remap(p, p0, P):
return ((p + P - p0) % P)
def num_c_blocks(N, B):
"""Number of complete blocks globally."""
return int(N / B)
def num_blocks(N, B):
"""Total number of blocks globally."""
return ceildiv(N, B)
def num_c_lblocks(N, B, p, P):
"""Number of complete blocks locally."""
nbc = num_c_blocks(N, B)
return int(nbc / P) + int(1 if ((nbc % P) > p) else 0)
def num_lblocks(N, B, p, P):
"""Total number of local blocks."""
nb = num_blocks(N, B)
return int(nb / P) + int(1 if ((nb % P) > p) else 0)
def partial_last_block(N, B, p, P):
"""Is the last local block partial?"""
return ((N % B > 0) and ((num_c_blocks(N, B) % P) == p))
def num_rstride(N, B, stride):
"""Length of block strided row."""
return num_blocks(N, B) * stride
#size_t stride_page(size_t B, size_t itemsize) {
#
# size_t pl;
#
# pl = (size_t) sysconf (_SC_PAGESIZE) / itemsize;
# return ceildiv(B, pl) * pl;
#}
#size_t num_rpage(size_t N, size_t B, size_t itemsize) {
# return num_rstride(N, B, stride_page(B, itemsize));
#}
def numrc(N, B, p, P):
"""The number of rows/columns of the global array local to the process.
"""
# Number of complete blocks owned by the process.
nbp = num_c_lblocks(N, B, p, P)
# Number of entries of complete blocks owned by process.
n = nbp * B
# If this process owns an incomplete block, then add the number of entries.
if partial_last_block(N, B, p, P):
n += N % B
return n
def indices_rc(N, B, p, P):
"""The indices of the global array local to the process.
"""
nt = numrc(N, B, p, P)
nb = num_c_lblocks(N, B, p, P)
ind = np.zeros(nt)
ind[:(nb*B)] = ((np.arange(nb)[:, np.newaxis] * P + p)*B +
np.arange(B)[np.newaxis, :]).flatten()
if (nb * B < nt):
ind[(nb*B):] = (nb*P+p)*B + np.arange(nt - nb*P)
return ind
def mpi_readmatrix(fname, comm, gshape, dtype, blocksize, process_grid, order='F', displacement=0):
"""Distribute a block cyclic matrix read from a file.
Uses MPI-IO to do all the work.
Parameters
----------
fname : string
Name of file to read.
comm : mpi4py.MPI.COMM
MPI communicator to use. Must match with the one used by BLACS (if using
Scalapack).
gshape : (nrows, ncols)
Shape of the global matrix.
blocksize : (blockrows, blockcols)
Blocking size for distribution.
process_grid : (prows, pcols)
The shape of the process grid. Must be the same total size as
comm.Get_rank(), and match the BLACS grid (if using Scalapack).
order : 'F' or 'C', optional
Is the matrix on disk in Fortran (column major) 'F', or C (row major)
'C'. Defaults to Fortran ordered.
displacement : integer, optional
Use a displacement from the start of the file. That is ignore the first
`displacement` bytes.
Returns
-------
local_array : np.ndarray
The section of the array local to this process.
"""
if dtype not in _typemap:
raise Exception("Unsupported type.")
# Get MPI type
mpitype = _typemap[dtype]
# Sort out F, C ordering
if order not in ['F', 'C']:
raise Exception("Order must be 'F' (Fortran) or 'C'")
mpiorder = MPI.ORDER_FORTRAN if order=='F' else MPI.ORDER_C # Check what we need to do here for loading C
# ordered files.
# Get MPI process info
rank = comm.Get_rank()
size = comm.Get_size()
# Check process grid shape
if size != process_grid[0]*process_grid[1]:
raise Exception("MPI size does not match process grid.")
# Create distributed array view.
darr = mpitype.Create_darray(size, rank, gshape,
[MPI.DISTRIBUTE_CYCLIC, MPI.DISTRIBUTE_CYCLIC],
blocksize, process_grid, mpiorder)
darr.Commit()
# Get shape of loal segment
process_position = [int(rank / process_grid[1]), int(rank % process_grid[1])]
lshape = map(numrc, gshape, blocksize, process_position, process_grid)
# Check to see if they type has the same shape.
if lshape[0]*lshape[1] != darr.Get_size() / mpitype.Get_size():
raise Exception("Strange mismatch is local shape size.")
# Create the local array
local_array = np.empty(lshape, dtype=dtype, order=order)
# Open the file, and read out the segments
f = MPI.File.Open(comm, fname, MPI.MODE_RDONLY)
f.Set_view(displacement, mpitype, darr, "native")
f.Read_all(local_array)
f.Close()
return local_array
def mpi_writematrix(fname, local_array, comm, gshape, dtype,
blocksize, process_grid, order='F', displacement=0):
"""Write a block cyclic distributed matrix to a file.
Uses MPI-IO to do all the work.
Parameters
----------
fname : string
Name of file to read.
local_array : np.ndarray
The array to write.
comm : mpi4py.MPI.COMM
MPI communicator to use. Must match with the one used by BLACS (if using
Scalapack).
gshape : (nrows, ncols)
Shape of the global matrix.
blocksize : (blockrows, blockcols)
Blocking size for distribution.
process_grid : (prows, pcols)
The shape of the process grid. Must be the same total size as
comm.Get_rank(), and match the BLACS grid (if using Scalapack).
order : 'F' or 'C', optional
Is the matrix on disk in Fortran (column major) 'F', or C (row major)
'C'. Defaults to Fortran ordered.
displacement : integer, optional
Use a displacement from the start of the file. That is ignore the first
`displacement` bytes.
"""
if dtype not in _typemap:
raise Exception("Unsupported type.")
# Get MPI type
mpitype = _typemap[dtype]
# Sort out F, C ordering
if order not in ['F', 'C']:
raise Exception("Order must be 'F' (Fortran) or 'C'")
mpiorder = MPI.ORDER_FORTRAN if order=='F' else MPI.ORDER_C # Check what we need to do here for loading C
# ordered files.
# Get MPI process info
rank = comm.Get_rank()
size = comm.Get_size()
# Check process grid shape
if size != process_grid[0]*process_grid[1]:
raise Exception("MPI size does not match process grid.")
# Create distributed array view.
darr = mpitype.Create_darray(size, rank, gshape,
[MPI.DISTRIBUTE_CYCLIC, MPI.DISTRIBUTE_CYCLIC],
blocksize, process_grid, mpiorder)
darr.Commit()
# Check to see if they type has the same shape.
if local_array.size != darr.Get_size() / mpitype.Get_size():
raise Exception("Local array size is not consistent with array description.")
# Length of filename required for write (in bytes).
filelength = displacement + gshape[0]*gshape[1]
print filelength, darr.Get_size()
# Open the file, and read out the segments
f = MPI.File.Open(comm, fname, MPI.MODE_RDWR | MPI.MODE_CREATE)
# Preallocate to ensure file is long enough for writing.
f.Preallocate(filelength)
f.Set_view(displacement, mpitype, darr, "native")
f.Write_all(local_array)
f.Close()
def bc_matrix_forward(src, blocksize, process, process_grid, order='F', dest=None):
gshape = src.shape
lshape = map(numrc, gshape, blocksize, process, process_grid)
if dest is None:
dest = np.empty(lshape, dtype=src.dtype, order='F')
cblocks = map(num_c_blocks, gshape, blocksize)
lcblocks = map(num_c_blocks, gshape, blocksize, process, process_grid)
partial = map(partial_last_block, gshape, blocksize, process, process_grid)
clen = cblocks[0]*blocksize[0], cblocks[1]*blocksize[1]
q1 = src[:clen[0], :clen[1]].reshape((blocksize[0], cblocks[0], blocksize[1], cblocks[1]))
q2 = src[:clen[0], clen[1]:].reshape((blocksize[0], cblocks[0], -1))
q3 = src[clen[0]:, :clen[1]].reshape((-1, blocksize[1], cblocks[1]))
q4 = src[clen[0]:, clen[1]:].reshape((gshape[0] - clen[0], gshape[1] - clen[1]))
#q1[:, process[0]::process_grid[0], :, process[1]::process_grid[1]].reshape()