Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'converg' into develop

  • Loading branch information...
commit 90df3b91cd5cb3a7bbab7d51eba3c324e9675005 2 parents ad8592c + 3095d39
Nikita Zhiltsov authored
Showing with 194 additions and 40 deletions.
  1. +134 −0 handythread.py
  2. +60 −40 rescal.py
134 handythread.py
View
@@ -0,0 +1,134 @@
+import sys
+import time
+import threading
+from itertools import izip, count
+
+def foreach(f,l,ARk, X, A, threads=4,return_=False):
+ """
+ Apply f to each element of l, in parallel
+ """
+
+ if threads>1:
+ iteratorlock = threading.Lock()
+ exceptions = []
+ if return_:
+ n = 0
+ d = {}
+ i = izip(count(),l.__iter__())
+ else:
+ i = l.__iter__()
+
+
+ def runall():
+ while True:
+ iteratorlock.acquire()
+ try:
+ try:
+ if exceptions:
+ return
+ v = i.next()
+ finally:
+ iteratorlock.release()
+ except StopIteration:
+ return
+ try:
+ if return_:
+ n,x = v
+ d[n] = f(x, ARk, X, A)
+ else:
+ f(v, ARk, X[i], A)
+ except:
+ e = sys.exc_info()
+ iteratorlock.acquire()
+ try:
+ exceptions.append(e)
+ finally:
+ iteratorlock.release()
+
+ threadlist = [threading.Thread(target=runall) for j in xrange(threads)]
+ for t in threadlist:
+ t.start()
+ for t in threadlist:
+ t.join()
+ if exceptions:
+ a, b, c = exceptions[0]
+ raise a, b, c
+ if return_:
+ r = d.items()
+ r.sort()
+ return [v for (n,v) in r]
+ else:
+ if return_:
+ return [f(v, ARk, X, A) for v in l]
+ else:
+ for v in l:
+ f(v, ARk, X, A)
+ return
+
+def parallel_map(f,l,ARk, X, A,threads=4):
+ return foreach(f,l,ARk, X, A, threads=threads,return_=True)
+
+def parallel_map2(f,l,ARki, A,threads=4):
+ return foreach2(f,l,ARki, A, threads=threads,return_=True)
+
+def foreach2(f,l,ARki, A, threads=4,return_=False):
+ """
+ Apply f to each element of l, in parallel
+ """
+
+ if threads>1:
+ iteratorlock = threading.Lock()
+ exceptions = []
+ if return_:
+ n = 0
+ d = {}
+ i = izip(count(),l.__iter__())
+ else:
+ i = l.__iter__()
+
+
+ def runall():
+ while True:
+ iteratorlock.acquire()
+ try:
+ try:
+ if exceptions:
+ return
+ v = i.next()
+ finally:
+ iteratorlock.release()
+ except StopIteration:
+ return
+ try:
+ if return_:
+ n,x = v
+ d[n] = f(x, ARki, A)
+ else:
+ f(v, ARki, A)
+ except:
+ e = sys.exc_info()
+ iteratorlock.acquire()
+ try:
+ exceptions.append(e)
+ finally:
+ iteratorlock.release()
+
+ threadlist = [threading.Thread(target=runall) for j in xrange(threads)]
+ for t in threadlist:
+ t.start()
+ for t in threadlist:
+ t.join()
+ if exceptions:
+ a, b, c = exceptions[0]
+ raise a, b, c
+ if return_:
+ r = d.items()
+ r.sort()
+ return [v for (n,v) in r]
+ else:
+ if return_:
+ return [f(v, ARki, A) for v in l]
+ else:
+ for v in l:
+ f(v, ARki, A)
+ return
100 rescal.py
View
@@ -11,6 +11,10 @@
import os
import fnmatch
import carray as ca
+import handythread
+import operator
+import itertools
+from multiprocessing import Pool, Process, Manager, Array
__version__ = "0.1"
__all__ = ['rescal', 'rescal_with_random_restarts']
@@ -23,6 +27,9 @@
logging.basicConfig(filename='rescal.log',filemode='w', level=logging.DEBUG)
_log = logging.getLogger('RESCAL')
+ARk = ca.zeros((1,1))
+Aglobal = ca.zeros((1,1))
+Xiglobal = ca.zeros((1,1))
def rescal_with_random_restarts(X, rank, restarts=10, **kwargs):
"""
@@ -82,6 +89,20 @@ def squareOfMatrix(M):
matrix[i,j] = dot(M[:,i], M[:,j])
return matrix
+def ARAtFunc(j, ARki, A):
+ """
+ Computes the j-th row of the matrix ARk * A^T
+ """
+ return dot(ARki, A[j,:])
+
+def fitNorm(i):
+ """
+ Computes the squared Frobenius norm of the i-th fitting matrix row
+ """
+ n, r = A.shape
+ ARAtValues = handythread.parallel_map2(ARAtFunc, range(n), ARk[i,:], Aglobal, threads=7)
+ return norm(Xiglobal.getrow(i).todense() - ARAtValues)**2
+
def rescal(X, rank, **kwargs):
"""
RESCAL
@@ -152,7 +173,6 @@ def rescal(X, rank, **kwargs):
# precompute norms of X
normX = [squareFrobeniusNormOfSparse(M) for M in X]
_log.debug('[Config] finished precomputing norms')
- Xflat = [M for M in X]
sumNormX = sum(normX)
# initialize A
@@ -181,15 +201,15 @@ def rescal(X, rank, **kwargs):
# R = __updateR(X, A, lmbda)
# compute factorization
- fit = fitchange = fitold = f = regularizedFit = 0
+ fit = fitchange = fitold = 0
exectimes = []
- Aold = None
- Rold = None
for iter in xrange(maxIter):
tic = time.clock()
A = __updateA(X, A, R, lmbda)
+ global Aglobal
+ Aglobal = A
if proj:
Q, A2 = qr(A)
X2 = __projectSlices(X, Q)
@@ -199,45 +219,45 @@ def rescal(X, rank, **kwargs):
# R = __updateR(X, A, lmbda)
# compute fit values
- if Aold != None and Rold != None:
- Rfit = 0
+ regularizedFit = 0
+ if lmbda != 0:
+ regRFit = 0
+ for i in range(len(R)):
+ regRFit += norm(R[i])**2
+ regularizedFit = lmbda*(norm(A)**2) + lmbda*regRFit
- if lmbda != 0:
- regRFit = 0
- for i in range(len(R)):
- Rfit += norm(R[i] - Rold[i])**2
- regRFit += norm(R[i])**2
- fit = norm(minus(A, Aold))**2 + Rfit
- regularizedFit = fit + lmbda*(norm(A)**2) + lmbda*regRFit
- else :
- for i in range(len(R)):
- Rfit += norm(R[i] - Rold[i])**2
- fit = norm(minus(A, Aold))**2 + Rfit
+ fit = 0
+ for i in range(len(R)):
+ global ARk
+ ARk = dotAsCArray(A, R[i])
+ global Xiglobal
+ Xiglobal = X[i]
+ p = Pool(4)
+ fits = p.map(fitNorm, range(n))
+ fit += sum(fits)
+ fit *= 0.5
+ fit += regularizedFit
+ fit /= sumNormX
- toc = time.clock()
- exectimes.append( toc - tic )
- fitchange = abs(fitold - fit)
- if lmbda != 0:
- _log.debug('[%3d] approxFit: %.5f | regularized fit: %.5f | approxFit delta: %7.1e | secs: %.5f' % (iter,
- fit, regularizedFit, fitchange, exectimes[-1]))
- else :
- _log.debug('[%3d] approxFit: %.5f | approxFit delta: %7.1e | secs: %.5f' % (iter,
- fit, fitchange, exectimes[-1]))
+ toc = time.clock()
+ exectimes.append( toc - tic )
+ fitchange = abs(fitold - fit)
+ if lmbda != 0:
+ _log.debug('[%3d] approxFit: %7.1e | regularized fit: %7.1e | approxFit delta: %7.1e | secs: %.5f' % (iter,
+ fit, regularizedFit, fitchange, exectimes[-1]))
+ else :
+ _log.debug('[%3d] approxFit: %7.1e | approxFit delta: %7.1e | secs: %.5f' % (iter,
+ fit, fitchange, exectimes[-1]))
- fitold = fit
- if iter > 1 and fitchange < conv:
- break
-
- Aold = A
- Rold = R
-
- return A, R, f, iter+1, array(exectimes)
+ fitold = fit
+# if iter > 1 and fitchange < conv:
+# break
+ return A, R, fit, iter+1, array(exectimes)
def __updateA(X, A, R, lmbda):
n, rank = A.shape
-# F = zeros((n, rank), dtype=np.float64)
- F = coo_matrix((n,rank), dtype=np.float64)
+ F = ca.zeros((n,rank))
E = zeros((rank, rank), dtype=np.float64)
AtA = squareOfMatrix(A)
@@ -278,20 +298,20 @@ def __projectSlices(X, Q):
numLatentComponents = args.latent
dim = 0
-with open('./data/entity-ids') as entityIds:
+with open('./data2/entity-ids') as entityIds:
for line in entityIds:
dim += 1
print 'The number of entities: %d' % dim
numSlices = 0
X = []
-for file in os.listdir('./data'):
+for file in os.listdir('./data2'):
if fnmatch.fnmatch(file, '*-rows'):
numSlices += 1
- row = loadtxt('./data/' + file, dtype=np.int32)
+ row = loadtxt('./data2/' + file, dtype=np.int32)
if row.size == 1:
row = np.atleast_1d(row)
- col = loadtxt('./data/' + file.replace("rows", "cols"), dtype=np.int32)
+ col = loadtxt('./data2/' + file.replace("rows", "cols"), dtype=np.int32)
if col.size == 1:
col = np.atleast_1d(col)
A = coo_matrix((ones(row.size),(row,col)), shape=(dim,dim), dtype=np.uint8)
Please sign in to comment.
Something went wrong with that request. Please try again.