forked from bccp/nbodykit
-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_species.py
126 lines (95 loc) · 3.99 KB
/
test_species.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from runtests.mpi import MPITest
from nbodykit.lab import *
from nbodykit import setup_logging
from numpy.testing import assert_array_equal, assert_allclose
import pytest
setup_logging()
@MPITest([1])
def test_boxsize_nmesh(comm):
# the catalog
source1 = UniformCatalog(nbar=3e-5, BoxSize=512., seed=42, comm=comm)
source2 = UniformCatalog(nbar=3e-5, BoxSize=512., seed=84, comm=comm)
cat = MultipleSpeciesCatalog(['data', 'randoms'], source1, source2)
# this should work (infer BoxSize)
mesh = cat.to_mesh(Nmesh=32)
# this should not work (no Nmesh to infer)
with pytest.raises(ValueError):
mesh = cat.to_mesh()
# this not should work
cat.attrs['data.BoxSize'] *= 10.
with pytest.raises(ValueError):
mesh = cat.to_mesh(Nmesh=32)
@MPITest([1, 4])
def test_getitem(comm):
# the catalog
source1 = UniformCatalog(nbar=3e-5, BoxSize=512., seed=42, comm=comm)
source2 = UniformCatalog(nbar=3e-5, BoxSize=512., seed=84, comm=comm)
cat = MultipleSpeciesCatalog(['data', 'randoms'], source1, source2)
# the mesh
mesh = cat.to_mesh(Nmesh=32, BoxSize=512)
for source, name in zip([source1, source2], ['data', 'randoms']):
submesh = mesh[name] # should be equal to source
assert submesh.source is cat[name]
@MPITest([1, 4])
def test_compute(comm):
# the catalog
source1 = UniformCatalog(nbar=3e-5, BoxSize=512., seed=42, comm=comm)
source2 = UniformCatalog(nbar=3e-5, BoxSize=512., seed=84, comm=comm)
cat = MultipleSpeciesCatalog(['data', 'randoms'], source1, source2)
# the meshes
mesh = cat.to_mesh(Nmesh=32, BoxSize=512)
mesh1 = source1.to_mesh(Nmesh=32, BoxSize=512)
mesh2 = source2.to_mesh(Nmesh=32, BoxSize=512)
# paint
real1 = mesh1.to_real_field()
real2 = mesh2.to_real_field()
# un-normalize real1 and real2
real1[:] *= real1.attrs['num_per_cell']
real2[:] *= real2.attrs['num_per_cell']
norm = real1.attrs['num_per_cell'] + real2.attrs['num_per_cell']
# the combined density field
combined = mesh.to_real_field()
# must be the same
assert_allclose(combined.value, (real1.value + real2.value)/norm, atol=1e-5)
@MPITest([1, 4])
def test_actions_compensated(comm):
# the test case fails only if there is enough particles to trigger
# the second loop of the interlaced painter; these parameters will do it.
# the catalog
source1 = UniformCatalog(nbar=1e-0, BoxSize=111, seed=111, comm=comm)
source2 = UniformCatalog(nbar=1e-0, BoxSize=111, seed=111, comm=comm)
source1['Weight'] = 1.0
source2['Weight'] = 0.1
cat = MultipleSpeciesCatalog(['data', 'randoms'], source1, source2)
# the meshes
mesh = cat.to_mesh(Nmesh=32, compensated=True)
assert len(mesh.actions) > 0
@MPITest([1, 4])
def test_paint_interlaced(comm):
# the test case fails only if there is enough particles to trigger
# the second loop of the interlaced painter; these parameters will do it.
# the catalog
source1 = UniformCatalog(nbar=1e-0, BoxSize=111, seed=111, comm=comm)
source2 = UniformCatalog(nbar=1e-0, BoxSize=111, seed=111, comm=comm)
source1['Weight'] = 1.0
source2['Weight'] = 0.1
cat = MultipleSpeciesCatalog(['data', 'randoms'], source1, source2)
# the meshes
mesh = cat.to_mesh(Nmesh=32, interlaced=True)
mesh1 = source1.to_mesh(Nmesh=32, interlaced=True)
mesh2 = source2.to_mesh(Nmesh=32, interlaced=True)
# paint
real1 = mesh1.to_real_field()
real2 = mesh2.to_real_field()
assert_allclose(real1.cmean(), 1.0)
assert_allclose(real2.cmean(), 1.0)
# un-normalize real1 and real2
real1[:] *= real1.attrs['num_per_cell']
real2[:] *= real2.attrs['num_per_cell']
norm = real1.attrs['num_per_cell'] + real2.attrs['num_per_cell']
# the combined density field
#combined = mesh.to_real_field()
combined = mesh.compute()
assert_allclose(combined.cmean(), 1.0)
# must be the same
assert_allclose(combined.value, (real1.value + real2.value)/norm, atol=1e-5)