Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Refactored psrfits conversion. Now much faster and it folds much more…

… data.
  • Loading branch information...
commit 3c644cf83bd196ff19be92b544693cac12174ab1 1 parent aaaefb8
Kiyoshi Masui authored
Showing with 108 additions and 44 deletions.
  1. +1 −0  input/km/psrfits.ini
  2. +107 −44 scripts/psrfits_to_sdfits.py
View
1  input/km/psrfits.ini
@@ -9,3 +9,4 @@ scans = (30,)
#scans = tuple(range(10, 153, 8))
output_root = '/home/scratch/kmasui/tmp/'
time_bins_to_average = 128
+cal_fold_time = 30
View
151 scripts/psrfits_to_sdfits.py
@@ -20,6 +20,7 @@
import glob
import subprocess
import warnings
+import time as tm
import numpy.ma as ma
import scipy as sp
@@ -57,7 +58,15 @@
# power of 2 and less than 2048.
"time_bins_to_average" : 1,
# Scans to not convert.
- "blacklist" : []
+ "blacklist" : [],
+ # Approximate amount of data to fold to find the cal phase.
+ # Each section of data this long will be folded and the cal
+ # phase independanly solved for. In seconds.
+ # This should shorter than the time it takes for guppi and the
+ # noise cal to change relative phase significantly. Empirically
+ # this seems to be about a minute for guppi samples of 0.001024s
+ # and cal period of 64 times this.
+ "cal_fold_time" : 30
}
prefix = ''
@@ -336,12 +345,37 @@ def processfile(self, scan, Pipe) :
Data.set_field('CRVAL4', [1,2,3,4], ('pol',), '1I')
else :
raise ce.DataError("Unsupported polarizations.")
- Data.set_field('OBSERVER', psrmain['OBSERVER'], (), '32A')
if get_cal :
Data.set_field('CAL', ['T', 'F'], ('cal',), '1A')
else :
Data.set_field('CAL', ['F'], ('cal',), '1A')
Data.verify()
+
+ # We will devided the data into params["cal_fold_time"] chunks and
+ # solve for the cal phase in each one. Figure out how many records go
+ # in each chunk.
+ # Figure out the number of time bins in a cal period.
+ if get_cal:
+ try :
+ cal_period = 1.0/psrmain["CAL_FREQ"]
+ except ZeroDivisionError :
+ # Default value that we usually use.
+ cal_period = 0.065536
+ n_bins_cal = cal_period/sample_time
+ if n_bins_cal%1.0 > 0.00001 :
+ raise NotImplementedError("Need an integer number of "
+ "samples per cal period.")
+ n_bins_cal = int(round(n_bins_cal))
+ if n_bins_ave%n_bins_cal != 0 :
+ raise ValueError("You should rebin on a multiple of "
+ "the cal period. Change parameter "
+ "time_bins_to_average.")
+ total_time = (n_records - start_record)*ntime*sample_time
+ n_divisions = total_time/params['cal_fold_time']
+ n_divisions = int(round(n_divisions))
+ n_records_div = float(n_records - start_record)/n_divisions
+ division_starts = [int(round(start_record + n_records_div*ii))
+ for ii in range(n_divisions+1)]
# Loop over the records and modify the data.
if self.feedback > 1 :
@@ -353,52 +387,73 @@ def processfile(self, scan, Pipe) :
if self.feedback > 1 :
print jj,
sys.stdout.flush()
+ # Do one pass through the current division of the data to find the
+ # folding offsets.
+ if get_cal and jj in division_starts :
+ div_num = division_starts.index(jj)
+ # Loop over the data and accumulate it into one array profile.
+ record_profile = sp.zeros((ntime, nfreq), dtype=float)
+ for kk in range(division_starts[div_num],
+ division_starts[div_num+1]):
+ record_tmp = psrdata[kk]
+ # Data comes from telescope as wierd unsigned and signed
+ # ints. Handle carefully. Only interested in stokes I.
+ tmp_data = record_tmp["DATA"]
+ tmp_data.shape = (ntime, npol, nfreq)
+ # The 'I' polarization should already be correctly formated
+ # as an uint8.
+ tmp_data = tmp_data[:,0,:]
+ # There are actually time intependant a frequency dependant
+ # scalings that are ignored here. They shouldn't affect
+ # the shape of the profile much.
+ record_profile += tmp_data
+ # `get_cal_mask` expects 3D data.
+ record_profile.shape = (ntime, 1, nfreq)
+ # Try to get the profile of this division. If we fail, set the
+ # Phase info to None, which cause the phase to be solved for on
+ # a record by record basis.
+ try :
+ cal_phase_info = get_cal_mask(record_profile, n_bins_cal)
+ except ce.DataError :
+ cal_phase_info = None
# Get the data field.
data = record["DATA"]
# Convert the Data to the proper shape and type.
data = format_data(data, ntime, npol, nfreq)
- # Now get the scale and offsets for the data and apply them.
- scls = sp.array(record["DAT_SCL"], dtype=sp.float32)
- scls.shape = (1, npol, nfreq)
- offs = sp.array(record["DAT_OFFS"], dtype=sp.float32)
- offs.shape = (1, npol, nfreq)
if get_cal :
- # Figure out the number of time bins in a cal period.
- try :
- cal_period = 1.0/psrmain["CAL_FREQ"]
- except ZeroDivisionError :
- # Default value that we usually use.
- cal_period = 0.065536
- n_bins_cal = cal_period/sample_time
- if n_bins_cal%1.0 > 0.00001 :
- raise NotImplementedError("Need an integer number of "
- "samples per cal periode.")
- n_bins_cal = int(round(n_bins_cal))
- if n_bins_ave%n_bins_cal != 0 :
- raise ValueError("You should rebin on a multiple of "
- "the cal period. Change parameter "
- "time_bins_to_average.")
- # We can speed up the separating of the cal by rebinning in
- # time first.
- tmp_n_bins_ave = n_bins_ave//n_bins_cal
- data.shape = (n_bins, tmp_n_bins_ave, n_bins_cal, npol,
- nfreq)
+ data = separate_cal(data, n_bins_cal, cal_phase_info)
+
+ # Down sample from the cal period to the final number of time
+ # bins.
+ data.shape = (n_bins, ntime//n_bins_cal//n_bins, npol,
+ ncal, nfreq)
# Use mean not median due to discritization.
- data = scls*sp.mean(data, 1) + offs
- data.shape = (n_bins*n_bins_cal, npol, nfreq)
- # Finally devided into cal on and cal off.
- data = separate_cal(data, n_bins_cal)
+ data = sp.mean(data, 1)
else :
# Just rebin in time and add a length 1 cal index.
- data.shape = (n_bins, n_bins_ave, npol, nfreq)
+ data.shape = (n_bins, n_bins_ave, npol, ncal, nfreq)
data = sp.mean(data, 1)
- data = scls*data + offs
- data.shape = (n_bins, npol, ncal, nfreq)
+ # Now get the scale and offsets for the data and apply them.
+ scls = sp.array(record["DAT_SCL"], dtype=sp.float32)
+ scls.shape = (1, npol, 1, nfreq)
+ offs = sp.array(record["DAT_OFFS"], dtype=sp.float32)
+ offs.shape = (1, npol, 1, nfreq)
+ data = scls*data + offs
+
# Now figure out the timing information.
time = start_seconds + record["OFFS_SUB"]
time = time + (sp.arange(-n_bins/2.0 + 0.5, n_bins/2.0 + 0.5)
* resampled_time)
+ # Make sure that there are no time gaps between records.
+ # This is nessisary for the cal separation to be valid.
+ if jj > start_record:
+ record_time = record["OFFS_SUB"] - last_offs_sub
+ if not sp.allclose(record_time, n_bins*resampled_time):
+ msg = "Time gaps between records"
+ raise ce.DataError(msg)
+ last_offs_sub = record["OFFS_SUB"]
+
# Make sure that our pointing data covers the guppi data time
# range within the 0.1 second accuracy.
if ((max(time) - ant_time_range[1] > 0.) or
@@ -439,10 +494,6 @@ def processfile(self, scan, Pipe) :
if self.feedback > 1 :
print
# Send Data back on the pipe.
- #XXX
- print
- print Data.data[0, 0, 0, 0], Data.data[20, 0, 1, 400],
- print Data.data[40, 2, 1, 300], Data.data[170, 1, 0, 200]
Pipe.send(Data)
# These functions not unit tested, simply made functions for convienience.
@@ -584,7 +635,7 @@ def get_cal_mask(data, n_bins_cal) :
return first_on, n_blank
-def separate_cal(data, n_bins_cal) :
+def separate_cal(data, n_bins_cal, cal_mask=None) :
"""Function separates data into cal_on and cal off.
No Guarantee that data argument remains unchanged."""
@@ -592,11 +643,14 @@ def separate_cal(data, n_bins_cal) :
# Allowcate memeory for output
ntime, npol, nfreq = data.shape
n_bins_after_cal = ntime//n_bins_cal
- out_data = sp.empty((n_bins_after_cal, npol, 2, nfreq), dtype=sp.float32)
+ out_data = sp.zeros((n_bins_after_cal, npol, 2, nfreq), dtype=sp.float32)
# Get the phase offset of the cal.
try :
- first_on, n_blank = get_cal_mask(data, n_bins_cal)
+ if cal_mask is None:
+ first_on, n_blank = get_cal_mask(data, n_bins_cal)
+ else :
+ first_on, n_blank = cal_mask
except ce.DataError :
print "Discarded record due to bad profile. "
out_data[:] = float('nan')
@@ -627,8 +681,16 @@ def separate_cal(data, n_bins_cal) :
# Find cal on and cal off averages. Always use mean not median due to
# discretization noise.
- out_data[:,:,0,:] = sp.mean(data[:,on_mask,:,:], 1)
- out_data[:,:,1,:] = sp.mean(data[:,off_mask,:,:], 1)
+ # This loop is much faster than the built in numpy mean() for some
+ # reason.
+ for ii in range(n_bins_cal) :
+ if on_mask[ii]:
+ out_data[:,:,0,:] += data[:,ii,:,:]
+ elif off_mask[ii]:
+ out_data[:,:,1,:] += data[:,ii,:,:]
+ out_data[:,:,0,:] /= sp.sum(on_mask)
+ out_data[:,:,1,:] /= sp.sum(off_mask)
+
return out_data
@@ -918,7 +980,8 @@ def process_session(self, session) :
converter_params = {"guppi_input_end" : "_0001.fits",
"combine_map_scans" : True,
"partition_cal" : True,
- "time_bins_to_average" : 128
+ "time_bins_to_average" : 128,
+ "cal_fold_time" : 30
}
# The parameters that are acctually assigned dynamically.
converter_params["scans"] = tuple(scans_to_convert)
Please sign in to comment.
Something went wrong with that request. Please try again.