In [1]:
import torch

import numpy as np

# Encoding SNV data

## - Encoding version 6

In [2]:
def enc_v6(string_to_tok, tok_to_string, a0, a1, pos: int, length: int):
    
    a0_toks = np.zeros(length, dtype = np.int32)
    
    a1_toks = np.zeros(length, dtype = np.int32)
    
    a01_toks = np.zeros(length, dtype = np.int32)
    
    for i, (a,b) in enumerate(zip(a0, a1)):
        
        if len(a) == 1 & len(b) == 1:
            
            a_string = str(a)
            
            b_string = str(b)
            
            ab_string = a + "," + b
            
        elif len(a) > len(b):
            
            a_string = "longer"
            
            b_string = "shorter"
            
            ab_string = "mixed_indel"
            
        elif len(a) < len(b):
            
            a_string = "shorter"
            
            b_string = "longer"
            
            ab_string = "mixed_indel"
            
        elif (len(a) == len(b)) and len(a) > 1:
            
            a_string = "long_sub"
            
            b_string = "long_sub"
            
            ab_string = "mixed_long_sub"
            
        # Obtain token for 2 major alleles
            
        if a_string not in string_to_tok:
            
            string_to_tok[a_string] = pos
            
            tok_to_string[pos] = a_string
            
            a_tok = pos
            
            pos += 1
            
        else:
            
            a_tok = string_to_tok[a_string]
            
        # Obtain token for 2 minor alleles
            
        if b_string not in string_to_tok:
            
            string_to_tok[b_string] = pos
            
            tok_to_string[pos] = b_string
            
            b_tok = pos
            
            pos += 1
            
        else:
            
            b_tok = string_to_tok[b_string]
            
        # Obtain token for 1 major allele & 1 minor allele
            
        if ab_string not in string_to_tok:
            
            string_to_tok[ab_string] = pos
            
            tok_to_string[pos] = ab_string
            
            ab_tok = pos
            
            pos += 1
            
        else:
            
            ab_tok = string_to_tok[ab_string]
            
        a0_toks[i] = a_tok
        
        a1_toks[i] = b_tok
        
        a01_toks[i] = ab_tok
        
    return a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string

## - Encoding version 5

In [3]:
def enc_v5(string_to_tok, tok_to_string, a0, a1, pos: int, length: int):
    
    for i in [0, 1, 2]:
        
        string_to_tok[str(i)] = i
        
        tok_to_string[i] = str(i)
        
    a0_toks = np.array(np.repeat(0, len(a0)), dtype = np.int32)
    
    a1_toks = np.array(np.repeat(2, len(a1)), dtype = np.int32)
    
    a01_toks = np.array(np.repeat(1, len(a0)), dtype = np.int32)
    
    return a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string

## - Encoding version 4

In [4]:
def enc_v4(string_to_tok, tok_to_string, a0, a1, pos: int, length: int):
    
    a0_toks = np.zeros(length, dtype = np.int32)
    
    a1_toks = np.zeros(length, dtype = np.int32)
    
    a01_toks = np.zeros(length, dtype = np.int32)
    
    diff_lens = np.zeros(length, dtype = np.int32)
    
    for i, (a,b) in enumerate(zip(a0, a1)):
            
        a_string = str(a)
            
        b_string = str(b)
            
        if len(a) > 1:
            
            a_string = "seq"
            
        if len(b) > 1:
            
            b_string = "seq"
            
        ab_string = a_string + "," + b_string
        
        # Major allele a0
        
        if a_string not in string_to_tok:
            
            a_tok = pos
            
            string_to_tok[a_string] = pos
            
            tok_to_string[pos] = a_string
            
            pos += 1
            
        else:
            
            a_tok = string_to_tok[a_string]
            
        # Minor allele a1
            
        if b_string not in string_to_tok:
            
            b_tok = pos
            
            string_to_tok[b_string] = pos
            
            tok_to_string[pos] = b_string
            
            pos += 1
            
        else:
            
            b_tok = string_to_tok[b_string]
        
        # Major allele a0 & Minor Allele a1
        
        if ab_string not in string_to_tok:
            
            ab_tok = pos
            
            string_to_tok[ab_string] = pos
            
            tok_to_string[pos] = ab_string
            
            pos += 1
            
        else:
            
            ab_tok = string_to_tok[ab_string]
            
        # Add token
            
        a0_toks[i] = a_tok
        
        a1_toks[i] = b_tok
        
        a01_toks[i] = ab_tok
        
        diff_lens[i] = len(b) - len(a)
        
    return a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string

## - Encoding version 3

