# DVS Test Procedures - Pointing

* VR.D.T.B.1.1.1 Dish Blind Pointing Test
* VR.D.T.B.1.1.3 Dish Tracking Stability Test
* VR.D.T.B.1.1.4 Dish Index Repeatability Test

Not currently: VR.D.T.B.1.1.2 Dish Relative Pointing Test

                                                              As on 11/02/2026

In [None]:
%matplotlib inline
import pylab as plt
import numpy as np

import katpoint
from dvs import util, tracking
from analysis import katsepnt, katselib

In [None]:
ant = "e117"

# From TBD-SetToWork-Tiltmeter. Needed in order to re-compute tiltx & y from raw voltages
katsepnt.TILT_ACTIVE_CAL[ant] = dict(
  cal_x=[0, 52.899999367456815, 0.14723491246946235, 45.555808433666215, 2.7837081554446876, 27.9],
  cal_y=[0, 53.44999973865004, -0.019984243482784728, 176.49884467633217, 5.479608686836212, 27.6]
)

In [None]:
NIGHT = [("sun_el",lambda e:e<=0)]
DAY = [("sun_el",lambda e:e>0)]
# Environmental conditions as per SKA-TEL-DSH-0000005 rev 6B and later
PRECISION = [('wind_speed',lambda w:w<=5), ('wind_dynamic',lambda w:w<=3*0.7)]
UPTO_STANDARD = [('wind_speed',lambda w:w<=7), ('wind_dynamic',lambda w:w<=3*1.0)] # PRECISION+STANDARD
UPTO_DEGRADED = [('wind_speed',lambda w:w<=10), ('wind_dynamic',lambda w:w<=3*1.4)] # PRECISION+STANDARD+DEGRADED
ONLY_STANDARD = [('*',lambda x: (5<x['wind_speed']<=7) | (3*0.7<x['wind_dynamic']<=3*1.0) )]
ONLY_DEGRADED = [('*',lambda x: (7<x['wind_speed']<=10) | (3*1.0<x['wind_dynamic']<=3*1.4) )]

# All Sky Blind Pointing

Need complete all sky coverage, and as wide a range of environmental conditions as is feasible at the time of measurement!

In [None]:
# Tiltmeter disabled and/or not correctly calibrated?
katselib.ls_archive(f"StartTime:[2025-06-10T0:0:0Z TO 2025-08-01T23:0:0Z] AND Description:All Sky Pointing AND InstructionSet:*circular_pointing*" +
                    f" AND InstructionSet:\"scan-ant {ant}\"~1", min_duration=1000, fields=["CaptureBlockId","StartTime","CenterFrequency","Description","InstructionSet"], field_len=220);

In [None]:
# Tiltmeter enabled & calibrated
katselib.ls_archive(f"StartTime:[2025-08-01T23:0:0Z TO *] AND Description:All Sky Pointing AND InstructionSet:*circular_pointing*" +
                    f" AND InstructionSet:\"scan-ant {ant}\"~1", min_duration=1000, fields=["CaptureBlockId","StartTime","CenterFrequency","Description","InstructionSet"], field_len=220);

In [None]:
## These must all be with consistent FI angles!

# tiltmeter CORRECTIONS ON but ACU calibration WRONG
ku_cbids_ = []
s_cbids_ = []

# tiltmeter calibration uploaded and CORRECTIONS ACTIVE
ku_cbids = []
s_cbids = []

In [None]:
for cbid in ku_cbids:
    !ls -la ./l2_data/{cbid}*.csv

In [None]:
## Fit centroids
cbids = ku_cbids

for cbid in cbids[-1:]:
    ds = util.open_dataset(cbid, ref_ant=ant, cache_root="./l1_data")
    rfi_mask = util.load_rfi_static_mask("../catalogues/rfi_mask.txt", ds.freqs)
    rfi_mask |= (3000e6 < ds.freqs) & (ds.freqs <= 3500e6) # S-band features, may be sensitive to environment?
    rfi_mask |= (ds.channels < 10) | (len(ds.channels)-10 < ds.channels) # Tune this if necessary
    track_ants = ds.obs_params['track_ants'].split(',')
    for track_ant in track_ants:
        output_filepattern = "%s_%s_circular_pointing.csv" if len(track_ants)==1 else f"%s_%s_{track_ant}_circular_pointing.csv"
        try:
            tracking.reduce_pointing_scans(ds, ant, track_ant=track_ant, kind='cardioid',
                                           phased_up=np.max(ds.freqs)<4e9, # True only for B2 & S-band!
                                           scans="track", chans=~rfi_mask, strict=True,
                                           output_filepattern="./l2_data/"+output_filepattern, debug=False, verbose=True)
        except Exception as e:
            print("INFO: unable to process %s for %s_%s" % (cbid, ant, track_ant), e)
    if cbid>1760000000: # TODO: TEMPORARY HACK
        ds.del_cache()

In [None]:
# Confirm hardware status over the time span
data1, _ = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids])

# FI angles - directly from sensor database
katselib.plot_sensors(data1['timestamp'], [ant+"_dsm_indexerActualPosition"], interpolate="zero", figsize=(12,3))
# Tiltmeter corrections - directly from sensor database
katselib.plot_sensors(data1['timestamp'], [ant+"_dsm_tiltPointCorrEnabled"], interpolate="zero", figsize=(12,3))
# There are some glitches in tilt corrections reported - but probably LMC sensor spropagation issue?
plt.figure(figsize=(12,3)); plt.grid(True)
plt.plot(data1['timestamp'], data1['tiltcorr_az'], '_')
plt.plot(data1['timestamp'], data1['tiltcorr_el'], '|')

### TBD-band (without tiltmeter)

Use this for data with tiltmeter disabled and / or no calibration available yet. Or simply to look at what the results would have been like if corrections were "un-done".


We use `update_tiltcorr='UNDO'` below to ensure whatever corrections the ACU reports it applied in real time, get removed from the data as we process it.

In [None]:
cbids = s_cbids_ + s_cbids # Always "trivial" to UNDO
print(cbids)

# Identify suitable filters to obtain un-biased sky cover
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=NIGHT+UPTO_STANDARD, figsize=(5,5))
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=NIGHT+PRECISION, figsize=(5,5))
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=PRECISION, figsize=(5,5))

In [None]:
# Current model, for reference
katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='UNDO',
                            outlier_pct=100, as_is=False, statistic='MAV', P_fits=[[]], figs=None)
print()
# Identify best model
for P_fits in [[1,3,4,5,6,7,8,11],  [1,3,4,5,6,7,8,11, 13,14], [1,3,4,5,6,7,8,11, 13,14,17,18]]:
    for outlier_pct in [90,80,70,60,50,40]:
        katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='UNDO',
                              filter0=[], filter1=NIGHT+PRECISION+[("snr_I",lambda s: 7<s)], outlier_pct=outlier_pct,
                              as_is=False, statistic='MAV', P_fits=P_fits, figs=None)
    print()

