Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[generator][python] Fixed race condition bugs; Prepared a bit for lau… #13991

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions tools/python/airmaps/dags/build_coastline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from airmaps.instruments import settings
from airmaps.instruments import storage
from airmaps.instruments.utils import get_latest_filename
from airmaps.instruments.utils import make_rm_build_task
from airmaps.instruments.utils import put_current_date_in_filename
from airmaps.instruments.utils import rm_build
from maps_generator.generator import stages_declaration as sd
Expand Down Expand Up @@ -40,9 +39,7 @@
COASTLINE_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/coasts"


def publish_coastline(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
env = Env(build_name=build_name)
def publish_coastline(env):
for name in (f"{WORLD_COASTS_NAME}.geom", f"{WORLD_COASTS_NAME}.rawgeom"):
coastline = put_current_date_in_filename(name)
latest = get_latest_filename(name)
Expand All @@ -58,7 +55,6 @@ def publish_coastline(**kwargs):
def build_coastline(**kwargs):
env = Env()
kwargs["ti"].xcom_push(key="build_name", value=env.build_name)

run_generation(
env,
(
Expand All @@ -68,26 +64,13 @@ def build_coastline(**kwargs):
),
)
env.finish()
publish_coastline(env)
rm_build(env)


BUILD_COASTLINE_TASK = PythonOperator(
task_id="Build_coastline_task",
provide_context=True,
python_callable=build_coastline,
on_failure_callback=lambda ctx: rm_build(**ctx),
dag=DAG,
)


PUBLISH_COASTLINE_TASK = PythonOperator(
task_id="Publish_coastline_task",
provide_context=True,
python_callable=publish_coastline,
dag=DAG,
)


RM_BUILD_TASK = make_rm_build_task(DAG)


BUILD_COASTLINE_TASK >> PUBLISH_COASTLINE_TASK >> RM_BUILD_TASK
99 changes: 75 additions & 24 deletions tools/python/airmaps/dags/build_maps.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import logging
import os
from datetime import timedelta

import filelock
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

from airmaps.instruments import settings
from airmaps.instruments import storage
from airmaps.instruments.utils import make_rm_build_task
from airmaps.instruments.utils import rm_build
from airmaps.instruments.utils import run_generation_from_first_stage
from maps_generator.generator import stages_declaration as sd
from maps_generator.generator.env import Env
from maps_generator.generator.env import PathProvider
from maps_generator.generator.env import get_all_countries_list
from maps_generator.generator.status import Status
from maps_generator.maps_generator import run_generation

logger = logging.getLogger("airmaps")


MAPS_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/maps"
MAPS_STORAGE_PATH = os.path.join(settings.STORAGE_PREFIX, "maps")


class MapsGenerationDAG(DAG):
Expand All @@ -39,23 +42,61 @@ def __init__(self, *args, **kwargs):
dag=self,
)

publish_maps_task = PythonOperator(
task_id="Publish_maps_task",
provide_context=True,
python_callable=MapsGenerationDAG.publish_maps,
dag=self,
)

rm_build_task = make_rm_build_task(self)

build_epilog_task >> publish_maps_task >> rm_build_task
for country in get_all_countries_list(PathProvider.borders_path()):
build_prolog_task >> self.make_mwm_operator(country) >> build_epilog_task

@staticmethod
def get_params(namespace="env", **kwargs):
return kwargs.get("params", {}).get(namespace, {})

@staticmethod
def publish_maps_build(env, subdir="temp_dir", **kwargs):
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir]
storage_path = os.path.join(MAPS_STORAGE_PATH, subdir)
storage.wd_publish(
env.paths.build_path, os.path.join(storage_path, env.build_name)
)

@staticmethod
def fetch_maps_build(env, subdir="temp_dir", **kwargs):
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir]
storage_path = os.path.join(MAPS_STORAGE_PATH, subdir)
storage.wd_fetch(
os.path.join(storage_path, env.build_name), env.paths.build_path
)

@staticmethod
def publish_map(env, country, subdir="temp_dir", **kwargs):
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)[subdir]
storage_path = os.path.join(MAPS_STORAGE_PATH, subdir)

ignore_paths = {
os.path.normpath(env.paths.draft_path),
os.path.normpath(env.paths.generation_borders_path),
}

