## ingest
Trying to figure out how to run butler.ingest() - ticket https://jira.lsstcorp.org/browse/DM-41032https://jira.lsstcorp.org/browse/DM-41032 - 

We need to ingest the data manually for the raws and they are going to a different destination, which we'll need to look up the URI to.

They are going here: /sdf/data/rubin/lsstdata/offline/instrument/

We'll then need to grab the remaining path to be:
butler.getURI(dataset_ref)

The transfer should be able to be performed using lsst.resources.ResourcePath.transfer_from. After that, the ingestion should then be handled using butler.ingest() using the existing DatasetRef from /repo/embargo and the new (absolute) destination path, using transfer mode direct.

In [1]:
import os

In [2]:
import astropy.time
import lsst
import lsst.daf.butler
from lsst.daf.butler import Butler, Timespan

In [3]:
from lsst.daf.butler import CollectionType

In [4]:
butler = Butler('/repo/embargo')
registry = butler.registry

In [6]:
dest_butler = Butler('/home/r/rnevin/scratch', writeable=True)
dest_registry = dest_butler.registry

In [7]:
datasettype = 'raw'
collections = 'LATISS/raw/all'

In [8]:
now = astropy.time.Time.now().tai
embargo_hours = 80.0
embargo_period = astropy.time.TimeDelta(
        embargo_hours * 3600.0, format="sec"
    )
timespan_embargo = Timespan(now - embargo_period, None)
# timespan_embargo = Timespan(now - embargo_period, now)

In [9]:
for dt in sorted(registry.queryDatasetTypes(datasettype)):
    print(dt)

DatasetType('raw', {band, instrument, detector, physical_filter, exposure}, Exposure)


In [10]:
# for i, dt in enumerate(registry.queryDimensionRecords('exposure',datasets=datasettype, collections=collections,
#                                              )):
#     print(dt)
#     if i > 1:
#         break

In [10]:
dataId = {"instrument": "LATISS"}
outside_embargo = [
        dt.id
        for dt in registry.queryDimensionRecords(
            "exposure",
            dataId=dataId,
            datasets=datasettype,
            collections=collections,
            where="NOT exposure.timespan OVERLAPS\
                                                    timespan_embargo",
            bind={"timespan_embargo": timespan_embargo},
        )
    ][0:2]
print(outside_embargo)

[2022083100004, 2022083100005]


In [11]:
# Query the DataIds after embargo period
datasetRefs = registry.queryDatasets(
    datasettype,
    dataId=dataId,
    collections=collections,
    where="exposure.id IN (exposure_ids)",
    bind={"exposure_ids": outside_embargo},
)#.expanded()

In [12]:
for ref in datasetRefs:
    print(ref)

raw@{instrument: 'LATISS', detector: 0, exposure: 2022083100004, ...} [sc=Exposure] (run=LATISS/raw/all id=cfd59ff4-4991-5093-b499-b3aff2d2089c)
raw@{instrument: 'LATISS', detector: 0, exposure: 2022083100005, ...} [sc=Exposure] (run=LATISS/raw/all id=d8975b39-6873-53e0-aa7f-d8781ceaaec7)


In [17]:
dest_prefix = '/home/r/rnevin/dest_uri' 
dest_uri = lsst.resources.ResourcePath(dest_prefix)

In [18]:
dest_uri

ResourcePath("file:///home/r/rnevin/dest_uri/")

In [19]:
source_uri = butler.get_many_uris(datasetRefs)
#source_uri = butler.getURI(datasetRefs)
print(source_uri)

{DatasetRef(DatasetType('raw', {band, instrument, detector, physical_filter, exposure}, Exposure), {instrument: 'LATISS', detector: 0, exposure: 2022083100004, ...}, run='LATISS/raw/all', id=cfd59ff4-4991-5093-b499-b3aff2d2089c): DatasetRefURIs(ResourcePath("s3://rubin-summit/LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits"), {}), DatasetRef(DatasetType('raw', {band, instrument, detector, physical_filter, exposure}, Exposure), {instrument: 'LATISS', detector: 0, exposure: 2022083100005, ...}, run='LATISS/raw/all', id=d8975b39-6873-53e0-aa7f-d8781ceaaec7): DatasetRefURIs(ResourcePath("s3://rubin-summit/LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits"), {})}


In [27]:
filedataset_list = []
for (key, value) in source_uri.items():
    # print(key)
    # print(value.count)
    # print(value.index)
    source_path_uri = value[0]
    print(source_path_uri)
    print(source_path_uri.exists())
    # source_path = value[0].path.strip('/')
    source_path = source_path_uri.relative_to(value[0].root_uri())
    print(source_path)
    print(dest_uri)
    new_dest_uri = dest_uri.join(source_path)
    print(new_dest_uri)
    
    # transfer 
    new_dest_uri.transfer_from(source_path_uri, transfer='copy')
    
    # make filedatasets for ingest
    filedataset_list.append(lsst.daf.butler.FileDataset(new_dest_uri, key))
    