In [None]:
# Construct best fit model from the "sweet spot" from above (prefer outlier_pct < 70).
best_pm = katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='UNDO',
                          filter0=[], filter1=NIGHT+PRECISION+[("snr_I",lambda s: 7<s)], outlier_pct=70,
                          as_is=False, statistic='MAV', P_fits=[1,3,4,5,6,7,8,11, 13,14], figs=[])[0]
print(best_pm)

In [None]:
# Discard 5% measurements - may try SNR thresholding or ourliers in specific environmental categories
S_all = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                        filters=[], apply_pm=best_pm, metrics=["snr_I"], figs=None)
print("Full set: N=%d"%len(S_all.dgc), "MAE=%.2f"%np.mean(S_all.dgc), "SNR>=%.1f"%np.min(np.asarray(S_all.x)[:,0]))

discard_pct = 5
bad_TS = []

# Spread remaining outliers across env categories for best final test outcome
for filt,w in zip([PRECISION, ONLY_STANDARD], [2/3, 1/3]): # Tune weights!
    S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                        filters=filt + [("timestamp",lambda t: t not in bad_TS)],
                                        apply_pm=best_pm, figs=None)
    pct = discard_pct*w * len(S_all.TS)/len(S.TS)
    bad_TS.extend(S.TS[S.dgc > np.percentile(S.dgc, 100-pct)])
BAD_TS = [("timestamp",lambda t: t not in bad_TS)]

print("Discarding a total of %d outliers = %.1f%%" % (len(bad_TS), len(bad_TS)/len(S_all.TS)*100))

# Visual inspection of the datasets with minimal flagging, to check for consistency & issues
figs = [] # Collect figures to overplot the datasets
for cbid in sorted(cbids):
    S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv"], root="./l2_data", update_tiltcorr='UNDO',
                                        filters=BAD_TS, update_model=False, apply_pm=best_pm,
                                        metrics=["timestamp","azimuth","elevation","wind_speed","sun_el"], meshplot=[], figs=figs)

In [None]:
##### Check expected measurement accuracy
data, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids])
hpbw = 1.2*(3e8/np.mean(data['frequency']))/a.diameter * 180*60/np.pi # arcmin

figs = []
for filt in [[],[("snr_I",lambda s: 20<=s)]]:
    S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
            apply_pm=best_pm, filters=BAD_TS+filt, metrics=["snr_I"], meshplot=[], figs=figs)
    snr = np.asarray(S.x)[:,0] # snr_I = "beam_height_I" / "beam_height_I_std"
    print("HPBW ~ %.f arcmin, SNR ~ %.f => measurement accuracy ~ %.f arcsec " % (hpbw, np.median(snr), np.median(hpbw*60/snr)))
    print("This filter drops a total of %.1f%% samples" % ((1-len(S.TS)/len(S_all.TS))*100))

In [None]:
# Use the best fit model (don't fit a new one) to evaluate residuals with
_rng_ = lambda label,vals,pct=5,fmt="%.1f": f"%s {fmt}-{fmt}" % (label,np.percentile(vals,pct/2),np.percentile(vals,100-pct/2))

SNR = [("snr_I",lambda s: 20<=s)] # TODO: if this is >> snr0 then strictly need to update best fit model to represent this subset, but the numbers end up being close
for l,N_D in [("Night | SNR>20",NIGHT+SNR), ("Night & Day | SNR>20",[]+SNR), ("Night",NIGHT), ("Night & Day",[])]:
    print(l)
    figs, ecirc, rms = [], [], []
    # for OC in [UPTO_DEGRADED,UPTO_STANDARD,PRECISION,PRECISION+NIGHT]: # Interesting, but not what the test needs to report on
    for OC in [ONLY_DEGRADED,ONLY_STANDARD,PRECISION,PRECISION+NIGHT]:
        try:
            S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                                apply_pm=best_pm, filters=BAD_TS+OC+N_D,
                                                metrics=["azimuth","elevation","sun_el"], figs=figs)
            ecirc.append(np.percentile(S.dgc*2, 95))
            rms.append(katsepnt.rms(S.dgc))
            print(len(S.dgc), _rng_("az",S.a), _rng_("el",S.e), np.std(S.dxe), np.std(S.de), np.mean(S.dgc))
        except IndexError:
            ecirc.append(np.nan)
            rms.append(np.nan)
            print(np.nan)
    rms, ecirc = rms[::-1], ecirc[::-1]
    plt.figure(np.max(plt.get_fignums())-3).suptitle(f"{ant} All sky RMS:\n" +
          "Pnight %.1f | P %.1f | S %.1f | D %.1f arcsecRMS\n"%tuple(rms) +
          "Pnight %.1f | P %.1f | S %.1f | D %.1f arcsec95pct"%tuple(ecirc))
    print(np.transpose([["PRECISION+NIGHT","PRECISION","STANDARD","DEGRADED"],rms,ecirc]))

In [None]:
# Show the distribution of errors for a specific environmental category.
for l,N_D in [("Night | SNR>20",NIGHT+SNR), ("Night & Day | SNR>20",[]+SNR), ("Night",NIGHT), ("Day",DAY), ("Night & Day",[])]:
    # OC = UPTO_STANDARD # UPTO_STANDARD, UPTO_DEGRADED -- categories differ from the cell above
    OC = PRECISION # PRECISION, ONLY_STANDARD, ONLY_DEGRADED -- should exactly match the numbers above (categories in specifications)
    l = l+" | P" # TODO: Update to match OC
    try:
        S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                            apply_pm=best_pm, filters=BAD_TS+OC+N_D, figs=None)
        katsepnt.plot_pointingresiduals(S.de, S.dxe, l, outlier_pct=100) # BAD_TS discards 5%(total), so use 100% here
    except:
        pass

In [None]:
## Summarise solar and wind influence

# It may be useful to apply a minor extra filter to remove extreme outliers from these plots
figs = []
Sd = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
        apply_pm=best_pm, filters=BAD_TS+DAY, metrics=["wind_speed"], meshplot=[], figs=figs);
Sn = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
        apply_pm=best_pm, filters=BAD_TS+NIGHT, metrics=["wind_speed"], meshplot=[], figs=figs);
_b_TS_ = list(Sd.TS[Sd.dgc > np.percentile(Sd.dgc, 98)]) + list(Sn.TS[Sn.dgc > np.percentile(Sn.dgc, 98)])
_BTS_ = [("timestamp",lambda t:t not in _b_TS_)]

katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
        apply_pm=best_pm, filters=_BTS_+BAD_TS, metrics=["wind_speed"], meshplot=[], figs=figs);

