Skip to content

Commit

Permalink
Update to latest DM stack
Browse files Browse the repository at this point in the history
Fix issue in Message where str was returned instead of dict
Fix issue in ingestd where dict was requested instead of str
Update log messages to indicate why things might have been configured incorrectly
Remove version.py, since it shouldn't have ever been checked in
  • Loading branch information
srp3rd committed Mar 18, 2024
1 parent af7a2e4 commit 795ea25
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 30 deletions.
2 changes: 1 addition & 1 deletion docker/ingestd.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CTRL_INGESTD_VERSION=1.1
CTRL_INGESTD_GIT_VERSION=tickets/DM-39319
LSST_STACK_VERSION=7-stack-lsst_distrib-w_2023_38
LSST_STACK_VERSION=7-stack-lsst_distrib-w_2024_11
CONFLUENT_KAFKA_VERSION=2.2.0
9 changes: 4 additions & 5 deletions etc/ingestd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ rses:
rucio_prefix: root://xrd1:1094//rucio
fs_prefix: file:///rucio/disks/xrd1/rucio
XRD2:
rucio_prefix: root://xrd2:1095//rucio
rucio_prefix: root://xrd2:1095//rucio/test
fs_prefix: file:///rucio/disks/xrd2/rucio
XRD3:
rucio_prefix: root://xrd3:1096//rucio/test
rucio_prefix: root://xrd3:1096//rucio
fs_prefix: file:///rucio/disks/xrd3/rucio
XRD3:
rucio_prefix: root://xrd3:1097//rucio/test
XRD4:
rucio_prefix: root://xrd4:1097//rucio
fs_prefix: file:///rucio/disks/xrd4/rucio


20 changes: 15 additions & 5 deletions python/lsst/ctrl/ingestd/ingestd.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ class IngestD:

def __init__(self):
if CTRL_INGESTD_CONFIG in os.environ:
config_file = os.environ[CTRL_INGESTD_CONFIG]
self.config_file = os.environ[CTRL_INGESTD_CONFIG]
else:
raise FileNotFoundError("CTRL_INGESTD_CONFIG is not set")

config = Config(config_file)
config = Config(self.config_file)
rse_dict = config.get_rses()
group_id = config.get_group_id()
brokers = config.get_brokers()
Expand Down Expand Up @@ -86,17 +86,27 @@ def process(self):
# and put the into a list
entries = []
for msg in msgs:
message = Message(msg)
try:
message = Message(msg)
except Exception as e:
logging.info(msg.value())
logging.info(e)
continue
rubin_butler = message.get_rubin_butler()
sidecar = message.get_rubin_sidecar_dict()
sidecar = message.get_rubin_sidecar_str()
logging.debug(f"{message=} {rubin_butler=} {sidecar=}")

if rubin_butler is None:
logging.warning("shouldn't have gotten this message: %s" % message)
continue

# Rewrite the Rucio URL to actual file location
file_to_ingest = self.mapper.rewrite(message.get_dst_rse(), message.get_dst_url())
dst_url = message.get_dst_url()
file_to_ingest = self.mapper.rewrite(message.get_dst_rse(), dst_url)

if file_to_ingest == dst_url:
logging.warn(f"failed to map {file_to_ingest}; check {self.config_file} for incorrect mapping")
continue

# create an object that's ingestible by the butler
# and add it to the list
Expand Down
1 change: 0 additions & 1 deletion python/lsst/ctrl/ingestd/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,5 @@ def rewrite(self, rse: str, url: str) -> str:
mapping_dict = self._rse_dict[rse]
rucio_prefix = mapping_dict["rucio_prefix"]
fs_prefix = mapping_dict["fs_prefix"]
LOGGER.debug(f"{rse=} {url=} --- {rucio_prefix=} {fs_prefix=}")
ret = url.replace(rucio_prefix, fs_prefix)
return ret
9 changes: 4 additions & 5 deletions python/lsst/ctrl/ingestd/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,11 @@ def get_rubin_butler(self) -> int:

def get_rubin_sidecar_dict(self) -> dict:
"""Getter to retrieve the 'sidecar' metadata as a dict"""
d = self.payload.get(RUBIN_SIDECAR, None)
s = self.get_rubin_sidecar_str()
d = json.loads(s)
return d

def get_rubin_sidecar_str(self) -> str:
"""Getter to retrieve the 'sidecar' metadata as a dict"""
d = self.get_rubin_sidecar_dict()
if d is None:
return None
return json.dumps(d)
s = self.payload.get(RUBIN_SIDECAR, None)
return s
13 changes: 0 additions & 13 deletions python/lsst/ctrl/ingestd/version.py

This file was deleted.

0 comments on commit 795ea25

Please sign in to comment.