In [1]:
import queue 
import logging

from __init__ import *
import snmcseq_utils
from snmcseq_utils import create_logger
from CEMBA_update_mysql import connect_sql

In [2]:
log = create_logger()

In [3]:
# get a clustering result
ens = 'Ens7'
ens_path = os.path.join(PATH_ENSEMBLES, ens)
 
database = 'CEMBA'
engine = connect_sql(database)
sql = '''SELECT cell_name, dataset, cluster_mCHmCG_lv_npc50_k30 
        FROM cells 
        RIGHT JOIN {} 
        ON cells.cell_id = {}.cell_id'''.format(ens, ens)
df_cluster = pd.read_sql(sql, engine, index_col='cell_name')
df_cluster.columns = ['dataset', 'cluster'] 
print(df_cluster.shape)
df_cluster.head()

(4029, 2)


Unnamed: 0_level_0,dataset,cluster
cell_name,Unnamed: 1_level_1,Unnamed: 2_level_1
171213_CEMBA_mm_P56_P63_3C_MOp_CEMBA171206_3C_4_CEMBA171206_3C_5_A11_AD002_indexed,CEMBA_3C_171206,8
171213_CEMBA_mm_P56_P63_3C_MOp_CEMBA171206_3C_1_CEMBA171206_3C_3_G4_AD008_indexed,CEMBA_3C_171206,1
171213_CEMBA_mm_P56_P63_3C_MOp_CEMBA171206_3C_1_CEMBA171206_3C_3_C3_AD002_indexed,CEMBA_3C_171206,19
171213_CEMBA_mm_P56_P63_3C_MOp_CEMBA171206_3C_1_CEMBA171206_3C_3_H3_AD002_indexed,CEMBA_3C_171206,6
171213_CEMBA_mm_P56_P63_3C_MOp_CEMBA171206_3C_4_CEMBA171206_3C_5_B7_AD002_indexed,CEMBA_3C_171206,3


In [4]:
def encode_allc_chrom(chrom):
    """give every chromosome an integer name to facilitate sorting (as CEMBA order)
    """
    trans_dict={'L': -4,
                'M': -3, 
                'X': -2, 
                'Y': -1, 
                }
    try:
        chrom = int(chrom)
    except:
        chrom = trans_dict[chrom]
    return chrom

def decode_allc_chrom(chrom_code):
    """give every chromosome code back to chrom (as CEMBA order)
    """
    if chrom_code == -4:
        chrom = 'L' 
    elif chrom_code == -3:
        chrom = 'M' 
    elif chrom_code == -2:
        chrom = 'X' 
    elif chrom_code == -1:
        chrom = 'Y' 
    else:
        chrom = int(chrom_code)
    return chrom

In [5]:
# def merge_allc(allc_paths, context='CG', chunksize=100000):
#     """Merge allc tables given the allc_files
#     Allc files are assumed to have CEMBA format (no header, all chromosomes in one file, and bgzipped)
#     """
#     iter_allcs = [snmcseq_utils.read_allc_CEMBA(allc_path, chunksize=chunksize)
#                for allc_path in allc_paths]
    
#     merged_cks = []
#     i = 0
#     ti = time.time()
#     while True:
#         print(".", end='')
        
#         i += 1
#         if i%5 == 0:
#             print(i)
#             print(time.time() - ti)
#             ti = time.time()
        
#         dfs_ck = []
#         empty = True
#         # iterate over all iterators once
#         for iter_allc in iter_allcs:
#             try:
#                 df_ck = next(iter_allc)
#                 df_ck = df_ck.loc[df_ck.context.isin(snmcseq_utils.get_expanded_context(context)), ['mc', 'c']]
#                 dfs_ck.append(df_ck)
#                 empty = False
#             except:
#                 pass

#         if empty:
#             break # end the while-loop 
#         else:
#             # merge and append merged chunk
#             merged_ck = pd.concat(dfs_ck).groupby(['chr', 'pos']).sum()
#             merged_cks.append(merged_ck)
#     return merged_cks 

