Skip to content

Commit

Permalink
Merge pull request #199 from prjemian/126-update_available
Browse files Browse the repository at this point in the history
spec: update_available property
  • Loading branch information
prjemian committed Aug 17, 2019
2 parents 4f8e469 + 7b40288 commit d3f6984
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 61 deletions.
92 changes: 80 additions & 12 deletions src/spec2nexus/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
and the internal routines will take care of all that is necessary
to read and interpret the information.
.. autosummary::
~is_spec_file
~is_spec_file_with_header
~SpecDataFile
~SpecDataFileHeader
~SpecDataFileScan
.. -----------------------------------------------------------------------------------------
old documentation
-----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -203,7 +212,22 @@ def is_spec_file_with_header(filename):
class SpecDataFile(object):

"""
contents of a spec data file
contents of a SPEC data file
.. autosummary::
~update_available
~dissect_file
~read
~getScan
~getScanNumbers
~getScanNumbersChronological
~getMinScanNumber
~getMaxScanNumber
~getFirstScanNumber
~getLastScanNumber
~getScanCommands
"""

fileName = ''
Expand All @@ -217,16 +241,33 @@ def __init__(self, filename):
self.headers = []
self.scans = OrderedDict()
self.readOK = -1
if not os.path.exists(filename):
raise SpecDataFileNotFound('file does not exist: ' + str(filename))
if not is_spec_file(filename):
raise NotASpecDataFile('not a SPEC data file: ' + str(filename))
self.fileName = filename

self.read()
self.last_scan = None
self.mtime = 0
self.num_lines = 0

if filename is not None:
if not os.path.exists(filename):
raise SpecDataFileNotFound(
'file does not exist: ' + str(filename))
if not is_spec_file(filename):
raise NotASpecDataFile(
'not a SPEC data file: ' + str(filename))
self.fileName = filename

self.read()

def __str__(self):
return self.fileName or 'None'

@property
def update_available(self):
"""
Has the file been updated since the last time it was read?
Reference file modification time is stored *after*
file is read in :meth:`read()` method.
"""
return self.mtime != os.path.getmtime(self.fileName)

