Skip to content

Commit

Permalink
Merge pull request #1484 from macrosynergy/bugfix/jpmaqs_download
Browse files Browse the repository at this point in the history
  • Loading branch information
Magnus167 committed Feb 22, 2024
2 parents 42b944d + 791bf7c commit 4f2c63f
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
9 changes: 4 additions & 5 deletions macrosynergy/download/dataquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,8 +964,6 @@ def _download(
**kwargs,
)

final_output = self._chain_download_outputs(download_outputs)

if len(failed_batches) > 0:
flat_failed_batches: List[str] = list(
itertools.chain.from_iterable(failed_batches)
Expand All @@ -986,11 +984,12 @@ def _download(
*args,
**kwargs,
)
download_outputs.extend(retried_output)

# extend retried output
final_output = self._chain_download_outputs([final_output, retried_output])
if retry_counter == 0:
return self._chain_download_outputs(download_outputs)

return final_output
return download_outputs

def download_data(
self,
Expand Down
60 changes: 53 additions & 7 deletions macrosynergy/download/jpmaqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,12 @@ def validate_downloaded_df(

if expr_missing:
log_str = (
f"Some expressions are missing from the downloaded data."
" Check logger output for complete list. \n"
f"{len(expr_missing)} out of {len(expr_expected)} expressions are "
"missing."
f"To download the catalogue of all available expressions and filter the"
" unavailable expressions, set `get_catalogue=True` in the "
" call to `JPMaQSDownload.download()`."
f"Some expressions are missing from the downloaded data. "
"Check logger output for complete list.\n"
f"{len(expr_missing)} out of {len(expr_expected)} expressions are missing. "
f"To download the catalogue of all available expressions and filter the "
"unavailable expressions, set `get_catalogue=True` in the "
"call to `JPMaQSDownload.download()`."
)

logger.info(log_str)
Expand All @@ -360,6 +359,34 @@ def validate_downloaded_df(
else data_df.index.unique()
)
dates_missing = list(set(dates_expected) - set(found_dates))
log_str = (
"The expressions in the downloaded data are not a subset of the expected expressions."
" Missing expressions: {missing_exprs}"
)
err_statement = (
"The expressions in the downloaded data are not a subset of the "
"expected expressions."
)
check_exprs = set()
if isinstance(data_df, QuantamentalDataFrame):
found_metrics = list(
set(data_df.columns) - set(QuantamentalDataFrame.IndexCols)
)
for col in QuantamentalDataFrame.IndexCols:
if not len(data_df[col].unique()) > 0:
raise InvalidDataframeError(f"Column {col} is empty.")

check_exprs = construct_expressions(
tickers=(data_df["cid"] + "_" + data_df["xcat"]).unique(),
metrics=found_metrics,
)

else:
check_exprs = data_df.columns.tolist()

missing_exprs = set(check_exprs) - set(found_expressions)
if len(missing_exprs) > 0:
logger.critical(log_str.format(missing_exprs=missing_exprs))

if len(dates_missing) > 0:
log_str = (
Expand Down Expand Up @@ -659,7 +686,16 @@ def _chain_download_outputs(
return concat_column_dfs(df_list=download_outputs)

if isinstance(download_outputs[0][0], (dict, QuantamentalDataFrame)):
logger.debug(f"Chaining {len(download_outputs)} outputs.")
_ch_types = list(
itertools.chain.from_iterable(
[list(map(type, x)) for x in download_outputs]
)
)
logger.debug(f"Object types in the downloaded data: {_ch_types}")

download_outputs = list(itertools.chain.from_iterable(download_outputs))

if isinstance(download_outputs[0], dict):
return download_outputs
if isinstance(download_outputs[0], QuantamentalDataFrame):
Expand Down Expand Up @@ -702,6 +738,12 @@ def _fetch_timeseries(
ts_list = concat_column_dfs(
df_list=[timeseries_to_column(ts) for ts in ts_list]
)
logger.debug(f"Downloaded data for {len(ts_list)} expressions.")
logger.debug(f"Unavailble expressions: {self.unavailable_expressions}")

downloaded_types = list(set(map(type, ts_list)))
logger.debug(f"Object types in the downloaded data: {downloaded_types}")

return ts_list

def download_data(self, *args, **kwargs):
Expand Down Expand Up @@ -882,6 +924,10 @@ def download(
verbose=True,
):
raise InvalidDataframeError("Downloaded data is invalid.")

if dataframe_format == "qdf":
assert isinstance(data, QuantamentalDataFrame)

return data


Expand Down
1 change: 1 addition & 0 deletions macrosynergy/management/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __instancecheck__(cls, instance):
result = result and not isinstance(instance.columns, pd.MultiIndex)
result = result and all([col in instance.columns for col in IDX_COLS])
result = result and len(instance.columns) > len(IDX_COLS)
result = result and len(instance.columns) == len(set(instance.columns))

correct_date_type: bool = (
instance["real_date"].dtype == "datetime64[ns]"
Expand Down

0 comments on commit 4f2c63f

Please sign in to comment.