forked from starkit/wsynphot
-
Notifications
You must be signed in to change notification settings - Fork 1
/
cache_filters.py
195 lines (157 loc) · 6.77 KB
/
cache_filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os, re
import numpy as np
import logging
# tqdm.autonotebook automatically chooses between console & notebook
from tqdm.autonotebook import tqdm
from astropy.io.votable import parse_single_table
from wsynphot.io.get_filter_data import (get_filter_index,
get_transmission_data)
from wsynphot.config import get_data_dir
CACHE_DIR = os.path.join(get_data_dir(), 'filters', 'SVO')
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
logger = logging.getLogger(__name__)
def cache_as_votable(table, file_path):
"""Caches the passed table on disk as a VOTable.
Parameters
----------
table : astropy.table.Table
Table to be cached
file_path : str
Path where VOTable is to be saved
"""
if not file_path.endswith('.vot'):
file_path += '.vot'
dir_path = os.path.dirname(file_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# Write table as votable (overwrite for cases when file already exists)
table.write(file_path, format='votable', overwrite=True)
def download_filter_data(cache_dir=CACHE_DIR):
"""Downloads the entire filter data (filter index and transmission data
of each filter) locally on disk as cache.
Parameters
----------
cache_dir : str, optional
Path of the directory where downloaded data is to be cached
"""
# Get filter index and cache it
logger.info("Caching filter index ...")
index_table = get_filter_index().to_table()
cache_as_votable(index_table,
os.path.join(cache_dir, 'index'))
# Fetch filter_ids from index as an iterator decorated with progress bar
logger.info("Caching transmission data ...")
filter_ids_pbar = tqdm(index_table['filterID'], desc='Filter ID',
total=len(index_table))
# Iterate over each filter_id and download transmission data
for filter_id in filter_ids_pbar:
filter_id = filter_id.decode("utf-8") # convert byte string to literal
filter_ids_pbar.set_postfix_str(filter_id)
try:
download_transmission_data(filter_id, cache_dir)
except Exception as e:
logger.error('Data for filter ID = {0} could not be downloaded '
'due to:\n{1}'.format(filter_id, e))
def download_transmission_data(filter_id, cache_dir=CACHE_DIR):
"""Downloads transmission data for the requested filter ID systematically
on disk as cache (in facility/instrument/ directory).
Parameters
----------
filter_id : str
Filter ID in either wsynphot format: 'facilty/instrument/filter'
or SVO format: 'facilty/instrument.filter' (Can use '/' and '.'
interchangeably as delimiters)
cache_dir : str, optional
Path of the directory where downloaded data is to be cached
"""
facility, instrument, filter_name = re.split('/|\.', filter_id)
# Convert filter_id in SVO format to get transmission data from SVO
svo_filter_id = '{0}/{1}.{2}'.format(facility, instrument, filter_name)
filter_table = get_transmission_data(svo_filter_id).to_table()
cache_as_votable(filter_table, os.path.join(cache_dir, facility,
instrument, filter_name))
def load_filter_index(cache_dir=CACHE_DIR):
"""Loads filter index from the cached filter data present on disk as a
pandas dataframe.
Parameters
----------
cache_dir : str, optional
Path of the directory where downloaded data is to be cached
Returns
-------
pandas.core.frame.DataFrame
Filter index loaded as a dataframe
"""
filter_index_loc = os.path.join(cache_dir, 'index.vot')
# When no index votable is present
if not os.path.exists(filter_index_loc):
raise IOError('Filter index does not exist in the cache directory: '
'{0}\nMake sure you have already downloaded filter data by using '
'download_filter_data()'.format(cache_dir))
return df_from_votable(filter_index_loc)
def load_transmission_data(filter_id, cache_dir=CACHE_DIR):
"""Loads transmission data for requested Filter ID from the cached filter
data present on disk as a pandas dataframe.
Parameters
----------
filter_id : str
Filter ID in either wsynphot format: 'facilty/instrument/filter'
or SVO format: 'facilty/instrument.filter' (Can use '/' and '.'
interchangeably as delimiters)
cache_dir : str, optional
Path of the directory where downloaded data is to be cached
Returns
-------
pandas.core.frame.DataFrame
Filter's transmission data loaded as a dataframe
"""
facility, instrument, filter_name = re.split('/|\.', filter_id)
transmission_data_loc = os.path.join(cache_dir, facility, instrument,
'{0}.vot'.format(filter_name))
# When no such filter votable is present
if not os.path.exists(transmission_data_loc):
index = load_filter_index(cache_dir)
# Check whether filter_id is present in index
svo_filter_id = '{0}/{1}.{2}'.format(facility, instrument, filter_name)
if svo_filter_id in index['filterID'].values:
raise IOError('Requested filter ID: {0} exists in index, but its '
'transmission data is missing in the cache directory: {1}\n'
'Make sure you have downloaded complete filter data by using '
'download_filter_data(). Or if you specifically want to '
'download transmission data for only requested filter ID, '
'use download_transmission_data()'.format(filter_id,
cache_dir))
else:
raise ValueError('Requested filter ID: {0} does not '
'exists'.format(filter_id))
return df_from_votable(transmission_data_loc)
def df_from_votable(votable_path):
"""Parses the passed VOTable to produce data in a usable table format as
pandas dataframe.
Parameters
----------
votable_path : str
Path where VOTable to be used is stored. Make sure passed VOTable is
properly formatted, since this is "not" a general purpose function.
Returns
-------
pandas.core.frame.DataFrame
Parsed data as a dataframe
"""
table = parse_single_table(votable_path).to_table()
df = table.to_pandas()
return byte_to_literal_strings(df)
def byte_to_literal_strings(dataframe):
"""Converts byte strings (if any) present in passed dataframe to literal
strings and returns an improved dataframe.
"""
# Select the str columns:
str_df = dataframe.select_dtypes([np.object])
if not str_df.empty:
# Convert all of them into unicode strings
str_df = str_df.stack().str.decode('utf-8').unstack()
# Swap out converted cols with the original df cols
for col in str_df:
dataframe[col] = str_df[col]
return dataframe