# Notes

This notebook creates subhalo ID and particle ID lists for snapshots for IllustrisTNG simulations.

These can be loaded and used for cross-matching for impact factors to galaxies.

The code is updated from: 

raven:/u/cwalker/Illustris_Zhang_Method/Create_Particle_Subhalo_Matchlists_3.ipynb

using:

raven:/u/cwalker/Illustris_Zhang_Method/test_artale_vs_yt_field_load.ipynb

- It builds on previous work by creating lists for any desired simulation.
- It uses illustris_python instead of yt to load simulation particle ID list.
- It loads particle ID data and creates subhalo IDs in chunks, and stores the matchlists in chunks to accomodate larger simulations (e.g. TNG300-1)
- As output for larger simulations is huge, it now stores matchlists to /ptmp/cwalker/Illustris_FRB_Project/Sim_Matchlists/
- It is the version of the code which is parallelised and tested in prior notebooks.

#NOTES

- NOTE 10/03/22: in the code, I feed to_ID into pID2shID(). I think this is correct.
- #this would mean I am feeding it the INDICES of the particle IDs, not the particle IDs themselves.
- #this would mean I need to update the documentation of this function.
- #this is tested below by feeding it AllPartList instead of to_ID, which always returns -1.
    
- #NOTE: you can run this for TNG300-1 in parts and save them in parts. you would then have to amend 
- #the loading in the pipe creation code to parse through each part separately.
    
- #NOTE: If you run both the chunked and non-chunked versions of this code for a single snapshot
- #for a small simulation, you can perform the test and see the output is the same.
    
- #NOTE 17/03/22: sometimes if you use too many cores the multiprocessing version hangs after ccompletion.
- #Don't know how to fix this yet.

# Imports

In [1]:
import os
import yt
import h5py
import random

import numpy as np
import multiprocessing as m
import illustris_python as il

from contextlib import closing
from multiprocessing import Pool

from matplotlib import pyplot as plt
from charlie_TNG_tools import pID2shID


Deprecated since v4.0.0 . This feature will be removed in v4.1.0
  from yt.config import ytcfg


# Functions

In [None]:
def unwrap_matchlist_package(package):
    """
    Helper function for making simulation matchlists in parallel using multiprocessing.
    Unpacks the set of data necessary for parsing the simulation and making matchlists
    for certain chunks. Then makes those chunks with process_matchlist_chunk().
    
    
    INPUTS:
    
    package : a list containing the inpyt data, which are X arguments in the following order:
    
        nchunks_total   : [int] the total number of chunks the simulation will be split into to create
                        matchlist covering the entire simulation
        chunkIDs        : [array of ints] the id numbers of the chunks of simulation to be processed
                        on this cpu
        indices         : [array] array of particle indices to load for each whole chunk. shape of 
                        (nchunks_total,2)
        simPath         : [str] location of sim file to be loaded as hdf5 File containing all the particle IDs
                        subhalo IDs will be created for
        partIDs_loc     : [str] location of the (partType0) particle IDs for this simulation snapshot
        basePath        : [str] base path to the simulation data
        snapshot_number : [int] the snapshot of the simulation to process
        offsetFile      : [str] location of the offset file for this simulation and snapshot
        subdir_name     : [str] location of subdirectory to store output matchlist chunk in
    
    RETURNS:
    
    output of process_matchlist_chunk()
    
    """
    
    verbose=True
    
    #unwrap the package to feed to process_matchlist_chunk()
    nchunks_total   = package[0]
    if verbose==True:
        print('Verbose mode check: nchunks_total = {0}'.format(nchunks_total))
    chunkIDs        = package[1]
    if verbose==True:
        print('Verbose mode check: chunkIDs = {0}'.format(chunkIDs))
    indices         = package[2]
    if verbose==True:
        print('Verbose mode check: indices = {0}'.format(indices))
    simPath         = package[3]
    if verbose==True:
        print('Verbose mode check: simPath = {0}'.format(simPath))
    partIDs_loc     = package[4]
    if verbose==True:
        print('Verbose mode check: partIDs_loc = {0}'.format(partIDs_loc))
    basePath        = package[5]
    if verbose==True:
        print('Verbose mode check: basePath = {0}'.format(basePath))
    snapshot_number = package[6]
    if verbose==True:
        print('Verbose mode check: snapshot_number = {0}'.format(snapshot_number))
    offsetFile      = package[7]
    if verbose==True:
        print('Verbose mode check: offsetFile = {0}'.format(offsetFile))
    subdir_name     = package[8]
    if verbose==True:
        print('Verbose mode check: subdir_name = {0}'.format(subdir_name))
    
    #run process_matchlist_chunk()
    process_matchlist_chunk(nchunks_total,chunkIDs,indices,simPath,partIDs_loc,basePath,snapshot_number,offsetFile,subdir_name)
    
    return 'done'

