In [4]:
from tiled.client import from_uri, from_profile
from tiled.structures.table import TableStructure
from tiled.structures.data_source import DataSource
from tiled.structures.core import StructureFamily
from tpx3utils import extract_fpaths_from_sid, raw_to_sorted_df, raw_df_to_cluster_df, add_centroid_cols
from tqdm import tqdm
import tiled as td
import os
import multiprocessing
import sys
import time
print(sys.executable)
print(time.ctime())

/srv/conda/envs/notebook/bin/python
Fri Mar 29 14:34:42 2024


In [10]:
# client objects
db_raw = from_uri('https://tiled.nsls2.bnl.gov', 'dask')['chx']['raw']
db_proc = from_uri('https://tiled.nsls2.bnl.gov', 'dask')['chx']['processed']

# function that uses filename to obtain both uncentroided and centroided dataframes
def source_dfs(filename):
    uncentdf = raw_to_sorted_df(filename)
    centdf = add_centroid_cols(raw_df_to_cluster_df(uncentdf))
    return (uncentdf, centdf)
            
# multiprocessing function for uploading each partition in parallel
def upload_partition(args):
    # partition number, filename, uncent node, cent node
    partition_num = args[0]
    file_path = args[1]
    uncent_node = args[2]
    cent_node = args[3]
    
    if (os.path.exists(file_path)):
        # check and see if file_path exists first before sourcing dfs.
        dfs = source_dfs(file_path)
        # catching WriteError that occurs with Tiled sometimes
        while (True):
            try:
                uncent_node.write_partition(dfs[0], partition_num)
                cent_node.write_partition(dfs[1], partition_num)
                break
            except Exception as e:
                print(e)
            
            
# defines a new container in tiled and writes all data from run object
def insert_to_tiled(container, run):    
    num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images']
    raw_file_paths = run['primary']['data']['tpx3_files_raw_filepaths'][0].compute()
    
    # create new container, nodes, and write first dataframes
    # must do this because structure needs an example dataframe
    uid = run['primary'].metadata['descriptors'][0]['run_start']
    dfs = source_dfs(raw_file_paths[0][5:])
    uncent_structure = TableStructure.from_pandas(dfs[0])
    cent_structure = TableStructure.from_pandas(dfs[1])
    uncent_structure.npartitions = num_img
    cent_structure.npartitions = num_img
    
    # name of key for testing purposes, should be using version on next line
    scan_container = db_proc.create_container(key='test_mp14_{}'.format(uid), metadata={"raw_uid": uid, "raw_sid": run.metadata['start']['scan_id']})
    # scan_container = db_proc.create_container(key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']})
    
    # create cent and uncent containers
    uncent_node = scan_container.new("table", [DataSource(structure=uncent_structure, structure_family=StructureFamily.table),], key="uncent")
    cent_node = scan_container.new("table", [DataSource(structure=cent_structure, structure_family=StructureFamily.table),], key="cent")
    uncent_node.write_partition(dfs[0], 0)
    cent_node.write_partition(dfs[1], 0)
    
    # gather arguments for each partition
    args = []
    for i in range(1, num_img):
        args.append([i, raw_file_paths[i][5:], uncent_node, cent_node])
        
    # multiprocessing pool
    num_cores = multiprocessing.cpu_count()
    max_workers = num_cores-1
    
    with multiprocessing.Pool(processes=max_workers) as pool:
        pool.map(upload_partition, tqdm(args))

# take in a list of sids and upload them all to tiled
def sids_to_tiled(sids):
    for sid in sids:
        insert_to_tiled(db_proc, db_raw[sid])

In [3]:
# demo
insert_to_tiled(db_proc, db_raw[143210])

100%|██████████| 1199/1199 [20:19<00:00,  1.02s/it] 
