In [1]:
import pyarrow as pa
import pyarrow.feather as feather
import pyarrow.parquet as pq
import pyarrow.compute as pc
import numpy as np
from pyarrow import csv
from pathlib import Path
from rich.progress import track

In [2]:
!ls -l /data/aiomics/massspec_cache/uniprot/cho/*.parquet
!ls -l /data/aiomics/massspec_cache/uniprot/cho/*.arrow

-rw-rw-r--. 1 djs10 645div   257977993 Apr 20 11:51 /data/aiomics/massspec_cache/uniprot/cho/cho_uniprot_tryptic_2.parquet
-rw-rw-r--. 1 djs10 645div   283786246 Apr 20 12:19 /data/aiomics/massspec_cache/uniprot/cho/cho_uniprot_tryptic_2_tmt.parquet
-rw-r--r--. 1 djs10 645div 12269375854 May 25 14:13 /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_0.parquet
-rw-r--r--. 1 djs10 645div 12573970866 May 25 14:17 /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_1.parquet
-rw-r--r--. 1 djs10 645div 12844369481 May 25 14:21 /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_2.parquet
-rw-r--r--. 1 djs10 645div 12310726608 May 25 14:26 /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_3.parquet
-rw-r--r--. 1 djs10 645div 11927726552 May 25 14:30 /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_4.parquet
-rw-r--r--. 1 djs10 645div  9185134284 May 25 14:33 /data/aiomics/massspec_

In [None]:
for input in track(Path('/data/search_asms2023/cho').glob('*.arrow')):
    output = input.with_suffix('.parquet')
    table = feather.read_table(input)
    pq.write_table(table, output)

In [None]:
table = pq.read_table("/data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_0.parquet")
table

In [2]:
query_table = pq.read_table("/home/djs10/asms2023/test.parquet")
query_table

pyarrow.Table
id: uint64
charge: int8
ev: double
instrument: string
instrument_type: string
instrument_model: string
ion_mode: string
ionization: string
name: string
synonyms: string
scan: string
nce: double
collision_energy: double
retention_time: double
collision_gas: string
insource_voltage: int64
sample_inlet: string
intensity: large_list<item: double>
  child 0, item: double
stddev: large_list<item: double>
  child 0, item: double
product_massinfo: struct<tolerance: double, tolerance_type: dictionary<values=string, indices=int32, ordered=0>, mass_type: dictionary<values=string, indices=int32, ordered=0>, neutral_loss: string, neutral_loss_charge: int64, evenly_spaced: bool>
  child 0, tolerance: double
  child 1, tolerance_type: dictionary<values=string, indices=int32, ordered=0>
  child 2, mass_type: dictionary<values=string, indices=int32, ordered=0>
  child 3, neutral_loss: string
  child 4, neutral_loss_charge: int64
  child 5, evenly_spaced: bool
mz: large_list<item: double>


In [4]:
len(query_table)

143044

In [3]:
mods = query_table["mod_names"]
np_mods = mods.combine_chunks().to_numpy(zero_copy_only=False)
mask = pa.array(~np.array([ np.any(x==737) for x in np_mods ]))
no_tmt_table = query_table.filter(mask)
len(no_tmt_table)

116548

In [12]:
cho_names = csv.read_csv('/home/djs10/asms2023/cho_names_uniq.txt',read_options=csv.ReadOptions(column_names=['name']), parse_options=csv.ParseOptions(delimiter='\t'))
len(cho_names)

110374

In [18]:
cho_set = set(cho_names['name'].to_pylist())
cho_mask = [x in cho_set for x in no_tmt_table['name'].to_pylist()]
print(cho_mask.count(True),cho_mask.count(False))
cho_table = no_tmt_table.filter(cho_mask)
len(cho_table)

8819 107729


8819

In [19]:

pq.write_table(cho_table,"/home/djs10/asms2023/test_filtered.parquet")

In [3]:
!dir d:\nist\asms2023\library\*.arrow

 Volume in drive D is Data
 Volume Serial Number is 3E64-D3CD

 Directory of d:\nist\asms2023\library

04/25/2023  08:27 PM    29,249,339,394 predicted_cho_uniprot_tryptic_2_0.arrow
04/25/2023  09:10 PM    29,769,728,626 predicted_cho_uniprot_tryptic_2_1.arrow
04/25/2023  08:26 PM    30,145,136,186 predicted_cho_uniprot_tryptic_2_2.arrow
04/25/2023  08:24 PM    29,257,910,130 predicted_cho_uniprot_tryptic_2_3.arrow
04/25/2023  08:00 PM    29,026,097,490 predicted_cho_uniprot_tryptic_2_4.arrow
04/25/2023  12:27 PM    21,739,170,266 predicted_cho_uniprot_tryptic_2_5.arrow
               6 File(s) 169,187,382,092 bytes
               0 Dir(s)  1,764,818,427,904 bytes free


In [2]:
# Read in all the arrow files, sort them (within the batch), then concatenate them into a single table
tables = []
for part in range(6):
   #dir_name = "D:/nist/asms2023/library/"
   dir_name = "/data/aiomics/massspec_cache/uniprot/cho"
   fname = f"{dir_name}/predicted_cho_uniprot_tryptic_2_{part}.arrow"
   print(f"reading {fname}...", end="")
   with pa.memory_map(fname, 'rb') as source:
      iTable = pa.ipc.open_file(source).read_all()
   jTable = iTable.drop_columns(['starts','stops'])
   tables.append(jTable.sort_by('precursor_mz'))
   print("done.")
big_table = pa.concat_tables(tables)

reading /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_0.arrow...done.
reading /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_1.arrow...done.
reading /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_2.arrow...done.
reading /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_3.arrow...done.
reading /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_4.arrow...done.
reading /data/aiomics/massspec_cache/uniprot/cho/predicted_cho_uniprot_tryptic_2_5.arrow...done.


In [3]:
# Write the semi-sorted batches as a single feather file. Does it compress by default?
#dir_name = "D:/nist/asms2023/library/no_ss"
dir_name = "/data/aiomics/massspec_cache/uniprot/cho/no_ss"
fname = f"{dir_name}/predicted_cho_uniprot_tryptic_2_semi_sorted.feather"
with open(fname, 'wb') as f:
    feather.write_feather(big_table, f)

In [2]:
# Load in the feather file from a memory map. Not the same as the arrow file, maybe it's compressed?
#dir_name = "D:/nist/asms2023/library/no_ss"
dir_name = "/data/aiomics/massspec_cache/uniprot/cho/no_ss"
fname = f"{dir_name}/predicted_cho_uniprot_tryptic_2_semi_sorted.feather"
with pa.memory_map(fname, 'rb') as source:
    big_table = pa.ipc.open_file(source).read_all()
len(big_table)

17113904

In [7]:
big_table.get_total_buffer_size()/(1024*1024*1024)
big_table.shape

(17113904, 38)

In [3]:
big_table.to_batches()[0].num_rows

65536

In [4]:
len(big_table.to_batches())

263

In [5]:
# Get the index that will sort the whole array 
sortidx = pc.array_sort_indices(big_table['precursor_mz'])
sortidx

<pyarrow.lib.ChunkedArray object at 0x7f4aa3612980>
[
  [
    15000000,
    15000001,
    15000002,
    15000003,
    15000004,
    ...
    5999999,
    17113900,
    17113901,
    17113902,
    17113903
  ]
]

In [21]:
list(range(0,len(sortidx),5000))[-1]

17110000

In [6]:
len(sortidx.slice(17110000,5000))

3904

In [7]:
# Extract the batches in sorted order and write them to an arrow file
batch_size=65536
#dir_name = "D:/nist/asms2023/library/no_ss"
dir_name = "/data/aiomics/massspec_cache/uniprot/cho/no_ss"
fname = f"{dir_name}/predicted_cho_uniprot_tryptic_2.arrow"
with pa.OSFile(fname, 'wb') as sink:
   with pa.ipc.new_file(sink, big_table.schema) as writer:
    for start in range(0,len(sortidx),batch_size):
        subset = sortidx.slice(start,batch_size)
        batch = big_table.take(subset)
        writer.write(batch)
        print(f"Wrote batch {start}:{start+len(batch)}")

Wrote batch 0:65536
Wrote batch 65536:131072
Wrote batch 131072:196608
Wrote batch 196608:262144
Wrote batch 262144:327680
Wrote batch 327680:393216
Wrote batch 393216:458752
Wrote batch 458752:524288
Wrote batch 524288:589824
Wrote batch 589824:655360
Wrote batch 655360:720896
Wrote batch 720896:786432
Wrote batch 786432:851968
Wrote batch 851968:917504
Wrote batch 917504:983040
Wrote batch 983040:1048576
Wrote batch 1048576:1114112
Wrote batch 1114112:1179648
Wrote batch 1179648:1245184
Wrote batch 1245184:1310720
Wrote batch 1310720:1376256
Wrote batch 1376256:1441792
Wrote batch 1441792:1507328
Wrote batch 1507328:1572864
Wrote batch 1572864:1638400
Wrote batch 1638400:1703936
Wrote batch 1703936:1769472
Wrote batch 1769472:1835008
Wrote batch 1835008:1900544
Wrote batch 1900544:1966080
Wrote batch 1966080:2031616
Wrote batch 2031616:2097152
Wrote batch 2097152:2162688
Wrote batch 2162688:2228224
Wrote batch 2228224:2293760
Wrote batch 2293760:2359296
Wrote batch 2359296:2424832
Wr

In [4]:
# Even though this fits into memory, it crashes trying to sort the whole thing at once
sorted = big_table.take(sortidx)
sorted

: 

: 

In [8]:
# Read in the sorted arrow file, and write out a parquet file
dir_name = "/data/aiomics/massspec_cache/uniprot/cho/no_ss"
fname = f"{dir_name}/predicted_cho_uniprot_tryptic_2.arrow"
with pa.memory_map(fname, 'rb') as source:
    table = pa.ipc.open_file(source).read_all()
outname = f"{dir_name}/predicted_cho_uniprot_tryptic_2.parquet"
pq.write_table(table, outname, row_group_size=10000)