def process_matchlist_chunk(nchunks_total,
                            chunkIDs,
                            indices,
                            simPath,
                            partIDs_loc,
                            basePath,
                            snapshot_number,
                            offsetFile,
                            subdir_name):
    """

    For a simulation snapshot split into nchunks chunks,  creates certain chunks of the simulation 
    particle/subhalo ID matchlist on a single cpu. It is fed by unwrap_matchlist_package().
    
    
    INPUTS:
    
    
    nchunks_total   : [int] the total number of chunks the simulation will be split into to create
                    matchlist covering the entire simulation
    chunkIDs        : [array of ints] the id numbers of the chunks of simulation to be processed
                    on this cpu
    indices         : [array] array of particle indices to load for each whole chunk. shape of 
                    (nchunks_total,2)
    simPath         : [str] location of sim file to be loaded as hdf5 File containing all the particle IDs  
                    subhalo IDs will be created for
    partIDs_loc     : [str] location of the (partType0) particle IDs for this simulation snapshot
    basePath        : [str] base path to the simulation data
    snapshot_number : [int] the snapshot of the simulation to process
    offsetFile      : [str] location of the offset file for this simulation and snapshot
    subdir_name     : [str] location of subdirectory to store output matchlist chunk in
    
    RETURNS
    
    1 (must return something so multiprocessing is guaranteed to terminate after process is finished)
    
    """
    
    #print(simPath)

    #get all gas particle IDs in the simulation snapshot
    with h5py.File(simPath) as f:

        print(f)
        
        #for the chunks of the simulation to be processed by this CPU...
        for i in range(len(chunkIDs)):
            
            ########################################
            #CREATE THE CHUNK PARTICLE ID MATCHLIST#
            ########################################
            
            #select the chunk ID to be processed
            chunk_id = chunkIDs[i] 

            #extract the indices within this chunk which will be loaded
            print('loading chunk {0}'.format(chunk_id))
            chunk_first_idx = indices[chunk_id][0]
            chunk_last_idx = indices[chunk_id][-1]
            print('chunk indices: ({0},{1})'.format(chunk_first_idx,chunk_last_idx))

            #load only the particles in this chunk, according to the indices
            allparts = np.copy(f[partIDs_loc][chunk_first_idx:chunk_last_idx])

            #get number of particles in this chunk
            nparts = allparts.shape[0] 
            print('chunk loaded.')

            #create a list version of every particle ID in this chunk
            AllPartList = allparts.tolist()
            print('Particle list created')

            #######################################
            #CREATE THE CHUNK SUBHALO ID MATCHLIST#
            #######################################

            #choose which particles to ID
            to_ID = np.arange(chunk_first_idx,chunk_last_idx+1) #replace with the indices we want to load    

            #choose the particle type (note: 0 is gas, and it should always be gas, but perhaps this)
            #should not be hard coded...
            partType = 0 #gas

            #choose subhalo fields to load
            subhaloFields = ['SubhaloFlag',
                             'SubhaloPos',
                             'SubhaloHalfmassRad',
                             'SubhaloHalfmassRadType',
                             'SubhaloLenType']

            #load subhalo catalog
            subhalos=il.groupcat.loadSubhalos(basePath,snapshot_number,fields=subhaloFields) 
            subhalos.keys()

            #get subhalo offset file for matching particle and subhalo IDs
            with h5py.File(offsetFile,'r') as f2:
                SnapOffsetsSubhalo= np.copy(f2['/Subhalo/SnapByType'])
            print('offsetfile loaded')

            #get subhalo lengths for all gas particles
            SubhaloLenType = np.copy(subhalos['SubhaloLenType'])

            #create array of subhaloIDs for every gas particle
            AllShIDList = pID2shID(to_ID,partType,SubhaloLenType,SnapOffsetsSubhalo)
            print('ShIDList created')

            #NOTE: you can run this for TNG300-1 in parts and save them in parts. you would then have to amend the loading
            #in the pipe creation code to parse through each part separately.
            #uniquestest_chunked.append(np.unique(AllShIDList).tolist())
            
            ############################
            #SAVE BOTH MATCHLIST CHUNKS#
            ############################
            
            np.save('/{0}/PartList_Snap{1}_Chunk{2:04d}.npy'.format(subdir_name,snapshot_number,chunk_id),AllPartList)
            np.save('/{0}/ShIDList_Snap{1}_Chunk{2:04d}.npy'.format(subdir_name,snapshot_number,chunk_id),AllShIDList)  
            print('saved')
    return 1

# Initialise

In [None]:
#simulation to create lists for
sim_to_use = 'TNG300-1'

