Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 46 additions & 11 deletions ohmg/conf/management/commands/configure-services.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ def handle(self, *args, **options):
out_dir = Path(options["destination"])
out_dir.mkdir(exist_ok=True)

cs_path = Path(out_dir, "celery.service")
self._write_file(self.generate_celery_service(out_dir), cs_path)
cs1_path = Path(out_dir, "celery_main.service")
self._write_file(self.generate_celery_main_service(out_dir), cs1_path)

cs2_path = Path(out_dir, "celery_mosaic.service")
self._write_file(self.generate_celery_mosaic_service(out_dir), cs2_path)

ui_path = Path(out_dir, "uwsgi.ini")
self._write_file(self.generate_uwsgi_ini(), ui_path)
Expand All @@ -43,21 +46,25 @@ def handle(self, *args, **options):
print(f"""services created. to deploy, run the following commands:

# initial deployment (first time only)
sudo ln -sf {cs_path.absolute()} /etc/systemd/system
sudo ln -sf {cs1_path.absolute()} /etc/systemd/system
sudo ln -sf {cs2_path.absolute()} /etc/systemd/system
sudo ln -sf {us_path.absolute()} /etc/systemd/system
sudo systemctl daemon-reload
sudo systemctl enable celery
sudo systemctl enable celery_main
sudo systemctl enable celery_mosaic
sudo systemctl enable uwsgi
sudo systemctl start celery
sudo systemctl start celery_main
sudo systemctl start celery_mosaic
sudo systemctl start uwsgi

# reload services
# reload services after changes
sudo systemctl daemon-reload
sudo systemctl restart celery
sudo systemctl restart celery_main
sudo systemctl restart celery_mosaic
sudo systemctl restart uwsgi
""")

output_files = [cs_path, ui_path, us_path]
output_files = [cs1_path, cs2_path, ui_path, us_path]

if self.verbose:
print(f"~~~\noutput directory: {out_dir.absolute()}")
Expand Down Expand Up @@ -167,7 +174,7 @@ def generate_uwsgi_service(self, ini_file_path: Path):
"""
return file_content

def generate_celery_service(self, state_path: Path):
def generate_celery_main_service(self, state_path: Path):
log_dir = self._resolve_var("LOG_DIR", settings.LOG_DIR)

file_content = f"""[Unit]
Expand All @@ -179,15 +186,43 @@ def generate_celery_service(self, state_path: Path):
EnvironmentFile={settings.BASE_DIR}/.env
ExecStart={self.python_env}/celery \\
-A ohmg.conf.celery:app worker \\
-Q main,background \\
--without-gossip --without-mingle \\
-Ofair -B -E \\
--statedb={str(state_path.resolve())}/worker.state \\
--statedb={str(state_path.resolve())}/celery_main_worker.state \\
--schedule-filename={str(state_path.resolve())}/celerybeat-schedule \\
--loglevel=INFO \\
--logfile={log_dir}/celery.log \\
--logfile={log_dir}/celery_main.log \\
--concurrency=10 -n worker1@%h
Restart=always

[Install]
WantedBy=multi-user.target
"""

return file_content

def generate_celery_mosaic_service(self, state_path: Path):
log_dir = self._resolve_var("LOG_DIR", settings.LOG_DIR)

file_content = f"""[Unit]
Description=Celery
After=rabbitmq-server.service
Requires=rabbitmq-server.service

[Service]
EnvironmentFile={settings.BASE_DIR}/.env
ExecStart={self.python_env}/celery \\
-A ohmg.conf.celery:app worker \\
-Q mosaic \\
--without-gossip --without-mingle \\
-Ofair -B -E \\
--statedb={str(state_path.resolve())}/celery_mosaic_worker.state \\
--loglevel=INFO \\
--logfile={log_dir}/celery_mosaic.log \\
--concurrency=1 -n worker2@%h
Restart=always

