Skip to content

Commit

Permalink
Merge pull request #277 from openfisca/275-support-parquet-file-format
Browse files Browse the repository at this point in the history
Support parquet file format and add batch and filter_by options
  • Loading branch information
clallemand committed Mar 27, 2024
2 parents f78ef85 + 654e274 commit ceb5ad8
Show file tree
Hide file tree
Showing 19 changed files with 885 additions and 206 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,7 @@ docs/_build/
/tags
tags
.venv

# Tests files
*.parquet
test_*.json
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

# 2.1.0 [#277](https://github.com/openfisca/openfisca-survey-manager/pull/277)

* New features
- Support parquet file format :
- if a parquet file is provided to buildcollection it won't be converted in HDF5 but added to the collection as is.
- It is possible to provide a folder containing many files : each files will be used as a separate table.
- Run a simulation on a part of the input datasets (for the moment it works only for two entities simulations) :
- With a batch size option
- With a filter_by option

# 2.0.10 [#285](https://github.com/openfisca/openfisca-survey-manager/pull/285)

* Technical changes
Expand Down
25 changes: 25 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,31 @@ In case of breaking changes, you **must** give details about what features were

> You must also provide guidelines to help users adapt their code to be compatible with the new version of the package.
## Debug tests in VSCode

To launch the debugger in VSCode, you need to create a `launch.json` file in the `.vscode` folder at the root of the project. The content of the file should be the following:

```json
{
"version": "0.2.0",
"configurations": [
{
"name": "Python debug Pytest",
"type": "python",
"request": "launch",
"module": "pytest",
"args": ["${file}"],
"console": "integratedTerminal",
"env": {
"CI": "1",
}
}

]
}
```

If you have an error "ModuleNotFoundError" create your environment in `.venv` folder and install the requirements.

## Advertising changes

Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ uninstall:

clean:
rm -rf build dist
rm -f openfisca_survey_manager/tests/data_files/config.ini
rm -f openfisca_survey_manager/tests/data_files/test_parquet_collection.json
rm -rf openfisca_survey_manager/tests/data_files/test_multiple_parquet_collection
rm -rf openfisca_survey_manager/tests/data_files/test_parquet_collection
rm -rf openfisca_survey_manager/tests/data_files/test_random_generator.json
find . -name '*.pyc' -exec rm \{\} \;

deps:
Expand Down
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
This repository contains the Survey-Manager module, to work with OpenFisca and survey data.

