Skip to content
This repository has been archived by the owner on Oct 29, 2020. It is now read-only.

Commit

Permalink
Merge pull request #66 from NeuroDataDesign/sandhya
Browse files Browse the repository at this point in the history
MDMR
  • Loading branch information
tpsatish95 committed Dec 8, 2018
2 parents aa90c0f + 1a9ba84 commit 0daab65
Show file tree
Hide file tree
Showing 7 changed files with 1,398 additions and 0 deletions.
70 changes: 70 additions & 0 deletions demos/MDMR.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"module_path = \"C:\\\\Users\\\\sunda\\\\Desktop\\\\AAA FA18 JHU\\\\NDD1\\\\gitscr\\\\mgcpy\"\n",
"if module_path not in sys.path:\n",
" sys.path.append(module_path)\n",
" \n",
"import numpy as np\n",
"from mgcpy.independence_tests.mdmr.mdmr import MDMR\n",
"from mgcpy.independence_tests.mdmr.mdmrfunctions import compute_distance_matrix"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[25.03876688]\n"
]
}
],
"source": [
"X = np.genfromtxt('../mgcpy/independence_tests/unit_tests/mdmr/data/X_mdmr.csv', delimiter=\",\")\n",
"Y = np.genfromtxt('../mgcpy/independence_tests/unit_tests/mdmr/data/Y_mdmr.csv', delimiter=\",\")\n",
"\n",
"mdmr = MDMR(compute_distance_matrix)\n",
"a, _ = mdmr.test_statistic(X, Y)\n",
"print(a)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
1 change: 1 addition & 0 deletions mgcpy/independence_tests/mdmr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .mdmr import MDMR
175 changes: 175 additions & 0 deletions mgcpy/independence_tests/mdmr/mdmr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""
**Main MDMR Independence Test Module**
"""
from mgcpy.independence_tests.abstract_class import IndependenceTest

import copy
from mgcpy.independence_tests.mdmr.mdmrfunctions import *
#from mdmrfunctions import *

class MDMR(IndependenceTest):
def __init__(self, compute_distance_matrix):
'''
:param compute_distance_matrix: a function to compute the pairwise distance matrix, given a data matrix
:type compute_distance_matrix: ``FunctionType`` or ``callable()``
'''
IndependenceTest.__init__(self, compute_distance_matrix)
self.which_test = "mdmr"

def get_name(self):
return self.which_test

def test_statistic(self, matrix_X, matrix_Y, permutations = 0, individual = 0, disttype = 'cityblock'):
"""
Computes MDMR Pseudo-F statistic between two datasets.
- It first takes the distance matrix of Y (by )
- Next it regresses X into a portion due to Y and a portion due to residual
- The p-value is for the null hypothesis that the variable of X is not correlated with Y's distance matrix
:param data_matrix_X: (optional, default picked from class attr) is interpreted as:
- a ``[n*d]`` data matrix, a square matrix with n samples in d dimensions
:type data_matrix_X: 2D `numpy.array`
:param data_matrix_Y: (optional, default picked from class attr) is interpreted as:
- a ``[n*d]`` data matrix, a square matrix with n samples in d dimensions
:type data_matrix_Y: 2D `numpy.array`
:parameter 'individual':
-integer, `0` or `1`
with value `0` tests the entire X matrix (default)
with value `1` tests the entire X matrix and then each predictor variable individually
:return: with individual = `0`, returns 1 values, with individual = `1` returns 2 values, containing:
-the test statistic of the entire X matrix
-for individual = 1, an array with the variable of X in the first column,
the test statistic in the second, and the permutation p-value in the third (which here will always be 1)
:rtype: list
"""
X = matrix_X
Y = matrix_Y

#calculate distance matrix of Y
D = self.compute_distance_matrix(Y, disttype)
D = scp.distance.squareform(D)
a = D.shape[0]**2
D = D.reshape((a,1))

predictors = np.arange(X.shape[1])
predsingle = X.shape[1]
check_rank(X)

#check number of subjects compatible
subjects = X.shape[0]
if subjects != np.sqrt(D.shape[0]):
raise Exception("# of subjects incompatible between X and D")

X = np.hstack((np.ones((X.shape[0], 1)), X))
predictors = np.array(predictors)
predictors += 1

# Gower Center the distance matrix of Y
Gs = gower_center_many(D)

m2 = float(X.shape[1] - predictors.shape[0])
nm = float(subjects - X.shape[1])

#form permutation indexes
permutation_indexes = np.zeros((permutations + 1, subjects), dtype=np.int)
permutation_indexes[0, :] = range(subjects)
for i in range(1, permutations + 1):
permutation_indexes[i,:] = np.random.permutation(subjects)

H2perms = gen_H2_perms(X, predictors, permutation_indexes)
IHperms = gen_IH_perms(X, predictors, permutation_indexes)

#Calculate test statistic
F_perms = calc_ftest(H2perms, IHperms, Gs, m2, nm)

#Calculate p-value
p_vals = None
if permutations > 0:
p_vals = fperms_to_pvals(F_perms)
F_permtotal = F_perms[0, :]
pvaltotal = p_vals
self.test_statistic_ = F_permtotal
if individual == 0:
return self.test_statistic_, self.test_statistic_metadata_

#code for individual test
if individual == 1:
results = np.zeros((predsingle,3))
for predictors in range(1, predsingle+1):
predictors = np.array([predictors])

Gs = gower_center_many(D)

m2 = float(X.shape[1] - predictors.shape[0])
nm = float(subjects - X.shape[1])

permutation_indexes = np.zeros((permutations + 1, subjects), dtype=np.int)
permutation_indexes[0, :] = range(subjects)
for i in range(1, permutations + 1):
permutation_indexes[i,:] = np.random.permutation(subjects)

H2perms = gen_H2_perms(X, predictors, permutation_indexes)
IHperms = gen_IH_perms(X, predictors, permutation_indexes)

F_perms = calc_ftest(H2perms, IHperms, Gs, m2, nm)

p_vals = None
if permutations > 0:
p_vals = fperms_to_pvals(F_perms)
results[predictors-1,0] = predictors
results[predictors-1,1] = F_perms[0, :]
results[predictors-1,2] = p_vals


return F_permtotal, results

def p_value(self, matrix_X, matrix_Y, replication_factor=1000):
"""
Tests independence between two datasets using MGC and permutation test.
:param matrix_X: is interpreted as:
- a ``[n*d]`` data matrix, a square matrix with ``n`` samples in ``d`` dimensions
:type matrix_X: 2D `numpy.array`
:param matrix_Y: is interpreted as:
- a ``[n*d]`` data matrix, a square matrix with ``n`` samples in ``d`` dimensions
:type matrix_Y: 2D `numpy.array`
:param replication_factor: specifies the number of replications to use for
the permutation test. Defaults to ``1000``.
:type replication_factor: integer
:return: returns a list of two items,that contains:
- :p_value: P-value of MGC
- :p_value_metadata:
:rtype: list
"""
return super(MDMR, self).p_value(matrix_X, matrix_Y)

def ind_p_value(self, matrix_X, matrix_Y, permutations = 1000, individual = 1, disttype = 'cityblock'):
"""
Individual predictor variable p-values calculation
:param matrix_X: is interpreted as:
- a ``[n*d]`` data matrix, a square matrix with ``n`` samples in ``d`` dimensions
:type matrix_X: 2D `numpy.array`
:param matrix_Y: is interpreted as:
- a ``[n*d]`` data matrix, a square matrix with ``n`` samples in ``d`` dimensions
:type matrix_Y: 2D `numpy.array`
"""
results = self.test_statistic(matrix_X, matrix_Y, permutations, individual)[1]
return results
117 changes: 117 additions & 0 deletions mgcpy/independence_tests/mdmr/mdmrfunctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
"""
Created on Thu Oct 25 16:09:14 2018
@author: sunda
"""
import numpy as np
import scipy.spatial as scp

DTYPE = np.float64
ITYPE = np.int32

def check_rank(X):
"""
This function checks if X is rank deficient.
"""
k = X.shape[1]
rank = np.linalg.matrix_rank(X)
if rank < k:
raise Exception("matrix is rank deficient (rank %i vs cols %i)" % (rank, k))

def compute_distance_matrix(X, disttype):
D = scp.distance.pdist(X, disttype)
return D

def hatify(X):
"""
Returns the "hat" matrix.
"""
return X.dot(np.linalg.inv(X.T.dot(X))).dot(X.T)

def gower_center(Y):
"""
Computes Gower's centered similarity matrix.
"""
n = Y.shape[0]
I = np.eye(n,n)
uno = np.ones((n, 1))

A = -0.5 * (Y ** 2)
C = I - (1.0 / n) * uno.dot(uno.T)
G = C.dot(A).dot(C)

return G

def gower_center_many(Ys):
"""
Gower centers each matrix in the input.
"""
observations = int(np.sqrt(Ys.shape[0]))
tests = Ys.shape[1]
Gs = np.zeros_like(Ys)

for i in range(tests):
# print(type(observations))
D = Ys[:, i].reshape(observations, observations)
Gs[:, i] = gower_center(D).flatten()

return Gs


def gen_H2_perms(X, predictors, permutation_indexes):
"""
Return H2 for each permutation of X indices, where H2 is the hat matrix
minus the hat matrix of the untested columns.
"""
permutations, observations = permutation_indexes.shape
variables = X.shape[1]

covariates = [i for i in range(variables) if i not in predictors]
H2_permutations = np.zeros((observations ** 2, permutations))
for i in range(permutations):
perm_X = X[permutation_indexes[i]]
H2 = hatify(perm_X) - hatify(perm_X[:, covariates])
H2_permutations[:, i] = H2.flatten()

return H2_permutations


def gen_IH_perms(X, predictors, permutation_indexes):
"""
Return I-H where H is the hat matrix and I is the identity matrix.
The function calculates this correctly for multiple column tests.
"""
permutations, observations = permutation_indexes.shape
I = np.eye(observations, observations)

IH_permutations = np.zeros((observations ** 2, permutations))
for i in range(permutations):
IH = I - hatify(X[permutation_indexes[i, :]])
IH_permutations[:,i] = IH.flatten()

return IH_permutations


def calc_ftest(Hs, IHs, Gs, m2, nm):
"""
This function calculates the pseudo-F statistic.
"""
N = Hs.T.dot(Gs)
D = IHs.T.dot(Gs)
F = (N / m2) / (D / nm)
return F


def fperms_to_pvals(F_perms):
"""
This function calculates the permutation p-value from the test statistics of all permutations.
"""
permutations, tests = F_perms.shape
permutations -= 1
pvals = np.zeros(tests)
for i in range(tests):
j = (F_perms[1:, i] >= F_perms[0, i]).sum().astype('float')
pvals[i] = (j+1) / (permutations+1)
return pvals
Loading

0 comments on commit 0daab65

Please sign in to comment.