In [1]:
def test_integral_query():
    from astropy.coordinates import SkyCoord
    import numpy as np
    from datetime import datetime
    import pandas as pd
    from IntegralQuery import SearchQuery, IntegralQuery, Filter, Range
    
    p = SkyCoord(83.6333, 22.0144, frame="icrs", unit="deg")
    searchquerry = SearchQuery(position=p, radius="10 degree")

    cat = IntegralQuery(searchquerry)
    f = Filter(SCW_TYPE="POINTING")
    scw_ids = cat.apply_filter(f,return_coordinates=True, remove_duplicates=False)
        
    assert scw_ids.shape == (8087, 4), "The number of pointings found is not what we expect"
    
    assert np.array_equal(scw_ids[3], np.array(['003900020030', 83.635201, 22.012722,
                                                datetime(2003, 2, 7, 7, 56, 16)], dtype=object)), \
        "A content of scw_ids is not what we expect"
        
    cat.sort_by("DEC_X")
    
    assert np.array_equal(np.array(cat.table.iloc[1114]),
                          np.array(['115900130010', 1, 'POINTING', 82.235039, 16.914862, 
                                    pd.Timestamp('2012-04-10 11:09:23'), pd.Timestamp('2012-04-10 11:44:04'), 
                                    '88602500001', 'CALIBRATION', 'PUBLIC', 'PUBLIC', 2053.0, 2038.0, 
                                    2051.0, 2053.0, 2053.0, 2053.0, 2052.0, 81747968.0, 316.022])), \
                                        "Sort_by did not produce the correct table"
                                        
    f = Filter(RA_X=Range(78., None),
           DEC_X=Range(None, 30.),
           TIME=Range(min_val='2009-11-04T00:05:23', max_val='2015-09-04T00:05:23'),
           OBS_TYPE="CALIBRATION",
           PS="PUBLIC",
           PI_NAME="PUBLIC",
           GOOD_SPI=Range(None, 500))
    
    cat.sort_by("DSIZE")
    
    scw_ids = cat.apply_filter(f,return_coordinates=False, remove_duplicates=True)
    
    assert (scw_ids.shape == (1511,)) and (scw_ids[1000]=='132700450021'), \
        "Filter gave wrong results"
                                        
    searchquerry = SearchQuery(object_name="Cyg X-1")
    cat = IntegralQuery(searchquerry)
    f = Filter()
    scw_ids = cat.apply_filter(f,return_coordinates=False, remove_duplicates=True)
    
    assert scw_ids.shape == (19151,), "Object Query gave wrong number of pointings"
    
    f = Filter(SCW_ID='004000100010',
               SCW_VER=2)
    
    scw_ids = cat.apply_filter(f,return_coordinates=False, remove_duplicates=True)
    
    assert scw_ids.shape == (0,), "SCW_ID or SCW_VER filter gave wrong result"
    
                                
test_integral_query()



In [5]:
def test_clustering():
    import numpy as np
    from IntegralQuery import SearchQuery, IntegralQuery, Filter, Range
    from IntegralPointingClustering import ClusteredQuery
    
    searchquerry = SearchQuery(object_name="Cyg X-1")
    cat = IntegralQuery(searchquerry)
    f = Filter(SCW_TYPE="POINTING")
    scw_ids = cat.apply_filter(f, return_coordinates=True, remove_duplicates=True)
    
    np.random.seed(0)
    
    cq = ClusteredQuery(scw_ids, 1/6, 1/15, 1, 0.3,
                        cluster_size_range = (5,5),
                        cluster_size_preference_threshold = (),
                        failed_improvements_max = 2,
                        suboptimal_cluster_size = 2,
                        close_suboptimal_cluster_size = 3,
                        track_performance = False)
    
    region_sizes = [len(cq.clusters[i+1]) for i in range(5)]
    
    assert region_sizes == [440, 123, 56, 73, 1710], "Cluster Sizes are not what we expect"
    
    clustered = set()
    n = 0
    for key, value in cq.clusters.items():
        for c in value:
            clustered = clustered | set(c.indices)
            n += c.num_pointings
            
    missing = [i for i in range(cq._num_pointings) if i not in clustered]
    
    assert missing == [], "Not all pointings were clustered"
    
    assert cq._num_pointings == n, "Some pointings are in multiple clusters"
    
    ss = cq.get_clustered_scw_ids()
    
    assert (ss[1][0], ss[5][-1]) == (['001130001060'], 
                                     ['250300020010',
                                      '250300090010',
                                      '250300100010',
                                      '250300110010',
                                      '250300120010']), \
                                        "The clusters are not what we expect"
    
    np.random.seed(0)
                                
    cq = ClusteredQuery(scw_ids, 0, 1/15, 1, 0.3, 6,
                    cluster_size_range = (3,6),
                    cluster_size_preference_threshold = (10.,8.,3.),
                    failed_improvements_max = 2,
                    suboptimal_cluster_size = 3,
                    close_suboptimal_cluster_size = 3,
                    track_performance = False)
    
    region_sizes = [len(cq.clusters[i+1]) for i in range(6)]
    
    assert region_sizes == [390, 136, 81, 97, 93, 1323], "Cluster Sizes are not what we expect"
    
    clustered = set()
    n = 0
    for key, value in cq.clusters.items():
        for c in value:
            clustered = clustered | set(c.indices)
            n += c.num_pointings
            
    missing = [i for i in range(cq._num_pointings) if i not in clustered]
    
    assert missing == [], "Not all pointings were clustered"
    
    assert cq._num_pointings == n, "Some pointings are in multiple clusters"
    
    ss = cq.get_clustered_scw_ids()
    
    assert (ss[1][-1], ss[6][1]) == (['240100460010'],
                                     ['001700110010',
                                      '001900050010',
                                      '001900060010',
                                      '001900070010',
                                      '001900130010',
                                      '001900140010']), \
                                        "The clusters are not what we expect"
    
    
test_clustering()

AssertionError: Cluster Sizes are not what we expect