def merge_allc_v2(allc_paths, context='CG', chunksize=100000):
    """Merge allc tables given the allc_files
    Allc files are assumed to have CEMBA format (no header, all chromosomes in one file, and bgzipped)
    """
    iter_allcs = [snmcseq_utils.read_allc_CEMBA(allc_path, chunksize=chunksize, pindex=False)
               for allc_path in allc_paths]
    
    merged_cks = []
    i = 0
    ti = time.time()
    while True:
        print(".", end='')
        
        i += 1
        if i%5 == 0:
            print(i)
            print(time.time() - ti)
            ti = time.time()
        
        dfs_ck = []
        empty = True
        # read phase 
        # iterate over all iterators once
        for iter_allc in iter_allcs:
            try:
                df_ck = next(iter_allc)
                df_ck = df_ck.loc[df_ck.context.isin(snmcseq_utils.get_expanded_context(context)), 
                                  ['chr', 'pos', 'mc', 'c']]
                dfs_ck.append(df_ck)
                empty = False
            except:
                pass

        if empty: # end the while-loop
            break  
        else: # concat and merge phase
            # concat
            df_ck = pd.concat(dfs_ck)
            df_ck['chr_code'] = df_ck['chr'].apply(encode_allc_chrom)
            df_ck = df_ck.set_index(['chr_code', 'pos'])[['mc', 'c']]
            
            # merge 
            merged_ck = df_ck.groupby(['chr_code', 'pos']).sum()
            
            # enqueue
            merged_cks.append(merged_ck)
    return merged_cks 

def merge_allc_v3(allc_paths, context='CG', chunksize=100000):
    """Merge allc tables given the allc_files
    Allc files are assumed to have CEMBA format (no header, all chromosomes in one file, and bgzipped)
    """
    iter_allcs = [snmcseq_utils.read_allc_CEMBA(allc_path, chunksize=chunksize, pindex=False)
               for allc_path in allc_paths]
    
    merged_cks = []
    i = 0
    ti = time.time()
    
    
    while True: # iterate over all iterators once 
                # end while loop if empty for all of them 
        print(".", end='')
        
        i += 1
        if i%5 == 0:
            print(i)
            print(time.time() - ti)
            ti = time.time()
        
        
        
        # establish queue by iterating over all file once
        q = queue.Queue()
        # read phase  (50 at a time)
        j = 0
        tj = time.time()
        for iter_allc_ck in snmcseq_utils.chunks(iter_allcs, 50):
            
            print("-", end='')

            j += 1
            if j%5 == 0:
                print(j)
                print(time.time() - tj)
                tj = time.time()
            
            
            
            # load 50  
            dfs_ck = []
            for iter_allc in iter_allc_ck:
                try:
                    df_ck = next(iter_allc)
                    df_ck = df_ck.loc[df_ck.context.isin(snmcseq_utils.get_expanded_context(context)), 
                                      ['chr', 'pos', 'mc', 'c']]
                    dfs_ck.append(df_ck)
#                     empty = False
                    empty_in = False
                except:
                    pass
                
            if not dfs_ck:
                pass
            else: # concat and merge phase
                df_ck = pd.concat(dfs_ck)
                df_ck['chr_code'] = df_ck['chr'].apply(encode_allc_chrom)
                df_ck = df_ck.set_index(['chr_code', 'pos'])[['mc', 'c']]

                # merge 
                merged_ck_tmp = df_ck.groupby(['chr_code', 'pos']).sum()

                # enqueue
                q.put(merged_ck_tmp)
            
            
        # dequeue
        if q.empty(): # end the while-loop
            break  
        else: # concat and merge phase (second merge)
            merged_ck = queue_merge(q, 20)
            
            # enqueue
            merged_cks.append(merged_ck)
            
    return merged_cks 


