# Phase 1i: Garbbing Metadata

```
Parameters
-------------
save_dir: str  
    Path to directory for saving outputs in.
cache_dir: str 
       Path to directory for cached objects in.
 metadata_db: str
       Path to csv or tsv containing metadata.
 dr_strain: str
       Name  of dominant resident (DR) lineage.
 voi_strains: list of strs
    Names  of Variant Of Interest (VOI) lineages.
sub_vars_dict: dict {str: list of strs}
    Dictionary defining sub lineages of dr_strain and voi_strains.
root_strain_names: list of strs
    IDs of sequences to be used as root.
sample_id_field: str
    Name of field in metadata_db containing sequence IDs.
collection_date_field: str
    Name of field in metadata_db containing collection dates of sequences. Should be format YYYY-MM-DD.
lineage_field: str
    Name of field in metadata_db containing lineage sequences belong to.
metadata_dtypes: str
    Optional can be an empy string, None or 'None'. Path to json defining pandas data types for metadata_db.
data_filter: str
    Optional can be an empy string, None or 'None'. Additional filter applieid to metadata_db when selecting 
    sequences and metadata to be used on pipeline. Must conform to [pandas documentation](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.query.html), see further [example](https://www.slingacademy.com/article/pandas-working-with-the-dataframe-query-method-5-examples/). 
```


In [None]:
save_dir = 'runs_of_pipeline/2025-02-05'
cache_dir = 'cache'
metadata_db = None
dr_strain = ''
voi_strains = []
sub_vars_dict = {}
root_strain_names = None
sample_id_field = 'strain'
collection_date_field = 'date'
lineage_field = 'pango_lineage'
metadata_dtypes = None
data_filter = 'country == "Canada" and coverage >= 0.925'
data_prep_threads=8

Create folders for each strain. The VOI strains should have a prefix of 'VOI_'. The dominant resident strains should have the prefix of 'DR_' 

In [None]:
import os
all_strains = [dr_strain] + voi_strains
xml_set_directories = {}
for strain in all_strains:
    if strain in voi_strains:
        xml_set_directories[strain] = save_dir + '/VOI_' + strain
    elif strain == dr_strain:
        xml_set_directories[strain] = save_dir + '/DR_' + strain
    else:
        xml_set_directories[strain] = save_dir + '/' + strain

for path in xml_set_directories.values():
    if not os.path.exists(path):
        os.makedirs(path)

For the loading all PHAC Covid sepuence metadata we need dask dataframe and to setup a dask client.

In [None]:
from dask.distributed import Client
import dask.dataframe as dd
client = Client(n_workers=data_prep_threads)
client

Dask dataframes may often through errors unless you tell it the datatypes of columns. This as dask does lazy dataframe loading, loading the dataframe in batches. Dask checks th data types of columns for the first X rows. If there is a differnce in data type of a column after row X an erorr can be thrown.

In [None]:
import json
if metadata_dtypes is None or metadata_dtypes in ['None', '']:
    dtypes = None
else:
    with open(metadata_dtypes, 'r') as dtypes_json:
        dtypes_data = dtypes_json.read()
    dtypes_json.close()
    dtypes = json.loads(dtypes_data)


if metadata_db.endswith('.tsv'):
    delimiter = '\t'
elif metadata_db.endswith('.csv'):
    delimiter = ','
else:
    raise TypeError(f"metadata_db must be a csv or tsv file, ending with the apporpraite file extension. Value given is {metadata_db}" )
metadata_all_ddf = dd.read_csv(metadata_db,
                                    sep=delimiter,
                                    dtype=dtypes,
                                    parse_dates=[collection_date_field]
                                   )


You may get an error loading when running the cell above. This error suggests entrys you should add to the dtypes dictionary in the metadata_dtypes json file.

## Filtering to Selected Strains
### Metadata

Create filter mask

In [None]:
strain_sub_varients = [f"'{item}'" for sub_list in sub_vars_dict.values() for item in sub_list]

strain_filter = f"`{lineage_field}` in ({', '.join(strain_sub_varients)})" 

if data_filter is None or data_filter in ['None','']:
    overall_fiter = strain_filter
else:
    overall_fiter = f"({data_filter}) & {strain_filter}"

selected_metadata = metadata_all_ddf.query(overall_fiter)

Apply filter to compute new pandas dataframe. This can take 2-15 mins, as dask is openning the dataframe in chunks and performing our selesction. 

In [None]:
#papermill_description=Collecting_VOIs_&_DR_metadata
# Convert to pandas dataframe.
selected_metadata = selected_metadata.compute()

We need the root metadata as well

In [None]:
#papermill_description=Collecting_Root_metadata
root_metadata = metadata_all_ddf[metadata_all_ddf[sample_id_field].isin(root_strain_names)].compute()
root_metadata.to_csv(save_dir +  '/root_metadata.csv', index=False)

**Remember to shutdown dask client when finished.**

In [None]:
client.shutdown()

In [None]:
selected_metadata.pango_lineage.value_counts()

Various packages used in this pipeline need dates (e.g. BEAST) to be expressed as year decimals.

In [None]:
from beast_pype.date_utilities import date_to_decimal

selected_metadata ['year_decimal'] = selected_metadata[collection_date_field].map(date_to_decimal)

Save all metadata for each strain and create some strain specific dictionaries.

In [None]:
import pandas as pd
strain_metadata_dict = {}
for strain, sub_vars_list in sub_vars_dict.items(): 
    strain_metadata = selected_metadata[selected_metadata[lineage_field].isin(sub_vars_list)]
    strain_metadata_dict[strain] = strain_metadata
    strain_metadata.to_csv(f'{xml_set_directories[strain]}/metadata.csv', index=False) # Saving metadata_update as well.
    strain_and_root_metadata = pd.concat([root_metadata, strain_metadata])
    strain_and_root_metadata.to_csv(f'{xml_set_directories[strain]}/metadata_with_root.csv', index=False) # Saving metadata_update as well.


available_samples = {strain: len(metadata) for strain, metadata in strain_metadata_dict.items()}

## Setting up IDs.txt files for Phase 2. 

These are simply files of sequence IDs (one per line).
First lets do this for all sequences.

In [None]:
cached_ids_with_root = cache_dir + '/all_IDs.txt'
ids = selected_metadata[sample_id_field]
ids_with_root = pd.Series(root_strain_names + ids.to_list())
ids_with_root.to_csv(cached_ids_with_root, sep='\t', index=False, header=False)

Then for each strain.

In [None]:
for strain, strain_metadata in strain_metadata_dict.items():
    file_name= xml_set_directories[strain] + '/strain'
    ids = strain_metadata[sample_id_field]
    ids.to_csv(file_name +  '_IDs.txt', sep='\t', index=False, header=False)
    ids_with_root = pd.Series(root_strain_names + ids.to_list())
    ids_with_root.to_csv(file_name +  '_with_root_IDs.txt', sep='\t', index=False, header=False)

# Saving information to pass onto the next Phases

In [None]:
pipeline_run_info = {'xml set directories': xml_set_directories,
                     'strain sub-variants': sub_vars_dict,
                     'available samples': available_samples,
                     'root strain names': root_strain_names,
                     }

with open(save_dir +'/pipeline_run_info.json', 'w') as fp:
    json.dump(pipeline_run_info, fp, sort_keys=True, indent=4)

fp.close()