In [None]:
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
        apply_pm=best_pm, filters=_BTS_+BAD_TS+DAY, # +_BTS_ if necessary
        metrics=["sun_bore_az","sun_bore_el", "tiltx","tilty"], meshplot=["sun_bore_az", "sun_bore_el"], figs=[]);

In [None]:
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
        apply_pm=best_pm, filters=_BTS_+BAD_TS+NIGHT, # +_BTS_ if necessary
        metrics=["wind_bore_az","wind_speed"], meshplot=["elevation", "wind_bore_az"], figs=[]);

In [None]:
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
        apply_pm=best_pm, filters=_BTS_+BAD_TS+NIGHT+[('wind_speed',lambda w:3<w)], # +_BTS_ if necessary
        metrics=["wind_bore_az"], meshplot=["elevation", "wind_bore_az"], figs=[]);

### TBD-band (with tiltmeter)

It is possible to combine datasets that were recorded with the same tiltmeter but either disabled or with incorrect calibation, with data recorded with correctly calibrated corrections applied in real time. For that use `update_tiltcorr='RECALC'` to update the data as we process it (i.e. no change to files). If you do this BEWARE:
* processing is quite slow, and
* you MUST specify the correct calibration table in `katsepnt.TILT_ACTIVE_CAL[ant]` (preferably at the top of notebook)

If all data was recorded with ACU correctly set-up then you may simply remove `update_tiltcorr='RECALC'` in all cases below.

In [None]:
cbids = ku_cbids_ + ku_cbids
print(cbids)

# Identify suitable filters to obtain un-biased sky cover
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=UPTO_STANDARD, figsize=(5,5))
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=DAY+UPTO_STANDARD, figsize=(5,5))
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=NIGHT+UPTO_STANDARD, figsize=(5,5))

In [None]:
# Current model, for reference
katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='RECALC',
                            outlier_pct=100, as_is=False, statistic='MAV', P_fits=[[]], figs=None)
print()
# Identify best model
for P_fits in [[1,3,4,5,6,7,8,11],  [1,3,4,5,6,7,8,11, 13,14], [3,4,5,6,7,8,11, 13,14], [3,4,5,6,7,8,11, 13,14,17,18]]:
    for outlier_pct in [90,80,70,60,50,40]:
        katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='RECALC',
                              filter0=[], filter1=UPTO_STANDARD+[("snr_I",lambda s: 7<s)], outlier_pct=outlier_pct,
                              as_is=False, statistic='MAV', P_fits=P_fits, figs=None)
    print()

In [None]:
# Construct best fit model from the "sweet spot" from above (prefer outlier_pct < 70).
best_pm = katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='RECALC',
                          filter0=[], filter1=UPTO_STANDARD+[("snr_I",lambda s: 7<s)], outlier_pct=60,
                          as_is=False, statistic='MAV', P_fits=[1,3,4,5,6,7,8,11, 13,14], figs=[])[0]
print(best_pm)

In [None]:
# With tiltmeter enabled we expect P5=P6 ~ 0, night & day
# -> P5,P6 are stable and O(5arcsec) - is that perhaps the "deflection of vertical" (difference between gravity & geodetic normal)?

In [None]:
# Discard 5% measurements - may try SNR thresholding or ourliers in specific environmental categories
S_all = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                        filters=[], apply_pm=best_pm, metrics=["snr_I"], figs=None)
print("Full set: N=%d"%len(S_all.dgc), "MAE=%.2f"%np.mean(S_all.dgc), "SNR>=%.1f"%np.min(np.asarray(S_all.x)[:,0]))

discard_pct = 5
bad_TS = []

# Spread remaining outliers across env categories for best final test outcome
for filt,w in zip([PRECISION, ONLY_STANDARD], [2/3, 1/3]): # Tune weights!
    S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                        filters=filt + [("timestamp",lambda t: t not in bad_TS)],
                                        apply_pm=best_pm, figs=None)
    pct = discard_pct*w * len(S_all.TS)/len(S.TS)
    bad_TS.extend(S.TS[S.dgc > np.percentile(S.dgc, 100-pct)])
BAD_TS = [("timestamp",lambda t: t not in bad_TS)]

print("Discarding a total of %d outliers = %.1f%%" % (len(bad_TS), len(bad_TS)/len(S_all.TS)*100))

# Visual inspection of the datasets with minimal flagging, to check for consistency & issues
figs = [] # Collect figures to overplot the datasets
for cbid in sorted(cbids):
    S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv"], root="./l2_data", update_tiltcorr='RECALC',
                                        filters=BAD_TS, update_model=False, apply_pm=best_pm,
                                        metrics=["timestamp","azimuth","elevation","wind_speed","sun_el"], meshplot=[], figs=figs)

In [None]:
## Check expected measurement accuracy
data, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids])
hpbw = 1.2*(3e8/np.mean(data['frequency']))/a.diameter * 180*60/np.pi # arcmin

figs = []
for filt in [[],[("snr_I",lambda s: 20<=s)]]:
    S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
            apply_pm=best_pm, filters=BAD_TS+filt, metrics=["snr_I"], meshplot=[], figs=figs)
    snr = np.asarray(S.x)[:,0] # snr_I = "beam_height_I" / "beam_height_I_std"
    print("HPBW ~ %.f arcmin, SNR ~ %.f => measurement accuracy ~ %.f arcsec " % (hpbw, np.median(snr), np.median(hpbw*60/snr)))
    print("This filter drops a total of %.1f%% samples" % ((1-len(S.TS)/len(S_all.TS))*100))

In [None]:
# Use the best fit model (don't fit a new one) to evaluate residuals with
_rng_ = lambda label,vals,pct=5,fmt="%.1f": f"%s {fmt}-{fmt}" % (label,np.percentile(vals,pct/2),np.percentile(vals,100-pct/2))

SNR = [("snr_I",lambda s: 20<=s)] # TODO: if this is >> snr0 then strictly need to update best fit model to represent this subset, but the numbers end up being close
for l,N_D in [("Night | SNR>20",NIGHT+SNR), ("Night & Day | SNR>20",[]+SNR), ("Night",NIGHT), ("Night & Day",[])]:
    print(l)
    figs, ecirc, rms = [], [], []
    # for OC in [UPTO_DEGRADED,UPTO_STANDARD,PRECISION,PRECISION+NIGHT]: # Interesting, but not what the test needs to report on
    for OC in [ONLY_DEGRADED,ONLY_STANDARD,PRECISION,PRECISION+NIGHT]:
        try:
            S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                                apply_pm=best_pm, filters=BAD_TS+OC+N_D,
                                                metrics=["azimuth","elevation","sun_el"], figs=figs)
            ecirc.append(np.percentile(S.dgc*2, 95))
            rms.append(katsepnt.rms(S.dgc))
            print(len(S.dgc), _rng_("az",S.a), _rng_("el",S.e), np.std(S.dxe), np.std(S.de), np.mean(S.dgc))
        except IndexError:
            ecirc.append(np.nan)
            rms.append(np.nan)
            print(np.nan)
    rms, ecirc = rms[::-1], ecirc[::-1]
    plt.figure(np.max(plt.get_fignums())-3).suptitle(f"{ant} All sky RMS:\n" +
          "Pnight %.1f | P %.1f | S %.1f | D %.1f arcsecRMS\n"%tuple(rms) +
          "Pnight %.1f | P %.1f | S %.1f | D %.1f arcsec95pct"%tuple(ecirc))
    print(np.transpose([["PRECISION+NIGHT","PRECISION","STANDARD","DEGRADED"],rms,ecirc]))

