-
Notifications
You must be signed in to change notification settings - Fork 76
/
ascii.py
99 lines (64 loc) · 2.59 KB
/
ascii.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
ascii
=====
A simple ascii file reader for pynbody
"""
# for py2.5
import os
import numpy as np
from .. import chunk, family, units
from . import SimSnap
_max_buf = 1024 * 512
class AsciiSnap(SimSnap):
def _setup_slices(self, num_lines, take=None):
disk_family_slice = {family.dm: slice(0,num_lines)}
self._load_control = chunk.LoadControl(
disk_family_slice, _max_buf, take)
self._family_slice = self._load_control.mem_family_slice
self._num_particles = self._load_control.mem_num_particles
def __init__(self, filename, take=None, header=None):
super().__init__()
num_particles = 0
self._header = header
with open(filename) as f:
for l in f:
if not header:
header = l
else:
num_particles+=1
self._loadable_keys = header.split()
self._filename = filename
self._file_units_system = [units.Unit("G"), units.Unit("kpc"), units.Unit("Msol")]
self._setup_slices(num_particles,take=take)
self._decorate()
def loadable_keys(self, fam=None):
return self._loadable_keys
def _load_arrays(self, array_name_list):
with open(self._filename) as f:
if not self._header: f.readline()
rs=[]
for array_name in array_name_list:
self._create_array(array_name, ndim=1)
rs = [self[array_name] for array_name in array_name_list]
cols = [self._loadable_keys.index(array_name) for array_name in array_name_list]
ncols = len(self._loadable_keys)
buf_shape = _max_buf
b = np.empty(buf_shape)
for readlen, buf_index, mem_index in self._load_control.iterate([family.dm],[family.dm]):
b = np.fromfile(f, count=readlen*ncols, sep=' ').reshape((-1,ncols))
if mem_index is not None:
for r,col in zip(rs,cols):
r[mem_index] = b[buf_index,col]
def _load_array(self, array_name, fam=None):
if fam is not None:
raise OSError("Arrays only loadable at snapshot, not family level")
ars = [array_name]
if array_name not in self._loadable_keys:
ars = self._array_name_ND_to_1D(array_name)
for array_name_i in ars:
if array_name_i not in self._loadable_keys:
raise OSError("No such array on disk")
self._load_arrays(ars)
@staticmethod
def _can_load(f):
return os.path.exists(f) and f.endswith('.txt')