In [5]:
def enc_v3(string_to_tok, tok_to_string, a0, a1, pos: int, length: int):
    
    a0_toks = np.zeros(length, dtype = np.int32)
    
    a1_toks = np.zeros(length, dtype = np.int32)
    
    a01_toks = np.zeros(length, dtype = np.int32)
    
    for i, (a,b) in enumerate(zip(a0, a1)):
        
        a_string = str(a)
        
        b_string = str(b)
        
        if len(a) > 1:
            
            a_string = a_string[0] + "I"
            
        if len(b) > 1:
            
            b_string = b_string[0] + "I"
            
        if len(str(a)) <= len(str(b)):
            
            ab_string = a_string + "," + b_string
            
        elif len(str(a)) > len(str(b)):
            
            ab_string = a_string + "," + "del"
            
        # Major allele a0
        
        if a_string not in string_to_tok:
            
            a_tok = pos
            
            string_to_tok[a_string] = pos
            
            tok_to_string[pos] = a_string
            
            pos += 1
            
        else:
            
            a_tok = string_to_tok[a_string]
            
        # Minor allele a1
        
        if b_string not in string_to_tok:
            
            b_tok = pos
            
            string_to_tok[b_string] = pos
            
            tok_to_string[pos] = b_string
            
            pos += 1
            
        else:
            
            b_tok = string_to_tok[b_string]
            
        # Major allele a0 & Minor allele a1
        
        if ab_string not in string_to_tok:
            
            ab_tok = pos
            
            string_to_tok[ab_string] = pos
            
            tok_to_string[pos] = ab_string
            
            pos += 1
            
        else:
            
            ab_tok = string_to_tok[ab_string]
            
        a0_toks[i] = a_tok
        
        a1_toks[i] = b_tok
        
        a01_toks[i] = ab_tok
        
    return a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string

## - Encoding version 2

In [6]:
def enc_v2(string_to_tok, tok_to_string, a0, a1, pos: int, length: int):
    
    a0_toks = np.zeros(length, dtype = np.int32)
    
    a1_toks = np.zeros(length, dtype = np.int32)
    
    a01_toks = np.zeros(length, dtype = np.int32)
    
    # Encoding a0
    
    for i, string in enumerate(a0):
        
        if len(string) > 1:
            
            string = string[0] + "I"
            
        if string not in string_to_tok:
            
            string_to_tok[string] = pos
            
            tok_to_string[pos] = string
            
            a_tok = pos
            
            pos += 1
        
        else:
            
            a_tok = string_to_tok[string]
            
        a0_toks[i] = a_tok
        
    # Encoding a1
    
    for i in range(len(a1)):
        
        if len(a1[i]) == len(a0[i]):
            
            if len(a1[i]) > 1:
                
                string = a1[i] + "I"
                
            else:
                
                string = a1[i]
                
        elif len(a1[i]) > len(a0[i]):
            
            string = "ins"
            
        elif len(a1[i]) < len(a0[i]):
            
            string = "del"
            
        if string not in string_to_tok:
            
            string_to_tok[string] = pos
            
            tok_to_string[pos] = string
            
            b_tok = pos
            
            pos += 1
            
        else:
            
            b_tok = string_to_tok[string]
            
        a1_toks[i] = b_tok
        
    # Encoding a01
        
    for i, (a,b) in enumerate(zip(a0, a1)):
        
        a = str(a)
        
        b = str(b)
        
        if len(a) > 1:
            
            a = a + "I"
            
        if len(b) > 1:
            
            b = b + "I"
        
        if len(a) == len(b):
            
            string = a + b
            
        elif len(a) > len(b):
            
            string = a + "," + "del"
        
        elif len(a) < len(b):
            
            string = a + "," + "ins"
            
        if string not in string_to_tok:
            
            string_to_tok[string] = pos
            
            tok_to_string[pos] = string
            
            ab_tok = pos
            
            pos += 1
            
        else:
            
            ab_tok = string_to_tok[string]
            
        a01_toks[i] = ab_tok
        
    return a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string
    

## - Encoding version 1

In [7]:
def enc_v2(string_to_tok, tok_to_string, a0, a1, pos: int, length: int):
    
    a0_toks = np.zeros(length, dtype = np.int32)
    
    a1_toks = np.zeros(length, dtype = np.int32)
    
    a01_toks = np.zeros(length, dtype = np.int32)
    
    # Encoding a0
    
    for i, string in enumerate(a0):
        
        if string not in string_to_tok:
            
            string_to_tok[string] = pos
            
            tok_to_string[pos] = string
            
            tok = pos
            
            pos += 1
            
        else:
            
            tok = string_to_tok[string]
            
        a0_toks[i] = tok
        
    # Encoding a1
        
    for i, string in enumerate(a1):
        
        if string not in string_to_tok:
            
            string_to_tok[string] = pos
            
            tok_to_string[pos] = string
            
            tok = pos
            
            pos += 1
            
        else:
            
            tok = string_to_tok[string]
            
        a1_toks[i] = tok
        
    # Encoding a01
    
    for i, (a,b) in enumerate(zip(a0, a1)):
        
        string = str(a) + ',' + str(b)
        
        if string not in string_to_tok:
            
            string_to_tok[string] = pos
            
            tok_to_string[pos] = string
            
            tok = pos
            
            pos += 1
            
        else:
            
            tok = string_to_tok[string]
            
        a01_toks[i] = tok
        
    return a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string
                        

## Obtain the token matrix for SNV data