In [None]:
# Show the distribution of errors for a specific environmental category.
for l,N_D in [("Night | SNR>20",NIGHT+SNR), ("Night & Day | SNR>20",[]+SNR), ("Night",NIGHT), ("Day",DAY), ("Night & Day",[])]:
    # OC = UPTO_STANDARD # UPTO_STANDARD, UPTO_DEGRADED -- categories differ from the cell above
    OC = PRECISION # PRECISION, ONLY_STANDARD, ONLY_DEGRADED -- should exactly match the numbers above (categories in specifications)
    l = l+" | P" # TODO: Update to match OC
    try:
        S = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                            apply_pm=best_pm, filters=BAD_TS+OC+N_D, figs=None)
        katsepnt.plot_pointingresiduals(S.de, S.dxe, l, outlier_pct=100) # BAD_TS discards 5%(total), so use 100% here
    except:
        pass

In [None]:
## Summarise solar and wind influence

# It may be useful to apply a minor extra filter to remove extreme outliers from these plots
figs = []
Sd = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
        apply_pm=best_pm, filters=BAD_TS+DAY, metrics=["wind_speed"], meshplot=[], figs=figs);
Sn = katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
        apply_pm=best_pm, filters=BAD_TS+NIGHT, metrics=["wind_speed"], meshplot=[], figs=figs);
_b_TS_ = list(Sd.TS[Sd.dgc > np.percentile(Sd.dgc, 98)]) + list(Sn.TS[Sn.dgc > np.percentile(Sn.dgc, 98)])
_BTS_ = [("timestamp",lambda t:t not in _b_TS_)]

katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
        apply_pm=best_pm, filters=_BTS_+BAD_TS, metrics=["wind_speed"], meshplot=[], figs=figs);

In [None]:
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
        apply_pm=best_pm, filters=_BTS_+BAD_TS+DAY, # +_BTS_ if necessary
        metrics=["sun_bore_az","sun_bore_el", "tiltx","tilty"], meshplot=["sun_bore_az", "sun_bore_el"], figs=[]);

In [None]:
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
        apply_pm=best_pm, filters=_BTS_+BAD_TS+NIGHT, # +_BTS_ if necessary
        metrics=["wind_bore_az","wind_speed"], meshplot=["elevation", "wind_bore_az"], figs=[]);

In [None]:
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
        apply_pm=best_pm, filters=_BTS_+BAD_TS+NIGHT+[('wind_speed',lambda w:3<w)], # +_BTS_ if necessary
        metrics=["wind_bore_az"], meshplot=["elevation", "wind_bore_az"], figs=[]);

# Tracking Stability
Aim to have "# windows > 20" for each category, for robust conclusions.


In [None]:
# Tiltmeter disabled and/or not correctly calibrated?
katselib.ls_archive("StartTime:[2025-06-10T0:0:0Z TO 2025-08-01T23:0:0Z] AND Description:Tracking AND InstructionSet:*circular_pointing*" +
                    f" AND InstructionSet:\"scan-ant {ant}\"~1", min_duration=1000, fields=["CaptureBlockId","StartTime","CenterFrequency","Description","InstructionSet"], field_len=220)

In [None]:
# Tiltmeter enabled & calibrated
katselib.ls_archive("StartTime:[2025-08-01T23:0:0Z TO *] AND Description:Tracking AND InstructionSet:*circular_pointing*" +
                    f" AND InstructionSet:\"scan-ant {ant}\"~1", min_duration=1000, fields=["CaptureBlockId","StartTime","CenterFrequency","Description","InstructionSet"], field_len=220);

In [None]:
# New tiltmeter fitted, active BUT NOT CALIBRATED
katselib.ls_archive("StartTime:[2025-12-08T0:0:0Z TO 2026-01-30T17:0:0Z] AND Description:Tracking AND InstructionSet:*circular_pointing*" +
                    f" AND InstructionSet:\"scan-ant {ant1}\"~1", min_duration=1000, fields=["CaptureBlockId","StartTime","CenterFrequency","Description","InstructionSet"], field_len=220);
print()
# Tilt corrections active
katselib.ls_archive(f"StartTime:[2026-01-30T17:0:0Z TO *] AND Description:Tracking AND InstructionSet:*circular_pointing*" +
                    f" AND InstructionSet:\"scan-ant {ant1}\"~1", min_duration=1000, fields=["CaptureBlockId","StartTime","CenterFrequency","Description","InstructionSet"], field_len=220);

In [None]:
# Tiltmeter disabled
s_cbids_tk_ = []

# Tiltmeter fitted, calibrated and active
s_cbids_tk = []
ku_cbids_tk = []

In [None]:
for cbid in s_cbids_tk:
    !ls -ls ./l2_data/{cbid}*

In [None]:
## Fit centroids
cbids = s_cbids_tk

for cbid in cbids[-1:]:
    ds = util.open_dataset(cbid, ref_ant=ant, cache_root="./l1_data")
    rfi_mask = util.load_rfi_static_mask("../catalogues/rfi_mask.txt", ds.freqs)
    rfi_mask |= (ds.channels<10) | (len(ds.channels)-10 < ds.channels)
    track_ants = ds.obs_params['track_ants'].split(',')
    for track_ant in track_ants:
        output_filepattern = "%s_%s_circular_pointing.csv" if len(track_ants)==1 else f"%s_%s_{track_ant}_circular_pointing.csv"
        try:
            tracking.reduce_pointing_scans(ds, ant, track_ant=track_ant, kind='epicycle',
                                           phased_up=np.max(ds.freqs)<4e9, # True only for B2 & S-band!
                                           scans="track", chans=~rfi_mask, strict=False,
                                           output_filepattern="./l2_data/"+output_filepattern, debug=False, verbose=True)
        except Exception as e: # Likely not correct track_ant
            print("INFO: unable to process %s for %s_%s" % (cbid, ant, track_ant), e)
    if cbid>1760000000: # TODO: TEMPORARY HACK
        ds.del_cache()

In [None]:
# Confirm hardware status over the time span
data1, _ = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids])

