Skip to content

Commit

Permalink
Merge pull request #24 from lenskit/feature/ii-knn-test
Browse files Browse the repository at this point in the history
Improve II testability and performance
  • Loading branch information
mdekstrand committed Aug 7, 2018
2 parents 7d705b4 + 0a44671 commit 03bbbb5
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 119 deletions.
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ jobs:
working_directory: ~/lkpy
environment:
LK_PROCESS_COUNT: 2
USE_OPENMP: intel

steps:
- checkout
Expand All @@ -241,7 +242,7 @@ jobs:
source ~/miniconda/bin/activate base
conda config --set always_yes yes --set changeps1 no
conda update -q --all
conda install -q pandas scipy cython pytables pytest pytest-arraydiff clang_osx-64
conda install -q pandas scipy cython pytables pytest pytest-arraydiff clang_osx-64 llvm-openmp
conda clean -tp
- save_cache:
Expand Down
12 changes: 12 additions & 0 deletions lenskit/_cy_util.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
cdef struct AccHeap:
int nmax
int size
int* keys
double* values

cdef AccHeap* ah_create(double* values, int nmax) nogil
cdef void ah_free(AccHeap* heap) nogil
cdef void ah_add(AccHeap* heap, int key) nogil
cdef int ah_remove(AccHeap* heap) nogil

cdef void zero(double* vals, int n) nogil
122 changes: 122 additions & 0 deletions lenskit/_cy_util.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import numpy as np
cimport numpy as np
cimport scipy.linalg.cython_blas as blas

from libc.stdlib cimport malloc, free

cdef struct AccHeap:
int nmax
int size
int* keys
double* values

cdef AccHeap* ah_create(double* values, int nmax) nogil:
"""
Create an accumulation heap with a limited size.
Args:
values: the values (not owned by this heap)
nmax: the maximum number of keys to retain
"""
cdef AccHeap* heap = <AccHeap*> malloc(sizeof(AccHeap))
heap.nmax = nmax
heap.size = 0
heap.values = values
heap.keys = <int*> malloc(sizeof(int) * (nmax + 1))
return heap


cdef void ah_free(AccHeap* heap) nogil:
free(heap.keys)
free(heap)

cdef void ah_add(AccHeap* heap, int key) nogil:
heap.keys[heap.size] = key
ind_upheap(heap.size, heap.keys, heap.values)
if heap.size < heap.nmax:
heap.size = heap.size + 1
else:
# we are at capacity, we need to drop the smallest value
heap.keys[0] = heap.keys[heap.size]
ind_downheap(0, heap.size, heap.keys, heap.values)

cdef int ah_remove(AccHeap* heap) nogil:
cdef int top = heap.keys[0]
if heap.size == 0:
return -1

heap.keys[0] = heap.keys[heap.size - 1]
heap.size = heap.size - 1
if heap.size > 0:
ind_downheap(0, heap.size, heap.keys, heap.values)
return top


cdef class Accumulator:
cdef np.float_t[::1] values
cdef AccHeap* heap

def __cinit__(self, np.float_t[::1] values, int nmax):
self.values = values
if values.shape[0] > 0:
self.heap = ah_create(&values[0], nmax)
else:
self.heap = ah_create(NULL, nmax)

def __dealloc__(self):
ah_free(self.heap)

def __len__(self):
return self.heap.size

cpdef add(self, int key):
if key < 0 or key >= self.values.shape[0]:
raise IndexError()
ah_add(self.heap, key)

cpdef int peek(self):
if self.heap.size > 0:
return self.heap.keys[0]
else:
return -1

cpdef int remove(self):
return ah_remove(self.heap)


cdef void ind_upheap(int pos, int* keys, double* values) nogil:
cdef int current, parent, kt
current = pos
parent = (current - 1) // 2
while current > 0 and values[keys[parent]] > values[keys[current]]:
# swap up
kt = keys[parent]
keys[parent] = keys[current]
keys[current] = kt
current = parent
parent = (current - 1) // 2

cdef void ind_downheap(int pos, int len, int* keys, double* values) nogil:
cdef int left, right, min, kt
min = pos
left = 2*pos + 1
right = 2*pos + 2
if left < len and values[keys[left]] < values[keys[min]]:
min = left
if right < len and values[keys[right]] < values[keys[min]]:
min = right
if min != pos:
kt = keys[min]
keys[min] = keys[pos]
keys[pos] = kt
ind_downheap(min, len, keys, values)


cdef void zero(double* vals, int n) nogil:
cdef int i
for i in range(n):
vals[i] = 0

cpdef zero_buf(double[::1] buf):
if buf.shape[0] > 0:
zero(&buf[0], buf.shape[0])
110 changes: 57 additions & 53 deletions lenskit/algorithms/_item_knn.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import array
import pandas as pd
import numpy as np
cimport numpy as np
from numpy cimport math as npm
from cython.parallel cimport parallel, prange, threadid
from libc.stdlib cimport malloc, free, realloc, abort, calloc
from libc.math cimport isnan
from libc.math cimport isnan, fabs
import logging

from lenskit cimport _cy_util as lku