def queue_merge(q, n_chunk):
    """Queue merge
    
    Arguments: q (a queue object with dataframes)
    """
    i = 0
    # get n_chunk out if not empty
    dfs = [q.get() for i in range(n_chunk) if not q.empty()]
    ti = time.time()
    while not q.empty():
        i += 1
        print('.', end='')
        if i%10==0:
            print(i, time.time()-ti)
            ti = time.time()

        # merge them and put back in queue
        df = pd.concat(dfs).groupby(['chr_code', 'pos']).sum()
        q.put(df)

        # get n_chunk out if not empty
        dfs = [q.get() for i in range(n_chunk) if not q.empty()]

    # merge them 
    df_final = pd.concat(dfs).groupby(['chr_code', 'pos']).sum()
    
    return df_final

In [6]:
def queue_merge_v2(dfs, n_chunk=20, n_stop=1):
    """
    """
    
    tii = time.time()
    ti = time.time()
    
    tmp_lst_last = dfs
    # one round
    rnd = 0
    while len(tmp_lst_last) > n_stop:
        rnd += 1
        logging.info("Queue merging (round {})...".format(rnd))
        tmp_lst = []
        for i, dfs_ck in enumerate(snmcseq_utils.chunks(tmp_lst_last, n_chunk)): 
            print('.', end='')
            if i%10==0:
                print(i)
                print(time.time() - ti)
                ti = time.time()
            tmp = pd.concat(dfs_ck).groupby(['chr_code', 'pos']).sum()
            tmp_lst.append(tmp)
            
        tmp_lst_last = tmp_lst

    print(time.time()-tii)
    
    return tmp_lst_last 

In [7]:
# group allcs for each cluster_id
for cluster_id, df_sub in df_cluster.groupby('cluster'):
    
    if cluster_id == 1:
        allc_paths = [os.path.join(PATH_DATASETS, '{}/allc/allc_{}.tsv.bgz').format(dataset, cell) 
                      for (dataset, cell) in zip(df_sub.dataset, df_sub.index)]
        
        n_files = len(allc_paths)
        logging.info(len(allc_paths))
        
        ti = time.time()
        chunksize = 100000
        merged_cks = merge_allc_v3(allc_paths, context='CG', chunksize=chunksize)
        tf = time.time()

print(tf-ti)

02/20/2018 06:02:46 PM 445


.-----5
12.997415542602539
----.-----5
14.274775266647339
----.-----5
14.670079946517944
----.-----5
14.147103309631348
----.5
124.58612942695618
-----5
14.771745681762695
----.-----5
12.060624122619629
----.-----5
13.992237091064453
----.-----5
13.870195150375366
----.-----5
13.898064374923706
----.10
155.1526336669922
-----5
14.324325323104858
----.-----5
13.865434408187866
----.-----5
13.748296022415161
----.-----5
13.688993453979492
----.-----5
13.487546920776367
----.15
156.7642138004303
-----5
13.585622310638428
----.-----5
13.444178581237793
----.-----5
13.55431604385376
----.-----5
13.388979196548462
----.-----5
13.470604658126831
----.20
153.9309093952179
-----5
13.13578724861145
----.-----5
13.614554405212402
----.-----5
13.62428903579712
----.-----5
13.610388994216919
----.-----5
11.996550559997559
----.25
152.98394656181335
-----5
13.11138653755188
----.-----5
13.175598621368408
----.-----5
14.013601064682007
----.-----5
11.543370008468628
----.-----5
13.631381750106812
---

----.-----5
12.377557039260864
----.235
146.2861683368683
-----5
12.604761123657227
----.-----5
12.381913423538208
----.-----5
12.725029706954956
----.-----5
12.447699546813965
----.-----5
12.673974752426147
----.240
144.22125816345215
-----5
10.577678442001343
----.-----5
12.57680368423462
----.-----5
12.548443794250488
----.-----5
10.969503402709961
----.-----5
12.7565598487854
----.245
142.19480991363525
-----5
12.415969610214233
----.-----5
12.379746198654175
----.-----5
10.717241048812866
----.-----5
12.399270296096802
----.-----5
12.258780241012573
----.250
140.53208756446838
-----5
12.358850717544556
----.-----5
12.495001792907715
----.-----5
12.23828673362732
----.-----5
12.2861487865448
----.-----5
12.329595804214478
----.255
144.50965309143066
-----5
12.268903732299805
----.-----5
12.250922918319702
----.-----5
10.700969457626343
----.-----5
10.44103479385376
----.-----5
12.335099697113037
----.260
132.27491283416748
-----5
11.36492371559143
----.-----5
12.262377500534058
---