def _read_file_(self, spec_file_name):
"""Reads a spec data file"""
Expand Down Expand Up @@ -260,11 +301,12 @@ def dissect_file(self):
text with one of the above control lines at its start
"""
buf = self._read_file_(self.fileName)
buf = self._read_file_(self.fileName).splitlines()
self.num_lines = len(buf)

sections, block = [], []

for _line_num, text in enumerate(buf.splitlines()):
for _line_num, text in enumerate(buf):
if len(text.strip()) > 0:
f = text.split()[0]
if len(f) == 2 and f in ("#E", "#F", "#S"):
Expand Down Expand Up @@ -299,6 +341,9 @@ def read(self):
# fix any missing parts
if not hasattr(self, "specFile"):
self.specFile = self.fileName

self.last_scan = self.getLastScanNumber()
self.mtime = os.path.getmtime(self.fileName)

def getScan(self, scan_number=0):
"""return the scan number indicated, None if not found"""
Expand Down Expand Up @@ -358,7 +403,18 @@ def getScanCommands(self, scan_list=None):


class SpecDataFileHeader(object):
"""contents of a spec data file header (#E) section"""
"""
contents of a spec data file header (#E) section
.. autosummary::
~get_macro_name
~interpret
~addPostProcessor
~addH5writer
~getLatestScan
"""

def __init__(self, buf, parent = None):
#----------- initialize the instance variables
Expand Down Expand Up @@ -430,7 +486,19 @@ def getLatestScan(self):
#-------------------------------------------------------------------------------------------

class SpecDataFileScan(object):
"""contents of a spec data file scan (#S) section"""
"""
contents of a spec data file scan (#S) section
.. autosummary::
~get_macro_name
~interpret
~add_interpreter_comment
~get_interpreter_comments
~addPostProcessor
~addH5writer
"""

def __init__(self, header, buf, parent=None):
self.parent = parent # instance of SpecDataFile
Expand Down
150 changes: 101 additions & 49 deletions tests/test_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------

import os
import shutil
import sys
import tempfile
import time
import unittest
import os, sys

_test_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
_path = os.path.abspath(os.path.join(_test_path, 'src'))
Expand All @@ -25,70 +29,68 @@


class Test(unittest.TestCase):

def setUp(self):
self.basepath = os.path.join(_path, 'spec2nexus')
self.datapath = os.path.join(self.basepath, 'data')

# def tearDown(self):
# pass

# def testName(self):
# pass

def abs_data_fname(self, fname):
return os.path.join(self.datapath, fname)
return os.path.join(_path, 'spec2nexus', 'data', fname)

def test_strip_first_word(self):
self.assertEqual(utils.strip_first_word('one two three'), 'two three')

def test_isSpecFileThis(self):
self.assertFalse(spec.is_spec_file('this_does_not_exist'))
self.assertFalse(spec.is_spec_file(self.basepath))
self.assertFalse(spec.is_spec_file(os.path.join(_path, 'spec2nexus')))
self.assertFalse(spec.is_spec_file(__file__))
self.assertTrue( spec.is_spec_file(self.abs_data_fname('APS_spec_data.dat')))
self.assertFalse(spec.is_spec_file_with_header('file does not exist'))
self.assertTrue( spec.is_spec_file_with_header(self.abs_data_fname('APS_spec_data.dat')))
self.assertFalse(spec.is_spec_file_with_header(self.abs_data_fname('spec_from_spock.spc')))

def is_spec_file(self, fname):
return spec.is_spec_file(self.abs_data_fname(fname))

def test_isSpecFile(self):
'''test all the known data files to see if they are SPEC'''
self.assertTrue( self.is_spec_file('33bm_spec.dat'))
self.assertTrue( self.is_spec_file('33id_spec.dat'))
self.assertTrue( self.is_spec_file('APS_spec_data.dat'))
self.assertTrue( self.is_spec_file('CdSe'))
self.assertTrue( self.is_spec_file('lmn40.spe'))
self.assertTrue( self.is_spec_file('YSZ011_ALDITO_Fe2O3_planar_fired_1.spc'))
self.assertFalse(self.is_spec_file('uxml')) # directory
self.assertFalse(self.is_spec_file('README.txt')) # text file
self.assertFalse(self.is_spec_file('33bm_spec.hdf5'))
self.assertFalse(self.is_spec_file('33id_spec.hdf5'))
self.assertFalse(self.is_spec_file('APS_spec_data.hdf5'))
self.assertFalse(self.is_spec_file('compression.h5'))
self.assertFalse(self.is_spec_file('Data_Q.h5'))
self.assertFalse(self.is_spec_file('lmn40.hdf5'))
self.assertFalse(self.is_spec_file('writer_1_3.h5'))

def cannot_find_spec_data_file(self):
spec.SpecDataFile('cannot_find_this_file')

def not_a_spec_data_file(self):
spec.SpecDataFile(__file__)
files = {
"33bm_spec.dat": True,
"33id_spec.dat": True,
"APS_spec_data.dat": True,
"CdSe": True,
"lmn40.spe": True,
"YSZ011_ALDITO_Fe2O3_planar_fired_1.spc": True,
"uxml": False, # directory
"README.txt": False, # text file
"33bm_spec.hdf5": False,
"33id_spec.hdf5": False,
"APS_spec_data.hdf5": False,
"compression.h5": False,
"Data_Q.h5": False,
"lmn40.hdf5": False,
"writer_1_3.h5": False,
}
for item, expected in files.items():
self.assertEqual(self.is_spec_file(item), expected, item)

def test_custom_exceptions(self):
self.assertRaises(Exception, spec.SpecDataFileNotFound())
self.assertRaises(Exception, spec.SpecDataFileCouldNotOpen())
self.assertRaises(Exception, spec.DuplicateSpecScanNumber())
self.assertRaises(Exception, spec.NotASpecDataFile())
self.assertRaises(Exception, spec.UnknownSpecFilePart())

def spec_data_file(self):
spec.SpecDataFile(self.abs_data_fname('03_05_UImg.dat'))
with self.assertRaises(IOError):
raise spec.SpecDataFileNotFound()
with self.assertRaises(IOError):
raise spec.SpecDataFileCouldNotOpen()
with self.assertRaises(Exception):
raise spec.DuplicateSpecScanNumber()
with self.assertRaises(Exception):
raise spec.NotASpecDataFile()
with self.assertRaises(Exception):
raise spec.UnknownSpecFilePart()

def test_file_initial_exceptions(self):
self.assertRaises(TypeError, spec.SpecDataFile)
self.assertRaises(spec.SpecDataFileNotFound, self.cannot_find_spec_data_file)
self.assertRaises(spec.NotASpecDataFile, self.not_a_spec_data_file)
with self.assertRaises(TypeError):
spec.SpecDataFile()
with self.assertRaises(spec.SpecDataFileNotFound):
spec.SpecDataFile('cannot_find_this_file')
with self.assertRaises(spec.SpecDataFileNotFound):
spec.SpecDataFile(self.abs_data_fname('03_05_UImg.dat'))
with self.assertRaises(spec.NotASpecDataFile):
spec.SpecDataFile(__file__)

def test_33bm_spec(self):
fname = self.abs_data_fname('33bm_spec.dat')
Expand Down Expand Up @@ -315,7 +317,7 @@ def test_extra_control_line_content__issue109(self):
scan = sfile.getScan(scan_number)
self.assertTrue(scan is not None)
self.assertEqual(scan.T, "0", "received expected count time")
self.assertTrue("Seco nds" in scan.data, "found counting base")
self.assertIn("Seco nds", scan.data, "found counting base")
self.assertEqual(
scan.data["Seco nds"][0],
1,
Expand All @@ -334,25 +336,75 @@ def test_extra_control_line_content__issue109(self):
"400000",
"received expected monitor count")
self.assertTrue(hasattr(scan, 'MCA'), "MCA found")
self.assertTrue("ROI" in scan.MCA, "MCA ROI found")
self.assertIn("ROI", scan.MCA, "MCA ROI found")
roi_dict = scan.MCA["ROI"]
key = "FeKa(mca1 R1)"
self.assertTrue(key in roi_dict, "MCA ROI config found")
self.assertIn(key, roi_dict, "MCA ROI config found")
roi = roi_dict[key]
self.assertEqual(roi["first_chan"], 377, "MCA ROI first channel")
self.assertEqual(roi["last_chan"], 413, "MCA ROI last channel")

self.assertTrue(key in scan.data, "MCA ROI data found")
self.assertIn(key, scan.data, "MCA ROI data found")
self.assertEqual(
len(scan.data[key]),
61,
"embedded comment not part of data")

def test_str(self):
specFile = os.path.join(
os.path.dirname(__file__),
'data',
'issue109_data.txt')
sdf = spec.SpecDataFile(specFile)
self.assertEqual(str(sdf), sdf.fileName)
sdf = spec.SpecDataFile(None)
self.assertEqual(str(sdf), 'None')


class TestFileUpdate(unittest.TestCase):

def setUp(self):
self.data_file = tempfile.NamedTemporaryFile(
suffix='.dat', delete=False)
self.data_file.close()

def tearDown(self):
os.remove(self.data_file.name)

def test_update_available(self):
# test the mtime function first
# and setup the modifiable SPEC data file
self.assertTrue(os.path.exists(self.data_file.name))
mt0 = os.path.getmtime(self.data_file.name)
time.sleep(0.02) # at least a clock tick (1/60 s)
shutil.copy(
os.path.join(_test_path, "tests", "data", "issue82_data.txt"),
self.data_file.name)
mt1 = os.path.getmtime(self.data_file.name)
self.assertGreater(mt1, mt0)

# test the ``update_available`` property
sdf = spec.SpecDataFile(self.data_file.name)
self.assertFalse(sdf.update_available)
self.assertEqual(sdf.num_lines, 164)
self.assertEqual(sdf.last_scan, sdf.getLastScanNumber())
self.assertEqual(sdf.last_scan, '17')

# update the file with a trivial edit
with open(self.data_file.name, "a") as fp:
fp.write("\n#C comment\n")
time.sleep(0.02) # at least a clock tick (1/60 s)

mt2 = os.path.getmtime(self.data_file.name)
self.assertGreater(mt2, mt1)
self.assertTrue(sdf.update_available)


def suite(*args, **kw):
test_suite = unittest.TestSuite()
test_list = [
Test,
TestFileUpdate,
]
for test_case in test_list:
test_suite.addTest(unittest.makeSuite(test_case))
Expand Down

0 comments on commit d3f6984

Please sign in to comment.