# FI angles - directly from sensor database
katselib.plot_sensors(data1['timestamp'], [ant+"_dsm_indexerActualPosition"], interpolate="zero", figsize=(12,3))
# Tiltmeter corrections - directly from sensor database
katselib.plot_sensors(data1['timestamp'], [ant+"_dsm_tiltPointCorrEnabled"], interpolate="zero", figsize=(12,3))
# There are some glitches in tilt corrections reported - but probably LMC sensor spropagation issue?
plt.figure(figsize=(12,3)); plt.grid(True)
plt.plot(data1['timestamp'], data1['tiltcorr_az'], '_')
plt.plot(data1['timestamp'], data1['tiltcorr_el'], '|')

### TBD-band (without tiltmeter)

Use this for data with tiltmeter disabled and / or no calibration available yet. Or simply to look at what the results would have been like if corrections were "un-done".

We use `update_tiltcorr='UNDO'` below to ensure whatever corrections the ACU reports it applied in real time, get removed from the data as we process it.

In [None]:
# Solution copied from "All Sky Pointing: TBD-band (no tiltmeter)" in this notebook
BEST_PM = katpoint.PointingModel("0:05:05.6 0 -0:00:45.0 -0:01:17.4 0:00:33.1 -0:00:06.3 -0:12:15.6 0:00:44.1 0 0 0:00:34.8 0 -0:00:01.4 -0:00:03.0")

In [None]:
cbids = s_cbids_tk_ + s_cbids_tk
print(cbids)

# Identify suitable filters to obtain un-biased sky cover
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=DAY, figsize=(5,5))
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=NIGHT, figsize=(5,5))

In [None]:
# Pathological points?
figs = []
for filt_tk in ([],
                [('timestamp',lambda t:not (1769955000<t<1769955800) and not (1769984000<t<1769985000))]): # Something wrong in these intervals
    katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                    filters=filt_tk, apply_pm=BEST_PM, metrics=["timestamp"], figs=figs)
BAD_TS = filt_tk

In [None]:
# Update the model?
# Starting from Ku-band model with tiltcorr active, we must update at least P4 & P7

# Current "best_pm", for reference
katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='UNDO',
                            filter0=BAD_TS, outlier_pct=100, as_is=False, statistic='MAV', P_fits=[[]], apply_pm=BEST_PM, figs=None)
print()
# Identify best model
for P_fit in ([4,7], [4,7,8,11], # Results show no advantage of adding 8,11
              [4,7, 13,14], [1,4,7, 13,14], # Results show significant advantage with 13,14, and no advantage of adding 1
              [4,7, 17,18], [4,7, 13,14,17,18], # Results show no significant advantage with 17,18
              [1,4,7, 13,14,17,18]): # Final check shows no advantage from 1
    for outlier_pct in [90,80,70,60,50,40,30]:
        katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='UNDO',
                              filter0=BAD_TS, filter1=UPTO_STANDARD+[("snr_I",lambda s: 7<s)], outlier_pct=outlier_pct,
                              as_is=False, statistic='MAV', P_fits=P_fit, apply_pm=katpoint.PointingModel(BEST_PM.values()[:11]), figs=None)
    print()

In [None]:
# Construct best fit model from the "sweet spot" from above (prefer outlier_pct < 70).
best_pm = katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='UNDO',
                          filter0=BAD_TS, filter1=UPTO_STANDARD+[("snr_I",lambda s: 7<s)], outlier_pct=70,
                          as_is=False, statistic='MAV', P_fits=[4,7, 13,14], apply_pm=katpoint.PointingModel(BEST_PM.values()[:11]), figs=[])[0]
print(best_pm)

In [None]:
# Overview of all measurements, with best pointing model
# Add SNR threshold to eliminate some obvious outliers and confirm how many points that removes
figs = []
for filt in ([], [('snr_I',lambda snr: snr>21)]):
    katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                    filters=BAD_TS+filt, apply_pm=best_pm,
                                    metrics=["timestamp","azimuth","elevation","sun_el","snr_I"], figs=figs)
SNR = filt

In [None]:
# Confirm range of SNR
data0, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids], filters=UPTO_DEGRADED)
data1, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids], filters=UPTO_DEGRADED+BAD_TS)
data2, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids], filters=UPTO_DEGRADED+BAD_TS+SNR)

snr = data0['snr_I']
print("SNR: [%.1f(1pct %.1f), <%.1f>, %.1f]" % (np.min(snr), np.percentile(snr, 1), np.percentile(snr, 50), np.max(snr)))

hpbw = 1.2*(3e8/data0['frequency'][0])/a.diameter * 180*3600/np.pi # arcsec

ME = hpbw/np.median(snr)
print("With median SNR of %.1f (~%.f''), reasonable '1 sigma' pass thresholds are:" % (np.median(snr), ME))
_ = (np.r_[2.3,4.6,9.2]**2 + ME**2)**.5
print("RMS&meas", _ )
print("4x(RMS&meas)", 4*_)


# Confirm how many points the filters remove
discarded = set(data0['timestamp']) - set(data1['timestamp'])
print("\nDiscarded points for BAD_TS:", len(discarded))
print(" ".join(map(str, sorted(discarded))))
discarded = set(data1['timestamp'])-set(data2['timestamp'])-discarded
print("\nDiscarded points for SNR (excl. BAD_TS):", len(discarded))
print(" ".join(map(str, sorted(discarded))))

In [None]:
# Overview of all measurements, with best pointing model
katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                filters=BAD_TS+SNR, apply_pm=best_pm, metrics=["timestamp","azimuth","elevation","sun_el","wind_speed"], figs=[])
katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                filters=BAD_TS+SNR, apply_pm=best_pm, metrics=["target"], meshplot=[], figs=[]);

In [None]:
# Overview of NIGHT & DAY separately to visualise sun & wind effects
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
             filters=BAD_TS+SNR+NIGHT, apply_pm=best_pm, meshplot=["elevation", "wind_bore_az"], figs=[], metrics=[]);
             # metrics=["wind_bore_az","wind_speed"]);
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
             filters=BAD_TS+SNR+DAY, apply_pm=best_pm, meshplot=["sun_bore_az", "sun_bore_el"], figs=[], metrics=[]);
             # metrics=["sun_bore_az", "sun_bore_el"]);

In [None]:
# Check for correlated factors

In [None]:
katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='UNDO',
                                reference=slice(0,2), filters=BAD_TS+SNR+NIGHT, apply_pm=best_pm, strict=False,
                                corr_metrics=['wind_dynamic','snr_I']);

In [None]:
katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='UNDO',
                                reference=slice(0,2), filters=BAD_TS+SNR+DAY, apply_pm=best_pm, strict=False,
                                corr_metrics=['wind_dynamic','sun_el']);