----.-----5
3.9638497829437256
----.470
60.24924302101135
-----5
4.3468711376190186
----.-----5
3.526923656463623
----.-----5
3.8731791973114014
----.-----5
3.6518256664276123
----.-----5
4.079806804656982
----.475
54.468329191207886
-----5
3.730247974395752
----.-----5
3.2956817150115967
----.-----5
3.2633512020111084
----.-----5
3.9460489749908447
----.-----5
3.2126357555389404
----.480
51.49525332450867
-----5
3.925952911376953
----.-----5
3.453303337097168
----.-----5
3.411113977432251
----.-----5
3.0676989555358887
----.-----5
3.015063762664795
----.485
48.32122230529785
-----5
3.6233692169189453
----.-----5
3.6171865463256836
----.-----5
3.5427563190460205
----.-----5
3.2080607414245605
----.-----5
3.510894298553467
----.490
50.78571152687073
-----5
2.7289116382598877
----.-----5
2.6951956748962402
----.-----5
2.932361602783203
----.-----5
2.6874780654907227
----.-----5
3.2954297065734863
----.495
42.92735028266907
-----5
3.3346617221832275
----.-----5
3.3471248149871826
----.---

-----5
0.12801909446716309
----.-----5
0.12374401092529297
----.-----5
0.12295222282409668
----.-----5
0.122711181640625
----.-----5
0.12073135375976562
----.705
2.585526943206787
-----5
0.12362408638000488
----.-----5
0.12074565887451172
----.-----5
0.1202230453491211
----.-----5
0.12370800971984863
----.-----5
0.12061095237731934
----.710
2.2949464321136475
-----5
0.12084770202636719
----.-----5
0.12300610542297363
----.-----5
0.12261247634887695
----.-----5
0.12108826637268066
----.-----5
0.12285232543945312
----.715
2.2438628673553467
-----5
0.12674379348754883
----.-----5
0.11963772773742676
----.-----5
0.12440133094787598
----.-----5
0.12849187850952148
----.-----5
0.12521719932556152
----.720
2.2469451427459717
-----5
0.1261579990386963
----.-----5
0.12390613555908203
----.-----5
0.12454104423522949
----.-----5
0.12471818923950195
----.-----5
0.12601399421691895
----.725
2.248523712158203
-----5
0.12745952606201172
----.-----5
0.12401485443115234
----.-----5
0.12313532829284668


----.-----5
0.001577615737915039
----.-----5
0.0015697479248046875
----.-----5
0.0016031265258789062
----.-----5
0.0015566349029541016
----.925
0.9584159851074219
-----5
0.0015952587127685547
----.-----5
0.0015666484832763672
----.-----5
0.0015842914581298828
----.-----5
0.001604318618774414
----.-----5
0.0015866756439208984
----.930
0.9452927112579346
-----5
0.0015940666198730469
----.-----5
0.0015761852264404297
----.-----5
0.0015821456909179688
----.-----5
0.0015676021575927734
----.-----5
0.0015749931335449219
----.935
0.9547080993652344
-----5
0.0015935897827148438
----.-----5
0.001596212387084961
----.-----5
0.0015842914581298828
----.-----5
0.001584768295288086
----.-----5
0.0015788078308105469
----.940
0.948394775390625
-----5
0.0015726089477539062
----.-----5
0.0015845298767089844
----.-----5
0.001560211181640625
----.-----5
0.0015501976013183594
----.-----5
0.0016362667083740234
----.945
1.079643726348877
-----5
0.0015561580657958984
----.-----5
0.0015459060668945312
----.---