It provides two main features:
* A Python API to access data in [Hierarchical Data Format](https://en.wikipedia.org/wiki/Hierarchical_Data_Format) (HDF).
* A script that transforms SAS, Stata, SPSS, and CSV data files to HDF data files, along with some metadata so they can be used by the Python API.
* A Python API to access data in [Hierarchical Data Format](https://en.wikipedia.org/wiki/Hierarchical_Data_Format) (HDF) or [Parquet](https://parquet.apache.org/).
* A script that transforms Parquet, SAS, Stata, SPSS, and CSV data files to HDF data files, along with some metadata so they can be used by the Python API. If the format is Parquet, it is kept as is.

> For France survey data, you might find useful information on the next steps in [openfisca-france-data](https://github.com/openfisca/openfisca-france-data) repository.
Expand All @@ -27,8 +27,8 @@ It provides two main features:
Ce dépôt contient le module Survey-Manager. Il facilite l'usage d'OpenFisca avec des données d'enquête.

Il fournit deux fonctionnalités principales:
* Une API Python permettant l'accès à des données au format [Hierarchical Data Format](https://fr.wikipedia.org/wiki/Hierarchical_Data_Format) (HDF).
* Un script qui tranforme les fichiers de données aux formats SAS, Stata, SPSS, and CSV data files en fichiers de données au format HDF, avec quelques metadonnées leur permettant d'être utilisés par l'API Python.
* Une API Python permettant l'accès à des données au format [Hierarchical Data Format](https://fr.wikipedia.org/wiki/Hierarchical_Data_Format) (HDF) ou [Parquet](https://parquet.apache.org/).
* Un script qui tranforme les fichiers de données aux formats SAS, Stata, SPSS, and CSV data files en fichiers de données au format HDF, avec quelques metadonnées leur permettant d'être utilisés par l'API Python. Si le format est Parquet, il est conservé tel quel.

> Si vous disposez de données d'enquête sur la France, le dépôt [openfisca-france-data](https://github.com/openfisca/openfisca-france-data) pourrait être utile à vos prochaines étapes de traitement.
Expand Down Expand Up @@ -230,6 +230,13 @@ build-collection -p /another/path -c housing_survey -s 2014 -d -m -v
It should work. If it doesn't, please do not hesitate to [open an issue](https://github.com/openfisca/openfisca-survey-manager/issues/new).

### Parquet files

Parquet files could be used as input files. They will not be converted to HDF5. As Parquet files can only contains one table, we add a `"parquet_file"` key to each table in a survey. This key contains the path to the Parquet file, or the folder containing many parquet files for the same table.

If using folder you have to name your files with the following pattern: `some_name_-<number>.parquet` and keep only the files for the same table in the same folder.

If a single file contains all the table, you can have many files for different tables in the same folder.

## Development

Expand Down
2 changes: 1 addition & 1 deletion openfisca_survey_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
default_config_files_directory = None


# Hack for uising with france-data on a CI or locally
# Hack for using with france-data on a CI or locally
try:
import openfisca_france_data
france_data_location = Path(openfisca_france_data.__file__).parent.parent
Expand Down
27 changes: 20 additions & 7 deletions openfisca_survey_manager/input_dataframe_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def randomly_init_variable(tax_benefit_system, input_dataframe_by_entity, variab


def set_table_in_survey(input_dataframe, entity, period, collection, survey_name, survey_label = None,
table_label = None, table_name = None, config_files_directory = default_config_files_directory):
table_label = None, table_name = None, config_files_directory = default_config_files_directory,
source_format = None, parquet_file = None):
period = periods.period(period)
if table_name is None:
table_name = entity + '_' + str(period)
Expand All @@ -199,6 +200,9 @@ def set_table_in_survey(input_dataframe, entity, period, collection, survey_name
name = collection,
config_files_directory = data_dir,
)
except FileNotFoundError as e:
log.warning(f"set_table_in_survey FileNotFoundError : {e}")
survey_collection = SurveyCollection(name = collection, config_files_directory=config_files_directory)

try:
survey = survey_collection.get_survey(survey_name)
Expand All @@ -210,17 +214,26 @@ def set_table_in_survey(input_dataframe, entity, period, collection, survey_name
survey_collection = survey_collection,
)

if survey.hdf5_file_path is None:
if survey.hdf5_file_path is None and survey.parquet_file_path is None:
config = survey.survey_collection.config
directory_path = config.get("data", "output_directory")
if not os.path.isdir(directory_path):
log.warning("{} who should be the HDF5 data directory does not exist: we create the directory".format(
log.warning("{} who should be the data directory does not exist: we create the directory".format(
directory_path))
os.makedirs(directory_path)
survey.hdf5_file_path = os.path.join(directory_path, survey.name + '.h5')

assert survey.hdf5_file_path is not None
survey.insert_table(label = table_label, name = table_name, dataframe = input_dataframe)
if source_format is None:
survey.hdf5_file_path = os.path.join(directory_path, survey.name + '.h5')
elif source_format == "parquet":
survey.parquet_file_path = os.path.join(directory_path, survey.name)
if not os.path.isdir(survey.parquet_file_path):
log.warning("{} who should be the parquet data directory does not exist: we create the directory".format(
survey.parquet_file_path))
os.makedirs(survey.parquet_file_path)

assert (survey.hdf5_file_path is not None) or (survey.parquet_file_path is not None)
if source_format == "parquet" and parquet_file is None:
parquet_file = os.path.join(directory_path, survey.name + '.parquet')
survey.insert_table(label = table_label, name = table_name, dataframe = input_dataframe, parquet_file = parquet_file)
# If a survey with save name exist it will be overwritten
survey_collection.surveys = [
kept_survey for kept_survey in survey_collection.surveys if kept_survey.name != survey_name
Expand Down
28 changes: 24 additions & 4 deletions openfisca_survey_manager/scripts/build_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pdb
import shutil
import sys

import re

from openfisca_survey_manager.survey_collections import SurveyCollection
from openfisca_survey_manager.surveys import Survey
Expand All @@ -22,13 +22,15 @@
log = logging.getLogger(app_name)


def add_survey_to_collection(survey_name = None, survey_collection = None, sas_files = None, stata_files = None, csv_files = None):
def add_survey_to_collection(survey_name = None, survey_collection = None, sas_files = None, stata_files = None, csv_files = None, parquet_files = None):
if sas_files is None:
sas_files = []
if stata_files is None:
stata_files = []
if csv_files is None:
csv_files = []
if parquet_files is None:
parquet_files = []

assert survey_collection is not None
overwrite = True
Expand All @@ -44,6 +46,7 @@ def add_survey_to_collection(survey_name = None, survey_collection = None, sas_f
csv_files = csv_files,
sas_files = sas_files,
stata_files = stata_files,
parquet_files = parquet_files,
survey_collection = survey_collection,
)
else:
Expand All @@ -53,6 +56,7 @@ def add_survey_to_collection(survey_name = None, survey_collection = None, sas_f
"csv_files": csv_files,
"sas_files": sas_files,
"stata_files": stata_files,
"parquet_files": parquet_files,
})
survey_collection.surveys = [
kept_survey for kept_survey in survey_collection.surveys if kept_survey.name != survey_name
Expand All @@ -65,6 +69,7 @@ def create_data_file_by_format(directory_path = None):
stata_files = []
sas_files = []
csv_files = []
parquet_files = []

for root, _subdirs, files in os.walk(directory_path):
for file_name in files:
Expand All @@ -78,7 +83,14 @@ def create_data_file_by_format(directory_path = None):
if os.path.basename(file_name).endswith(".sas7bdat"):
log.info("Found sas file {}".format(file_path))
sas_files.append(file_path)
return {'csv': csv_files, 'stata': stata_files, 'sas': sas_files}
if os.path.basename(file_name).endswith(".parquet"):
log.info("Found parquet file {}".format(file_path))
relative = file_name[file_name.find(directory_path):]
if ("/" in relative or "\\" in relative) and re.match(r".*-\d$", file_name):
# Keep only the folder name if parquet files are in subfolders and name contains "-<number>"
file_path = os.path.dirname(file_name)
parquet_files.append(file_path)
return {'csv': csv_files, 'stata': stata_files, 'sas': sas_files, 'parquet': parquet_files}


def build_survey_collection(
Expand Down Expand Up @@ -111,12 +123,14 @@ def build_survey_collection(

data_file_by_format = create_data_file_by_format(data_directory_path)
survey_name = '{}_{}'.format(collection_name, survey_suffix)
# Save the originals files list in the survey collection
add_survey_to_collection(
survey_name = survey_name,
survey_collection = survey_collection,
csv_files = data_file_by_format.get('csv'),
sas_files = data_file_by_format.get('sas'),
stata_files = data_file_by_format.get('stata'),
parquet_files = data_file_by_format.get('parquet'),
)

valid_source_format = [
Expand All @@ -134,12 +148,18 @@ def build_survey_collection(
os.mkdir(collections_directory)
collection_json_path = os.path.join(collections_directory, "{}.json".format(collection_name))
survey_collection.dump(json_file_path = collection_json_path)
surveys = [survey for survey in survey_collection.surveys if survey.name.endswith(str(survey_suffix))]
surveys = []
for survey in survey_collection.surveys:
if survey.name.endswith(str(survey_suffix)) and survey.name.startswith(collection_name):
surveys.append(survey)
survey_collection.fill_hdf(source_format = source_format, surveys = surveys, overwrite = replace_data)
return survey_collection


def check_template_config_files(config_files_directory: str):
"""
Create template config files if they do not exist.
"""
raw_data_ini_path = os.path.join(config_files_directory, 'raw_data.ini')
config_ini_path = os.path.join(config_files_directory, 'config.ini')
raw_data_template_ini_path = os.path.join(config_files_directory, 'raw_data_template.ini')
Expand Down

0 comments on commit ceb5ad8

Please sign in to comment.