In [19]:
import goodpoints

%load_ext cython

In [123]:
%%cython

import warnings
warnings.filterwarnings("ignore")


"""sobolev kernel smoothness functionality.

Cython implementation of functions involving sobolev kernel smoothness evaluation.
"""
import numpy as np
cimport numpy as np
# "cimport" is used to import special compile-time information
# about the numpy module (this is stored in a file numpy.pxd which is
# currently part of the Cython distribution).
cimport cython
from libc.math cimport sqrt, log, exp, cos, pi
from libc.stdlib cimport rand, RAND_MAX, srand
from posix.time cimport clock_gettime, timespec, CLOCK_REALTIME
from libc.stdio cimport printf
# It's necessary to call "import_array" if you use any part of the
# numpy PyArray_* API. From Cython 3, accessing attributes like
# ".shape" on a typed Numpy array use this API. 
np.import_array()

'''
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Sobolev Kernel Functionality %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
'''

@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cdef double single_sobolev_kernel_two_points(const double[:] X1,
                                       const double[:] X2,
                                       const long s) nogil:
    """
    Computes a single Sobolev kernel k(X1-X2, s) 
    between two points X1 and X2
    
    Args:
      X1: array of size d
      X2: array of size d
      s: sobolev kernel smoothness
    """
    cdef long d = X1.shape[0]
    cdef double x, ans
    cdef long j
    
    cdef double pi_factor

    # Compute the squared Euclidean distance between X1 and X2
    if s == 1:
        pi_factor = pi ** 2
    if s == 2:
        pi_factor = pi ** 4
    if s == 3:
        pi_factor = pi ** 6

    ans = 1.
    for j in range(d):
        x = X1[j]-X2[j]
        if x <0:
            x += 1
        if s == 1:
            ans *= (1. + 2. * pi_factor * ((x ** 2) - x + 1. / 6.))
        if s == 2:
            ans = ans * ( 1. - pi_factor * 2. / 3. * \
                ((x ** 4) - 2. * (x ** 3) + (x ** 2) - 1. / 30.))
        if s == 3:
            ans = ans * ( 1 + pi_factor * 4. / 45. * ((x**6) - 3 * (x**5) + \
                    5. / 2. * (x**4) - (x ** 2) / 2. + 1. / 42.))
    return(ans)

@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cdef double sobolev_kernel_two_points(const double[:] X1,
                                       const double[:] X2,
                                       const long[:] s) nogil:
    """
    Computes a sum of Sobolev kernels sum_j k(X1-X2, s_j) 
    between two points X1 and X2
    
    Args:
      X1: array of size d
      X2: array of size d
      s: array of sobolev kernel smoothness
    """
    
    cdef long d = X1.shape[0]
    cdef long num_kernels = s.shape[0]
    
    cdef long j
    cdef double kernel_sum
    
    # Compute the kernel sum
    kernel_sum = single_sobolev_kernel_two_points(X1, X2, s[0])
    for j in range(1, num_kernels):
        kernel_sum += single_sobolev_kernel_two_points(X1, X2, s[j])
    return(kernel_sum)

@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef void sobolev_kernel(const double[:,:] X1,
                           const double[:,:] X2,
                           const long s,
                           double[:,:] K) nogil:
    """
    Computes the Sobolev kernel matrix between each rows of X1 and each rows of X2
    and stores in K
    
    Args:
      X1: Matrix of size (n1,d)
      X2: Matrix of size (n2,d)
      s: sobolev kernel smoothness
      K: Matrix of size (n1,n2) to store kernel matrix
    """
    
    cdef long n1 = X1.shape[0]
    cdef long n2 = X2.shape[0]
    cdef long d = X1.shape[1]
    
    cdef long i, j
    
    
    for i in range(n1):
        for j in range(n2):
            K[i,j] = single_sobolev_kernel_two_points(X1[i], X2[j], s)
                