----.-----5
0.0015783309936523438
----.1135
0.37189364433288574
-----5
0.0015645027160644531
----.-----5
0.0016181468963623047
----.-----5
0.0016531944274902344
----.-----5
0.0019228458404541016
----.-----5
0.0019102096557617188
----.1140
0.3650178909301758
-----5
0.0019252300262451172
----.-----5
0.0018954277038574219
----.-----5
0.0019078254699707031
----.-----5
0.0019245147705078125
----.-----5
0.001920938491821289
----.1145
0.401200532913208
-----5
0.0018947124481201172
----.-----5
0.0019154548645019531
----.-----5
0.0021162033081054688
----.-----5
0.0019042491912841797
----.-----5
0.0019252300262451172
----.1150
0.381838321685791
-----5
0.0019040107727050781
----.-----5
0.0019211769104003906
----.-----5
0.0019276142120361328
----.-----5
0.0018970966339111328
----.-----5
0.0019202232360839844
----.1155
0.37987494468688965
-----5
0.0018830299377441406
----.-----5
0.0019156932830810547
----.-----5
0.0018892288208007812
----.-----5
0.0018939971923828125
----.-----5
0.00192117691040039

In [8]:
q = queue.Queue()
for mck in merged_cks:
    q.put(mck)
    
print(q.qsize())

n_chunk = 20 

tii = time.time()
df_final = queue_merge(q, n_chunk)
# df_final = queue_merge_v2(merged_cks, n_chunk)

print(time.time()-tii)

1325
..........10 145.99336695671082
..........20 204.6451759338379
..........30 94.36031723022461
..........40 10.09069275856018
..........50 1.2101869583129883
..........60 0.338071346282959
.........637.6791107654572


In [67]:
# check df_final
unique_chr = []
for ind in df_final.index.get_level_values(0):
    if ind not in unique_chr:
        unique_chr.append(ind)
    else:
        pass
print(unique_chr) 