def publish(path):
rel_path = path.replace(env.paths.build_path, "")[1:]
dest = os.path.join(storage_path, env.build_name, rel_path)
storage.wd_publish(path, dest)

def find_and_publish_files_for_country(path):
for root, dirs, files in os.walk(path):
if os.path.normpath(root) in ignore_paths:
continue

for dir in dirs:
if dir.startswith(country):
publish(os.path.join(root, dir))
else:
find_and_publish_files_for_country(os.path.join(root, dir))

for file in files:
if file.startswith(country):
publish(os.path.join(root, file))

find_and_publish_files_for_country(env.paths.build_path)

@staticmethod
def build_prolog(**kwargs):
params = MapsGenerationDAG.get_params(**kwargs)
Expand All @@ -71,14 +112,30 @@ def build_prolog(**kwargs):
sd.StageDownloadDescriptions(),
),
)
env.clean()
MapsGenerationDAG.publish_maps_build(env, **kwargs)
rm_build(env)

@staticmethod
def make_build_mwm_func(country):
def build_mwm(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
params = MapsGenerationDAG.get_params(**kwargs)
params.update({"build_name": build_name, "countries": [country,]})
env = Env(**params)

lock_file = os.path.join(
PathProvider.tmp_dir(), f"{build_name}-{__name__}-download.lock"
)
status_name = os.path.join(
PathProvider.tmp_dir(), f"{build_name}-{__name__}-download.status"
)
with filelock.FileLock(lock_file):
env = Env(**params)
s = Status(status_name)
if not s.is_finished():
MapsGenerationDAG.fetch_maps_build(env, **kwargs)
s.finish()

# We need to check existing of mwm.tmp. It is needed if we want to
# build mwms from part of planet.
tmp_mwm_name = env.get_tmp_mwm_names()
Expand All @@ -88,6 +145,7 @@ def build_mwm(**kwargs):
return

run_generation_from_first_stage(env, (sd.StageMwm(),), build_lock=False)
MapsGenerationDAG.publish_map(env, country, **kwargs)

return build_mwm

Expand All @@ -97,6 +155,7 @@ def build_epilog(**kwargs):
params = MapsGenerationDAG.get_params(**kwargs)
params.update({"build_name": build_name})
env = Env(**params)
MapsGenerationDAG.fetch_maps_build(env, **kwargs)
run_generation_from_first_stage(
env,
(
Expand All @@ -108,16 +167,8 @@ def build_epilog(**kwargs):
),
)
env.finish()

@staticmethod
def publish_maps(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
params = MapsGenerationDAG.get_params(**kwargs)
params.update({"build_name": build_name})
env = Env(**params)
subdir = MapsGenerationDAG.get_params(namespace="storage", **kwargs)["subdir"]
storage_path = f"{MAPS_STORAGE_PATH}/{subdir}"
storage.wd_publish(env.paths.mwm_path, f"{storage_path}/{env.mwm_version}/")
MapsGenerationDAG.publish_maps_build(env, subdir="mwm_dir", **kwargs)
rm_build(env)

def make_mwm_operator(self, country):
normalized_name = "__".join(country.lower().split())
Expand All @@ -129,7 +180,7 @@ def make_mwm_operator(self, country):
)


PARAMS = {"storage": {"subdir": "open_source"}}
PARAMS = {"storage": {"mwm_dir": "open_source", "temp_dir": "temp"}}
if settings.DEBUG:
PARAMS["env"] = {
# The planet file in debug mode does not contain Russia_Moscow territory.
Expand Down
28 changes: 6 additions & 22 deletions tools/python/airmaps/dags/update_planet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from datetime import timedelta

from airflow import DAG
Expand All @@ -7,7 +8,7 @@

from airmaps.instruments import settings
from airmaps.instruments import storage
from airmaps.instruments.utils import make_rm_build_task
from airmaps.instruments.utils import rm_build
from maps_generator.generator import stages_declaration as sd
from maps_generator.generator.env import Env
from maps_generator.maps_generator import run_generation
Expand All @@ -33,13 +34,14 @@
)


PLANET_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/planet_regular/planet-latest.o5m"
PLANET_STORAGE_PATH = os.path.join(
settings.STORAGE_PREFIX, "planet_regular", "planet-latest.o5m"
)


def update_planet(**kwargs):
env = Env()
kwargs["ti"].xcom_push(key="build_name", value=env.build_name)

if settings.DEBUG:
env.add_skipped_stage(sd.StageUpdatePlanet)

Expand All @@ -52,13 +54,9 @@ def update_planet(**kwargs):
),
)
env.finish()