@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef void sobolev_kernel_same(const double[:,:] X1,
                                const long s,
                                double[:,:] K) nogil:
    """
    Computes the sobolev kernel smoothness matrix between each pair of rows in X1
    and stores in K
    
    Args:
      X1: Matrix of size (n1,d)
      s: sobolev kernel smoothness
      K: Empty matrix of size (n1,n1) to store kernel matrix
    """
    
    cdef long n1 = X1.shape[0]
    cdef long d = X1.shape[1]
    
    cdef long i, j
    
    
    for i in range(n1):
        for j in range(i+1):
            K[i,j] = single_sobolev_kernel_two_points(X1[i], X1[j], s)
            if j<i:
                K[j,i] = K[i,j]
            
@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef void sobolev_kernel_by_row(const double[:] X1,
                                  const double[:,:] X2,
                                  const long s,
                                  double[:] K) nogil:
    """
    Computes the sobolev kernel matrix between X1 and each row of X2
    and stores in K
    
    Args:
      X1: Vector of size d
      X2: Matrix of size (n2,d)
      s: sobolev kernel smoothness
      K: Vector of size n2 to store kernel values
    """
    
    cdef long n2 = X2.shape[0]
    cdef long d = X2.shape[1]
    
    cdef long i
    
    for i in range(n2):
        K[i] = single_sobolev_kernel_two_points(X1, X2[i], s)
   
@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef double sum_sobolev_kernel(const double[:,:] X1,
                                 const double[:,:] X2,
                                 const long s) nogil:
    """
    Returns the sum of sobolev kernel evaluations between each row of X1 
    and each row of X2
    
    Args:
      X1: Matrix of size (n1,d)
      X2: Matrix of size (n2,d)
      s: sobolev kernel smoothness
    """
    
    cdef double total_sum = 0
    cdef long n1 = X1.shape[0]
    cdef long n2 = X2.shape[0]
    cdef long d = X1.shape[1]
    
    cdef long i, j
    
    for i in range(n1):
        for j in range(n2):
            total_sum += single_sobolev_kernel_two_points(X1[i], X2[j], s)
            
    return(total_sum)

@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef double sum_sobolev_kernel_same(const double[:,:] X1,
                                      const long s) nogil:
    """
    Returns the sum of sobolev kernel evaluations between each pair of 
    rows of X1
    
    Args:
      X1: Matrix of size (n1,d)
      s: sobolev kernel smoothness
    """
    
    cdef double total_sum = 0
    cdef long n1 = X1.shape[0]
    cdef long d = X1.shape[1]
    
    cdef long i, j
    
    for i in range(n1):
        for j in range(i+1):
            if j < i:    
                total_sum += 2*single_sobolev_kernel_two_points(X1[i], X1[j], s)
            else:
                total_sum += single_sobolev_kernel_two_points(X1[i], X1[j], s)
            
    return(total_sum)        
        
@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef double sum_sobolev_kernel_linear_eval(const double[:,:] X1,
                                             const double[:,:] X2,
                                             const long s) nogil:
    """
    Computes the sum of sobolev kernel smoothness of order s evaluations between the 
    i-th row of X1 and the i-th row of X2; they need to have same number of rows
    
    Args:
      X1: Matrix of size (n,d)
      X2: Matrix of size (n,d)
      s: sobolev kernel smoothness
    """
    
    cdef double total_sum = 0
    cdef long n = X1.shape[0]
    cdef long d = X1.shape[1]
    
    cdef long i
    
    for i in range(n):
        total_sum += single_sobolev_kernel_two_points(X1[i], X2[i], s)
            
    return(total_sum)

@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef double biased_sqMMD_sobolev(const double[:,:] X1,
                                   const double[:,:] X2,
                                   const long s) nogil:
    """
    Computes the biased quadratic squared MMD estimator for the sobolev kernel smoothness of order s from samples X1 and X2
    
    Args:
      X1: Matrix of size (n1,d)
      X2: Matrix of size (n12,d)
      s: sobolev kernel smoothness
    """
    
    cdef long n1 = X1.shape[0]
    cdef long n2 = X2.shape[0]
    cdef long d = X1.shape[1]
    
    cdef double first_term = sum_sobolev_kernel(X1,X1,s)
    cdef double second_term = sum_sobolev_kernel(X1,X2,s)
    cdef double third_term = sum_sobolev_kernel(X2,X2,s)
    cdef double bsqMMD = first_term/(n1*n1) - 2*second_term/(n1*n2) + third_term/(n2*n2)
            
    return(bsqMMD)