In [8]:
def get_token_matrix(geno, encoding : int = 2):
    
    a0 = geno.a0.values
    
    a1 = geno.a1.values
    
    string_to_token = {}
    
    token_to_string = {}
    
    pos = 0
    
    for special_token in ['nan', 'del', 'ins']:
        
        string_to_token[special_token] = pos
        
        token_to_string[pos] = special_token
        
        pos += 1
        
        geno_length = geno.shape[0]
        
        geno_size = geno.shape[1]
        
    if encoding == 1:
        
        a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string = enc_v1(string_to_token, token_to_string,
                                                                      a0, a1, pos, geno_size)
    
    elif encoding == 2:
        
        a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string = enc_v2(string_to_token, token_to_string,
                                                                      a0, a1, pos, geno_size)
    
    elif encoding == 3:
        
        a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string = enc_v3(string_to_token, token_to_string,
                                                                      a0, a1, pos, geno_size)
    
    elif encoding == 4:
        
        a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string = enc_v4(string_to_token, token_to_string,
                                                                      a0, a1, pos, geno_size)
    
    elif encoding == 5:
        
        a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string = enc_v5(string_to_token, token_to_string,
                                                                      a0, a1, pos, geno_size)
    
    elif encoding == 6:
        
        a0_toks, a1_toks, a01_toks, string_to_tok, tok_to_string = enc_v6(string_to_token, token_to_string,
                                                                      a0, a1, pos, geno_size)
    
    # Initialize token matrix for further processing
    
    batch_size = 1024
    
    token_matrix = np.zeros(shape = (geno_length, geno_size), dtype = np.int32)
    
    diff_alleles_matrix = np.zeros(shape = (geno_length, geno_size), dtype = np.bool_)
    
    non_ref_alleles_matrix = np.zeros(shape = (geno_length, geno_size), dtype = np.bool_)
    
    num_batches = np.ceil(geno_length / batch_size)
    
    for num_batch in range(int(num_batches)):
        
        geno_mat = np.array(geno[num_batch * batch_size : (num_batch + 1) * batch_size].values, dtype = np.int32)
        
        for num_row in range(geno_mat.shape[0]):
            
            # Make sure row index does not exceed the last row
            
            actual_row_num = num_batch * batch_size + num_row
            
            if actual_row_num < geno_length:
            
                for num_col in range(geno_size):
                
                    alleles = geno_mat[num_row, num_col]

                    if encoding != 4:
                
                        if alleles == 0:
                    
                            token = a0_toks[num_col]
                        
                        elif alleles == 1:
                        
                            token = a01_toks[num_col]
                        
                        elif alleles == 2:
                        
                            token = a1_toks[num_col]
                        
                        else:
                        
                            token = string_to_tok['nan']
                        
                    elif encoding == 4:
                    
                        if alleles == "0":
                        
                            token = a0_toks[num_col]
                        
                            diff_alleles_matrix[actual_row_num, num_col] = 0
                        
                            non_ref_alleles_matrix[actual_row_num, num_col] = 0
                        
                        elif alleles == "1":
                        
                            token = a01_toks[num_col]
                        
                            diff_alleles_matrix[actual_row_num, num_col] = 1
                        
                            non_ref_alleles_matrix[actual_row_num, num_col] = 1
                        
                        elif alleles == "2":
                        
                            token = a1_toks[num_col]
                        
                            diff_alleles_matrix[actual_row_num, num_col] = 0
                        
                            non_ref_alleles_matrix[actual_row_num, num_col] = 1
                     
                        else:
                        
                            token = string_to_tok['nan']
                        
                    token_matrix[actual_row_num, num_col] = token
                
    token_matrix = torch.from_numpy(token_matrix)
    
    if encoding == 4:
        
        return token_matrix, diff_alleles_matrix, non_ref_alleles_matrix, diff_lens, string_to_tok, tok_to_string, len(string_to_tok)
    
    else:
        
        return token_matrix, string_to_tok, tok_to_string, len(string_to_tok)             

In [9]:
from pandas_plink import read_plink1_bin

In [10]:
bed_file = "test_logical.bed"
    
bim_file = "test_logical.bim"
    
fam_file = "test_logical.fam"
    
# Load in genomic files containing SNV data
    
test_geno = read_plink1_bin(bed_file, bim_file, fam_file)

Mapping files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 66.71it/s]


In [11]:
a, b, c, d = get_token_matrix(test_geno, encoding = 5)

In [12]:
a

tensor([[1, 1, 2,  ..., 0, 2, 2],
        [2, 2, 0,  ..., 1, 1, 0],
        [1, 1, 1,  ..., 2, 1, 0],
        ...,
        [0, 2, 2,  ..., 1, 1, 2],
        [2, 2, 0,  ..., 2, 2, 1],
        [1, 0, 1,  ..., 0, 2, 1]], dtype=torch.int32)

In [13]:
b

{'nan': 0, 'del': 1, 'ins': 2, '0': 0, '1': 1, '2': 2}

In [14]:
c

{0: '0', 1: '1', 2: '2'}

In [15]:
d

6