s3://rubin-summit/LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits
True
LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits
file:///home/r/rnevin/dest_uri/
file:///home/r/rnevin/dest_uri/LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits
s3://rubin-summit/LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits
True
LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits
file:///home/r/rnevin/dest_uri/
file:///home/r/rnevin/dest_uri/LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits


In [28]:
filedataset_list

[FileDataset(refs=[DatasetRef(DatasetType('raw', {band, instrument, detector, physical_filter, exposure}, Exposure), {instrument: 'LATISS', detector: 0, exposure: 2022083100004, ...}, run='LATISS/raw/all', id=cfd59ff4-4991-5093-b499-b3aff2d2089c)], path=ResourcePath("file:///home/r/rnevin/dest_uri/LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits"), formatter=None),
 FileDataset(refs=[DatasetRef(DatasetType('raw', {band, instrument, detector, physical_filter, exposure}, Exposure), {instrument: 'LATISS', detector: 0, exposure: 2022083100005, ...}, run='LATISS/raw/all', id=d8975b39-6873-53e0-aa7f-d8781ceaaec7)], path=ResourcePath("file:///home/r/rnevin/dest_uri/LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits"), formatter=None)]

In [29]:
# register datatype to the destination
frombutler_datasettype = registry.getDatasetType('raw')
dest_registry.registerDatasetType(frombutler_datasettype)

False

In [23]:
collection_chain = registry.getCollectionSummary(collections)

In [24]:
collection_chain

CollectionSummary(dataset_types=NamedValueSet({DatasetType('raw', {band, instrument, detector, physical_filter, exposure}, Exposure)}), governors={'instrument': {'LATISS'}})

In [25]:
dest_registry.registerCollection('LATISS/raw/all', CollectionType(1))

True

In [None]:
# before we can run ingest, we need to register everything:
# this code is taken from: https://github.com/lsst/daf_butler/blob/1e3d68c2a155215c755c62404ea5fdd1de110740/python/lsst/daf/butler/direct_butler.py#L1920-L1949

# Check to see if the dataset type in the source butler has
# the same definition in the target butler and register missing
# ones if requested. Registration must happen outside a transaction.
newly_registered_dataset_types = set()
for datasetType in ["raw"]:
    if register_dataset_types:
        # Let this raise immediately if inconsistent. Continuing
        # on to find additional inconsistent dataset types
        # might result in additional unwanted dataset types being
        # registered.
        if self._registry.registerDatasetType(datasetType):
            newly_registered_dataset_types.add(datasetType)
    else:
        # If the dataset type is missing, let it fail immediately.
        target_dataset_type = self.get_dataset_type(datasetType.name)
        if target_dataset_type != datasetType:
            raise ConflictingDefinitionError(
                "Source butler dataset type differs from definition"
                f" in target butler: {datasetType} !="
                f" {target_dataset_type}"
            )
if newly_registered_dataset_types:
    # We may have registered some even if there were inconsistencies
    # but should let people know (or else remove them again).
    _LOG.verbose(
        "Registered the following dataset types in the target Butler: %s",
        ", ".join(d.name for d in newly_registered_dataset_types),
    )
else:
    _LOG.verbose("All required dataset types are known to the target Butler")

In [None]:
# and this is from https://github.com/lsst/daf_butler/blob/1e3d68c2a155215c755c62404ea5fdd1de110740/python/lsst/daf/butler/direct_butler.py#L1951-L1975
dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
if transfer_dimensions:
    # Collect all the dimension records for these refs.
    # All dimensions are to be copied but the list of valid dimensions
    # come from this butler's universe.
    elements = frozenset(
        element
        for element in self.dimensions.getStaticElements()
        if element.hasTable() and element.viewOf is None
    )
    dataIds = {ref.dataId for ref in source_refs}
    # This logic comes from saveDataIds.
    for dataId in dataIds:
        # Need an expanded record, if not expanded that we need a full
        # butler with registry (allow mocks with registry too).
        if not dataId.hasRecords():
            if registry := getattr(source_butler, "registry", None):
                dataId = registry.expandDataId(dataId)
            else:
                raise TypeError("Input butler needs to be a full butler to expand DataId.")
        # If this butler doesn't know about a dimension in the source
        # butler things will break later.
        for record in dataId.records.values():
            if record is not None and record.definition in elements:
                dimension_records[record.definition].setdefault(record.dataId, record)


