Add the hierarchical mutual information distance metric from Perotti et al. 2015 to this package, to allow comparison between pairs of phylogenetic trees.
An R implementation would be fine; R + C++ with Rcpp would also work.
Please follow the structure of existing functions where possible. Output should be in bits, i.e. using base 2 logarithms.
Notebook with an example implementation. (Check that recursion is properly implemented – not guaranteed correct.)
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib\n",
"import matplotlib.pyplot as plt\n",
"matplotlib.rcParams['figure.figsize'] = (17,13)\n",
"%matplotlib inline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import dill\n",
"from collections import defaultdict\n",
"from sklearn import metrics"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Hierarchical partitions as lists of lists\n",
"\n",
"A hierarchical partition can be represented by nested list. However, not every construction of nested list constitutes a hierarchical partition. In a hierarchical partition a list either contain other list or either contain elements. It cannot contain both at the same time. So, for example, the following is a well formed hierarchical partition\n",
"\n",
"[[1],[2,3]]\n",
"\n",
"while this below other is not (pay attention to element \"1\")\n",
"\n",
"[1,[2,3]]\n",
"\n",
"because the root contains element \"1\" and list [2,3]."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def check(hp):\n",
" \"\"\"Checks if a hierarchical partition represented by nested lists is well formed or not. Namely, it ensures that each list either contain other list or elements, but not lists and elements at the same time.\n",
" \n",
" Examples:\n",
" >>> hp=[[1],[2,3]]\n",
" >>> check(hp)\n",
" True\n",
" >>> hp=[1,[2,3]]\n",
" >>> check(hp)\n",
" False\n",
" \"\"\"\n",
" flag=None\n",
" for chp in hp:\n",
" if isinstance(chp,list):\n",
" if flag is None:\n",
" flag=True\n",
" elif not flag:\n",
" return False\n",
" if not check(chp):\n",
" return False \n",
" else:\n",
" if flag is None:\n",
" flag=False\n",
" elif flag:\n",
" return False\n",
" return True"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hp=[[1],[2,3]]\n",
"check(hp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hp=[1,[2,3]]\n",
"check(hp)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Flattenator"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def flattenator(newick):\n",
" \"\"\"Takes a hierarchical partition represented by nested lists and return a list of all its elements.\n",
" \n",
" Example\n",
" >>> hp = [[3, 4, 5, 6], [[0], [1, 2]], [[7], [8, 9]]]\n",
" >>> sorted(flattenator(hp))\n",
" [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n",
" \"\"\"\n",
" for e in newick:\n",
" if isinstance(e,list):\n",
" for ee in flattenator(e):\n",
" yield ee\n",
" else:\n",
" yield e"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hp = [[3, 4, 5, 6], [[0], [1, 2]], [[7], [8, 9]]]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sorted(flattenator(hp))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Shuffling hierarchical partitions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def replicate_hierarchical_partition(hp,d):\n",
" \"\"\"Replicates a hierarchical partition hp into a new one whose elements are interachanged by others as specified by the list or dictionary d.\"\"\"\n",
" if isinstance(hp,list):\n",
" return [replicate_hierarchical_partition(chp,d) for chp in hp]\n",
" else:\n",
" return d[hp]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"d=list(range(10))\n",
"d"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"np.random.shuffle(d)\n",
"for i in range(len(d)):\n",
" print(i,\"->\",d[i])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hp=[[[[5], [[[7, 6]]]]], [[2], [0]], [[[[9, 4]]], [3, 8]], [1]]\n",
"replicate_hierarchical_partition(hp,d)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Running mean and std"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class RunningMeanStd:\n",
" \"\"\"To compute mean, variance, etc. online as values are obtained without the need to store them. \n",
" This method is also numerically stable.\n",
" See Donald Knuth’s Art of Computer Programming, Vol 2, page 232, 3rd edition.\"\"\"\n",
" def __init__(self):\n",
" self._M=0.\n",
" self._S=0.\n",
" self._n=0\n",
" def push(self,x):\n",
" self._n+=1\n",
" oldM=self._M\n",
" self._M=self._M+(x-self._M)/float(self._n)\n",
" self._S=self._S+(x-oldM)*(x-self._M)\n",
" def mean(self):\n",
" if self._n>0:\n",
" return self._M\n",
" return None\n",
" def variance(self):\n",
" if self._n>1:\n",
" return self._S/float(self._n-1)\n",
" return None\n",
" def std(self):\n",
" v=self.variance()\n",
" if v is None:\n",
" return None\n",
" return np.sqrt(v)\n",
" def sem(self):\n",
" if self._n>1:\n",
" return self.std()/np.sqrt(self._n)\n",
" return None\n",
" def rel_err(self,tol_std=.05):\n",
" assert tol_std>0.\n",
" if self._n>1:\n",
" return self.sem()/(abs(self.mean())+tol_std)\n",
" return None\n",
" def clear(self):\n",
" self._M=0.\n",
" self._S=0.\n",
" self._n=0\n",
" def __len__(self):\n",
" return self._n\n",
" def __repr__(self):\n",
" return \"RunningMeanStd{n=\"+str(len(self))+\" mean=\"+str(self.mean())+\" std=\"+str(self.std())+\" sem=\"+str(self.sem())+\"}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"runms=RunningMeanStd()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"runms"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for i in range(10):\n",
" x=np.random.random()\n",
" runms.push(x)\n",
" print(i,x,runms)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Hierarchical Mutual Information"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def xlnx(x):\n",
" \"\"\"Returns x*log(x) for x > 0 or returns 0 otherwise.\"\"\"\n",
" if x <= 0.:\n",
" return 0.\n",
" return x*np.log(x)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def HMI(Ut,Us):\n",
" \"\"\"\n",
" Computes the hierarchical mutual information between two hierarchical partitions.\n",
" \n",
" Returns\n",
" n_ts,HMI(Ut,Us) : where n_ts is the number of common elements between the hierarchical partitions Ut and Us.\n",
" \n",
" NOTE: We label by u,v the children of t,s respectively.\n",
" \n",
" Examples\n",
" >>>\"\"\"\n",
" if isinstance(Ut[0],list):\n",
" if isinstance(Us[0],list):\n",
" # Ut and Us are both internal nodes since they contain other lists.\n",
" n_ts=0.\n",
" H_uv=0.\n",
" H_us=0.\n",
" H_tv=0.\n",
" mean_I_ts=0.0\n",
" n_tv=defaultdict(float) \n",
" for Uu in Ut:\n",
" n_us=0.\n",
" for v,Uv in enumerate(Us):\n",
" n_uv,I_uv=HMI(Uu,Uv)\n",
" n_ts+=n_uv\n",
" n_tv[v]+=n_uv\n",
" n_us+=n_uv \n",
" H_uv+=xlnx(n_uv)\n",
" mean_I_ts+=n_uv*I_uv\n",
" H_us+=xlnx(n_us)\n",
" for _n_tv in n_tv.values():\n",
" H_tv+=xlnx(_n_tv)\n",
" if n_ts>0.:\n",
" local_I_ts=np.log(n_ts)-(H_us+H_tv-H_uv)/n_ts\n",
" mean_I_ts=mean_I_ts/n_ts\n",
" I_ts=local_I_ts+mean_I_ts\n",
" #print(\"... Ut =\",Ut,\"Us =\",Us,\"n_ts =\",n_ts,\"I_ts =\",I_ts,\"local_I_ts =\",local_I_ts,\"mean_I_ts =\",mean_I_ts)\n",
" return n_ts,I_ts\n",
" else:\n",
" #print(\"... Ut =\",Ut,\"Us =\",Us,\"n_ts =\",0.0,\"I_ts =\",0.0)\n",
" return 0.,0.\n",
" else:\n",
" # Ut is internal node and Us is leaf\n",
" return len(set(flattenator(Ut))&set(Us)),0.\n",
" else:\n",
" if isinstance(Us,list):\n",
" # Ut is leaf and Us internal node\n",
" return len(set(flattenator(Us))&set(Ut)),0. \n",
" else:\n",
" # Both Ut and Us are leaves\n",
" return len(set(Ut)&set(Us)),0."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"# Two random partitions (pay attention! these are non hierarchical)\n",
"p1 = [[19, 18, 5],[14, 16, 3],[7],[10, 8],[1, 17, 9, 4, 6, 15],[2, 13, 11],[12, 0]]\n",
"p2 = [[12, 9],[4, 2, 0, 7],[16],[5],[8, 3, 1, 14],[11, 6, 10],[18, 17, 19],[13, 15]]\n",
"HMI(p1,p2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"# Now with two hierarchical partitions\n",
"hp1 = [[23],[[[[[[16], [17]]]]]],[[12], [22, 13]],[5],[7],[24],[[[9], [[14, 2]]], [[[[[[27], [3]]]]]]],[20, 29, 18],[4],[26, 15],[[10], [21, 25]],[11],[[0, 28], [1], [6]],[19, 8]]\n",
"hp2 = [[[[0, 25], [24]], [6], [11, 28], [8]],[[[19], [[[[21], [4], [[[[[22, 7]]]]]]]]], [5]],[[3], [10, 23, 14]],[[27, 1, 16, 13, 18, 26, 9], [[[[15], [[[[[[12, 17]]]]]]]], [2, 20]], [29]]]\n",
"HMI(hp1,hp2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def HH(hp):\n",
" \"\"\"Returns the hierarchical entropy of a hierarchical partition.\n",
" \n",
" Note: this is not the most efficient implementation.\"\"\"\n",
" return HMI(hp,hp)[1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def HJH(hp1,hp2):\n",
" \"\"\"Returns the hierarchical joint entropy between two hierarchical partitions.\"\"\"\n",
" return HH(hp1)+HH(hp2)-HMI(hp1,hp2)[1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def mean_arit(x,y):\n",
" return .5*(x+y)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def mean_geom(x,y):\n",
" return np.sqrt(x*y)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def mean_max(x,y):\n",
" return max(x,y)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def mean_min(x,y):\n",
" return min(x,y)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def NHMI(hp1,hp2,generalized_mean=mean_arit):\n",
" \"\"\"Returns the normalized hierarchical mutual information.\n",
" \n",
" By default, it uses the arithmetic mean for normalization. However, another generalized mean can be provided if desired.\"\"\"\n",
" gm = generalized_mean(HH(hp1),HH(hp2))\n",
" if gm > 0.:\n",
" return HMI(hp1,hp2)[1]/gm\n",
" return 0."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def HCH(hp1,hp2):\n",
" \"\"\"Returns the hierarchical conditional entropy HCH(hp1|hp2).\"\"\"\n",
" return HJH(hp1,hp2)-HH(hp2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def HVI(hp1,hp2):\n",
" \"\"\"Returns the hierarchical variation of information.\"\"\"\n",
" return HH(hp1)+HH(hp2)-2.0*HMI(hp1,hp2)[1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def diff_HVI(t,s,r):\n",
" \"\"\"Returns the HVI difference associated triangular inequality.\"\"\"\n",
" return HVI(t,s)+HVI(s,r)-HVI(t,r)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def EHMI(hp1,hp2,min_num_shufflings=36,rel_err_tol=0.01,verbose=0):\n",
" \"\"\"Returns the expected hierarchical mutual information using the permutation model as the reference null model.\n",
" When verbose=0, then the function works silently. For verbose=1,2,3 some information is shown as the process run. The larger the value, the more information.\"\"\"\n",
" assert min_num_shufflings>0\n",
" runms=RunningMeanStd() \n",
" rel_err=2.*rel_err_tol\n",
" d=sorted(flattenator(hp1))\n",
" while rel_err>rel_err_tol or len(runms)<min_num_shufflings:\n",
" np.random.shuffle(d)\n",
" rhp1=replicate_hierarchical_partition(hp1,d)\n",
" HMI_sample=HMI(rhp1,hp2)[1]\n",
" runms.push(HMI(rhp1,hp2)[1]) \n",
" if len(runms)>1:\n",
" rel_err=runms.rel_err() \n",
" if verbose>0:\n",
" if verbose==1:\n",
" print(HMI_sample)\n",
" elif verbose==2:\n",
" print(HMI_sample,runms.mean(),rel_err)\n",
" elif verbose==3:\n",
" print(HMI_sample,runms.mean(),rel_err) \n",
" print(rhp1)\n",
" return runms"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"# Now with two hierarchical partitions\n",
"hp1 = [[23],[[[[[[16], [17]]]]]],[[12], [22, 13]],[5],[7],[24],[[[9], [[14, 2]]], [[[[[[27], [3]]]]]]],[20, 29, 18],[4],[26, 15],[[10], [21, 25]],[11],[[0, 28], [1], [6]],[19, 8]]\n",
"hp2 = [[[[0, 25], [24]], [6], [11, 28], [8]],[[[19], [[[[21], [4], [[[[[22, 7]]]]]]]]], [5]],[[3], [10, 23, 14]],[[27, 1, 16, 13, 18, 26, 9], [[[[15], [[[[[[12, 17]]]]]]]], [2, 20]], [29]]]\n",
"HMI(hp1,hp2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"EHMI(hp1,hp2,verbose=2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def AHMI(hp1,hp2,generalized_mean=mean_max):\n",
" \"\"\"Returns the adjusted hierarchical mutual information.\"\"\"\n",
" _EHMI = EHMI(hp1,hp2).mean()\n",
" deno = generalized_mean(HH(hp1),HH(hp2))-_EHMI\n",
" if deno>0.:\n",
" return (HMI(hp1,hp2)[1]-_EHMI)/deno\n",
" return 0."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"# Now with two hierarchical partitions\n",
"hp1 = [[23],[[[[[[16], [17]]]]]],[[12], [22, 13]],[5],[7],[24],[[[9], [[14, 2]]], [[[[[[27], [3]]]]]]],[20, 29, 18],[4],[26, 15],[[10], [21, 25]],[11],[[0, 28], [1], [6]],[19, 8]]\n",
"hp2 = [[[[0, 25], [24]], [6], [11, 28], [8]],[[[19], [[[[21], [4], [[[[[22, 7]]]]]]]]], [5]],[[3], [10, 23, 14]],[[27, 1, 16, 13, 18, 26, 9], [[[[15], [[[[[[12, 17]]]]]]]], [2, 20]], [29]]]\n",
"AHMI(hp1,hp2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Generate random partitions and hierarchical partitions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def random_partition(elements):\n",
" \"\"\"Generates a random partition of the set of <<elements>>\n",
" \n",
" Examples\n",
" >>> random_partition(list(range(10)))\n",
" [[4, 7, 6], [5, 1, 3, 9, 2, 8], [0]]\n",
" \"\"\"\n",
" \n",
" num_splitters=np.random.randint(len(elements))\n",
" if num_splitters==0:\n",
" return elements\n",
" proto_partition=[-1]*num_splitters+elements\n",
" np.random.shuffle(proto_partition)\n",
" i=0\n",
" while proto_partition[i]==-1:\n",
" i+=1\n",
" j=len(proto_partition)-1\n",
" while proto_partition[j]==-1:\n",
" j-=1\n",
" rnd_partition = [[]]\n",
" for e in proto_partition[i:j+1]:\n",
" if e != -1:\n",
" rnd_partition[-1].append(e)\n",
" else:\n",
" if len(rnd_partition[-1])>0:\n",
" rnd_partition.append([])\n",
" #if len(rnd_partition[-1])==0:\n",
" # rnd_partition.pop()\n",
" return rnd_partition"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"random_partition(list(range(10)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST for the generator of random partitions\n",
"for i in range(100):\n",
" assert len(list(flattenator(random_partition(list(range(20))))))==20"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def random_hierarchical_partition(elements):\n",
" \"\"\"Generates a random hierarchical partition of the set of <<elements>>\n",
" \n",
" Examples\n",
" >>> random_hierarchical_partition(list(range(10)))\n",
" [[[[5], [[[7, 6]]]]], [[2], [0]], [[[[9, 4]]], [3, 8]], [1]]\n",
" \"\"\"\n",
" partition=random_partition(elements)\n",
" if not isinstance(partition[0],list):\n",
" return elements\n",
" hp=[]\n",
" for part in partition:\n",
" if len(part)>0:\n",
" chp=random_hierarchical_partition(part)\n",
" hp.append(chp)\n",
" if len(hp)>0:\n",
" return hp\n",
" return elements"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"random_hierarchical_partition(list(range(10)))"
]
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"# DEPRECATED\n",
"# TEST the generator of hierarchical random partitions and the HMI against the old implementation of the computation of the HMI.\n",
"\n",
"for n in range(1,201):\n",
" hp1=random_partition(list(range(n)))\n",
" hp2=random_partition(list(range(n))) \n",
" #hp1=random_hierarchical_partition(list(range(n)))\n",
" #hp2=random_hierarchical_partition(list(range(n)))\n",
" #print(hp1)\n",
" #print(hp2)\n",
" #print(hit.HMI(hp1,hp2))\n",
" #print(HMI(hp1,hp2))\n",
" assert check(hp1)\n",
" assert check(hp2)\n",
" assert abs(hit.HMI(hp1,hp2)-HMI(hp1,hp2)[1])<0.000001"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"HMI(hp1,hp2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Partial shuffling of hierarchical partitions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def partial_shuffling_hierarchical_partition(hp,k,verbose=0):\n",
" \"\"\"It shuffle k randomly chosen elements of the hierarchical partition hp.\"\"\"\n",
" d=np.array(sorted(flattenator(hp)),dtype=np.int32)\n",
" #if verbose>0:\n",
" # print(d) \n",
" rd=d.copy()\n",
" np.random.shuffle(rd)\n",
" for ii in range(k-1):\n",
" i=rd[ii]\n",
" j=rd[(ii+1)%k]\n",
" #if verbose>1:\n",
" # print(i,j)\n",
" d[i],d[j]=d[j],d[i]\n",
" if verbose>0:\n",
" print(d)\n",
" return replicate_hierarchical_partition(hp,d)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"hp = [[23],[[[[[[16], [17]]]]]],[[12], [22, 13]],[5],[7],[24],[[[9], [[14, 2]]], [[[[[[27], [3]]]]]]],[20, 29, 18],[4],[26, 15],[[10], [21, 25]],[11],[[0, 28], [1], [6]],[19, 8]]\n",
"rhp=partial_shuffling_hierarchical_partition(hp,30,verbose=2)\n",
"print(hp)\n",
"print(rhp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"NHMI(hp,rhp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"HMI(hp,rhp)[1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"AHMI(hp,rhp)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Compare AHMI against \"exact\" computation in the non-hierarchical case implemented in scikit-learn."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"metrics.cluster.adjusted_mutual_info_score([0, 0, 1, 1], [0, 0, 1, 1])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def partition_into_labels(p):\n",
" d={}\n",
" for i,c in enumerate(p):\n",
" for e in c:\n",
" d[e]=i\n",
" l=np.array([0]*len(d))\n",
" for e,i in d.items():\n",
" l[e]=i\n",
" return l"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"p1=[[3, 8, 0],[2],[16, 14],[19, 18],[11],[12, 1, 6, 5, 13, 4, 17, 7, 15, 9, 10]]\n",
"p2=[[4, 13],[19],[11, 16, 18],[1, 6, 8, 7, 2],[17, 5, 9, 15, 12, 14],[0, 10, 3]]\n",
"l1=partition_into_labels(p1)\n",
"l2=partition_into_labels(p2)\n",
"l1,l2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First test the HMI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"HMI(p1,p2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"metrics.cluster.mutual_info_score(l1,l2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Secondly test the EHMI"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"runms=EHMI(p1,p2)\n",
"runms"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# This way doesn't work. We must implement the way below...\n",
"#metrics.cluster.expected_mutual_information(l1,l2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# We must \"extend\" scikit-leearn with our implementation of an interface to metrics.cluster.expected_mutual_information(contingency, n_samples)\n",
"# https://github.com/scikit-learn/scikit-learn/blob/1495f6924/sklearn/metrics/cluster/supervised.py#L656\n",
"def sklearn_expected_mutual_info_score(labels_true, labels_pred,\n",
" average_method='warn'):\n",
" \"\"\"Expected Mutual Information between two clusterings.\n",
" Expected Mutual Information (EMI) averages the Mutual\n",
" Information (MI) score accounting for chance. \n",
" This metric is independent of the absolute values of the labels:\n",
" a permutation of the class or cluster label values won't change the\n",
" score value in any way.\n",
" This metric is furthermore symmetric: switching ``label_true`` with\n",
" ``label_pred`` will return the same score value. This can be useful to\n",
" measure the agreement of two independent label assignments strategies\n",
" on the same dataset when the real ground truth is not known.\n",
" Read more in the :ref:`User Guide <mutual_info_score>`.\n",
" Parameters\n",
" ----------\n",
" labels_true : int array, shape = [n_samples]\n",
" A clustering of the data into disjoint subsets.\n",
" labels_pred : array, shape = [n_samples]\n",
" A clustering of the data into disjoint subsets.\n",
" average_method : string, optional (default: 'warn')\n",
" How to compute the normalizer in the denominator. Possible options\n",
" are 'min', 'geometric', 'arithmetic', and 'max'.\n",
" If 'warn', 'max' will be used. The default will change to\n",
" 'arithmetic' in version 0.22.\n",
" .. versionadded:: 0.20\n",
" Returns\n",
" -------\n",
" emi: float\n",
" See also\n",
" --------\n",
" adjusted_mutual_info_score: Mutual Information adjusted by change\n",
" mutual_info_score: Mutual Information (not adjusted for chance)\n",
" Examples\n",
" --------\n",
" References\n",
" ----------\n",
" .. [1] `Vinh, Epps, and Bailey, (2010). Information Theoretic Measures for\n",
" Clusterings Comparison: Variants, Properties, Normalization and\n",
" Correction for Chance, JMLR\n",
" <http://jmlr.csail.mit.edu/papers/volume11/vinh10a/vinh10a.pdf>`_\n",
" .. [2] `Wikipedia entry for the Adjusted Mutual Information\n",
" <https://en.wikipedia.org/wiki/Adjusted_Mutual_Information>`_\n",
" \"\"\"\n",
" def check_clusterings(labels_true, labels_pred):\n",
" \"\"\"Check that the labels arrays are 1D and of same dimension.\n",
" Parameters\n",
" ----------\n",
" labels_true : int array, shape = [n_samples]\n",
" The true labels\n",
" labels_pred : int array, shape = [n_samples]\n",
" The predicted labels\n",
" \"\"\"\n",
" labels_true = np.asarray(labels_true)\n",
" labels_pred = np.asarray(labels_pred)\n",
"\n",
" # input checks\n",
" if labels_true.ndim != 1:\n",
" raise ValueError(\n",
" \"labels_true must be 1D: shape is %r\" % (labels_true.shape,))\n",
" if labels_pred.ndim != 1:\n",
" raise ValueError(\n",
" \"labels_pred must be 1D: shape is %r\" % (labels_pred.shape,))\n",
" if labels_true.shape != labels_pred.shape:\n",
" raise ValueError(\n",
" \"labels_true and labels_pred must have same size, got %d and %d\"\n",
" % (labels_true.shape[0], labels_pred.shape[0]))\n",
" return labels_true, labels_pred \n",
" #if average_method == 'warn':\n",
" # warnings.warn(\"The behavior of AMI will change in version 0.22. \"\n",
" # \"To match the behavior of 'v_measure_score', AMI will \"\n",
" # \"use average_method='arithmetic' by default.\",\n",
" # FutureWarning)\n",
" # average_method = 'max'\n",
" labels_true, labels_pred = check_clusterings(labels_true, labels_pred)\n",
" n_samples = labels_true.shape[0]\n",
" classes = np.unique(labels_true)\n",
" clusters = np.unique(labels_pred)\n",
" # Special limit cases: no clustering since the data is not split.\n",
" # This is a perfect match hence return 1.0.\n",
" if (classes.shape[0] == clusters.shape[0] == 1 or\n",
" classes.shape[0] == clusters.shape[0] == 0):\n",
" return 1.0\n",
" contingency = metrics.cluster.contingency_matrix(labels_true, labels_pred, sparse=True)\n",
" #contingency = contingency.astype(np.float64,**_astype_copy_false(contingency))\n",
" # Calculate the expected value for the mutual information\n",
" emi = metrics.cluster.expected_mutual_information(contingency, n_samples)\n",
" return emi"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"skl_emi=sklearn_expected_mutual_info_score(l1,l2)\n",
"skl_emi"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"skl_emi-runms.mean()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Systematic TEST\n",
"print(\"The printed values should be small, say of order 0.01 or below.\\n\")\n",
"elements=list(range(10))\n",
"for sample in range(10):\n",
" p1=random_partition(elements)\n",
" p2=random_partition(elements)\n",
" if not isinstance(p1[0],list):\n",
" p1=[p1]\n",
" if not isinstance(p2[0],list):\n",
" p2=[p2] \n",
" print(p1)\n",
" print(p2)\n",
" runms=EHMI(p1,p2)\n",
" l1=partition_into_labels(p1)\n",
" l2=partition_into_labels(p2)\n",
" skl_emi=sklearn_expected_mutual_info_score(l1,l2)\n",
" print(skl_emi-runms.mean())\n",
" print(\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Generate all hierarchical partitions of size $n$"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def copy(t):\n",
" \"\"\"Returns a copy a hierarchical partition <<t>>\"\"\"\n",
" if isinstance(t,list):\n",
" return [copy(c) for c in t]\n",
" else:\n",
" return t"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"t=[[[[5], [1]], [0]], [[2, 8], [7, 9, 3, 4], [6]]]\n",
"t == copy(t)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def prunned_copy(t,e):\n",
" \"\"\"Returns tt,aa where tt is a replica of t excluding the branch e of t. aa is the copied ancestor of e that exists in tt. In this way, if we like to replace subtree e by say subtree ee, then we can insert ee in aa to obtain a modifed copy tt of t where e is replaced by ee.\"\"\"\n",
" if isinstance(t,list):\n",
" a = None\n",
" b = []\n",
" for c in t:\n",
" if c is not e:\n",
" cc,aa = prunned_copy(c,e)\n",
" b.append(cc)\n",
" if aa is not None:\n",
" a=aa\n",
" else:\n",
" a = b\n",
" return b,a\n",
" else:\n",
" return t,None"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TESTS\n",
"t=[[[[5], [1]], [0]], [[2, 8], [7, 9, 3, 4], [6]]]\n",
"e=t[0][0][0]\n",
"b,a = prunned_copy(t,e)\n",
"print(e)\n",
"print(b)\n",
"print(a)\n",
"ee=copy(e)\n",
"ee.append('*')\n",
"a.append(ee)\n",
"print(a)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def replace(t,e,ee):\n",
" \"\"\"Assuming t, e and ee are hierarchical partitions represented by nested lists, this function replaces the branch e of t with another brach ee.\"\"\"\n",
" if e is t:\n",
" return ee\n",
" tt,aa = prunned_copy(t,e)\n",
" if aa is not None:\n",
" aa.append(ee)\n",
" return tt\n",
" else:\n",
" return None"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"t=[[[[5], [1]], [0]], [[2, 8], [7, 9, 3, 4], [6]]]\n",
"e=t[0][0][0]#[0]\n",
"ee=['*']\n",
"#ee='*'\n",
"tt=replace(t,e,ee)\n",
"print(t)\n",
"print(e)\n",
"print(ee)\n",
"print(tt)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"t=[[[[5], [1]], [0]], [[2, 8], [7, 9, 3, 4], [6]]]\n",
"e=t#[0][0][0]#[0]\n",
"ee=['*']\n",
"#ee='*'\n",
"tt=replace(t,e,ee)\n",
"print(t)\n",
"print(e)\n",
"print(ee)\n",
"print(tt)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def depth_first_search(t):\n",
" \"\"\"Performs a depth first search exploration of a hierarchical partition t represented by nested lists. It is an iterator, yielding each visited node e of t.\"\"\"\n",
" if isinstance(t,list):\n",
" yield t \n",
" for c in t:\n",
" for cc in depth_first_search(c):\n",
" yield cc"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"t=[[[[5], [1]], [0]], [[2, 8], [7, 9, 3, 4], [6]]]\n",
"for c in depth_first_search(t):\n",
" print(c)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TEST\n",
"t=[[1]]\n",
"for c in depth_first_search(t):\n",
" print(c)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def generate_hierarchical_partitions(n):\n",
" \"\"\"Generates all hierarchical partitions of the n elements 0,1,2,....,n-1. The generated hierarchical partitions are yielded as nested lists.\"\"\"\n",
" if n==1:\n",
" yield [0]\n",
" else:\n",
" for t in generate_hierarchical_partitions(n-1):\n",
" #if n==4:\n",
" # print(\"# t =\",t)\n",
" for c in depth_first_search(t):\n",
" if isinstance(c[0],list):\n",
" # c is non-leaf, so change with type B\n",
" cc=list(c)\n",
" cc.append([n-1])\n",
" #print(\"#.B cc =\",cc)\n",
" yield replace(t,c,cc) # B \n",
" else:\n",
" # c is leaf, so change with type A\n",
" cc=list(c)\n",
" cc.append(n-1)\n",
" #print(\"#.A cc =\",cc)\n",
" yield replace(t,c,cc)\n",
" # change with type C\n",
" cc=[c,[n-1]]\n",
" #print(\"#.C cc =\",cc)\n",
" yield replace(t,c,cc) # C "
]
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"# TEST, WARNING, SLOW, UNCOMMENT TO RUN.\n",
"for t in generate_hierarchical_partitions(7):\n",
" print(t)"
]
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"# Jus to check, visually, the case of n=3.\n",
"1 [0, 1, 2]\n",
"2 [[0, 1], [2]]\n",
"5 [[0], [1], [2]]\n",
"6 [[[0], [1]], [2]]\n",
"3 [[1], [0, 2]]\n",
"7 [[1], [[0], [2]]]\n",
"4 [[0], [1, 2]]\n",
"8 [[0], [[1], [2]]]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def hp_list_to_tuples(hp):\n",
" \"\"\"Converts a hierarchical partition written in terms of lists, into a hierarchical partition written in terms of tuples. In this way, hierarchical partitions can be hashed to be included as elements of sets.\"\"\"\n",
" if isinstance(hp,list):\n",
" return tuple([hp_list_to_tuples(c) for c in hp])\n",
" return hp"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"hp_list_to_tuples([[0], [[1], [2]]])"
]
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"# TEST, WARNING, SLOW, UNCOMMENT TO RUN.\n",
"# Lets check there are no repetitions.\n",
"n=6\n",
"results = []\n",
"for i,t in enumerate(generate_hierarchical_partitions(n)):\n",
" assert check(t)\n",
" print(i+1,t)\n",
" #for tt in results:\n",
" # assert tt != t\n",
" results.append(hp_list_to_tuples(t))\n",
"assert len(results) == len(set(results))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Distance $d_n$"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Naive implementation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ln2d2=0.5*np.log(2.0)\n",
"def d_n(t1,t2,n):\n",
" \"\"\"Computes the distance metric associated to the HVI given by\n",
" d_n(T,S)=1-exp(-n(ln(2)/2)V(T,S))\n",
" \"\"\"\n",
" return 1.0-np.exp(-n*ln2d2*HVI(t1,t2))"
]
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"# TEST, WARNING, SLOW, UNCOMMENT TO RUN.\n",
"# Shows that the triangular inequality is passed by the distance d_n.\n",
"n=4\n",
"for i1,t1 in enumerate(generate_hierarchical_partitions(n)):\n",
" for i2,t2 in enumerate(generate_hierarchical_partitions(n)):\n",
" for i3,t3 in enumerate(generate_hierarchical_partitions(n)):\n",
" HVI_12=HVI(t1,t2)\n",
" HVI_23=HVI(t2,t3)\n",
" HVI_13=HVI(t1,t3)\n",
" dHVI=HVI_12+HVI_23-HVI_13\n",
" print(i1,i2,i3,t1,t2,t3,dHVI)\n",
" assert(dHVI>=-1.0e-5)"
]
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"# TEST, WARNING, SLOW, UNCOMMENT TO RUN.\n",
"# Shows that the triangular inequality fails for the HVI.\n",
"n=4\n",
"for i1,t1 in enumerate(generate_hierarchical_partitions(n)):\n",
" for i2,t2 in enumerate(generate_hierarchical_partitions(n)):\n",
" for i3,t3 in enumerate(generate_hierarchical_partitions(n)):\n",
" d12=d_n(t1,t2,n)\n",
" d23=d_n(t2,t3,n)\n",
" d13=d_n(t1,t3,n)\n",
" dd=d12+d23-d13\n",
" assert(dd>=0.)\n",
" print(i1,i2,i3,t1,t2,t3,dd)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Fast implementation"
]
},
{
"cell_type": "raw",
"metadata": {},
"source": [
"# TEST, WARNING, SLOW, UNCOMMENT TO RUN.\n",
"# Let's check that the d_n >= 0 for all \n",
"\n",
"n=4\n",
"list_t = []\n",
"for t in generate_hierarchical_partitions(n):\n",
" list_t.append(t)\n",
"\n",
"fast_HVI=np.zeros((len(list_t),len(list_t)))\n",
"for i2,t2 in enumerate(list_t):\n",
" for i1,t1 in enumerate(list_t):\n",
" fast_HVI[i1,i2]=HVI(t1,t2)\n",
"\n",
"ln2d2=0.5*np.log(2.0)\n",
"def fast_d_n(_HVI,n):\n",
" \"\"\"Computes the distance metric associated to the HVI given by\n",
" d_n(T,S)=1-exp(-n(ln(2)/2)V(T,S))\n",
" \"\"\"\n",
" return 1.0-np.exp(-n*ln2d2*_HVI)\n",
"\n",
"for i3 in range(len(list_t)):\n",
" for i2 in range(len(list_t)):\n",
" for i1 in range(i3+1):\n",
" HVI_12=fast_HVI[i1,i2]\n",
" HVI_23=fast_HVI[i2,i3]\n",
" HVI_13=fast_HVI[i1,i3]\n",
" d12=fast_d_n(HVI_12,n)\n",
" d23=fast_d_n(HVI_23,n)\n",
" d13=fast_d_n(HVI_13,n)\n",
" dd=d12+d23-d13\n",
" dHVI=HVI_12+HVI_23-HVI_13\n",
" assert(dd>=-1.0e-5)\n",
" #assert(dHVI>=-1.0e-5)\n",
" print(i1,i2,i3,\"|\",d12,d23,d13,\"|\",dd,dHVI)"
]
}
],
"metadata": {
"@webio": {
"lastCommId": null,
"lastKernelId": null
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Task
Add the hierarchical mutual information distance metric from Perotti et al. 2015 to this package, to allow comparison between pairs of phylogenetic trees.
Format
An R implementation would be fine; R + C++ with Rcpp would also work.
Please follow the structure of existing functions where possible. Output should be in bits, i.e. using base 2 logarithms.
Output required
HierarchicalMutualInfoDist(tree1, tree2), wheretreeis an object of class "phylo".devtools::document().Background
Papers detailing the algo:
Juan Ignacio Perotti, Claudio Juan Tessone, and Guido Caldarelli
Phys. Rev. E 92, 062825 – Published 22 December 2015
Zhao Yang, Juan I. Perotti, and Claudio J. Tessone
Phys. Rev. E 96, 052311 – Published 14 November 2017
Juan Ignacio Perotti, Nahuel Almeira, and Fabio Saracco
Phys. Rev. E 101, 062148 – Published 30 June 2020.
Notebook with an example implementation. (Check that recursion is properly implemented – not guaranteed correct.)
https://github.com/jipphysics/hit/blob/master/hit.ipynb