IF OPENMP:
from openmp cimport omp_get_thread_num, omp_get_num_threads
ELSE:
Expand Down Expand Up @@ -121,83 +124,46 @@ cdef void tr_add_all(ThreadState* self, int item, size_t nitems,

self.size = self.size + 1

cdef void ind_upheap(int pos, int len, int* keys, double* values) nogil:
cdef int current, parent, kt
current = pos
parent = (current - 1) // 2
while current > 0 and values[keys[parent]] > values[keys[current]]:
# swap up
kt = keys[parent]
keys[parent] = keys[current]
keys[current] = kt
current = parent
parent = (current - 1) // 2

cdef void ind_downheap(int pos, int len, int* keys, double* values) nogil:
cdef int left, right, min, kt
min = pos
left = 2*pos + 1
right = 2*pos + 2
if left < len and values[keys[left]] < values[keys[min]]:
min = left
if right < len and values[keys[right]] < values[keys[min]]:
min = right
if min != pos:
kt = keys[min]
keys[min] = keys[pos]
keys[pos] = kt
ind_downheap(min, len, keys, values)

cdef void tr_add_nitems(ThreadState* self, int item, size_t nitems,
double threshold, int nmax) nogil:
cdef int* keys
cdef int j, kn, ki
cdef np.int64_t nbr
keys = <int*> calloc(nmax + 1, sizeof(int))
cdef lku.AccHeap* acc = lku.ah_create(self.work, nmax)

tr_ensure_capacity(self, self.size + nmax)

kn = 0
for j in range(nitems):
if self.work[j] < threshold: continue

ki = kn
keys[ki] = j
ind_upheap(ki, kn, keys, self.work)

if kn < nmax:
kn = kn + 1
else:
# we just added the (nmax+1)th thing
# drop the smallest of our nmax largest
keys[0] = keys[kn]
ind_downheap(0, kn, keys, self.work)

lku.ah_add(acc, j)

# now that we have the heap built, let us unheap!
while kn > 0:
nbr = keys[0]
while acc.size > 0:
nbr = lku.ah_remove(acc)
self.items[self.size] = item
self.nbrs[self.size] = nbr
self.sims[self.size] = self.work[nbr]
self.size = self.size + 1
keys[0] = keys[kn - 1]
kn = kn - 1
if kn > 0:
ind_downheap(0, kn, keys, self.work)

free(keys)
lku.ah_free(acc)


cdef dict tr_results(ThreadState* self):
cdef np.npy_intp size = self.size
cdef np.ndarray items, nbrs, sims
items = np.asarray(<np.int32_t[:self.size]> self.items)
nbrs = np.asarray(<np.int32_t[:self.size]> self.nbrs)
sims = np.asarray(<np.float_t[:self.size]> self.sims)
items = np.empty(size, dtype=np.int32)
nbrs = np.empty(size, dtype=np.int32)
sims = np.empty(size, dtype=np.float_)
items[:] = <np.int32_t[:self.size]> self.items
nbrs[:] = <np.int32_t[:self.size]> self.nbrs
sims[:] = <np.float_t[:self.size]> self.sims
# items = np.PyArray_SimpleNewFromData(1, &size, np.NPY_INT32, self.items)
# nbrs = np.PyArray_SimpleNewFromData(1, &size, np.NPY_INT32, self.nbrs)
# sims = np.PyArray_SimpleNewFromData(1, &size, np.NPY_DOUBLE, self.sims)
return {'item': items.copy(), 'neighbor': nbrs.copy(), 'similarity': sims.copy()}
return {'item': items, 'neighbor': nbrs, 'similarity': sims}


cpdef sim_matrix(BuildContext context, double threshold, int nnbrs):
Expand Down Expand Up @@ -227,13 +193,13 @@ cpdef sim_matrix(BuildContext context, double threshold, int nnbrs):
return pd.concat([pd.DataFrame(d) for d in neighborhoods],
ignore_index=True)


cdef void train_row(int item, ThreadState* tres, BuildContext context,
double threshold, int nnbrs) nogil:
cdef int j, u, uidx, iidx, nbr, urp
cdef double ur = 0

for j in range(context.n_items):
tres.work[j] = 0
lku.zero(tres.work, context.n_items)

for uidx in range(context.r_iptrs[item], context.r_iptrs[item+1]):
u = context.r_users[uidx]
Expand All @@ -260,3 +226,41 @@ cdef void train_row(int item, ThreadState* tres, BuildContext context,
tr_add_nitems(tres, item, context.n_items, threshold, nnbrs)
else:
tr_add_all(tres, item, context.n_items, threshold)


cpdef void predict(matrix, int nitems, int min_nbrs, int max_nbrs,
np.float_t[:] ratings,
np.int64_t[:] targets,
np.float_t[:] scores):
cdef int[:] indptr = matrix.indptr
cdef int[:] indices = matrix.indices
cdef double[:] similarity = matrix.data
cdef int i, j, iidx, rptr, rend, nidx, nnbrs
cdef double num, denom

with nogil:
for i in range(targets.shape[0]):
iidx = targets[i]
rptr = indptr[iidx]
rend = indptr[iidx + 1]

num = 0
denom = 0
nnbrs = 0

for j in range(rptr, rend):
nidx = indices[j]
if isnan(ratings[nidx]):
continue

nnbrs = nnbrs + 1
num = num + ratings[nidx] * similarity[j]
denom = denom + fabs(similarity[j])

if max_nbrs > 0 and nnbrs >= max_nbrs:
break

if nnbrs < min_nbrs:
break

scores[iidx] = num / denom

0 comments on commit 03bbbb5

Please sign in to comment.