Skip to content

Commit

Permalink
Merge pull request #53 from predict-idlab/windows_bug
Browse files Browse the repository at this point in the history
🚑 Fix windows bug
  • Loading branch information
jvdd committed Dec 24, 2021
2 parents 8638d98 + fe8aafa commit 1b7d142
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 14 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Expand Up @@ -12,10 +12,11 @@ on:
jobs:
build:

runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ['windows-latest', 'macOS-latest', 'ubuntu-latest']
python-version: [3.7, 3.8, 3.9]

steps:
Expand Down
3 changes: 2 additions & 1 deletion tests/utils.py
Expand Up @@ -4,8 +4,8 @@

import os
import pytest
import logging
import pandas as pd
import numpy as np

from typing import Union, Dict

Expand All @@ -29,6 +29,7 @@ def logging_file_path() -> str:
yield logging_path
# Cleanup after test
if os.path.exists(logging_path):
logging.shutdown()
os.remove(logging_path)


Expand Down
20 changes: 17 additions & 3 deletions tsflex/features/feature_collection.py
Expand Up @@ -319,6 +319,13 @@ def calculate(
If n_jobs is either 0 or 1, the code will be executed sequentially without
creating a process pool. This is very useful when debugging, as the stack
trace will be more comprehensible.
.. note:
Multiprocessed execution is not supported on Windows. Even when,
`n_jobs` is set > 1, the feature extraction will still be executed
sequentially.
Why do we not support multiprocessing on Windows; see this issue
https://github.com/predict-idlab/tsflex/issues/51
.. tip::
It takes on avg. _300ms_ to schedule everything with
Expand All @@ -341,7 +348,7 @@ def calculate(
delete_logging_handlers(logger)
# Add logging handler (if path provided)
if logging_file_path:
add_logging_handler(logger, logging_file_path)
f_handler = add_logging_handler(logger, logging_file_path)

# Convert the data to a series_dict
series_dict: Dict[str, pd.Series] = {}
Expand Down Expand Up @@ -371,7 +378,9 @@ def calculate(
)
nb_stroll_funcs = self._get_stroll_feat_length()

if n_jobs is None:
if os.name == "nt": # On Windows no multiprocessing is supported, see https://github.com/predict-idlab/tsflex/issues/51
n_jobs = 1
elif n_jobs is None:
n_jobs = os.cpu_count()
n_jobs = min(n_jobs, nb_stroll_funcs)

Expand All @@ -388,7 +397,7 @@ def calculate(
with Pool(processes=n_jobs) as pool:
results = pool.imap_unordered(self._executor, range(nb_stroll_funcs))
if show_progress:
results = tqdm(results, total=self._get_stroll_feat_length())
results = tqdm(results, total=nb_stroll_funcs)
try:
calculated_feature_list = [f for f in results]
except:
Expand All @@ -399,6 +408,11 @@ def calculate(
pool.close()
pool.join()

# Close the file handler (this avoids PermissionError: [WinError 32])
if logging_file_path:
f_handler.close()
logger.removeHandler(f_handler)

if calculated_feature_list is None:
raise RuntimeError(
"Feature Extraction halted due to error while extracting one (or multiple) feature(s)! "
Expand Down
11 changes: 10 additions & 1 deletion tsflex/processing/series_pipeline.py
Expand Up @@ -187,7 +187,7 @@ def process(
delete_logging_handlers(logger)
# Add logging handler (if path provided)
if logging_file_path:
add_logging_handler(logger, logging_file_path)
f_handler = add_logging_handler(logger, logging_file_path)

# Convert the data to a series_dict
series_dict: Dict[str, pd.Series] = {}
Expand All @@ -210,12 +210,21 @@ def process(
output_keys.update(processed_dict.keys())
series_dict.update(processed_dict)
except Exception as e:
# Close the file handler (this avoids PermissionError: [WinError 32])
if logging_file_path:
f_handler.close()
logger.removeHandler(f_handler)
raise _ProcessingError(
"Error while processing function {}:\n {}".format(
processor.name, str(e)
)
) from e

# Close the file handler (this avoids PermissionError: [WinError 32])
if logging_file_path:
f_handler.close()
logger.removeHandler(f_handler)

if not return_all_series:
# Return just the output series
output_dict = {key: series_dict[str(key)] for key in output_keys}
Expand Down
22 changes: 14 additions & 8 deletions tsflex/utils/logging.py
@@ -1,6 +1,6 @@
"""Utility functions for logging operations."""

__author__ = 'Jeroen Van Der Donckt'
__author__ = "Jeroen Van Der Donckt"

import logging
import warnings
Expand All @@ -22,7 +22,7 @@ def remove_inner_brackets(message: str) -> str:
-------
str:
A new message without any inner brackets.
"""
level = 0
new_message = ""
Expand All @@ -48,7 +48,7 @@ def delete_logging_handlers(logger: logging.Logger):
----------
logger : logging.Logger
The logger.
"""
if len(logger.handlers) > 1:
logger.handlers = [
Expand All @@ -57,7 +57,9 @@ def delete_logging_handlers(logger: logging.Logger):
assert len(logger.handlers) == 1, "Multiple logging StreamHandlers present!!"


def add_logging_handler(logger: logging.Logger, logging_file_path: Union[str, Path]):
def add_logging_handler(
logger: logging.Logger, logging_file_path: Union[str, Path]
) -> logging.FileHandler:
"""Add a logging file-handler to the logger.
Parameters
Expand All @@ -66,7 +68,12 @@ def add_logging_handler(logger: logging.Logger, logging_file_path: Union[str, Pa
The logger.
logging_file_path : Union[str, Path]
The file path for the file handler.
Returns
-------
logging.FileHandler
The file-handler that is added to the given logger.
"""
if not isinstance(logging_file_path, Path):
logging_file_path = Path(logging_file_path)
Expand All @@ -81,12 +88,11 @@ def add_logging_handler(logger: logging.Logger, logging_file_path: Union[str, Pa
open(logging_file_path, "w").close()
f_handler = logging.FileHandler(logging_file_path, mode="w")
f_handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
f_handler.setLevel(logging.INFO)
logger.addHandler(f_handler)
return f_handler


def logging_file_to_df(logging_file_path: str) -> pd.DataFrame:
Expand Down

0 comments on commit 1b7d142

Please sign in to comment.