### Gadi path set-up¶

In [1]:
import os
import pathlib

nci_user_name = "abc123"   ### change this to your NCI user name
work_path = pathlib.Path("/g/data/nm05/workspace/").joinpath(nci_user_name)

### Comment these three lines below, which are here for pre-workshop testing
work_path = pathlib.Path().home().joinpath("aurora_test_folder")
work_path.mkdir(parents=True, exist_ok=True)

os.chdir(work_path)
target_folder = work_path

# Create a processing list for an gadi qsub job

Here we count the runs in each musgraves level_1 mth5.  Most stations have only a single run, and we restrict the qsub job to these.

In [2]:
import pandas as pd
from aurora.pipelines.run_summary import RunSummary

In [5]:
my80_path = pathlib.Path("/g/data/my80")
au_scope_mt_collection_path = my80_path.joinpath("AuScope_MT_collection")
auslamp_path = au_scope_mt_collection_path.joinpath("AuScope_AusLAMP")
musgraves_path = auslamp_path.joinpath("Musgraves_APY")
data_dir = musgraves_path
assert data_dir.exists()

In [9]:
def get_musgraves_availability_df(data_dir):
    """
    recusively search for h5 files in data_dir, tabulate and return a dataframe
    """
    all_mth5_files = list(data_dir.rglob("*h5"))
    num_mth5 = len(all_mth5_files)
    print(f"Found {num_mth5} h5 files")
    levels = num_mth5 * [""]
    station_ids = num_mth5 * [""]
    territories = num_mth5 * [""]
    paths = num_mth5 * [""]

    for i_filepath, filepath in enumerate(all_mth5_files):
        levels[i_filepath] = str(filepath).split("level_")[1][0]
        station_ids[i_filepath] = filepath.stem
        territories[i_filepath] = str(filepath).split("Musgraves_APY/")[1][0:2]
        paths[i_filepath] = filepath
    df_dict = {"level": levels, "territory": territories, "station_id": station_ids, "path": paths}
    df = pd.DataFrame(data=df_dict)

    return df

In [10]:
availability_df = get_musgraves_availability_df(data_dir)
availability_df

Found 184 h5 files


Unnamed: 0,level,territory,station_id,path
0,1,WA,WA67,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
1,1,WA,WA66,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
2,1,WA,WA74,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
3,1,WA,WA60,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
4,1,WA,WA44,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
...,...,...,...,...
179,0,SA,SA26W-2,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
180,0,SA,SA344-2,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
181,0,SA,SA273,/g/data/my80/AuScope_MT_collection/AuScope_Aus...
182,0,SA,SA351,/g/data/my80/AuScope_MT_collection/AuScope_Aus...


Will use the "enrich_df" pattern where we iterate over rows of df, getting info.

In [12]:
def enrich_row_with_run_info(row):
    mth5_run_summary = RunSummary()
    mth5_run_summary.from_mth5s([row.path, ])
    row.num_runs = len(mth5_run_summary.df)
    run_names = mth5_run_summary.df.run_id.unique().tolist()
    row.run_names = ",".join(run_names)
    return row

In [13]:
def count_mth5_runs(df):
    """

    Parameters
    ----------
    df : pd.DataFrame
        This is a list of the files

    Returns
    -------
    df:  pd.DataFrame
        SAme as input but with new columns

    """
    df["num_runs"] = 0
    df["run_names"] = ""
    enriched_df = df.apply(enrich_row_with_run_info, axis=1)

    return enriched_df


In [14]:
enriched_df = count_mth5_runs(availability_df)

[1m2023-11-17T11:43:46.955275+1100 | INFO | mth5.mth5 | close_mth5 | Flushing and closing /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA67.h5[0m
[1m2023-11-17T11:43:47.039534+1100 | INFO | mth5.mth5 | close_mth5 | Flushing and closing /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA66.h5[0m
[1m2023-11-17T11:43:47.297852+1100 | INFO | mth5.mth5 | close_mth5 | Flushing and closing /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA74.h5[0m
[1m2023-11-17T11:43:47.725269+1100 | INFO | mth5.mth5 | close_mth5 | Flushing and closing /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA60.h5[0m
[1m2023-11-17T11:43:48.368509+1100 | INFO | mth5.mth5 | close_mth5 | Flushing and closing /

In [15]:
enriched_df

Unnamed: 0,level,territory,station_id,path,num_runs,run_names
0,1,WA,WA67,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
1,1,WA,WA66,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
2,1,WA,WA74,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
3,1,WA,WA60,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
4,1,WA,WA44,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
...,...,...,...,...,...,...
179,0,SA,SA26W-2,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
180,0,SA,SA344-2,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
181,0,SA,SA273,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
182,0,SA,SA351,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1


# Check how may runs in each hdf5

In [16]:
enriched_df.num_runs.unique()

array([1, 4, 3])

In [17]:
enriched_df.run_names.unique()

array(['1', '1,2,3,4', '1,2,3'], dtype=object)

In [18]:
level_1_df = enriched_df[enriched_df.level=="1"]

In [19]:
level_1_df

Unnamed: 0,level,territory,station_id,path,num_runs,run_names
0,1,WA,WA67,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
1,1,WA,WA66,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
2,1,WA,WA74,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
3,1,WA,WA60,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
4,1,WA,WA44,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
...,...,...,...,...,...,...
130,1,SA,SA26W-2,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
131,1,SA,SA344-2,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
132,1,SA,SA273,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1
133,1,SA,SA351,/g/data/my80/AuScope_MT_collection/AuScope_Aus...,1,1


In [20]:
level_1_df.num_runs.unique()

array([1, 4, 3])

In [21]:
level_1_df.run_names.unique()

array(['1', '1,2,3,4', '1,2,3'], dtype=object)

In [None]:
print(level_1_df.to_string())
#level_1_df.to_string()

    level territory station_id                                                                                                                                    path  num_runs run_names
0       1        WA       WA67     /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA67.h5         1         1
1       1        WA       WA66     /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA66.h5         1         1
2       1        WA       WA74     /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA74.h5         1         1
3       1        WA       WA60     /g/data/my80/AuScope_MT_collection/AuScope_AusLAMP/Musgraves_APY/WA/level_1/Concatenated_Resampled_Rotated_Time_Series_MTH5/WA60.h5         1         1
4       1        WA       WA44     /g/data/my80/AuScope_MT_collec

### In Conclusion, there is only a smattering of stations that have multiple runs ... 
SA299, SA246, SA324-2

Dropping these for now

In [25]:
processing_list_df = enriched_df.copy(deep=True)

In [26]:
print(f"Processing list has {len(processing_list_df)} rows before restricting to one-run cases")
processing_list_df.drop(processing_list_df[processing_list_df.num_runs != 1].index, inplace=True)
print(f"Processing list has {len(processing_list_df)} rows after restricting to one-run cases")
processing_list_df.reset_index(drop=True, inplace=True)

Processing list has 184 rows before restricting to one-run cases
Processing list has 178 rows after restricting to one-run cases


#### Restrict to level_1 data

In [28]:
processing_list_df = processing_list_df[processing_list_df.level=="1"]
print(f"Processing list has {len(processing_list_df)} rows in level_1")


Processing list has 89 rows in level_1


In [4]:
out_csv = work_path.joinpath("l1_processing_list.csv")
processing_list_df.to_csv(out_csv, index=False)
print(out_csv.absolute())

The above path can be used to get a jobs list for a qsub job.