Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concat data #26

Merged
merged 2 commits into from May 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 96 additions & 2 deletions src/pypore/file_converter.py
Expand Up @@ -3,11 +3,16 @@

@author: `@parkin`_
"""
import os
import datetime
from filetypes import data_file
import numpy as np
import scipy.signal as sig
from pypore.i_o import get_reader_from_filename

import pypore.filetypes.data_file as df
from pypore.i_o.abstract_reader import AbstractReader


def convert_file(filename, output_filename=None):
"""
Expand All @@ -20,7 +25,7 @@ def convert_file(filename, output_filename=None):

if output_filename is None:
output_filename = filename.split('.')[0] + '.h5'

save_file = data_file.open_file(output_filename, mode='w', sample_rate=sample_rate, n_points=n_points)
blocks_to_get = 1
data = reader.get_next_blocks(blocks_to_get)[0]
Expand Down Expand Up @@ -80,11 +85,100 @@ def filter_file(filename, filter_frequency, out_sample_rate, output_filename=Non
if 0 < out_sample_rate < sample_rate:
n_out = int(np.ceil(n_points * out_sample_rate / sample_rate))
filtered = sig.resample(filtered, num=n_out)
final_sample_rate = sample_rate * (1.0*n_out/n_points)
final_sample_rate = sample_rate * (1.0 * n_out / n_points)

save_file = data_file.open_file(output_filename, mode='w', sample_rate=final_sample_rate, n_points=filtered.size)
save_file.root.data[:] = filtered[:]

save_file.flush()
save_file.close()
return output_filename


class SamplingRatesMismatchError(Exception):
pass


def concat_files(files, output_filename=None):
"""
This function concatenates multiple files into one data file. All of the sampling rates of the original files
must be the same.

:param list files: List of string file names OR
:py:class:`Readers <pypore.i_o.abstract_reader.AbstractReader>`.
:param output_filename: Optional file name for the resulting file.
:raises: :py:exc:`ValueError` -- if the length of the files list is < 2.
:raises: :py:exc:`SamplingRatesMismatchError <pypore.file_converter.SamplingRatesMismatchError>` -- if the sampling
rates do not match in all of the files.

>>> from pypore.i_o.data_file_reader import DataFileReader
>>> concat_files(['file1.log', DataFileReader('dataFile.h5')], output_filename='concatenated.h5') # can pass strings or Readers
"""
if len(files) < 2:
raise ValueError("Minimum length of files list is 2.")

# Get the first sample rate
should_close_reader = False
reader = files[0]
if not isinstance(reader, AbstractReader):
reader = get_reader_from_filename(reader)
should_close_reader = True

sample_rate = reader.get_sample_rate()

if output_filename is None:
basename = os.path.basename(reader.get_filename())
output_filename = basename.split('.')[0] + '_concatenated_' + datetime.datetime.now().strftime(
"%Y%m%d_%H%M%S") + '.h5'

if should_close_reader:
reader.close()

n = 0

# Get the total number of data points, and check that the sampling rates are equal.
for i, reader in enumerate(files):
should_close_reader = False

# If it's not already a reader
if not isinstance(reader, AbstractReader):
reader = get_reader_from_filename(reader)
should_close_reader = True

curr_sample_rate = reader.get_sample_rate()

if curr_sample_rate != sample_rate:
raise SamplingRatesMismatchError(
"Sampling rates differ in files. Found {0} and {1}.".format(curr_sample_rate, sample_rate))

n += reader.get_all_data()[0].size

if should_close_reader:
reader.close()

# Open a new data file.
new_data_file = df.open_file(output_filename, mode='w', n_points=n, sample_rate=sample_rate)

curr_i = 0

for i, reader in enumerate(files):
should_close_reader = False

# If it's not already a reader
if not isinstance(reader, AbstractReader):
reader = get_reader_from_filename(reader)
should_close_reader = True

n_i = reader.get_points_per_channel_total()

new_data_file.root.data[curr_i:curr_i + n_i] = reader.get_all_data()[0]

curr_i += n_i

if should_close_reader:
reader.close()

new_data_file.close()

return output_filename

134 changes: 134 additions & 0 deletions src/pypore/tests/test_file_converter.py
Expand Up @@ -4,6 +4,7 @@

import unittest
import os
import datetime
from pypore.i_o import get_reader_from_filename
from pypore.file_converter import convert_file

Expand Down Expand Up @@ -225,6 +226,139 @@ def test_filtered_baseline(self, filename):
os.remove(out_filename)


from pypore.file_converter import concat_files
from pypore.file_converter import SamplingRatesMismatchError


class TestConcatFiles(unittest.TestCase):
def test_default_output_filename(self):
"""
Tests that the default output filename is correctly generated from the input file names.
"""
file_names = [tf.get_abs_path('chimera_1event_2levels.log'), tf.get_abs_path('chimera_1event.log')]

output_filename = concat_files(file_names)

self.assertTrue(os.path.exists(output_filename))

# Check that it's saved in the current directory
self.assertEqual(output_filename[0:len('chimera_1event')], 'chimera_1event')

# Check the correct file extension
self.assertEqual(output_filename[-len('.h5'):], '.h5')

self.assertIn('_concatenated_', output_filename,
"Default output filename ''{0}'' should contain ''_concatenated_''")

os.remove(output_filename)

@_test_file_manager(DIRECTORY)
def test_set_output_filename(self, filename):
file_names = [tf.get_abs_path('chimera_1event_2levels.log'), tf.get_abs_path('chimera_1event.log')]

output_filename = concat_files(file_names, output_filename=filename)

self.assertEqual(output_filename, filename)
self.assertTrue(os.path.exists(output_filename))

@_test_file_manager(DIRECTORY)
def test_different_sample_rate_no_resample(self, filename):
"""
Tests that an error is thrown if the files have different sample rates and we do not want to resample.
"""

file_names = [tf.get_abs_path('heka_1.5s_mean5.32p_std2.76p.hkd'), tf.get_abs_path('chimera_1event.log')]

self.assertRaises(SamplingRatesMismatchError, concat_files, file_names, output_filename=filename)

def test_single_file_fail(self):
"""
Tests that an exception is thrown if less than 2 file names are passed in the list.
"""
file_names = ['hi.txt']

self.assertRaises(ValueError, concat_files, file_names)
self.assertRaises(ValueError, concat_files, [])

@_test_file_manager(DIRECTORY)
def test_original_files_unmodified(self, filename):
file1 = tf.get_abs_path('chimera_1event_2levels.log')
file2 = tf.get_abs_path('chimera_1event.log')

reader1 = get_reader_from_filename(file1)
reader2 = get_reader_from_filename(file2)

sample_rate1_orig = reader1.get_sample_rate()
sample_rate2_orig = reader2.get_sample_rate()

data1_orig = reader1.get_all_data()[0]
data2_orig = reader2.get_all_data()[0]

reader1.close()
reader2.close()

concat_files([file1, file2], output_filename=filename)

reader1 = get_reader_from_filename(file1)
reader2 = get_reader_from_filename(file2)

reader_out = get_reader_from_filename(filename)

sample_rate1_final = reader1.get_sample_rate()
sample_rate2_final = reader2.get_sample_rate()

self.assertEqual(sample_rate1_final, sample_rate1_orig,
"Sample rate changed. Was {0}, now {1}.".format(sample_rate1_orig, sample_rate1_final))
self.assertEqual(sample_rate2_final, sample_rate2_orig,
"Sample rate changed. Was {0}, now {1}.".format(sample_rate2_orig, sample_rate2_final))

data1 = reader1.get_all_data()[0]
data2 = reader2.get_all_data()[0]

np.testing.assert_array_equal(data1, data1_orig)
np.testing.assert_array_equal(data2, data2_orig)

reader1.close()
reader2.close()
reader_out.close()

@_test_file_manager(DIRECTORY)
def test_correct_data(self, filename):
file1 = tf.get_abs_path('chimera_1event_2levels.log')
file2 = tf.get_abs_path('chimera_1event.log')

concat_files([file1, file2], output_filename=filename)

reader1 = get_reader_from_filename(file1)
reader2 = get_reader_from_filename(file2)

reader_out = get_reader_from_filename(filename)

sample_rate1 = reader1.get_sample_rate()
sample_rate2 = reader2.get_sample_rate()
sample_rate_out = reader_out.get_sample_rate()

self.assertEqual(sample_rate_out, sample_rate1,
"Unexpected sample rate. Should be {0}, was {1}.".format(sample_rate1, sample_rate_out))
self.assertEqual(sample_rate_out, sample_rate2,
"Unexpected sample rate. Should be {0}, was {1}.".format(sample_rate2, sample_rate_out))

data1 = reader1.get_all_data()[0]
data2 = reader2.get_all_data()[0]

data_out_should_be = np.zeros(data1.size + data2.size)
data_out_should_be[:data1.size] = data1[:]
data_out_should_be[data1.size:] = data2[:]

data_out = reader_out.get_all_data()[0]

np.testing.assert_array_equal(data_out, data_out_should_be)

reader1.close()
reader2.close()
reader_out.close()


if __name__ == "__main__":
unittest.main()