In [None]:
# Same as above, just vs snr_I
katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='UNDO',
                                reference=slice(0,2), filters=BAD_TS+SNR+DAY, apply_pm=best_pm, strict=False,
                                corr_metrics=['wind_dynamic','snr_I'])
plt.close(1);

In [None]:
# Assess tracking performance with only minimum filters:
for ref in [slice(0,1), slice(0,2), slice(0,4), 'mean']:
    print(ref)
    for filt,pass_lim in [(PRECISION+NIGHT,[3.4,14]), (PRECISION,[3.4,14]), (ONLY_STANDARD,[5.2,21]), (ONLY_DEGRADED,[9.5,38])]:
        *_, diam, rms = katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='UNDO',
                                                        reference=ref, filters=BAD_TS+SNR+filt, apply_pm=best_pm, strict=False) # Allow 1 out-of-category point
        plt.close('all')
        if (len(diam) > 0):
            # Discard at most 5% of windows
            m = rms <= np.percentile(rms, 95)
            diam, rms = diam[m], rms[m]
            _pass_ = (rms<=pass_lim[0]) & (diam<=pass_lim[1])
            pass_pct = len(rms[_pass_])/len(rms) * 100
            print("N=%d: GC RMS <%.1f>,%.1f], 95pct diameter <%.1f>,%.1f] = (pass %.f%%)" % \
                   (len(rms), np.mean(rms),np.max(rms), np.mean(diam),np.max(diam),pass_pct))
        else:
            print("N=0")

In [None]:
for filt in [PRECISION+NIGHT, PRECISION, ONLY_STANDARD, ONLY_DEGRADED]:
    *_, diam, rms = katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='UNDO',
                                                    reference=slice(0,2), filters=BAD_TS+SNR+filt, apply_pm=best_pm, strict=False) # Allow 1 out-of-category point

### TBD-band (with tiltmeter)

It is possible to combine datasets that were recorded with the same tiltmeter but either disabled or with incorrect calibation, with data recorded with correctly calibrated corrections applied in real time. For that use `update_tiltcorr='RECALC'` to update the data as we process it (i.e. no change to files). If you do this BEWARE:
* processing is quite slow, and
* you MUST specify the correct calibration table in `katsepnt.TILT_ACTIVE_CAL[ant]` (preferably at the top of notebook)

If all data was recorded with ACU correctly set-up then you may simply remove `update_tiltcorr='RECALC'` in all cases below.

In [None]:
# Solution copied from "All Sky Pointing: TBD-band (no tiltmeter)" in this notebook
BEST_PM = katpoint.PointingModel("0:05:05.6 0 -0:00:45.0 -0:01:17.4 0:00:33.1 -0:00:06.3 -0:12:15.6 0:00:44.1 0 0 0:00:34.8 0 -0:00:01.4 -0:00:03.0")

In [None]:
cbids = s_cbids_tk_ + s_cbids_tk
print(cbids)

# Identify suitable filters to obtain un-biased sky cover
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=DAY, figsize=(5,5))
katsepnt.plot_skycover(cbids, ant, "./l2_data", filters=NIGHT, figsize=(5,5))

In [None]:
# Pathological points?
figs = []
for filt_tk in ([],
                [('timestamp',lambda t:not (1769955000<t<1769955800) and not (1769984000<t<1769985000))]): # Something wrong in these intervals
    katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                    filters=filt_tk, apply_pm=BEST_PM, metrics=["timestamp"], figs=figs)
BAD_TS = filt_tk

In [None]:
# Update the model?
# Starting from Ku-band model with tiltcorr active, we must update at least P4 & P7

# Current "best_pm", for reference
katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='RECALC',
                            filter0=BAD_TS, outlier_pct=100, as_is=False, statistic='MAV', P_fits=[[]], apply_pm=BEST_PM, figs=None)
print()
# Identify best model
for P_fit in ([4,7], [4,7,8,11], # Results show no advantage of adding 8,11
              [4,7, 13,14], [1,4,7, 13,14], # Results show significant advantage with 13,14, and no advantage of adding 1
              [4,7, 17,18], [4,7, 13,14,17,18], # Results show no significant advantage with 17,18
              [1,4,7, 13,14,17,18]): # Final check shows no advantage from 1
    for outlier_pct in [90,80,70,60,50,40,30]:
        katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='RECALC',
                              filter0=BAD_TS, filter1=UPTO_STANDARD+[("snr_I",lambda s: 7<s)], outlier_pct=outlier_pct,
                              as_is=False, statistic='MAV', P_fits=P_fit, apply_pm=katpoint.PointingModel(BEST_PM.values()[:11]), figs=None)
    print()

In [None]:
# Construct best fit model from the "sweet spot" from above (prefer outlier_pct < 70).
best_pm = katsepnt.fit_pointingmodels(ant, cbids, root="./l2_data", update_tiltcorr='RECALC',
                          filter0=BAD_TS, filter1=UPTO_STANDARD+[("snr_I",lambda s: 7<s)], outlier_pct=70,
                          as_is=False, statistic='MAV', P_fits=[4,7, 13,14], apply_pm=katpoint.PointingModel(BEST_PM.values()[:11]), figs=[])[0]
print(best_pm)

In [None]:
# Overview of all measurements, with best pointing model
# Add SNR threshold to eliminate some obvious outliers and confirm how many points that removes
figs = []
for filt in ([], [('snr_I',lambda snr: snr>21)]):
    katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                    filters=BAD_TS+filt, apply_pm=best_pm,
                                    metrics=["timestamp","azimuth","elevation","sun_el","snr_I"], figs=figs)
SNR = filt

In [None]:
# Confirm range of SNR
data0, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids], filters=UPTO_DEGRADED)
data1, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids], filters=UPTO_DEGRADED+BAD_TS)
data2, a = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids], filters=UPTO_DEGRADED+BAD_TS+SNR)

snr = data0['snr_I']
print("SNR: [%.1f(1pct %.1f), <%.1f>, %.1f]" % (np.min(snr), np.percentile(snr, 1), np.percentile(snr, 50), np.max(snr)))

hpbw = 1.2*(3e8/data0['frequency'][0])/a.diameter * 180*3600/np.pi # arcsec

ME = hpbw/np.median(snr)
print("With median SNR of %.1f (~%.f''), reasonable '1 sigma' pass thresholds are:" % (np.median(snr), ME))
_ = (np.r_[2.3,4.6,9.2]**2 + ME**2)**.5
print("RMS&meas", _ )
print("4x(RMS&meas)", 4*_)


# Confirm how many points the filters remove
discarded = set(data0['timestamp']) - set(data1['timestamp'])
print("\nDiscarded points for BAD_TS:", len(discarded))
print(" ".join(map(str, sorted(discarded))))
discarded = set(data1['timestamp'])-set(data2['timestamp'])-discarded
print("\nDiscarded points for SNR (excl. BAD_TS):", len(discarded))
print(" ".join(map(str, sorted(discarded))))