[-4, -3, -2, -1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


Unnamed: 0_level_0,Unnamed: 1_level_0,mc,c
chr_code,pos,Unnamed: 2_level_1,Unnamed: 3_level_1
-1,10487025,1,1
-1,10487026,2,2
-1,10487802,0,1
-1,10487949,1,1
-1,10488096,1,1


In [12]:
n_chunk = 20
tii = time.time()
ti = time.time()
merged_cks_tmp = []
for i, mcks in enumerate(snmcseq_utils.chunks(merged_cks, n_chunk)): 
    print('.', end='')
    if i%10==0:
        print(i)
        print(time.time() - ti)
        ti = time.time()
    tmp = pd.concat(mcks).groupby(['chr_code', 'pos']).sum()
    merged_cks_tmp.append(tmp)
    
print(time.time()-tii)

.0
0.0002837181091308594
..........10
277.4595060348511
..........20
362.69828057289124
..........30
145.60496425628662
..........40
7.962611436843872
..........50
1.256485939025879
..........60
0.3831515312194824
......795.6369676589966


In [18]:
n_chunk = 20
tii = time.time()
ti = time.time()
merged_cks_tmp2 = []
for i, mcks in enumerate(snmcseq_utils.chunks(merged_cks_tmp, n_chunk)): 
    print('.', end='')
    if i%10==0:
        print(i)
        print(time.time() - ti)
        ti = time.time()
    tmp = pd.concat(mcks).groupby(['chr_code', 'pos']).sum()
    merged_cks_tmp2.append(tmp)
    
print(time.time()-tii)

.0
0.00023221969604492188
...173.8031063079834


In [24]:
n_chunk = 20
tii = time.time()
ti = time.time()
merged_cks_tmp3 = []
for i, mcks in enumerate(snmcseq_utils.chunks(merged_cks_tmp2, n_chunk)): 
    print('.', end='')
    if i%10==0:
        print(i)
        print(time.time() - ti)
        ti = time.time()
    tmp = pd.concat(mcks).groupby(['chr_code', 'pos']).sum()
    merged_cks_tmp3.append(tmp)
    
print(time.time()-tii)

.0
0.000335693359375
64.8678126335144


In [47]:
m = (21867837*2) # all cg sites 
a = [mck.shape[0] for mck in merged_cks]
print(len(merged_cks))
print(sum(a)) 
print(sum(a)/m) 

b = [mck.shape[0] for mck in merged_cks_tmp]
print(len(merged_cks_tmp))
print(sum(b)) 
print(sum(b)/m)

c = [mck.shape[0] for mck in merged_cks_tmp2]
print(len(merged_cks_tmp2))
print(sum(c)) 
print(sum(c)/m)

d = [mck.shape[0] for mck in merged_cks_tmp3]
print(len(merged_cks_tmp3))
print(sum(d)) 
print(sum(d)/m)

print(df_final.shape[0]/m)

1325
719458627
16.45015524397772
67
344304454
7.8723939180633185
4
62549829
1.4301786911984025
1
40098430
0.9168357620371873
0.9168357620371873


In [10]:
ti = time.time()

# read allcg
path_allcg = os.path.join(PATH_REFERENCES, 'Genome/mm10_all_cg.tsv')
allcg = pd.read_table(path_allcg, dtype={'chr': object})
allcg['chr_code'] = allcg['chr'].apply(encode_allc_chrom)
# read df_merged 
res_final = df_final.reset_index()

# merge on chr and pos
res_final = pd.merge(res, allcg, on=['chr_code', 'pos'], how='left')
# select columns
res_final = res_final.loc[res_final['chr'].isin(snmcseq_utils.get_mouse_chromosomes()+['Y', 'M']), 
             ['chr', 'pos', 'strand', 'context', 'mc', 'c']]
res_final['methylated'] = 1

print(time.time() - ti)

# results
print(res)
print(res_final.shape)
res_final.head()

37.858304500579834


In [95]:
ti = time.time()
# save to file
res_final.to_csv('res_test_merge_allc.tsv', sep='\t', header=False, index=False, na_rep='NA')
print(time.time() - ti)

186.42249250411987


In [None]:
# test junhao's idea

allcg['mc'] = 0
allcg['c'] = 0
allcg = allcg.set_index(['chr', 'pos'])

# group allcs for each cluster_id
for cluster_id, df_sub in df_cluster.groupby('cluster'):
    
    if cluster_id == 1:
        allc_paths = [os.path.join(PATH_DATASETS, '{}/allc/allc_{}.tsv.bgz').format(dataset, cell) 
                      for (dataset, cell) in zip(df_sub.dataset, df_sub.index)]
        
        n_files = len(allc_paths)
        print(len(allc_paths))
        
        for allc_path in allc_paths[:2]:
            ti = time.time()
            df = snmcseq_utils.read_allc_CEMBA(allc_path, pindex=False)
            tj = time.time()
            for j, (idx, row) in enumerate(df.iterrows()): 
                if j%1000 == 0:
                    print(time.time()-tj)
                    tj = time.time()
                try:
                    allcg.loc[(row.chr, row.pos), 'mc'] += row.mc
                    allcg.loc[(row.chr, row.pos), 'c'] += row.c
                except:
                    pass
            print(time.time()-ti)
tf = time.time()
print(tf-ti)

445
3.307274103164673
70.49065470695496
28.78847312927246
19.948750019073486
23.349807262420654
22.84055733680725
16.53987169265747


In [16]:
print(54040047/33804630)
print(75457653/33804630)
print(80554541/33804630)
print(719458627/33804630)
print(33804630/(21867837*2))
# group by chromosome (splitting & combining)


1.5985989788972694
2.2321691732759685
2.382944022756646
21.28284282360138
0.7729303542915561


In [8]:
df = snmcseq_utils.read_allc_CEMBA(allc_paths[0])
print(df.shape)
df.head()

  mask |= (ar1 == a)


(47614230, 5)


Unnamed: 0_level_0,Unnamed: 1_level_0,strand,context,mc,c,methylated
chr,pos,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
L,15,+,CGG,0,1,1
L,23,+,CGC,0,1,1
L,25,+,CTA,0,1,1
L,42,+,CCG,0,1,1
L,43,+,CGG,0,1,1
