Skip to content

Commit

Permalink
Merge pull request #588 from openego/features/#553-motorized-individu…
Browse files Browse the repository at this point in the history
…al-travel

Features/#553 motorized individual travel
  • Loading branch information
nesnoj committed Jun 22, 2022
2 parents 7863805 + a5bbab0 commit ad64942
Show file tree
Hide file tree
Showing 7 changed files with 470 additions and 256 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ Added
`#699 <https://github.com/openego/eGon-data/issues/699>`_
* Introduce Sanity checks for eGon2035
`#382 <https://github.com/openego/eGon-data/issues/382>`_
* Add motorized individual travel
`#553 <https://github.com/openego/eGon-data/issues/553>`_

.. _PR #159: https://github.com/openego/eGon-data/pull/159
.. _PR #703: https://github.com/openego/eGon-data/pull/703
Expand Down Expand Up @@ -506,7 +508,7 @@ Bug Fixes
* Fix assignemnt of impedances (x) to etrago tables
`#710 <https://github.com/openego/eGon-data/issues/710>`_
* Fix country_code attribution of two gas buses
`#710 <https://github.com/openego/eGon-data/issues/744>`_
`#744 <https://github.com/openego/eGon-data/issues/744>`_
* Fix voronoi assignemnt for enclaves
`#734 <https://github.com/openego/eGon-data/issues/734>`_
* Set lengths of non-pipeline links to 0
Expand Down
5 changes: 3 additions & 2 deletions src/egon/data/datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1056,10 +1056,10 @@ emobility_mit:
skiprows: 8
trips:
eGon2035:
file: "eGon2035_RS7_min2k_2022-05-13_120240_simbev_run.tar.gz"
file: "eGon2035_RS7_min2k_2022-06-01_175429_simbev_run.tar.gz"
file_metadata: "metadata_simbev_run.json"
eGon100RE:
file: "eGon100RE_RS7_min2k_2022-05-13_120301_simbev_run.tar.gz"
file: "eGon100RE_RS7_min2k_2022-06-01_175444_simbev_run.tar.gz"
file_metadata: "metadata_simbev_run.json"
scenario:
variation:
Expand All @@ -1068,3 +1068,4 @@ emobility_mit:
model_timeseries:
reduce_memory: True
export_results_to_csv: True
parallel_tasks: 10
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
import os
import tarfile

# ========== Register np datatypes with SQLA ==========
from airflow.operators.python_operator import PythonOperator
from psycopg2.extensions import AsIs, register_adapter
import numpy as np
import pandas as pd
Expand All @@ -100,16 +100,19 @@
COLUMNS_KBA,
DATA_BUNDLE_DIR,
DATASET_CFG,
MVGD_MIN_COUNT,
TESTMODE_OFF,
TRIP_COLUMN_MAPPING,
WORKING_DIR,
)
from egon.data.datasets.emobility.motorized_individual_travel.model_timeseries import (
generate_model_data_eGon100RE,
generate_model_data_eGon2035,
generate_model_data_bunch,
generate_model_data_eGon100RE_remaining,
generate_model_data_eGon2035_remaining,
)


# ========== Register np datatypes with SQLA ==========
def adapt_numpy_float64(numpy_float64):
return AsIs(numpy_float64)

Expand Down Expand Up @@ -151,8 +154,7 @@ def create_tables():

# Create dir for results, if it does not exist
result_dir = WORKING_DIR / Path("results")
if not os.path.exists(result_dir):
os.mkdir(result_dir)
result_dir.mkdir(exist_ok=True, parents=True)


def download_and_preprocess():
Expand Down Expand Up @@ -356,9 +358,62 @@ def import_csv(f):

class MotorizedIndividualTravel(Dataset):
def __init__(self, dependencies):
def generate_model_data_tasks(scenario_name):
"""Dynamically generate tasks for model data creation.
The goal is to speed up the creation of model timeseries. However,
the exact number of parallel task cannot be determined during the
DAG building as the number of grid districts (MVGD) is calculated
within another pipeline task.
Approach: assuming an approx. count of `mvgd_min_count` of 3700,
the majority of the MVGDs can be parallelized. The remainder is
handled subsequently in XXX.
The number of parallel tasks is defined via parameter
`parallel_tasks` in the dataset config `datasets.yml`.
Parameters
----------
scenario_name : str
Scenario name
Returns
-------
set of functools.partial
The tasks. Each element is of
:func:`egon.data.datasets.emobility.motorized_individual_travel.model_timeseries.generate_model_data`
"""
parallel_tasks = DATASET_CFG["model_timeseries"].get(
"parallel_tasks", 1
)
mvgd_bunch_size = divmod(MVGD_MIN_COUNT, parallel_tasks)[0]

tasks = set()
for _ in range(parallel_tasks):
bunch = range(_ * mvgd_bunch_size, (_ + 1) * mvgd_bunch_size)
tasks.add(
PythonOperator(
task_id=(
f"generate_model_data_"
f"{scenario_name}_"
f"bunch{bunch[0]}-{bunch[-1]}"
),
python_callable=generate_model_data_bunch,
op_kwargs={
"scenario_name": scenario_name,
"bunch": bunch,
},
)
)

if scenario_name == "eGon2035":
tasks.add(generate_model_data_eGon2035_remaining)
elif scenario_name == "eGon100RE":
tasks.add(generate_model_data_eGon100RE_remaining)
return tasks

super().__init__(
name="MotorizedIndividualTravel",
version="0.0.0.dev",
version="0.0.1",
dependencies=dependencies,
tasks=(
create_tables,
Expand All @@ -367,6 +422,9 @@ def __init__(self, dependencies):
(extract_trip_file, write_evs_trips_to_db),
},
allocate_evs_to_grid_districts,
{generate_model_data_eGon2035, generate_model_data_eGon100RE},
{
*generate_model_data_tasks(scenario_name="eGon2035"),
*generate_model_data_tasks(scenario_name="eGon100RE"),
},
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"drive_end_timesteps": "drive_end",
"consumption_kWh": "consumption",
}
MVGD_MIN_COUNT = 3700 if TESTMODE_OFF else 150


def read_kba_data():
Expand Down

0 comments on commit ad64942

Please sign in to comment.