In [None]:
# Overview of all measurements, with best pointing model
katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                filters=BAD_TS+SNR, apply_pm=best_pm, metrics=["timestamp","azimuth","elevation","sun_el","wind_speed"], figs=[])
katsepnt.eval_pointingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
                                filters=BAD_TS+SNR, apply_pm=best_pm, metrics=["target"], meshplot=[], figs=[]);

In [None]:
# Overview of NIGHT & DAY separately to visualise sun & wind effects
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
             filters=BAD_TS+SNR+NIGHT, apply_pm=best_pm, meshplot=["elevation", "wind_bore_az"], figs=[], metrics=[]);
             # metrics=["wind_bore_az","wind_speed"]);
katsepnt.eval_pointingstability([f"{cbid}*_{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='RECALC',
             filters=BAD_TS+SNR+DAY, apply_pm=best_pm, meshplot=["sun_bore_az", "sun_bore_el"], figs=[], metrics=[]);
             # metrics=["sun_bore_az", "sun_bore_el"]);

In [None]:
# Check for correlated factors

In [None]:
katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='RECALC',
                                reference=slice(0,2), filters=BAD_TS+SNR+NIGHT, apply_pm=best_pm, strict=False,
                                corr_metrics=['wind_dynamic','snr_I']);

In [None]:
katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='RECALC',
                                reference=slice(0,2), filters=BAD_TS+SNR+DAY, apply_pm=best_pm, strict=False,
                                corr_metrics=['wind_dynamic','sun_el']);

In [None]:
# Same as above, just vs snr_I
katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='RECALC',
                                reference=slice(0,2), filters=BAD_TS+SNR+DAY, apply_pm=best_pm, strict=False,
                                corr_metrics=['wind_dynamic','snr_I'])
plt.close(1);

In [None]:
# Assess tracking performance with only minimum filters:
for ref in [slice(0,1), slice(0,2), slice(0,4), 'mean']:
    print(ref)
    for filt,pass_lim in [(PRECISION+NIGHT,[3.4,14]), (PRECISION,[3.4,14]), (ONLY_STANDARD,[5.2,21]), (ONLY_DEGRADED,[9.5,38])]:
        *_, diam, rms = katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='RECALC',
                                                        reference=ref, filters=BAD_TS+SNR+filt, apply_pm=best_pm, strict=False) # Allow 1 out-of-category point
        plt.close('all')
        if (len(diam) > 0):
            # Discard at most 5% of windows
            m = rms <= np.percentile(rms, 95)
            diam, rms = diam[m], rms[m]
            _pass_ = (rms<=pass_lim[0]) & (diam<=pass_lim[1])
            pass_pct = len(rms[_pass_])/len(rms) * 100
            print("N=%d: GC RMS <%.1f>,%.1f], 95pct diameter <%.1f>,%.1f] = (pass %.f%%)" % \
                   (len(rms), np.mean(rms),np.max(rms), np.mean(diam),np.max(diam),pass_pct))
        else:
            print("N=0")

In [None]:
for filt in [PRECISION+NIGHT, PRECISION, ONLY_STANDARD, ONLY_DEGRADED]:
    *_, diam, rms = katsepnt.eval_trackingstability([f"{cbid}_*l0_{ant}*.csv" for cbid in cbids], "./l2_data", update_tiltcorr='RECALC',
                                                    reference=slice(0,2), filters=BAD_TS+SNR+filt, apply_pm=best_pm, strict=False) # Allow 1 out-of-category point

# Feed Indexer Stability

In [None]:
katselib.ls_archive("StartTime:[2025-02-22T0:0:0Z TO *] AND Description:Index AND InstructionSet:*circular_pointing*" +
                         f" AND InstructionSet:\"scan-ant {ant}\"~1", min_duration=1000, fields=["CaptureBlockId","StartTime","CenterFrequency","Description","InstructionSet","Targets"], field_len=140);
# The logs nicely show the indexing changes for the cycles, e.g.
# http://10.97.8.4:8081/tailtask/20250224-0016/progress

In [None]:
s_ms_idx = [dict(cbid=1764962879, freq_MHz=(3400.8-.5,3400.8+.5)), # COSMOS 2539
]
ku_ms_idx = [dict(cbid=1765047350, freq_MHz=(12500.25-.3,12500.25+.3)), # EUTELSAT 21B
]

In [None]:
# Check which datasets have not yet been reduced
for ms in s_ms_idx+ku_ms_idx:
    !ls -l ./l2_data/{ms['cbid']}*

In [None]:
# Fit centroids
ms_idx = ku_ms_idx
cbids = [_['cbid'] for _ in ms_idx]

for ms in ms_idx:
    ds = util.open_dataset(ms['cbid'], ref_ant=ant, cache_root="./l1_data")
    track_ants = ds.obs_params['track_ants'].split(',')
    for track_ant in track_ants:
        output_filepattern = "%s_%s_circular_pointing.csv" if len(track_ants)==1 else f"%s_%s_{track_ant}_circular_pointing.csv"
        try:
            tracking.reduce_pointing_scans(ds, ant, track_ant=track_ant,
                                           kind=ms.get('kind', 'cardioid'), scans="track",
                                           freq_MHz=ms['freq_MHz'], strict=False,
                                           output_filepattern='./l2_data/'+output_filepattern, debug=False, verbose=True)
        except Exception as e: # Likely not correct track_ant
            print("INFO: unable to process %s for %s_%s" % (ms['cbid'], ant, track_ant), e)
    if (ms['cbid'] > 1760000000): # TEMPORARY 
        ds.del_cache()

In [None]:
# Confirm hardware status over the time span
data1, _ = katsepnt.load_apss_data([katsepnt.find_files(f"{cbid}_*l0_{ant}*.csv", "./l2_data")[0] for cbid in cbids])

# FI angles - directly from sensor database
katselib.plot_sensors(data1['timestamp'], [ant+"_dsm_indexerActualPosition"], interpolate="zero", figsize=(12,3))
# Tiltmeter corrections - directly from sensor database
katselib.plot_sensors(data1['timestamp'], [ant+"_dsm_tiltPointCorrEnabled"], interpolate="zero", figsize=(12,3))
# There are some glitches in tilt corrections reported - but probably LMC sensor spropagation issue?
plt.figure(figsize=(12,3)); plt.grid(True)
plt.plot(data1['timestamp'], data1['tiltcorr_az'], '_')
plt.plot(data1['timestamp'], data1['tiltcorr_el'], '|')