#snapshots to create lists for
snap_list = [25]#[99,91,84,78,72,67,59,50,40,33,25,21,17]

#base path to data
basePath = '/virgo/simulations/IllustrisTNG/{0}/output/'.format(sim_to_use)

#path to simulation hdf5 file
simPath = '/virgo/simulations/IllustrisTNG/{0}/simulation.hdf5'.format(sim_to_use)

#check these exist
print('Testing whether basePath and simPath exist...')
print('basePath exists = {0}'.format(os.path.exists(basePath)))
print('simPath exists = {0},\n{1}'.format(os.path.exists(simPath),simPath))

nchunks=1000 #the desied number of chunks (minus remainder) to process the particle data in

#identify number of available cores on the system
ncpus = m.cpu_count()
print('Maximum of {0} cores available to use.'.format(ncpus))

cpus_to_use = 8
print('Desired number of cores to use: {0}'.format(cpus_to_use))



# Create directories to store matchlists

In [None]:
#check to see if a top directory exists
#topdir_name = '/u/cwalker/Illustris_Zhang_Method/Sim_Matchlists/'
topdir_name = '/ptmp/cwalker/Illustris_FRB_Project/Sim_Matchlists/' #edit 14/03/2022: put in /ptmp/ now due to size for large simulations
topdir_check = os.path.exists(topdir_name)

#if it exists, print
if topdir_check == True:
    print('Top directory ({0}) already exists.'.format(topdir_name))
    
#else, create it.
elif topdir_check == False:
    print('Creating top directory for matchlists at {0}...'.format(topdir_name))
    os.mkdir(topdir_name)
    print('{0} created.'.format(topdir_name))
    
    
#check to see if subdirectory for particular simulation matchlist exists
subdir_name = topdir_name+'Matchlist_dir_{0}'.format(sim_to_use)
subdir_check = os.path.exists(subdir_name)

#if it exists, print
if subdir_check == True:
    print('Directory to hold {0} matchlist ({1} exists.)'.format(sim_to_use,subdir_name))

#else, create it
elif subdir_check == False:
    print('Creating subdirectory {0}...'.format(subdir_name))
    os.mkdir(subdir_name)
    print('{0} created.'.format(subdir_name))

# Create matchlists

