Skip to content

Commit

Permalink
[FIX] read_partpositions() actually replaces the one in base_read mod…
Browse files Browse the repository at this point in the history
…ule.
  • Loading branch information
FrancescAlted committed Dec 14, 2017
1 parent 8974c81 commit 26ff566
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 206 deletions.
2 changes: 0 additions & 2 deletions doc/source/reflexible.rst
Expand Up @@ -10,8 +10,6 @@ The reflexible API
.. automodule:: reflexible
:members:

.. autofunction:: reflexible.read_part_positions

.. autofunction:: reflexible.scripts.create_ncfile.create_ncfile

.. autofunction:: reflexible.scripts.fprun.main
Expand Down
1 change: 0 additions & 1 deletion reflexible/__init__.py
Expand Up @@ -109,7 +109,6 @@ def print_versions():
from .scripts.create_ncfile import create_ncfile
from .tests.all import test
from .utils import Structure, CacheDict, curtain_agltoasl, data_range
from .part_positions import read_part_positions

from .plotting import (
plot_sensitivity, plot_at_level, plot_totalcolumn, plot_trajectory,
Expand Down
138 changes: 79 additions & 59 deletions reflexible/base_read.py
Expand Up @@ -31,14 +31,12 @@
ID: $Id$: $Rev$
"""
# builtin imports
import os
import datetime as dt

# Dependencies:
# Numpy
import numpy as np
import pandas as pd
import bcolz


from .data_structures import Trajectory

Expand Down Expand Up @@ -208,91 +206,113 @@ def read_trajectories(H, trajfile='trajectories.txt', ncluster=5,
return RelTraj


# The amount of records that are read from disk in a single shot. Keep this as
# small as possible.
CHUNKSIZE = 10 * 1000


def read_partpositions(filename, nspec, xlon0, ylat0, dx, dy, dataframe=False, header=None):
"""Read the particle positions in filename.
def get_quantized_ctable(dtype, cparams, quantize=None, expectedlen=None):
"""Return a ctable with the quantize filter enabled for floating point cols."""
columns, names = [], []
for fname, ftype in dtype.descr:
names.append(fname)
if 'f' in ftype:
cparams2 = bcolz.cparams(clevel=cparams.clevel, cname=cparams.cname, quantize=quantize)
columns.append(bcolz.zeros(0, dtype=ftype, cparams=cparams2, expectedlen=expectedlen))
else:
columns.append(bcolz.zeros(0, dtype=ftype, cparams=cparams, expectedlen=expectedlen))
return bcolz.ctable(columns=columns, names=names)


def read_partpositions(filename, nspec, ctable=True, clevel=5, cname="lz4", quantize=None):
"""Read the particle positions in `filename`.
This function strives to use as less memory as possible; for this, a
bcolz ctable container is used for holding the data. Besides to be compressed
in-memory, its chunked nature makes a natural fit for data that needs to
be appended because it does not need expensive memory resize operations.
NOTE: This code reads directly from un UNFORMATTED SEQUENTIAL data Fortran
file so care has been taken to skip the record length at the beginning and
the end of every record. See:
http://stackoverflow.com/questions/8751185/fortran-unformatted-file-format
Parameters
----------
filename : string
the file name of the particle raw files
The file name of the particle raw data
nspec : int
number of species
xlon0 : float
the longitude
xlat0 : float
the latitude
dx : float
the cell size (x)
dy : float
the cell size (u)
dataframe : bool
Return a pandas DataFrame
number of species in particle raw data
ctable : bool
Return a bcolz ctable container. If not, a numpy structured array is returned instead.
clevel : int
Compression level for the ctable container
cname : string
Codec name for the ctable container. Can be 'blosclz', 'lz4', 'zlib' or 'zstd'.
quantize : int
Quantize data to improve (lossy) compression. Data is quantized using
np.around(scale*data)/scale, where scale is 2**bits, and bits is
determined from the quantize value. For example, if quantize=1, bits
will be 4. 0 means that the quantization is disabled.
Returns
-------
structured_numpy_array OR pandas DataFrame
"""
CHUNKSIZE = 50 * 1000
ctable object OR structured_numpy_array
Returning a ctable is preferred because it is used internally so it does not require to be
converted to other formats, so it is faster and uses less memory.
xmass_dtype = [('xmass_%d' % (i+1), 'f4') for i in range(nspec)]
#note age is calculated from itramem by adding itimein
Note: Passing a `quantize` param > 0 can increase the compression ratio of the ctable
container, but it may also slow down the reading speed significantly.
"""

xmass_dtype = [('xmass_%d' % (i + 1), 'f4') for i in range(nspec)]
# note age is calculated from itramem by adding itimein
out_fields = [
('npoint', 'i4'), ('xtra1', 'f4'), ('ytra1', 'f4'), ('ztra1', 'f4'),
('itramem', 'i4'), ('topo', 'f4'), ('pvi', 'f4'), ('qvi', 'f4'),
('rhoi', 'f4'), ('hmixi', 'f4'), ('tri', 'f4'), ('tti', 'f4')] + xmass_dtype
('npoint', 'i4'), ('xtra1', 'f4'), ('ytra1', 'f4'), ('ztra1', 'f4'),
('itramem', 'i4'), ('topo', 'f4'), ('pvi', 'f4'), ('qvi', 'f4'),
('rhoi', 'f4'), ('hmixi', 'f4'), ('tri', 'f4'), ('tti', 'f4')] + xmass_dtype
raw_fields = [('begin_recsize', 'i4')] + out_fields + [('end_recsize', 'i4')]
rectype = np.dtype(out_fields)
raw_rectype = np.dtype(raw_fields)
recsize = raw_rectype.itemsize

out = np.empty(CHUNKSIZE, dtype=rectype)
chunk = np.empty(CHUNKSIZE, dtype=raw_rectype)
chunk.flags.writeable = True
cparams = bcolz.cparams(clevel=clevel, cname=cname)
if quantize is not None and quantize > 0:
out = get_quantized_ctable(raw_rectype, cparams=cparams, quantize=quantize, expectedlen=int(1e6))
else:
out = bcolz.zeros(0, dtype=raw_rectype, cparams=cparams, expectedlen=int(1e6))

numparticlecount = 0
with open(filename, "rb", buffering=1) as f:
# The timein value is at the beginning of the file
reclen = np.ndarray(shape=1, buffer=f.read(4), dtype="i4")[0]
reclen = np.ndarray(shape=(1,), buffer=f.read(4), dtype="i4")[0]
assert reclen == 4
itimein = np.ndarray(shape=1, buffer=f.read(4), dtype="i4")
reclen = np.ndarray(shape=1, buffer=f.read(4), dtype="i4")[0]
itimein = np.ndarray(shape=(1,), buffer=f.read(4), dtype="i4")
reclen = np.ndarray(shape=(1,), buffer=f.read(4), dtype="i4")[0]
assert reclen == 4
i = 0
nrec = 0
while True:
# Try to read a complete chunk
data = f.read(CHUNKSIZE * recsize)
read_records = int(len(data) / recsize) # the actual number of records read
chunk = chunk[:read_records]
chunk.data[:] = data
# Recompute xtra1 and ytra1 fields for the recently added chunk
#chunk['xtra1'][:] = (chunk['xtra1'] - xlon0) / dx
#chunk['ytra1'][:] = (chunk['ytra1'] - ylat0) / dy
#chunk['age'][:] = (chunk['itramem'] + itimein)
chunk = np.ndarray(shape=(read_records,), buffer=data, dtype=raw_rectype)
# Add the chunk to the out array
out[i:i+read_records] = chunk
i += read_records
out.append(chunk[:read_records])
nrec += read_records
if read_records < CHUNKSIZE:
# We reached the end of the file
break
else:
# Enlarge out by CHUNKSIZE more elements
out = np.resize(out, out.size + CHUNKSIZE)

# Truncate at the max length (last row is always a sentinel, so remove it)
out = np.resize(out, i - 1)

if dataframe:
df = pd.DataFrame(out)
#df['age'] = df['itramem'] + itimein
df['age'] = (df['itramem'] - (df['npoint']-1) * 10800) + itimein
if header:
month = header.releasetimes[0].month
df['month'] = pd.Series(len(df) * [month])
return df
return out

# Truncate at the max length (last row is always a sentinel, so remove it)
out.trim(1)
# Remove the first and last columns
out.delcol("begin_recsize")
out.delcol("end_recsize")

if ctable:
return out
else:
return out[:]


def read_shortpositions(filename, nspec, xlon0, ylat0, dx, dy, dataframe=False, header=None):
"""Read the particle positions in filename.
Expand Down
144 changes: 0 additions & 144 deletions reflexible/part_positions.py

This file was deleted.

0 comments on commit 26ff566

Please sign in to comment.