In [1]:
import numpy as np
import hail as hl
from hail import methods
import scipy as sp
import pandas as pd
from math import sqrt, pi

## CREATE GENETIC DATA (and clean/process/edit)

In [2]:
# Create genetic data and write to disk
bnm_mt = hl.balding_nichols_model(3, 100, 1000)
bnm_mt.write("balding_nichols_3_100_1000.mt")

Initializing Hail with default parameters...
Running on Apache Spark version 2.4.1
SparkUI available at http://192.168.0.166:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.46-bf7b7f7082e1
LOGGING: writing to /Users/annamira/Documents/GitHub/hail/hail/python/hail/linalg/hail-20200724-1045-0.2.46-bf7b7f7082e1.log
2020-07-24 10:45:59 Hail: INFO: balding_nichols_model: generating genotypes for 3 populations, 100 samples, and 1000 variants...
2020-07-24 10:46:09 Hail: INFO: Coerced sorted dataset
2020-07-24 10:46:12 Hail: INFO: wrote matrix table with 1000 rows and 100 columns in 8 partitions to balding_nichols_3_100_1000.mt


In [3]:
# Read first MatrixTable and clean

# entries are now calls: An object that represents an individual’s call at a genomic locus
mt = hl.read_matrix_table("balding_nichols_3_100_1000.mt")

# don't understand meaning of this: returns the count of non-reference alleles from each call
mt = mt.transmute_entries(n_alt = hl.float64(mt.GT.n_alt_alleles())) 

# Turn MatrixTable into Table

ht = mt.localize_entries("ent", "sample")

----------------------------------------
Global fields:
    'bn': struct {
        n_populations: int32, 
        n_samples: int32, 
        n_variants: int32, 
        n_partitions: int32, 
        pop_dist: array<int32>, 
        fst: array<float64>, 
        mixture: bool
    }
----------------------------------------
Column fields:
    'sample_idx': int32
    'pop': int32
----------------------------------------
Row fields:
    'locus': locus<GRCh37>
    'alleles': array<str>
    'ancestral_af': float64
    'af': array<float64>
----------------------------------------
Entry fields:
    'n_alt': float64
----------------------------------------
Column key: ['sample_idx']
Row key: ['locus', 'alleles']
----------------------------------------


## Grouping and NDArray methods from Tim and Dan

In [None]:
# Functions for operating with Tables of ndarrays in Hail (from Tim)

from hail.expr import Expression, ExpressionException, \
    expr_float64, expr_call, expr_any, expr_numeric, expr_array, \
    expr_locus, \
    analyze, check_entry_indexed, check_row_indexed, \
    matrix_table_source, table_source