[Install]
WantedBy=multi-user.target
"""
Expand Down
29 changes: 15 additions & 14 deletions ohmg/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,24 +307,26 @@

# basic independent setup for Celery Exchange/Queue
DEFAULT_EXCHANGE = Exchange("default", type="topic")
# CELERY_TASK_QUEUES += (

CELERY_TASK_QUEUES = (
Queue("split", DEFAULT_EXCHANGE, routing_key="split", priority=0),
Queue("georeference", DEFAULT_EXCHANGE, routing_key="georeference", priority=0),
Queue("map", DEFAULT_EXCHANGE, routing_key="map", priority=0),
Queue("mosaic", DEFAULT_EXCHANGE, routing_key="mosaic", priority=0),
Queue("housekeeping", DEFAULT_EXCHANGE, routing_key="housekeeping", priority=0),
## these queues run on the same worker
Queue("main", DEFAULT_EXCHANGE, routing_key="main"),
Queue("background", DEFAULT_EXCHANGE, routing_key="background"),
## this queue runs on its own worker
Queue("mosaic", DEFAULT_EXCHANGE, routing_key="mosaic"),
)

CELERY_TASK_ROUTES = {
"ohmg.georeference.tasks.run_preparation_session": {"queue": "split"},
"ohmg.georeference.tasks.bulk_run_preparation_sessions": {"queue": "split"},
"ohmg.georeference.tasks.run_georeference_session": {"queue": "georeference"},
"ohmg.georeference.tasks.delete_stale_sessions": {"queue": "housekeeping"},
"ohmg.georeference.tasks.delete_preview_vrts": {"queue": "housekeeping"},
"ohmg.georeference.tasks.run_preparation_session": {"queue": "main"},
"ohmg.georeference.tasks.bulk_run_preparation_sessions": {"queue": "main"},
"ohmg.georeference.tasks.run_georeference_session": {"queue": "main"},
"ohmg.core.tasks.load_map_documents_as_task": {"queue": "main"},
"ohmg.core.tasks.load_document_file_as_task": {"queue": "main"},
"ohmg.georeference.tasks.delete_stale_sessions": {"queue": "background"},
"ohmg.georeference.tasks.delete_preview_vrts": {"queue": "background"},
"ohmg.georeference.tasks.cleanup_existing_tileset": {"queue": "background"},
"ohmg.georeference.tasks.create_mosaic_cog": {"queue": "mosaic"},
"ohmg.core.tasks.load_map_documents_as_task": {"queue": "map"},
"ohmg.core.tasks.load_document_file_as_task": {"queue": "map"},
"ohmg.georeference.tasks.create_mosaic_tileset": {"queue": "mosaic"},
}

# empty celery beat schedule of default GeoNode jobs
Expand Down Expand Up @@ -449,7 +451,6 @@
if DEBUG:
celery_log_level = "DEBUG"
LOGGING["loggers"]["ohmg"]["handlers"].append("console")
LOGGING["loggers"]["ohmg.georeference"]["handlers"].append("console")
else:
celery_log_level = "INFO"

Expand Down
1 change: 1 addition & 0 deletions ohmg/core/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class LayerSetAdmin(admin.ModelAdmin):
"layer_display_list",
"extent",
"multimask_extent",
"xyz_tiles_url",
)
search_fields = ("map__title",)
list_filter = ("category",)
Expand Down
18 changes: 18 additions & 0 deletions ohmg/core/migrations/0012_layerset_xyz_tiles_prefix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.27 on 2026-05-13 10:26

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0011_map_main_layer_ct'),
]

operations = [
migrations.AddField(
model_name='layerset',
name='xyz_tiles_prefix',
field=models.CharField(blank=True, max_length=200, null=True),
),
]
16 changes: 16 additions & 0 deletions ohmg/core/models/layerset.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class Meta:
null=True,
blank=True,
)
xyz_tiles_prefix = models.CharField(
max_length=200,
blank=True,
null=True,
)
tilejson = models.JSONField(null=True, blank=True)

def __str__(self):
Expand All @@ -81,6 +86,17 @@ def layer_display_list(self):
def get_layers(self) -> Iterable["Layer"]:
return self.layer_set.all()

@property
def xyz_tiles_url(self):
if self.xyz_tiles_prefix:
if settings.ENABLE_S3_STORAGE:
base_url = f"{settings.AWS_S3_ENDPOINT_URL}/{settings.AWS_STORAGE_BUCKET_NAME}"
else:
base_url = f"{settings.SITEURL.rstrip('/')}{settings.MEDIA_URL}"
return f"{base_url.rstrip('/')}/{self.xyz_tiles_prefix}"
else:
return None

@cached_property
def centroid(self):
return Polygon.from_bbox(self.extent).centroid
Expand Down
12 changes: 12 additions & 0 deletions ohmg/core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ def random_alnum(size=6):
return code


def get_boto3_s3_client():
import boto3

return boto3.client(
"s3",
region_name=settings.AWS_S3_REGION_NAME,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
endpoint_url=settings.AWS_S3_ENDPOINT_URL,
)


MONTH_CHOICES = [
(1, "JAN."),
(2, "FEB."),
Expand Down
17 changes: 15 additions & 2 deletions ohmg/georeference/management/commands/mosaic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ohmg.core.models import LayerSet
from ohmg.georeference.mosaicker import Mosaicker
from ohmg.georeference.tasks import create_mosaic_cog
from ohmg.georeference.tasks import create_mosaic_cog, create_mosaic_tileset


class Command(BaseCommand):
Expand All @@ -15,6 +15,7 @@ def add_arguments(self, parser):
"operation",
choices=[
"generate-cog",
"generate-tiles",
],
help="the operation to perform",
)
Expand All @@ -37,6 +38,10 @@ def add_arguments(self, parser):
"--background",
action="store_true",
)
parser.add_argument(
"--multiprocessing",
action="store_true",
)

def handle(self, *args, **options):
options = Namespace(**options)
Expand All @@ -48,10 +53,18 @@ def handle(self, *args, **options):
map__identifier=options.mapid, category__slug=options.category
)

m = Mosaicker()

if options.operation == "generate-tiles":
if options.background:
create_mosaic_tileset.delay(ls.pk)
else:
m.generate_xyz_tiles(ls, use_multiprocessing=options.multiprocessing)
m.cleanup_files()

if options.operation == "generate-cog":
if options.background:
create_mosaic_cog.delay(ls.pk)
else:
m = Mosaicker()
m.generate_cog(ls)
m.cleanup_files()
8 changes: 5 additions & 3 deletions ohmg/georeference/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ def run(self):
## regardless of whether there was an old layer or not, overwrite
## the file with the newly georeferenced tif.
session_ct = GeorefSession.objects.filter(reg2=self.reg2).exclude(pk=self.pk).count()
file_name = f"{layer.slug}__{random_alnum(6)}_{str(session_ct).zfill(2)}.tif"
file_name = f"{layer.slug}__{random_alnum()}_{str(session_ct).zfill(2)}.tif"

with open(g.cog, "rb") as openf:
layer.file.save(file_name, File(openf))
Expand Down Expand Up @@ -793,8 +793,10 @@ class Meta:

def __str__(self):
return (
f"{self.session} --> {self.target._meta.object_name} ({self.target} {self.target_id})"
) if self.target else f"{self.session} --> (no target)"
(f"{self.session} --> {self.target._meta.object_name} ({self.target} {self.target_id})")
if self.target
else f"{self.session} --> (no target)"
)

def extend(self):
self.expiration += timedelta(seconds=settings.GEOREFERENCE_SESSION_LENGTH)
Expand Down
46 changes: 41 additions & 5 deletions ohmg/georeference/mosaicker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from ohmg.core.utils import random_alnum

from .georeferencer import Georeferencer, VRTHandler
from .tasks import cleanup_existing_tileset
from .utils import make_xyz_tiles, make_xyz_tiles_with_multiprocessing

gdal.SetConfigOption("GDAL_NUM_THREADS", "ALL_CPUS")
gdal.SetConfigOption("GDAL_TIFF_INTERNAL_MASK", "YES")
Expand Down Expand Up @@ -62,9 +64,11 @@ def generate_mosaic_vrt(self, layerset) -> VRTHandler:
print(layer_name)
try:
layer = Layer.objects.get(slug=layer_name, region__document__map=layerset.map)
except Layer.MultipleObjectsReturned as e:
print("this layer slug matched multiple layers in this map: cancelling mosaic process")
except Exception as e:
except Layer.MultipleObjectsReturned:
print(
"this layer slug matched multiple layers in this map: cancelling mosaic process"
)
except Exception as e:
raise e

if not layer.file:
Expand Down Expand Up @@ -124,7 +128,7 @@ def generate_cog(self, layerset: LayerSet):

existing_file_name = layerset.mosaic_geotiff.name if layerset.mosaic_geotiff else None

file_name = f"{layerset.map.identifier}-{layerset.category.slug}__{datetime.now().strftime('%Y-%m-%d')}__{random_alnum(6)}.tif"
file_name = f"{layerset.map.identifier}-{layerset.category.slug}__{datetime.now().strftime('%Y-%m-%d')}__{random_alnum()}.tif"

with open(self.cog, "rb") as f:
layerset.mosaic_geotiff.save(file_name, File(f))
Expand All @@ -137,6 +141,38 @@ def generate_cog(self, layerset: LayerSet):

print(f"completed - elapsed time: {datetime.now() - start}")

def generate_xyz_tiles(
self,
layerset: LayerSet,
min_zoom: int = 13,
max_zoom: int = 20,
use_multiprocessing: bool = False,
):
if layerset.mosaic_geotiff:
in_path = f"/vsicurl/{layerset.mosaic_cog_url}"
else:
self.generate_mosaic_vrt(layerset)
in_path = self.mosaic_vrt.get_path()

prefix = f"tiles/{layerset.map.identifier}/{layerset.category.slug}/{random_alnum()}"
logger.info(f"creating new tileset {prefix}")
logger.info(f"source dataset: {in_path}")

if use_multiprocessing:
make_xyz_tiles_with_multiprocessing(
in_path, prefix, min_zoom=min_zoom, max_zoom=max_zoom
)
else:
make_xyz_tiles(in_path, prefix, min_zoom=min_zoom, max_zoom=max_zoom)

existing_tileset_prefix = layerset.xyz_tiles_prefix
layerset.xyz_tiles_prefix = prefix
layerset.save()

## clean up existing tileset
if existing_tileset_prefix:
cleanup_existing_tileset.delay(existing_tileset_prefix)

def generate_mosaic_json(self, layerset, trim_all=False):
"""DEPRECATED: Currently, MosaicJSON is not used anywhere in the app."""
from cogeo_mosaic.backends import MosaicBackend
Expand Down Expand Up @@ -188,7 +224,7 @@ def read_trim_feature_cache(file_path):
cached_feature = None
write_trim_feature_cache(feature, feat_cache_path)

unique_id = random_alnum(6)
unique_id = random_alnum()
trim_vrt_path = in_path.replace(".tif", f"_{unique_id}_trim.vrt")
out_path = trim_vrt_path.replace(".vrt", ".tif")

Expand Down
Loading