In [None]:
# Eyeball the raw centroids, check time of day, wind speed & outliers!
figs = []
# To avoid "spurious outliers" due to ACU keeps turning tilt corr on, we look for outliers with UNDO...
katsepnt.eval_pointingstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], root="./l2_data", update_tiltcorr='UNDO',
                                    metrics=["timestamp","sun_el","elevation","wind_speed","wind_dynamic","snr_I"], meshplot=[], figs=figs);

### With tiltmeter

It is possible to combine datasets that were recorded with the same tiltmeter but either disabled or with incorrect calibation, with data recorded with correctly calibrated corrections applied in real time. For that use `update_tiltcorr='RECALC'` to update the data as we process it (i.e. no change to files). If you do this BEWARE:
* processing is quite slow, and
* you MUST specify the correct calibration table in `katsepnt.TILT_ACTIVE_CAL[ant]` (preferably at the top of notebook)

If all data was recorded with ACU correctly set-up then you may simply remove `update_tiltcorr='RECALC'` in all cases below.

In [None]:
mss = [ku_ms_idx, s_ms_idx]

BAD_TS = [("snr_I",lambda s:s>7)] # SNR shown below to elimintate (1) outlier

In [None]:
# Eyeball the measurement sets separately, without tilt corrections
for ms_idx in mss:
    cbids = [_['cbid'] for _ in ms_idx]
    print(cbids)
    for cbid in cbids:
        TS, el, fi, diff_dxe, diff_de, diff_sdxe, diff_sde = katsepnt.eval_indexerstability(f"{cbid}_*{ant}*.csv", "./l2_data",
                                                                update_tiltcorr='UNDO', filters=BAD_TS, debug=1)    

In [None]:
# Repeat above but tilt corrections RECALC
for ms_idx in mss:
    cbids = [_['cbid'] for _ in ms_idx]
    print(cbids)
    for cbid in cbids:
        TS, el, fi, diff_dxe, diff_de, diff_sdxe, diff_sde = katsepnt.eval_indexerstability(f"{cbid}_*{ant}*.csv", "./l2_data",
                                                                update_tiltcorr='RECALC', filters=BAD_TS, debug=1)

In [None]:
# Check impact of STANDARD & DEGRADED conditions
cbids = [_['cbid'] for _ in np.concatenate(mss)]
S_all = katsepnt.eval_pointingstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], root="./l2_data", filters=[], figs=None)
S_s = katsepnt.eval_pointingstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], root="./l2_data", filters=UPTO_STANDARD, figs=None)
S_d = katsepnt.eval_pointingstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], root="./l2_data", filters=UPTO_DEGRADED, figs=None)
print("Individual points UPTO_DEGRADED ~", len(S_d.TS)/len(S_all.TS), ", UPTO_STANDARD ~", len(S_s.TS)/len(S_all.TS))

for ms_idx in mss:
    cbids = [_['cbid'] for _ in ms_idx]
    TS, *_ = katsepnt.eval_indexerstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], "./l2_data", filters=BAD_TS)
    TSud, *_ = katsepnt.eval_indexerstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], "./l2_data", filters=BAD_TS+UPTO_DEGRADED)
    TSus, *_ = katsepnt.eval_indexerstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], "./l2_data", filters=BAD_TS+UPTO_STANDARD)    
    print(cbids, "UPTO_DEGRADED ~", len(TSud)/len(TS), ", UPTO_STANDARD ~", len(TSus)/len(TS) )

In [None]:
# Check detailed impact of environment filtering
for cbid in cbids:
    TS, el, fi, diff_dxe, diff_de, diff_sdxe, diff_sde = katsepnt.eval_indexerstability(f"{cbid}_*{ant}*.csv", "./l2_data",
                                                                update_tiltcorr='RECALC', filters=BAD_TS, debug=1)
    plt.close(plt.get_fignums()[-1])
    TS, el, fi, diff_dxe, diff_de, diff_sdxe, diff_sde = katsepnt.eval_indexerstability(f"{cbid}_*{ant}*.csv", "./l2_data",
                                                                update_tiltcorr='RECALC', filters=BAD_TS+UPTO_STANDARD, debug=1)
    plt.close(plt.get_fignums()[-1])
# -> UPTO_STANDARD: Entire cycles are discarded, TODO need more sophisticated algorithm to work around that!
# -> UPTO_DEGRADED: Only first Ku-band dataset affected, slightly worse so don't apply filter

In [None]:
# Summarise all Ku-band measurements & all S-band measurements separately, without tilt corrections
for ms_idx in mss:
    cbids = [_['cbid'] for _ in ms_idx]
    TS, el, fi, diff_dxe, diff_de, diff_sdxe, diff_sde = katsepnt.eval_indexerstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], "./l2_data",
                                                            update_tiltcorr='UNDO', filters=BAD_TS)
    plt.hlines(9*np.r_[1,-1], *plt.xlim(), 'r'); # 100% of req
    
    print("95pct radii: xe~%.1f, e~%.1f''" % (np.nanpercentile(np.abs(diff_dxe), 95), np.nanpercentile(np.abs(diff_de), 95)))

In [None]:
# Repeat above but tilt corrections RECALC
for ms_idx in mss:
    cbids = [_['cbid'] for _ in ms_idx]
    TS, el, fi, diff_dxe, diff_de, diff_sdxe, diff_sde = katsepnt.eval_indexerstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], "./l2_data",
                                                            update_tiltcorr='RECALC', filters=BAD_TS)
    plt.hlines(9*np.r_[1,-1], *plt.xlim(), 'r'); # 100% of req
    
    print("95pct radii: xe~%.1f, e~%.1f''" % (np.nanpercentile(np.abs(diff_dxe), 95), np.nanpercentile(np.abs(diff_de), 95)))

In [None]:
# Over elevation
axs = plt.subplots(len(mss),1, sharex=True,sharey=True, figsize=(12,3*len(mss)))[1]
for ax,ms_idx in zip(axs,mss):
    cbids = [_['cbid'] for _ in ms_idx]
    TS, el, fi, diff_dxe, diff_de, diff_sdxe, diff_sde = katsepnt.eval_indexerstability([f"{cbid}_*{ant}*.csv" for cbid in cbids], "./l2_data",
                                                            update_tiltcorr='RECALC', filters=BAD_TS)
    plt.close(plt.get_fignums()[-1])
    
    ax.set_title("Pointing offsets from repeated Feed Indexer cycling @ FI ~ %.1fdeg" % np.mean(fi))
    ax.plot(el, diff_dxe, 'C0_', label="xEl")
    ax.plot(el, diff_de, 'C1|', label="El")
    ax.legend(); ax.grid(True)
    ax.set_ylabel(r"$\bar{\theta}_i-\bar{\theta}_j$ [arcsec]")
ax.set_xlabel("Elevation [deg]"); xlim = ax.get_xlim()
for ax in axs:
    ax.hlines(9*np.r_[1,-1], *xlim, 'r') # 100% of req