# For generating demo data for a run of `beditor`

# Input mutations

In [None]:
import logging
from pathlib import Path 
import pandas as pd
from roux.lib.io import to_table,to_dict
logging.basicConfig(level=logging.INFO)

## Input parameters

In [None]:
## parameters
n=2
mutation_format=None# 'protein'
mutation_scan=None #'residues' # region
output_path=None
## genome
species_name=None
release=None
# or
gtf_path=None
transcript_path=None
protein_path=None
genome_path=None
## constants
scanning_interval=2
force=False

In [None]:
def _to_gids_demo(
    annots,
    n,
    ):
    from roux.lib.set import get_alt
    gids=[]
    strand='+'
    for g in annots.genes():
        if n==len(gids):
            break
        if g.is_protein_coding and not g.contig in ['MT','MITO']:
            if g.strand == strand:
                for t in g.transcripts:
                    if (t.is_protein_coding and t.contains_start_codon and t.contains_stop_codon):
                        if t.strand == strand:
                            print(g)
                            gids.append(g.id)
                            strand=strand=get_alt(['+','-'],g.strand)
    return gids
def _get_base_pos_demo(
    annots,
    gene_ids,
    n=2,
    ):
    dfs=[]
    for gid in gene_ids:
        g=annots.gene_by_id(gid)    
        t=g.transcripts[0]
        dfs.append(
            pd.DataFrame({
            'chrom':[t.contig]*n,
            "start":list(range(t.start,(t.start)+(scanning_interval*n),scanning_interval)),
        }))
    return pd.concat(dfs,axis=0)
if species_name is None:
    gtf_path='inputs/ann.gtf'
    genome_path='inputs/dna.fa'
    transcript_path="inputs/RNA.fa"
    protein_path="inputs/Protein.fa"
output_path=f"inputs{'_'+species_name if not species_name is None else ''}/mutations/{mutation_format}/{mutation_scan}.yml"

In [None]:
# ## inferred output variables
# output_dir_path=str(Path(output_path).parent)+'/'
# Path(output_dir_path).parent.mkdir(parents=True, exist_ok=True)
# logging.info(f"Output directory: {output_dir_path}")

mutations_path=Path(output_path).with_suffix('.tsv').as_posix()

## Genome annotations

In [None]:
from beditor.lib.utils import get_annots
annots=get_annots(
    species_name=species_name,
    release=release,
    gtf_path=gtf_path,
    transcript_path=transcript_path,
    protein_path=protein_path,
    reference_name='assembly',
    annotation_name='source',
    # **kws_Genome,
    )

In [None]:
gene_ids=_to_gids_demo(
    annots=annots,
    n=n,
    )
logging.info(gene_ids)

## Bases 

### Mutations

In [None]:
if mutation_format=='base' and mutation_scan is None:
    ## C->T
    pass

### Scanning

#### At positions

In [None]:
if mutation_format=='base' and mutation_scan.startswith('pos'):
    df1=_get_base_pos_demo(annots,gene_ids=gene_ids).rename(columns={'start':'pos'})
    print(df1.head(1))

#### Within regions

In [None]:
if mutation_format=='base' and mutation_scan.startswith('reg'):
    df1=_get_base_pos_demo(annots,gene_ids=gene_ids).assign(end=lambda df: df['start']+10)
    print(df1.head(1))

## Protein

### Get protein IDs

In [None]:
from beditor.lib.utils import to_pid

if mutation_format=='protein':
    df1=(pd.DataFrame({'gene id':gene_ids})
        .assign(**{
            'protein id':lambda df: df['gene id'].apply(lambda x: to_pid(annots,x)),
        })
        .log.dropna()
        )
    print(df1.head())

### Mutations

In [None]:
if mutation_format=='protein' and mutation_scan is None:
    pass

### Scanning

#### At residue positions

In [None]:
if mutation_format=='protein' and mutation_scan.startswith('pos'):
    df1=(df1
        .assign(**{
            'aa pos':lambda df: df['protein id'].apply(lambda x: list(range(1,len(annots.protein_sequence(x))+1,scanning_interval))),
        })
        .explode('aa pos')
        )
    print(df1.head(1))

#### Within regions of protein

In [None]:
if mutation_format=='protein' and mutation_scan.startswith('reg'):
    df1=df1.assign(**{
        'aa start':1,
        'aa end':lambda df: df['protein id'].apply(lambda x: len(annots.transcript_by_protein_id(x).protein_sequence)//2),
    })    
    print(df1.head(1))

## Output 
### Mutations

In [None]:
to_table(df1,mutations_path)

### Config

In [None]:
import yaml
cfg_base=yaml.safe_load("""input_path:
output_path:
method:
search_window:
## for species registered in pyensembl
species_name:
release:
## for non-registered species
genome_path:
gtf_path:
transcript_path:
protein_path:

not_be:
threads:
force: False
verbose: False
test: False""")
# cfg_base

In [None]:
if genome_path is None:
    from beditor.lib.io import download_genome
    genome_path=download_genome(
        species=species_name,
        ensembl_release=release,
        force=False,
        verbose=True,
        )

In [None]:
cfg=cfg_base.copy()
cfg['input_path']=mutations_path
cfg['method']='Cas12a-BE'
cfg['species_name']=species_name
cfg['release']=release
cfg['genome_path']=genome_path
cfg['gtf_path']=gtf_path
cfg['transcript_path']=transcript_path
cfg['protein_path']=protein_path

# cfg['output_dir_path']=str(Path(output_path).with_suffix(''))+f'/{get_datetime()}/'
cfg['output_dir_path']=f"outputs{'_'+species_name if not species_name is None else ''}/{mutation_format}/{mutation_scan}/"
cfg['not_be']=False

cfg

In [None]:
to_dict({'run':cfg},output_path)