frozenset({DatabaseDimension(exposure), DatabaseDimension(subfilter), DatabaseDimension(patch), DatabaseDimensionCombination(visit_system_membership), DatabaseDimension(detector), DatabaseDimension(tract), GovernorDimension(instrument), DatabaseDimension(physical_filter), DatabaseDimensionCombination(visit_detector_region), GovernorDimension(skymap), DatabaseDimension(visit), DatabaseDimensionCombination(visit_definition), DatabaseDimension(visit_system)})


In [41]:
# re-defining stuff to align with the butler code:
source_refs = datasetRefs
source_butler = butler

from collections import Counter, defaultdict
from lsst.daf.butler import (
    DataCoordinate,
    DataId,
    DataIdValue,
    Dimension,
    DimensionElement,
    DimensionRecord,
    DimensionUniverse,
)

dimension_records: dict[DimensionElement, dict[DataCoordinate, DimensionRecord]] = defaultdict(dict)
print(dimension_records)

elements = frozenset(
                element
                for element in dest_butler.dimensions.getStaticElements()
                if element.hasTable() and element.viewOf is None
            )
print(elements)

dataIds = {ref.dataId for ref in source_refs}
print('dataIds', dataIds)

# This logic comes from saveDataIds.
for dataId in dataIds:
    # Need an expanded record, if not expanded that we need a full
    # butler with registry (allow mocks with registry too).
    if not dataId.hasRecords():
        if registry := getattr(source_butler, "registry", None):
            dataId = registry.expandDataId(dataId)
        else:
            raise TypeError("Input butler needs to be a full butler to expand DataId.")
    # If this butler doesn't know about a dimension in the source
    # butler things will break later.
    for record in dataId.records.values():
        if record is not None and record.definition in elements:
            dimension_records[record.definition].setdefault(record.dataId, record)


defaultdict(<class 'dict'>, {})
frozenset({DatabaseDimension(exposure), DatabaseDimension(subfilter), DatabaseDimension(patch), DatabaseDimensionCombination(visit_system_membership), DatabaseDimension(detector), DatabaseDimension(tract), GovernorDimension(instrument), DatabaseDimension(physical_filter), DatabaseDimensionCombination(visit_detector_region), GovernorDimension(skymap), DatabaseDimension(visit), DatabaseDimensionCombination(visit_definition), DatabaseDimension(visit_system)})
dataIds {{instrument: 'LATISS', detector: 0, exposure: 2022083100005, ...}, {instrument: 'LATISS', detector: 0, exposure: 2022083100004, ...}}


In [43]:
print(dimension_records)