# Only groups by rows, NOT COLUMNS
def matrix_table_to_table_of_ndarrays(field, group_size, tmp_path = '/tmp/nd_table.ht'):
    """

    The returned table has two fields: 'row_group_number' and 'ndarray'.

    Examples
    --------
    >>> ht = matrix_table_to_table_of_ndarrays(mt.GT.n_alt_alleles(), 100)

    Parameters
    ----------
    field
    group_size
    tmp_path

    Returns
    -------

    """
    mt = matrix_table_source('matrix_table_to_table_of_ndarrays/x', field)
    mt = mt.select_entries(x = field)
    ht = mt.localize_entries(entries_array_field_name='entries')
    # now ht.entries is an array of structs with one field, x

    # we'll also want to mean-impute/variance-normalize/etc here
    ht = ht.select(xs = ht.entries.map(lambda e: e['x']))
    # now ht.xs is an array of float64

    # now need to produce groups of G
    ht = ht.add_index()
    ht = ht.group_by(row_group_number= hl.int32(ht.idx // group_size)) \
        .aggregate(ndarray=hl.nd.array(hl.agg.collect(ht.xs)))
    # may require a .T on ndarray

    return ht.checkpoint(tmp_path, overwrite=True)

def chunk_ndarray(a, group_size):
    """Chunks a NDarray along the first axis in chunks of `group_size`.
    Parameters
    ----------
    a
    group_size
    -------

    """
    n_groups = a.shape[0] // group_size
    groups = []
    for i in range(a.shape[0] // group_size):
        start = i * group_size
        end = (i + 1) * group_size
        groups.append(a[start:end, :])
    return groups


# Concatenate the ndarrays with a blocked Table
def concatBlocked(A):
    blocks = A.ndarray.collect()
    big_mat = np.concatenate(blocks, axis=0)
    ht = ndarray_to_table([big_mat])
    
    block_shape = blocks[0].shape
    
    tup = ht.ndarray.collect()[0].shape
    assert (tup == (len(blocks) * block_shape[0], block_shape[1]))
    
    return ht

# takes ndarray, possibly already in chunks/groups and converts to a 
# the input array should always be an array of the chunks
# meaning, if there is no chunking of arr at all, one should input [arr]
# Hail Table with subarray chunks as entries
# def ndarray_to_table(chunked_arr, num_blocks):
#     ht = hl.utils.range_table(num_blocks)
#     structs = [hl.struct(row_group_number = idx, ndarray = block) 
#                for idx, block in enumerate(chunked_arr)] # this line has a bug
#     ht = hl.Table.parallelize(structs) # or this line
#     ht = ht.key_by('row_group_number') # or this line
#     return ht


def ndarray_to_table(chunked_arr):
    structs = [hl.struct(row_group_number = idx, ndarray = block)
               for idx, block in enumerate(chunked_arr)]
    ht = hl.Table.parallelize(structs)
    ht = ht.key_by('row_group_number')
    return ht



# function to multiply two blocks, given the two blocks
# returns struct in form of array but not ndarray, includes the shape in the struct
# to change the result product directly back into a ndarray we need to use from_column_major
def block_product(left, right):
    product = left @ right
    n_rows, n_cols = product.shape
    return hl.struct(
        shape=product.shape,
        block=hl.range(hl.int(n_rows * n_cols)).map(
            lambda absolute: product[absolute % n_rows, absolute // n_rows]))

# takes in output of block_product
def block_aggregate(prod):
    shape = prod.shape
    block = prod.block
    return hl.nd.from_column_major(
        hl.agg.array_sum(block),
        hl.agg.take(shape, 1)[0])

# returns flat array
def to_column_major(ndarray):
    n_rows, n_cols = ndarray.shape
    return hl.range(hl.int(n_rows * n_cols)).map(
        lambda absolute: ndarray[absolute % n_rows, absolute // n_rows])

# hl.nd.from_column_major(thing.the_sum, thing.the_shape)

## Blanczos Algorithm

In [7]:
def makeData(model_input, group_size):
    mt = hl.balding_nichols_model(*model_input)
    mt.write("balding_nichols_test.mt")
    mt = hl.read_matrix_table("balding_nichols_test.mt")
    mt = mt.transmute_entries(n_alt = hl.float64(mt.GT.n_alt_alleles())) 
    table = mt.localize_entries("ent", "sample")
    table = matrix_table_to_table_of_ndarrays(mt.n_alt, group_size, tmp_path='/tmp/test_table.ht')
#     table = table.key_by(hl.int32(table.row_group_number))
    return table
    
data = makeData((3, 100, 1000), 4)


2020-07-24 10:46:12 Hail: INFO: balding_nichols_model: generating genotypes for 3 populations, 100 samples, and 1000 variants...
2020-07-24 10:46:13 Hail: INFO: Coerced sorted dataset
2020-07-24 10:46:14 Hail: INFO: wrote matrix table with 1000 rows and 100 columns in 8 partitions to balding_nichols_test.mt
2020-07-24 10:46:17 Hail: INFO: Ordering unsorted dataset with network shuffle
2020-07-24 10:46:18 Hail: INFO: wrote table with 250 rows in 8 partitions to /tmp/test_table.ht


In [75]:
(n, m) = (100, 1000)
k = 50
l = k + 2
q = 0

G = hl.nd.array(np.random.normal(0, 1, (n,l)))

In [77]:
# Algorithm step: multiplying H0 = A @ G

# METHOD
# Multiply a row-blocked matrix by a local non-blocked matrix
# First step of algorithm

# usage:
# assumes blocks in blocked matrix are named ndarray
def matmul_rowblocked_nonblocked(A, B):
    temp = A.annotate_globals(mat = B)
    temp = temp.annotate(prod = block_product(temp.ndarray, temp.mat))
    temp = temp.annotate(ndarray = hl.nd.from_column_major(temp.prod.block, temp.prod.shape))
    #temp = temp.annotate(ndarray = hl.nd.from_column_major(temp.prod.block, hl.agg.take(temp.prod.shape, 1)[0]))
    temp = temp.select(temp.ndarray)
    temp = temp.drop(temp.mat)
    return temp

H0 = matmul_rowblocked_nonblocked(data, G)

# Algorithm step: intermediate operation of multiplying At @ (A @ G) = At @ H0

# METHOD
# Multiply a column-blocked matrix by a row-blocked matrix 
# as a blockmatrix multiplcation and then sum
# Second step of algorithm

# usage:
# pass in matrix A normally, blocked in rows - this specifically expects A to need to be transposed
# assumes blocks in blocked matrix are named ndarray
def matmul_colblocked_rowblocked(A, B):
    temp = A.transmute(prod = block_product(A.ndarray.transpose(), B[A.row_group_number].ndarray))
    temp.aggregate(block_aggregate(temp.prod))
    temp = temp.annotate(ndarray = hl.nd.from_column_major(temp.prod.block, temp.prod.shape))
    temp = temp.select(temp.ndarray)
    return temp

G1 = matmul_colblocked_rowblocked(data, H0)


def matmul_nonblocked_rowblocked(B, A):
    pass


In [108]:
# Algorithm step: perform QR decomposition of Hq and compute T = Q^T @ A

# METHOD
# Perform QR decomposition of a row-blocked matrix
# Third and fourth step of algorithm

def processH(H, A):
    
    # perform QR decomposition on unblocked version of H
    mat_H = concatBlocked(H)
    arr_H = mat_H.ndarray.collect()[0]
    assert (arr_H.shape == (m, (q+1)*l))
    Q, R = np.linalg.qr(arr_H)
    assert(Q.shape == (m, (q+1)*l))
    
    # block Q's rows into the same number of blocks that A has
    num_blocks = A.count()
    tup = hl.eval(Q.shape)
    group_size_Q = tup[0] // num_blocks
    assert group_size_Q * num_blocks == m
    blocked_Q_table = ndarray_to_table(chunk_ndarray(Q, group_size_Q))
    
    T = matmul_colblocked_rowblocked(blocked_Q_table, A)
    return T, blocked_Q_table

In [114]:
# Algorithm step: compute SVD of T such that T = USW^T

# METHOD
# 5th step of algorithm

def processT(T):
    mat_T = concatBlocked(T)
    U, S, W = np.linalg.svd(hl.eval(mat_T.ndarray.collect()[0]), full_matrices=False)
    return U, S, W

# Algorithm step: multiply V = Q @ W

# METHOD
# 6th step of algorithm and last step

def computeV(U, S, W, Q):
    table_W = ndarray_to_table([W])
    V = matmul_rowblocked_nonblocked(Q, W)
    return U, S, V

In [116]:
T, Q = processH(H0, data)
T.describe()
mat_T = concatBlocked(T)
# print(mat_T.ndarray.collect()[0])
# print(mat_T.ndarray.collect()[0].shape)
u, s, w = processT(T)
V = computeV(u, s, w, Q)

(1000, 52) (1000, 52)


2020-07-24 15:13:36 Hail: INFO: Coerced sorted dataset


----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'row_group_number': int32 
    'ndarray': ndarray<float64, 2> 
----------------------------------------
Key: ['row_group_number']
----------------------------------------


2020-07-24 15:13:37 Hail: INFO: Coerced sorted dataset
2020-07-24 15:14:12 Hail: INFO: Coerced sorted dataset


In [51]:
# debugging

52*250 == l * 

True

In [None]:
data.describe()
G.describe()
#ht.transmute(ent = ht.ent.map(lambda x: x.n_alt))
AG = data.annotate_globals(G = G)
AG.describe()
AG = AG.annotate(prod = (AG.ndarray @ AG.G))


AG.describe()
AG.show()
matmul_product = AG.aggregate(hl.agg.array_agg(lambda element: hl.agg.sum(element), AG.prod))

#(hl.agg.array_sum([data.ndarray @ G]))

## Practice Scraps - linalg operations on small data, experimenting with Hail

In [None]:
# annotate_{rows, cols, entries}
# can annotate without aggregating but can also do an aggregation that is called an annotation?

# mt.annotate_rows(sum_of_ef1_by_row=hl.agg.sum(mt.ef1))
# Aggregate along each row of entries to create a new row annotation. Can
# reference column and entry fields in aggregations.

# mt.annotate_cols(sum_of_ef1_by_col=hl.agg.sum(mt.ef1))
# Aggregate along each column of entries to create a new col annotation.
# Can reference row and entry fields in aggregations

# need map-like aggregator to create "new" MatrixTable that is a product

In [69]:
# Practice doing a matrix multiplication and a transpose

# Make some python ndarrays
a = np.arange(30).reshape((5, 6))
b = np.arange(24).reshape((6, 4))

# Make Hail ndarrays
matrix_5_6 = hl.nd.array(a)
matrix_6_4 = hl.nd.array(b)

# Make MatrixTables from pandas dataframes
dfA = pd.DataFrame(data=a[1:,1:], index=a[1:,0], columns=a[0,1:])
dfB = pd.DataFrame(data=b[1:,1:], index=b[1:,0], columns=b[0,1:])


In [73]:
# dfA
# tableA = hl.Table.from_pandas(dfA)
# tableA.show()
# mtA = tableA.to_matrix_table(row_key=['1'], col_key=['2'])
# mtA.describe()
# new_htA = matrix_table_to_table_of_ndarrays(mtA.1, 2, PATH???)
# tableA.group_by(row_group_number=tableA.idx // 2)

print(np.concatenate([a, a, a], axis=0))

[[ 0  1  2  3  4  5]
 [ 6  7  8  9 10 11]
 [12 13 14 15 16 17]
 [18 19 20 21 22 23]
 [24 25 26 27 28 29]
 [ 0  1  2  3  4  5]
 [ 6  7  8  9 10 11]
 [12 13 14 15 16 17]
 [18 19 20 21 22 23]
 [24 25 26 27 28 29]
 [ 0  1  2  3  4  5]
 [ 6  7  8  9 10 11]
 [12 13 14 15 16 17]
 [18 19 20 21 22 23]
 [24 25 26 27 28 29]]