def publish_planet(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
env = Env(build_name=build_name)
storage.wd_publish(env.paths.planet_o5m, PLANET_STORAGE_PATH)
storage.wd_publish(md5_ext(env.paths.planet_o5m), md5_ext(PLANET_STORAGE_PATH))
rm_build(env)


UPDATE_PLANET_TASK = PythonOperator(
Expand All @@ -67,17 +65,3 @@ def publish_planet(**kwargs):
python_callable=update_planet,
dag=DAG,
)


PUBLISH_PLANET_TASK = PythonOperator(
task_id="Publish_planet_task",
provide_context=True,
python_callable=publish_planet,
dag=DAG,
)


RM_BUILD_TASK = make_rm_build_task(DAG)


UPDATE_PLANET_TASK >> PUBLISH_PLANET_TASK >> RM_BUILD_TASK
26 changes: 21 additions & 5 deletions tools/python/airmaps/instruments/storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import os

import webdav.client as wc
from webdav3.client import Client
from webdav3.urn import Urn

from airmaps.instruments import settings

Expand All @@ -15,13 +17,27 @@

def wd_fetch(src, dst):
logger.info(f"Fetch form {src} to {dst} with options {WD_OPTIONS}.")
client = wc.Client(WD_OPTIONS)
client = Client(WD_OPTIONS)
client.download_sync(src, dst)


def wd_publish(src, dst):
logger.info(f"Publish form {src} to {dst} with options {WD_OPTIONS}.")
client = wc.Client(WD_OPTIONS)
tmp = f"{dst[:-1]}__/" if dst[-1] == "/" else f"{dst}__"
if os.path.isdir(src):
dst += Urn.separate

dst = Urn(dst)
tmp = f"{dst.path()}__"
if dst.is_dir():
tmp = f"{dst.path()[:-1]}__{Urn.separate}"

parent = dst.parent()
path = Urn.separate
client = Client(WD_OPTIONS)
for dir in str(parent).split(Urn.separate):
if not client.check(path):
client.mkdir(path)
path += f"{dir}{Urn.separate}"

client.upload_sync(local_path=src, remote_path=tmp)
client.move(remote_path_from=tmp, remote_path_to=dst)
client.move(remote_path_from=tmp, remote_path_to=dst.path(), overwrite=True)
19 changes: 5 additions & 14 deletions tools/python/airmaps/instruments/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import logging
import os
import shutil
from datetime import datetime
from typing import Iterable

from airflow.operators.python_operator import PythonOperator

from maps_generator.generator.env import Env
from maps_generator.generator.stages import Stage
from maps_generator.generator.stages import get_stage_name
from maps_generator.maps_generator import run_generation

logger = logging.getLogger("airmaps")


def put_current_date_in_filename(filename):
path, name = os.path.split(filename)
Expand All @@ -26,21 +27,11 @@ def get_latest_filename(filename, prefix=""):
return os.path.join(path, ".".join(parts))


def rm_build(**kwargs):
build_name = kwargs["ti"].xcom_pull(key="build_name")
env = Env(build_name=build_name)
def rm_build(env):
logger.info(f"Build {env.build_path} will be removed.")
shutil.rmtree(env.build_path)


def make_rm_build_task(dag):
return PythonOperator(
task_id="Rm_build_task",
provide_context=True,
python_callable=rm_build,
dag=dag,
)


def run_generation_from_first_stage(
env: Env, stages: Iterable[Stage], build_lock: bool = True
):
Expand Down
2 changes: 1 addition & 1 deletion tools/python/airmaps/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ omim-maps_generator
apache-airflow [postgres]==1.10.10
psycopg2-binary==2.8.4
cryptography==2.8
webdavclient==1.0.8
git+https://github.com/maksimandrianov/webdav-client-python-3.git@andrianov
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

нельзя ли это положить в github.com/mapsme ?

3 changes: 2 additions & 1 deletion tools/python/airmaps/requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
apache-airflow [postgres]==1.10.10
psycopg2-binary==2.8.4
cryptography==2.8
webdavclient==1.0.8
git+https://github.com/maksimandrianov/webdav-client-python-3.git@andrianov
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

нельзя ли это положить в github.com/mapsme ?


Loading