-
Notifications
You must be signed in to change notification settings - Fork 9
/
vec-segfault.py
116 lines (88 loc) · 3.17 KB
/
vec-segfault.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# https://arxiv.org/pdf/1301.1071.pdf "Direct TSQR"
from parla.multiload import multiload_contexts as VECs
import os
import sys
import getopt
import concurrent.futures
import time
import queue
# Default values, can override with command line args
ROWS = 240000 # Must be >> COLS
COLS = 1000
BLOCK_SIZE = 3000 # Must be >= COLS
MAX_WORKERS = 11 # Must be < 12 due to limited VECs
THREADS_PER_WORKER = 6 # For OMP_NUM_THREADS
main_VEC = None
locks = None
VEC_q = queue.Queue()
# Accepts a matrix and returns a list of its blocks
# block_size rows are grouped together
def make_blocked(A, block_size):
nrows = A.shape[0]
nblocks = (nrows + block_size - 1) // block_size # ceiling division
block_list = []
for i in range(0, nblocks):
lower = i * block_size; # first row in block, inclusive
upper = (i + 1) * block_size # last row in block, exclusive
if upper > nrows:
upper = nrows
block_list.append(A[lower:upper])
return block_list, nblocks
# Get back to original matrix form
def unblock(A):
return np.concatenate([fixarr(a) for a in A])
def VEC_qr(A):
# Acquire lock
VEC_id = VEC_q.get()
mystring = ['|' for x in range(MAX_WORKERS)]
mystring[VEC_id] = 'x'
print(mystring)
with VECs[VEC_id]:
Q, R = np.linalg.qr(fixarr(A))
mystring = ['|' for x in range(MAX_WORKERS)]
mystring[VEC_id] = 'o'
print(mystring)
# Release Lock
VEC_q.task_done()
VEC_q.put(VEC_id)
return Q, R
def tsqr_blocked_multi(A, block_size, workers):
with concurrent.futures.ThreadPoolExecutor(max_workers = workers) as executor:
if COLS > BLOCK_SIZE:
print('Block size must be greater than or equal to the number of columns in the input matrix', file=sys.stderr)
exit(1)
with main_VEC:
A_blocked, nblocks = make_blocked(A, block_size)
print('ENTERING PARALLEL SECTION')
# Each thread gets a block from A_blocked to run numpy's build-in qr factorization on
block_results = executor.map(VEC_qr, A_blocked)
# Regroup results
with main_VEC:
Q1 = []
R1 = []
for result in block_results:
Q1.append(result[0])
R1.append(result[1])
print('FINISHED FIRST PARALLEL SECTION (segfault fixed if this prints)')
def fixarr(A):
return np.asarray(memoryview(A))
if __name__ == "__main__":
print('ROWS=', ROWS, ' COLS=', COLS, ' BLOCK_SIZE=', BLOCK_SIZE, ' MAX_WORKERS=', MAX_WORKERS, 'THREADS_PER_WORKER=', THREADS_PER_WORKER, sep='')
# Set up VEC's
for i in range(MAX_WORKERS):
# Limit thread count here
VECs[i].setenv('OMP_NUM_THREADS', str(THREADS_PER_WORKER))
with VECs[i]:
import numpy as np
# Populate VEC queue
VEC_q.put(i)
# Reserve last context for single threaded stuff
# Unlimited threads, don't setenv
with VECs[MAX_WORKERS]:
import numpy as np
main_VEC = VECs[MAX_WORKERS]
with main_VEC:
# Original matrix
A = np.random.rand(ROWS, COLS)
# Multithreaded blocked version with VECs
Q, R = tsqr_blocked_multi(A, BLOCK_SIZE, MAX_WORKERS)