In [None]:
#loop over snapshots
for snapshot_number in snap_list:
    
    uniquestest_chunked = [] #test array to record all unique subhalo IDs to test against the non-chunked version
    
    print('Creating for {0} snap {1}'.format(sim_to_use,snapshot_number))
    
     #########################
     #SNAPSHOT DATA LOCATIONS#
     #########################

    offsetFile = '/virgo/simulations/IllustrisTNG/{0}/postprocessing/offsets/offsets_0{1}.hdf5'.format(sim_to_use,snapshot_number)
    data_loc = '/virgo/simulations/IllustrisTNG/{0}/output/snapdir_0{1}/snap_0{1}.0.hdf5'.format(sim_to_use,snapshot_number)
    partIDs_loc = '/Snapshots/{0}/PartType0/ParticleIDs'.format(snapshot_number)
    
    print('Processing snapshot {0} at:\n{1}\n with offset file:\n{2}\n and particle IDs file loc:\n{3}'.format(snapshot_number,data_loc,offsetFile,partIDs_loc))

    
    #########################
    #CREATE PARTICLE ID LIST#
    #########################
    
    
    #get all gas particle IDs in snapshot
    with h5py.File(simPath) as f:
        
        
        #you can load smaller chunks of this at a time! Exploit this.
        
        
        simparts = f[partIDs_loc].shape[0] #get the total number of particles in the simulation
        
        ########################
        ########################
        ##CHUNKING INFORMATION##
        ########################
        ########################
        
        #######################################################################################
        #the number of chunks of data to load the particle IDs in according to desired nchunks#
        #######################################################################################
        
        print('calculating chunking information for {0} particles over {1} desired chunks...'.format(simparts,
                                                                                                     nchunks))
        
        nchunks_whole = nchunks #number of whole chunks to process
        parts_per_chunk = simparts//nchunks #number of particles in a whole chunk which will be processed
        remainder = simparts%nchunks #number of particles in the remainder once whole chunks are processed.
        if remainder!=0: #if there is any remainder...
            nchunks_partial = 1 #one extra chunk will be processed, which is not the same size as the others
        else: #if there is no remainder...
            nchunks_partial = 0 #no extra chunks will be processed.
        nchunks_total = nchunks_whole + nchunks_partial #the total number of chunks (plus any remainder) to process
        
        print('will process simulation in {0} chunks. {1} chunks of {2} particles, and {3} chunks of {4}.'.format(nchunks_total,
                                                                                                                 nchunks_whole,
                                                                                                                  parts_per_chunk,
                                                                                                                  nchunks_partial,
                                                                                                                  remainder))
        ###############################################################################                                                                                                    
        #how this loading will be distributed over desired number of cores cpus_to_use#  
        ###############################################################################

        #choose the number of cores to use at once. 
        cpus_to_use = cpus_to_use 

        #calculate the number of full core runs to be used to check for simulation cells in pipe
        #this number is the number of parts of the simulation which will be loaded simultaneously
        n_full_core = nchunks_total//cpus_to_use
        print(n_full_core)
        

        #calculate the number of cores which must be used to check the remaining simulation cells
        #this number is the number of leftover parts of the simulation which will be loaded all at once
        n_partial_core = nchunks_total%cpus_to_use

        print('To parse these {0} chunks over {1} cpus, {1} cpus will load data simultaneously.\n\
              This will happen {2} times. The remaining data needs {3} cpus.\n\
              These will be loaded simultaneously.'.format(nchunks_total,
                                                           cpus_to_use,
                                                           n_full_core,
                                                           n_partial_core))
        
        ###################################
        #indices of chunks (and remainder)#
        ###################################
        
        #calculate the particle indices to load for each whole chunk
        indices = [[i*parts_per_chunk,(i*parts_per_chunk)+(parts_per_chunk-1)] for i in range(nchunks_whole)]
        #append the particle indices for the remainder chunk
        indices.append([nchunks*(parts_per_chunk),(nchunks*parts_per_chunk)-1+(remainder)])
        indices=np.array(indices)
        print('particles will be loaded according to indices array of shape {0}:\n\
        {1}'.format(indices.shape,
                    indices))
        
    ###################                   
    ###################                                                                                                          
    ##PROCESSING DATA##                                                                                                          
    ###################
    ###################

    if n_partial_core == 0: #if there is no remainder to process after full core runs

        print('No partial cores to process')

        ####################################################
        #map of which indices will be processed on each cpu#
        ####################################################

        cpu_map = np.arange(n_full_core*cpus_to_use).reshape(cpus_to_use,n_full_core) 
        print('CPU map: {0}'.format(cpu_map))

        ############################################################################
        #package of data to be passed to an unwrapper to unwrap for multiprocessing#
        ############################################################################

        package = [(nchunks_total,
                    cpu_map[i],
                    indices,
                    simPath,
                    partIDs_loc,
                    basePath,
                    snapshot_number,
                    offsetFile,
                    subdir_name) for i in range(cpus_to_use)]
        #print('package: {0}'.format(package))

        #########################
        #the multiprocessing bit#
        #########################

        with closing(Pool(cpus_to_use)) as p: #invoke multiproccessing
            run = p.map(unwrap_matchlist_package,package,chunksize=1) #run the multiprocessing
        p.terminate() #terminate after completion
        p.join() #eeven more terminating

    elif n_partial_core > 0: #else if there is remainder to process after full core runs

        print('Partial cores to process')

        #####################################################
        #maps of which indices will be processed on each cpu#
        #####################################################

        cpu_map_a = np.arange(n_full_core*cpus_to_use).reshape(cpus_to_use,n_full_core)
        cpu_map_b = np.arange(n_full_core*cpus_to_use,nchunks_total).reshape(n_partial_core,1)
        print('CPU map a : {0}'.format(cpu_map_a))
        print('CPU map b : {0}'.format(cpu_map_b))
        print(indices[cpu_map_b])

        #############################################################################
        #packages of data to be passed to an unwrapper to unwrap for multiprocessing#
        #############################################################################

        package_a = [(nchunks_total,
                      cpu_map_a[i],
                      indices,
                      simPath,
                      partIDs_loc,
                      basePath,
                      snapshot_number,
                      offsetFile,
                      subdir_name) for i in range(cpus_to_use)]
        package_b = [(nchunks_total,
                      cpu_map_b[i],
                      indices,
                      simPath,
                      partIDs_loc,
                      basePath,
                      snapshot_number,
                      offsetFile,
                      subdir_name) for i in range(n_partial_core)]
        #print('package a: {0}'.format(package_a))
        #print('package b: {0}'.format(package_b))

        #########################
        #the multiprocessing bit#
        #########################

        print('full core')
        with closing(Pool(cpus_to_use)) as p: #invoke multiproccessing
            run = p.map(unwrap_matchlist_package,package_a,chunksize=1) #run the multiprocessing
        p.terminate() #terminate after completion
        p.join() #even more terminating

        print('partial core')
        with closing(Pool(n_partial_core)) as p: #invoke multiproccessing
            run = p.map(unwrap_matchlist_package,package_b,chunksize=1) #run the multiprocessing
        p.terminate() #terminate after completion
        p.join() #even more terminating