@cython.boundscheck(False) # turn off bounds-checking for this function
@cython.wraparound(False)  # turn off negative index wrapping for this function
@cython.initializedcheck(False) # turn off memoryview initialization checks for this function
@cython.cdivision(True) # Disable C-division checks for this function
cpdef double unbiased_sqMMD_sobolev(const double[:,:] X1,
                                     const double[:,:] X2,
                                     const long s) nogil:
    """
    Computes the unbiased quadratic squared MMD estimator for the sobolev kernel smoothness of order s from samples X1 and X2
    
    Args:
      X1: Matrix of size (n1,d)
      X2: Matrix of size (n2,d)
      s: sobolev kernel smoothness
    """
    
    cdef long n1 = X1.shape[0]
    cdef long n2 = X2.shape[0]
    cdef long d = X1.shape[1]
    
    cdef double first_term = sum_sobolev_kernel_same(X1,s)
    cdef double second_term = sum_sobolev_kernel(X1,X2,s)
    cdef double third_term = sum_sobolev_kernel_same(X2,s)
    cdef double extra_terms_first = sum_sobolev_kernel_linear_eval(X1,X1,s)
    cdef double extra_terms_third = sum_sobolev_kernel_linear_eval(X2,X2,s)
    cdef double usqMMD = (first_term-extra_terms_first)/(n1*(n1-1)) - 2*second_term/(n1*n2) + (third_term-extra_terms_third)/(n2*(n2-1))
            
    return(usqMMD)
'''unit tests'''
d = 2
X1 = np.ones(d)
X2 = 0.5*np.ones(d)
X = np.vstack((X1, X2))
n = X.shape[0]
K = np.zeros((n, n))

s = 3
ans = single_sobolev_kernel_two_points(X1, X2, s)
x =  X1-X2
x = np.maximum(0,x) + (1+np.minimum(0, x)) * (x<0)
ans2 =  np.prod(1 + np.pi**6 * 4. / 45. * ((x**6) - 3 * (x**5) + \
                    5. / 2. * (x**4) - (x ** 2) / 2. + 1. / 42.), axis=-1)
print(x, ans, ans2, ans==ans2)

sobolev_kernel_same(X, s, K)

print(K)

K2 = np.zeros(n)
sobolev_kernel_by_row(X[0], X, s, K2)
print(np.all(K[0] == K2))

print(np.isclose(sum_sobolev_kernel(X, X, s), np.sum(K)))

print(np.isclose(sum_sobolev_kernel_same(X, s), np.sum(K)))

print(np.isclose(sum_sobolev_kernel_linear_eval(X, X, s), np.sum(np.diag(K))))

print(f'Biased mmd, {biased_sqMMD_sobolev(X, X, s)}, Unbiased mmd {unbiased_sqMMD_sobolev(X, X, s)}')



In file included from /Users/raaz.rsk/.ipython/cython/_cython_magic_fe53002b6f2d02e2c8fdb4f8b8102306.c:774:
In file included from /Users/raaz.rsk/Library/Python/3.8/lib/python/site-packages/numpy/core/include/numpy/arrayobject.h:5:
In file included from /Users/raaz.rsk/Library/Python/3.8/lib/python/site-packages/numpy/core/include/numpy/ndarrayobject.h:12:
In file included from /Users/raaz.rsk/Library/Python/3.8/lib/python/site-packages/numpy/core/include/numpy/ndarraytypes.h:1960:
 ^
  0, /*tp_print*/
  ^
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/include/python3.8/cpython/object.h:260:5: note: 'tp_print' has been explicitly marked deprecated here
    Py_DEPRECATED(3.8) int (*tp_print)(PyObject *, FILE *, int);
    ^
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/include/python3.8/pyport.h:515:54: note: expanded from macro 'Py_DEPRECATED'
#define Py_DEPRECATED(VERSION_UNUSED) __attribute__((__deprecated_

[0.5 0.5] 0.9430394490405203 0.9430394490405203 True
[[9.20931987 0.94303945]
 [0.94303945 9.20931987]]
True
True
True
True
Biased mmd, 0.0, Unbiased mmd -8.266280421968856