defaultdict(<class 'dict'>, {GovernorDimension(instrument): {{instrument: 'LATISS'}: instrument.RecordClass(name='LATISS', visit_max=6050123199999, visit_system=2, exposure_max=6050123199999, detector_max=1, class_name='lsst.obs.lsst.Latiss')}, DatabaseDimension(detector): {{instrument: 'LATISS', detector: 0}: detector.RecordClass(instrument='LATISS', id=0, full_name='RXX_S00', name_in_raft='RXX_S00', raft=None, purpose='SCIENCE')}, DatabaseDimension(physical_filter): {{instrument: 'LATISS', physical_filter: 'unknown~unknown'}: physical_filter.RecordClass(instrument='LATISS', name='unknown~unknown', band='unknown')}, DatabaseDimension(exposure): {{instrument: 'LATISS', exposure: 2022083100005}: exposure.RecordClass(instrument='LATISS', id=2022083100005, physical_filter='unknown~unknown', obs_id='AT_O_20220831_000005', exposure_time=0.0, dark_time=0.011363, observation_type='bias', observation_reason='bias', day_obs=20220831, seq_num=5, seq_start=5, seq_end=5, group_name='2022083100005'

In [42]:
# ingest to the destination butler
dest_butler.ingest(*filedataset_list, transfer = 'direct')

DataIdValueError: Could not fetch record for required dimension instrument via keys {'instrument': 'LATISS', 'detector': 0, 'exposure': 2022083100004}.

In [30]:
# test dimensionrecord
print(dest_registry)

results = dest_registry.queryDimensionRecords( 'exposure',
                                                 collections='LATISS/raw/all',
                                                 datasets='raw')
results = list( set(results) )
print('results', results)

<lsst.daf.butler._registry_shim.RegistryShim object at 0x7f9931f2ac50>
results []


In [None]:
#https://pipelines.lsst.io/v/daily/py-api/lsst.daf.butler.Registry.html#lsst.daf.butler.Registry.queryDimensionRecords

In [31]:
dest_registry.syncDimensionData()


TypeError: RegistryShim.syncDimensionData() missing 2 required positional arguments: 'element' and 'row'

Use datasetRefs to look up the URI

In [None]:

dest_uri.join(LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits)
# dest_prefix = '/sdf/data/rubin/lsstdata/offline/instrument/'
# dest_uri = dest_prefix + 'LATISS'
# dest = Butler(dest_prefix, writeable=True)

#dest_registry = dest.registry

In [98]:
#for i, (key, value) in enumerate(my_dict.items()):
for (key, value) in source_uri.items():
    print('key', key,'value', value)
    print(value.as_local())
    
STOP
print(source_uri.keys())
print(source_uri[0].exists())

key raw@{instrument: 'LATISS', detector: 0, exposure: 2022083100004, ...} [sc=Exposure] (run=LATISS/raw/all id=cfd59ff4-4991-5093-b499-b3aff2d2089c) value DatasetRefURIs(ResourcePath("s3://rubin-summit/LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits"), {})


AttributeError: 'DatasetRefURIs' object has no attribute 'as_local'

The below cell is failing because transfer_from is expecting a dict object from source_uri that has an as_local() element. When we obtain the source_uri list, there is an empty element for the component URI, but I'm not sure if this is the root of the issue.


In [103]:

# we need dest_uri to not be a string
# and to actually be 
dest_uri.transfer_from(source_uri, transfer='copy')



AttributeError: 'dict' object has no attribute 'as_local'

In [125]:
# next do ingest
# The ingestion should then be handled using butler.ingest() using the existing DatasetRef from /repo/embargo and the new (absolute) destination path, using transfer mode direct.
# https://pipelines.lsst.io/py-api/lsst.daf.butler.Butler.html#lsst.daf.butler.Butler.ingest
dest.ingest(datasetRefs)#, mode = 'direct')

AttributeError: 'ParentDatasetQueryResults' object has no attribute 'refs'

In [65]:
uri_list = []
for i,ref in enumerate(datasetRefs):
    #print(ref.dataId.full)
    uri = butler.getURI(ref)
    print(type(uri), uri)
    uri_list.append(uri)
print(uri_list)
    

<class 'lsst.resources.s3.S3ResourcePath'> s3://rubin-summit/LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits
<class 'lsst.resources.s3.S3ResourcePath'> s3://rubin-summit/LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits
[ResourcePath("s3://rubin-summit/LATISS/20220831/AT_O_20220831_000004/AT_O_20220831_000004_R00_S00.fits"), ResourcePath("s3://rubin-summit/LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits")]


In [47]:
dest_prefix = '/home/r/rnevin/scratch'
dest = Butler(dest_prefix, writeable=True)
dest_registry = dest.registry

In [68]:
import lsst
#uri = "s3://rubin-summit/LATISS/20220831/AT_O_20220831_000005/AT_O_20220831_000005_R00_S00.fits"
uri_src = lsst.resources.ResourcePath.transfer_from('/repo/embargo/' + uri_list, transfer='copy')
print(uri_src)

# next do ingest
# The ingestion should then be handled using butler.ingest() using the existing DatasetRef from /repo/embargo and the new (absolute) destination path, using transfer mode direct.
# https://pipelines.lsst.io/py-api/lsst.daf.butler.Butler.html#lsst.daf.butler.Butler.ingest
dest.ingest(uri_src, mode = 'direct')

TypeError: can only concatenate str (not "list") to str

In [None]:
#butler = Butler('/repo/embargo')
#registry = butler.registry

dest_prefix = '/sdf/data/rubin/lsstdata/offline/instrument/'

dest = Butler(dest_prefix + uri, writeable=True)
dest_registry = dest.registry

lsst.resources.ResourcePath.transfer_from(src, transfer)

In [None]:
dest_prefix = '/sdf/data/rubin/lsstdata/offline/instrument/'

dest = Butler(dest_prefix + uri, writeable=True)
dest_registry = dest.registry

for i,ref in enumerate(datasetRefs):
    #print(ref.dataId.full)
    uri = butler.getURI(ref)
    print(uri)
    # first do transfer_from
    # https://pipelines.lsst.io/v/weekly/py-api/lsst.resources.ResourcePath.html
    # actually, returns an URI with an updated final component of the source
    uri_src = lsst.resources.ResourcePath.transfer_from('/repo/embargo/'+uri)

    # next do ingest
    # The ingestion should then be handled using butler.ingest() using the existing DatasetRef from /repo/embargo and the new (absolute) destination path, using transfer mode direct.
    # https://pipelines.lsst.io/py-api/lsst.daf.butler.Butler.html#lsst.daf.butler.Butler.ingest
    dest.ingest(uri_src, mode = 'direct')

In [None]:
out = dest.transfer_from(
    butler,
    source_refs=datasetRefs,
    transfer="copy",
    skip_missing=True,
    register_dataset_types=True,
    transfer_dimensions=True,
)