diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..9add2b0 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,28 @@ +[run] +branch = True +data_file = var/coverage +omit = + tests/* + +[report] +; Regexes for lines to exclude from consideration +exclude_also = + ; Don't complain about missing debug-only code: + def __repr__ + if self\.debug + + ; Don't complain if tests don't hit defensive assertion code: + raise AssertionError + raise NotImplementedError + + ; Don't complain if non-runnable code isn't run: + if 0: + if __name__ == .__main__.: + + ; Don't complain about abstract methods, they aren't run: + @(abc\.)?abstractmethod + +ignore_errors = True + +[html] +directory = var/coverage_html_report \ No newline at end of file diff --git a/.github/5578703.png b/.github/5578703.png new file mode 100644 index 0000000..8340985 Binary files /dev/null and b/.github/5578703.png differ diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml new file mode 100644 index 0000000..63e85e5 --- /dev/null +++ b/.github/workflows/tests.yaml @@ -0,0 +1,26 @@ +name: Python data-flow Tests + +on: [ push ] + +jobs: + tests: + + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.10", "3.11", "3.12" ] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Display Python version + run: python -c "import sys; print(sys.version)" + - name: Install modules + run: pip install -r requirements.txt && pip install -r requirements.dev.txt + - name: Tests + run: PYTHONPATH=. pytest --cov=data_flow --cov-report term + - name: Lint + run: pflake8 mysiar_data_flow/ tests/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7899471 --- /dev/null +++ b/.gitignore @@ -0,0 +1,166 @@ +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + + +# data_flow +!mysiar_data_flow/lib +tests/data/*.csv \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b13ad83 --- /dev/null +++ b/Makefile @@ -0,0 +1,27 @@ +venv:: + rm -rf venv + python -m venv venv + venv/bin/pip install -U pip + +pip:: + venv/bin/pip install -r requirements.txt + venv/bin/pip install -r requirements.dev.txt + +tests:: + PYTHONPATH=. venv/bin/pytest --cov=mysiar_data_flow --cov-report html --cov-report term -rP tests/ -vvv + +lint:: + venv/bin/pflake8 mysiar_data_flow/ tests/ + +build:: + rm -rf dist + venv/bin/poetry build + + +upload-test:: + $(MAKE) build + venv/bin/python -m twine upload -u $${PYPI_USER} -p $${PYPI_PASS_TEST} --verbose --repository testpypi dist/* + +upload:: + $(MAKE) build + . venv/bin/activate && python -m twine upload -u $${PYPI_USER} -p $${PYPI_PASS} --verbose dist/* diff --git a/README.md b/README.md index 28941a2..2d36127 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,24 @@ # DataFlow +![tests](https://github.com/mysiar-org/python-data-flow/actions/workflows/tests.yaml/badge.svg) +[![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg)](https://www.python.org/downloads/release/python-3100/) +[![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg)](https://www.python.org/downloads/release/python-3110/) +[![Python 3.12](https://img.shields.io/badge/python-3.12-blue.svg)](https://www.python.org/downloads/release/python-3120/) + + library to manipulate data +## Installation instructions + +```sh +pip install mysiar-data-flow +``` + ## DataFlow.DataFrame -work in progress + +### Usage +For now check [mysiar_data_flow/data_flow.py](mysiar_data_flow/data_flow.py) file for interface + + + +![work in progress](.github/5578703.png) diff --git a/mysiar_data_flow/__init__.py b/mysiar_data_flow/__init__.py new file mode 100644 index 0000000..b98e3ed --- /dev/null +++ b/mysiar_data_flow/__init__.py @@ -0,0 +1 @@ +from .data_flow import DataFlow diff --git a/mysiar_data_flow/data_flow.py b/mysiar_data_flow/data_flow.py new file mode 100644 index 0000000..fc1994a --- /dev/null +++ b/mysiar_data_flow/data_flow.py @@ -0,0 +1,220 @@ +import os +import tempfile +from typing import Any + +import fireducks.pandas as fd +import pandas as pd +import polars as pl +from pyarrow import feather + +from mysiar_data_flow.lib import FileType, Operator +from mysiar_data_flow.lib.data_columns import ( + data_get_columns, + data_delete_columns, + data_rename_columns, + data_select_columns, + data_filter_on_column, +) +from mysiar_data_flow.lib.data_from import ( + from_csv_2_file, + from_feather_2_file, + from_parquet_2_file, + from_json_2_file, + from_hdf_2_file, +) +from mysiar_data_flow.lib.data_to import ( + to_csv_from_file, + to_feather_from_file, + to_parquet_from_file, + to_json_from_file, + to_hdf_from_file, +) +from mysiar_data_flow.lib.fireducks import from_fireducks_2_file, to_fireducks_from_file +from mysiar_data_flow.lib.pandas import from_pandas_2_file +from mysiar_data_flow.lib.tools import generate_temporary_filename, delete_file + + +class DataFlow: + class DataFrame: + __in_memory: bool + __file_type: FileType + __data: fd.DataFrame = None + __filename: str = None + + def __init__(self, in_memory: bool = True, file_type: FileType = FileType.parquet, tmp_file: str = None): + self.__in_memory = in_memory + self.__file_type = file_type + if not in_memory and tmp_file is not None: + self.__filename = tmp_file + if not in_memory and tmp_file is None: + self.__filename = os.path.join(tempfile.gettempdir(), generate_temporary_filename(ext=file_type.name)) + + def __del__(self): + if not self.__in_memory: + delete_file(self.__filename) + + def from_fireducks(self, df: fd.DataFrame): + if self.__in_memory: + self.__data = df + else: + from_fireducks_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_fireducks(self) -> fd.DataFrame: + if self.__in_memory: + return self.__data + else: + return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type) + + def from_pandas(self, df: pd.DataFrame): + if self.__in_memory: + self.__data = fd.from_pandas(df) + else: + from_pandas_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_pandas(self) -> pd.DataFrame: + if self.__in_memory: + return self.__data.to_pandas() + else: + return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas() + + def from_polars(self, df: pl.DataFrame): + if self.__in_memory: + self.__data = fd.from_pandas(df.to_pandas()) + else: + from_pandas_2_file(df=df.to_pandas(), tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_polars(self) -> pl.DataFrame: + if self.__in_memory: + return pl.from_pandas(self.__data.to_pandas()) + else: + return pl.from_pandas( + to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas() + ) + + def from_csv(self, filename: str): + if self.__in_memory: + self.__data = fd.read_csv(filename) + else: + from_csv_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_csv(self, filename: str, index=False): + if self.__in_memory: + self.__data.to_csv(filename, index=index) + else: + to_csv_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def from_feather(self, filename: str): + if self.__in_memory: + self.__data = fd.from_pandas(feather.read_feather(filename)) + else: + from_feather_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_feather(self, filename: str): + if self.__in_memory: + self.__data.to_feather(filename) + else: + to_feather_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def from_parquet(self, filename: str): + if self.__in_memory: + self.__data = fd.read_parquet(filename) + else: + from_parquet_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_parquet(self, filename: str): + if self.__in_memory: + self.__data.to_parquet(filename) + else: + to_parquet_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def from_json(self, filename: str): + if self.__in_memory: + self.__data = fd.read_json(filename) + else: + from_json_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_json(self, filename: str): + if self.__in_memory: + self.__data.to_json(filename) + else: + to_json_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def from_hdf(self, filename: str): + if self.__in_memory: + self.__data = fd.read_hdf(filename) + else: + from_hdf_2_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type) + return self + + def to_hdf(self, filename: str, key: str = "key"): + if self.__in_memory: + self.__data.to_hdf(path_or_buf=filename, key=key) + else: + to_hdf_from_file(filename=filename, tmp_filename=self.__filename, file_type=self.__file_type, key=key) + return self + + def columns(self) -> list: + if self.__in_memory: + return self.__data.columns.to_list() + else: + return data_get_columns(tmp_filename=self.__filename, file_type=self.__file_type) + + def columns_delete(self, columns: list): + if self.__in_memory: + self.__data.drop(columns=columns, inplace=True) + else: + data_delete_columns(tmp_filename=self.__filename, file_type=self.__file_type, columns=columns) + + return self + + def columns_rename(self, columns_mapping: dict): + if self.__in_memory: + self.__data.rename(columns=columns_mapping, inplace=True) + else: + data_rename_columns( + tmp_filename=self.__filename, + file_type=self.__file_type, + columns_mapping=columns_mapping, + ) + return self + + def columns_select(self, columns: list): + if self.__in_memory: + self.__data = self.__data[columns] + else: + data_select_columns(tmp_filename=self.__filename, file_type=self.__file_type, columns=columns) + + def filter_on_column(self, column: str, value: Any, operator: Operator): + if self.__in_memory: + match operator: + case Operator.Eq: + self.__data = self.__data[self.__data[column] == value] + case Operator.Gte: + self.__data = self.__data[self.__data[column] >= value] + case Operator.Lte: + self.__data = self.__data[self.__data[column] <= value] + case Operator.Gt: + self.__data = self.__data[self.__data[column] > value] + case Operator.Lt: + self.__data = self.__data[self.__data[column] < value] + case Operator.Ne: + self.__data = self.__data[self.__data[column] != value] + else: + data_filter_on_column( + tmp_filename=self.__filename, + file_type=self.__file_type, + column=column, + value=value, + operator=operator, + ) diff --git a/mysiar_data_flow/lib/FileType.py b/mysiar_data_flow/lib/FileType.py new file mode 100644 index 0000000..d97eae8 --- /dev/null +++ b/mysiar_data_flow/lib/FileType.py @@ -0,0 +1,6 @@ +from enum import Enum + + +class FileType(Enum): + feather = "feather" + parquet = "parquet" diff --git a/mysiar_data_flow/lib/Operator.py b/mysiar_data_flow/lib/Operator.py new file mode 100644 index 0000000..7abaa22 --- /dev/null +++ b/mysiar_data_flow/lib/Operator.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class Operator(Enum): + Eq = "==" + Gt = ">" + Lt = "<" + Gte = ">=" + Lte = "<=" + Ne = "!=" diff --git a/mysiar_data_flow/lib/__init__.py b/mysiar_data_flow/lib/__init__.py new file mode 100644 index 0000000..b656cc7 --- /dev/null +++ b/mysiar_data_flow/lib/__init__.py @@ -0,0 +1,2 @@ +from .FileType import FileType +from .Operator import Operator diff --git a/mysiar_data_flow/lib/data_columns.py b/mysiar_data_flow/lib/data_columns.py new file mode 100644 index 0000000..ae149b8 --- /dev/null +++ b/mysiar_data_flow/lib/data_columns.py @@ -0,0 +1,82 @@ +from typing import Any + +import fireducks.pandas as fd + +from mysiar_data_flow.lib.FileType import FileType +from mysiar_data_flow.lib.Operator import Operator + + +def data_get_columns(tmp_filename: str, file_type: FileType) -> list: + match file_type: + case FileType.parquet: + return fd.read_parquet(tmp_filename).columns.to_list() + case FileType.feather: + return fd.read_feather(tmp_filename).columns.to_list() + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def data_delete_columns(tmp_filename: str, file_type: FileType, columns: list) -> None: + match file_type: + case FileType.parquet: + data = fd.read_parquet(tmp_filename) + data.drop(columns=columns, inplace=True) + data.to_parquet(tmp_filename) + case FileType.feather: + data = fd.read_feather(tmp_filename) + data.drop(columns=columns, inplace=True) + data.to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def data_rename_columns(tmp_filename: str, file_type: FileType, columns_mapping: dict) -> None: + match file_type: + case FileType.parquet: + fd.read_parquet(tmp_filename).rename(columns=columns_mapping).to_parquet(tmp_filename) + case FileType.feather: + fd.read_feather(tmp_filename).rename(columns=columns_mapping).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def data_select_columns(tmp_filename: str, file_type: FileType, columns: list) -> None: + match file_type: + case FileType.parquet: + data = fd.read_parquet(tmp_filename)[columns] + data.to_parquet(tmp_filename) + case FileType.feather: + data = fd.read_feather(tmp_filename)[columns] + data.to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def data_filter_on_column(tmp_filename: str, file_type: FileType, column: str, value: Any, operator: Operator) -> None: + match file_type: + case FileType.parquet: + data = fd.read_parquet(tmp_filename) + case FileType.feather: + data = fd.read_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + match operator: + case Operator.Eq: + data = data[data[column] == value] + case Operator.Gte: + data = data[data[column] >= value] + case Operator.Lte: + data = data[data[column] <= value] + case Operator.Gt: + data = data[data[column] > value] + case Operator.Lt: + data = data[data[column] < value] + case Operator.Ne: + data = data[data[column] != value] + + match file_type: + case FileType.parquet: + data.to_parquet(tmp_filename) + case FileType.feather: + data.to_feather(tmp_filename) diff --git a/mysiar_data_flow/lib/data_from.py b/mysiar_data_flow/lib/data_from.py new file mode 100644 index 0000000..d7ecf09 --- /dev/null +++ b/mysiar_data_flow/lib/data_from.py @@ -0,0 +1,54 @@ +import fireducks.pandas as fd +from pyarrow import feather + +from mysiar_data_flow.lib.FileType import FileType + + +def from_csv_2_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_csv(filename).to_parquet(tmp_filename) + case FileType.feather: + fd.read_csv(filename).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def from_feather_2_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.from_pandas(feather.read_feather(filename)).to_parquet(tmp_filename) + case FileType.feather: + fd.from_pandas(feather.read_feather(filename)).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def from_parquet_2_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_parquet(filename).to_parquet(tmp_filename) + case FileType.feather: + fd.read_parquet(filename).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def from_json_2_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_json(filename).to_parquet(tmp_filename) + case FileType.feather: + fd.read_json(filename).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def from_hdf_2_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_hdf(filename).to_parquet(tmp_filename) + case FileType.feather: + fd.read_hdf(filename).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") diff --git a/mysiar_data_flow/lib/data_to.py b/mysiar_data_flow/lib/data_to.py new file mode 100644 index 0000000..f73a941 --- /dev/null +++ b/mysiar_data_flow/lib/data_to.py @@ -0,0 +1,53 @@ +import fireducks.pandas as fd + +from mysiar_data_flow.lib.FileType import FileType + + +def to_csv_from_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_parquet(tmp_filename).to_csv(filename) + case FileType.feather: + fd.read_feather(tmp_filename).to_csv(filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def to_feather_from_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_parquet(tmp_filename).to_feather(filename) + case FileType.feather: + fd.read_feather(tmp_filename).to_feather(filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def to_parquet_from_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_parquet(tmp_filename).to_parquet(filename) + case FileType.feather: + fd.read_feather(tmp_filename).to_parquet(filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def to_json_from_file(filename: str, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.read_parquet(tmp_filename).to_json(filename) + case FileType.feather: + fd.read_feather(tmp_filename).to_json(filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def to_hdf_from_file(filename: str, tmp_filename: str, file_type: FileType, key: str = "key") -> None: + match file_type: + case FileType.parquet: + fd.read_parquet(tmp_filename).to_hdf(path_or_buf=filename, key=key) + case FileType.feather: + fd.read_feather(tmp_filename).to_hdf(path_or_buf=filename, key=key) + case _: + raise ValueError(f"File type not implemented: {file_type} !") diff --git a/mysiar_data_flow/lib/fireducks.py b/mysiar_data_flow/lib/fireducks.py new file mode 100644 index 0000000..8c7dd75 --- /dev/null +++ b/mysiar_data_flow/lib/fireducks.py @@ -0,0 +1,23 @@ +import fireducks.pandas as fd + +from mysiar_data_flow.lib.FileType import FileType + + +def from_fireducks_2_file(df: fd.DataFrame, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + df.to_parquet(tmp_filename) + case FileType.feather: + df.to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def to_fireducks_from_file(tmp_filename: str, file_type: FileType) -> fd.DataFrame: + match file_type: + case FileType.parquet: + return fd.read_parquet(tmp_filename) + case FileType.feather: + return fd.read_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") diff --git a/mysiar_data_flow/lib/pandas.py b/mysiar_data_flow/lib/pandas.py new file mode 100644 index 0000000..e6eb975 --- /dev/null +++ b/mysiar_data_flow/lib/pandas.py @@ -0,0 +1,14 @@ +import fireducks.pandas as fd +import pandas as pd + +from mysiar_data_flow.lib.FileType import FileType + + +def from_pandas_2_file(df: pd.DataFrame, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.from_pandas(df).to_parquet(tmp_filename) + case FileType.feather: + fd.from_pandas(df).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") diff --git a/mysiar_data_flow/lib/tools.py b/mysiar_data_flow/lib/tools.py new file mode 100644 index 0000000..6582844 --- /dev/null +++ b/mysiar_data_flow/lib/tools.py @@ -0,0 +1,13 @@ +import os +import uuid + + +def delete_file(filename: str) -> bool: + if os.path.exists(filename): + os.remove(filename) + return True + return False + + +def generate_temporary_filename(ext: str = "tmp") -> str: + return f"{uuid.uuid4()}-{uuid.uuid4()}.{ext}" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c35d749 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,69 @@ +[tool.poetry] +name = "mysiar-data-flow" +version = "0.0.1rc2" +readme = "README.md" +description = "Python data manipulation library" +authors = ["Piotr Synowiec "] +maintainers = ["Piotr Synowiec "] +keywords = ["dataframe", "pandas", "polars", "fireducks"] +classifiers = [ + "Intended Audience :: Developers", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development :: Libraries", + "Topic :: Utilities" +] + +[tool.poetry.dependencies] +python = ">=3.10,<3.13" +fireducks = "*" +tables = "*" +pyarrow = "*" +pandas = "*" +polars = "*" +black = "*" +flake8 = "*" +pyproject-flake8 = "*" +pytest = "*" +pytest-cov = "*" +poetry = "*" + + +[project.urls] +Repository = "https://github.com/mysiar-org/python-data-flow" +Issues = "https://github.com/mysiar-org/python-data-flow/issues" +#Changelog= + +[tool.poetry.extras] +all = [ + +] +dev = [ + "black", + "flake8", + "pyproject-flake8", + "pytest", + "pytest-cov", + "poetry", +] + +#[tool.poetry.packages] +#packages = [ +# { include = "data_flow" } +#] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + + + + +[tool.black] +line-length = 120 + +[tool.flake8] +max-line-length = 120 +exclude = [".git", "venv", "__pychache__", "__init__.py", "build", "dist"] \ No newline at end of file diff --git a/requirements.dev.txt b/requirements.dev.txt new file mode 100644 index 0000000..9440ad0 --- /dev/null +++ b/requirements.dev.txt @@ -0,0 +1,7 @@ +black +flake8 +pyproject-flake8 +pytest +pytest-cov +poetry +twine \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7d3d4f8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +fireducks +tables +pyarrow +pandas +polars \ No newline at end of file diff --git a/tests/BaseTestCase.py b/tests/BaseTestCase.py new file mode 100644 index 0000000..0c39f4e --- /dev/null +++ b/tests/BaseTestCase.py @@ -0,0 +1,100 @@ +import unittest +from typing import Callable +from zipfile import ZipFile + +import pandas as pd + +from mysiar_data_flow import DataFlow +from mysiar_data_flow.lib import Operator + + +class BaseTestCase(unittest.TestCase): + def setUp(self): + ZipFile(self.ZIP_FILE).extractall("./tests/data") + + ZIP_FILE = "./tests/data/annual-enterprise-survey-2023-financial-year-provisional.zip" + CSV_FILE = "./tests/data/annual-enterprise-survey-2023-financial-year-provisional.csv" + TEST_FEATHER_FILE = "/tmp/data-flow.feather" + TEST_PARQUET_FILE = "/tmp/data-flow.parquet" + TEST_CSV_FILE = "/tmp/data-flow.csv" + TEST_JSON_FILE = "/tmp/data-flow.json" + TEST_HDF_FILE = "/tmp/data-flow.h5" + + def assertPandasEqual(self, df1: pd.DataFrame, df2: pd.DataFrame): + self.assertTrue(df1.equals(df2), "Pandas DataFrames are not equal !") + + def all(self, function: Callable): + self._sequence(data=function()) + self._filter_Eq(data=function()) + self._filter_Gte(data=function()) + self._filter_Lte(data=function()) + self._filter_Gt(data=function()) + self._filter_Lt(data=function()) + self._filter_Ne(data=function()) + + # @count_assertions + def _sequence(self, data: DataFlow.DataFrame) -> None: + self.assertPandasEqual(data.to_pandas(), DataFlow().DataFrame().from_csv(self.CSV_FILE).to_pandas()) + polars = data.to_polars() + + self.assertEqual(10, len(data.columns())) + + data.columns_delete( + [ + "Industry_aggregation_NZSIOC", + "Industry_code_NZSIOC", + "Industry_name_NZSIOC", + "Industry_code_ANZSIC06", + "Variable_code", + "Variable_name", + "Variable_category", + ] + ) + + self.assertEqual(3, len(data.columns())) + self.assertListEqual(["Year", "Units", "Value"], data.columns()) + + data.columns_rename(columns_mapping={"Year": "_year_", "Units": "_units_"}) + self.assertListEqual(["_year_", "_units_", "Value"], data.columns()) + + data.columns_select(columns=["_year_"]) + self.assertListEqual(["_year_"], data.columns()) + + self.assertPandasEqual( + DataFlow().DataFrame().from_polars(polars).to_pandas(), + DataFlow().DataFrame().from_csv(self.CSV_FILE).to_pandas(), + ) + + def _filter_Eq(self, data: DataFlow.DataFrame) -> None: + data.filter_on_column(column="Year", operator=Operator.Eq, value=2018) + self.assertListEqual([2018], list(data.to_pandas().Year.unique())) + + def _filter_Gte(self, data: DataFlow.DataFrame) -> None: + data.filter_on_column(column="Year", operator=Operator.Gte, value=2018) + result = data.to_pandas().Year.unique().tolist() + result.sort() + self.assertListEqual([2018, 2019, 2020, 2021, 2022, 2023], result) + + def _filter_Lte(self, data: DataFlow.DataFrame) -> None: + data.filter_on_column(column="Year", operator=Operator.Lte, value=2018) + result = data.to_pandas().Year.unique().tolist() + result.sort() + self.assertListEqual([2013, 2014, 2015, 2016, 2017, 2018], result) + + def _filter_Gt(self, data: DataFlow.DataFrame) -> None: + data.filter_on_column(column="Year", operator=Operator.Gt, value=2018) + result = data.to_pandas().Year.unique().tolist() + result.sort() + self.assertListEqual([2019, 2020, 2021, 2022, 2023], result) + + def _filter_Lt(self, data: DataFlow.DataFrame) -> None: + data.filter_on_column(column="Year", operator=Operator.Lt, value=2018) + result = data.to_pandas().Year.unique().tolist() + result.sort() + self.assertListEqual([2013, 2014, 2015, 2016, 2017], result) + + def _filter_Ne(self, data: DataFlow.DataFrame) -> None: + data.filter_on_column(column="Year", operator=Operator.Ne, value=2018) + result = data.to_pandas().Year.unique().tolist() + result.sort() + self.assertListEqual([2013, 2014, 2015, 2016, 2017, 2019, 2020, 2021, 2022, 2023], result) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/data/annual-enterprise-survey-2023-financial-year-provisional.zip b/tests/data/annual-enterprise-survey-2023-financial-year-provisional.zip new file mode 100644 index 0000000..6915e6e Binary files /dev/null and b/tests/data/annual-enterprise-survey-2023-financial-year-provisional.zip differ diff --git a/tests/test_base_test_case.py b/tests/test_base_test_case.py new file mode 100644 index 0000000..3bde473 --- /dev/null +++ b/tests/test_base_test_case.py @@ -0,0 +1,22 @@ +import unittest + +import pandas as pd + +from tests.BaseTestCase import BaseTestCase + + +class BaseTestCaseTestCase(BaseTestCase): + def test_assert_pandas_equal(self): + df1 = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}) + df2 = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}) + df3 = pd.DataFrame({"Name": ["Tom", "nick", "krish", "jack"], "Age": [20, 21, 19, 18]}) + + self.assertPandasEqual(df1, df2) + + with self.assertRaises(AssertionError) as context: + self.assertPandasEqual(df1, df3) + self.assertEqual(str(context.exception), "False is not true : Pandas DataFrames are not equal !") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_data_flow_csv.py b/tests/test_data_flow_csv.py new file mode 100644 index 0000000..b83ed07 --- /dev/null +++ b/tests/test_data_flow_csv.py @@ -0,0 +1,35 @@ +import unittest + +from mysiar_data_flow import DataFlow +from mysiar_data_flow.lib import FileType +from mysiar_data_flow.lib.tools import delete_file +from tests.BaseTestCase import BaseTestCase + + +class DataFlowCSVTestCase(BaseTestCase): + def setUp(self): + super().setUp() + delete_file(self.TEST_CSV_FILE) + DataFlow().DataFrame().from_csv(self.CSV_FILE).to_csv(self.TEST_CSV_FILE) + + def test_memory(self): + self.all(self.__memory) + + def test_parquet(self): + self.all(self.__parquet) + + def test_feather(self): + self.all(self.__feather) + + def __memory(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame().from_csv(self.TEST_CSV_FILE) + + def __parquet(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False).from_csv(self.TEST_CSV_FILE) + + def __feather(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False, file_type=FileType.feather).from_csv(self.TEST_CSV_FILE) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_data_flow_feather.py b/tests/test_data_flow_feather.py new file mode 100644 index 0000000..95ab8c5 --- /dev/null +++ b/tests/test_data_flow_feather.py @@ -0,0 +1,35 @@ +import unittest + +from mysiar_data_flow import DataFlow +from mysiar_data_flow.lib import FileType +from mysiar_data_flow.lib.tools import delete_file +from tests.BaseTestCase import BaseTestCase + + +class DataFlowFeatherTestCase(BaseTestCase): + def setUp(self): + super().setUp() + delete_file(self.TEST_FEATHER_FILE) + DataFlow().DataFrame().from_csv(self.CSV_FILE).to_feather(self.TEST_FEATHER_FILE) + + def test_memory(self): + self.all(self.__memory) + + def test_parquet(self): + self.all(self.__parquet) + + def test_feather(self): + self.all(self.__feather) + + def __memory(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame().from_feather(self.TEST_FEATHER_FILE) + + def __parquet(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False).from_feather(self.TEST_FEATHER_FILE) + + def __feather(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False, file_type=FileType.feather).from_feather(self.TEST_FEATHER_FILE) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_data_flow_hdf.py b/tests/test_data_flow_hdf.py new file mode 100644 index 0000000..76ac185 --- /dev/null +++ b/tests/test_data_flow_hdf.py @@ -0,0 +1,35 @@ +import unittest + +from mysiar_data_flow import DataFlow +from mysiar_data_flow.lib import FileType +from mysiar_data_flow.lib.tools import delete_file +from tests.BaseTestCase import BaseTestCase + + +class DataFlowHdfTestCase(BaseTestCase): + def setUp(self): + super().setUp() + delete_file(self.TEST_HDF_FILE) + DataFlow().DataFrame().from_csv(self.CSV_FILE).to_hdf(self.TEST_HDF_FILE) + + def test_memory(self): + self.all(self.__memory) + + def test_parquet(self): + self.all(self.__parquet) + + def test_feather(self): + self.all(self.__feather) + + def __memory(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame().from_hdf(self.TEST_HDF_FILE) + + def __parquet(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False).from_hdf(self.TEST_HDF_FILE) + + def __feather(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False, file_type=FileType.feather).from_hdf(self.TEST_HDF_FILE) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_data_flow_json.py b/tests/test_data_flow_json.py new file mode 100644 index 0000000..b70c37f --- /dev/null +++ b/tests/test_data_flow_json.py @@ -0,0 +1,35 @@ +import unittest + +from mysiar_data_flow import DataFlow +from mysiar_data_flow.lib import FileType +from mysiar_data_flow.lib.tools import delete_file +from tests.BaseTestCase import BaseTestCase + + +class DataFlowJsonTestCase(BaseTestCase): + def setUp(self): + super().setUp() + delete_file(self.TEST_JSON_FILE) + DataFlow().DataFrame().from_csv(self.CSV_FILE).to_json(self.TEST_JSON_FILE) + + def test_memory(self): + self.all(self.__memory) + + def test_parquet(self): + self.all(self.__parquet) + + def test_feather(self): + self.all(self.__feather) + + def __memory(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame().from_json(self.TEST_JSON_FILE) + + def __parquet(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False).from_json(self.TEST_JSON_FILE) + + def __feather(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False, file_type=FileType.feather).from_json(self.TEST_JSON_FILE) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_data_flow_parquet.py b/tests/test_data_flow_parquet.py new file mode 100644 index 0000000..d7bc73d --- /dev/null +++ b/tests/test_data_flow_parquet.py @@ -0,0 +1,35 @@ +import unittest + +from mysiar_data_flow import DataFlow +from mysiar_data_flow.lib import FileType +from mysiar_data_flow.lib.tools import delete_file +from tests.BaseTestCase import BaseTestCase + + +class DataFlowParquetTestCase(BaseTestCase): + def setUp(self): + super().setUp() + delete_file(self.TEST_PARQUET_FILE) + DataFlow().DataFrame().from_csv(self.CSV_FILE).to_parquet(self.TEST_PARQUET_FILE) + + def test_memory(self): + self.all(self.__memory) + + def test_parquet(self): + self.all(self.__parquet) + + def test_feather(self): + self.all(self.__feather) + + def __memory(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame().from_parquet(self.TEST_PARQUET_FILE) + + def __parquet(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False).from_parquet(self.TEST_PARQUET_FILE) + + def __feather(self) -> DataFlow.DataFrame: + return DataFlow().DataFrame(in_memory=False, file_type=FileType.feather).from_parquet(self.TEST_PARQUET_FILE) + + +if __name__ == "__main__": + unittest.main()