Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protein bert uniref90 dataset #1

Open
rom1504 opened this issue Jul 12, 2021 · 4 comments
Open

protein bert uniref90 dataset #1

rom1504 opened this issue Jul 12, 2021 · 4 comments

Comments

@rom1504
Copy link

rom1504 commented Jul 12, 2021

(discussed in discord)

after running the first step (create_uniref_db) of https://github.com/nadavbra/protein_bert I got a 24GB file "uniref_proteins_and_annotations.db" .
It seems it could be useful for generate sequences for this project, sharing the links there

CREATE TABLE "protein_annotations" (
    "index"    INTEGER,
    "tax_id"    REAL,
    "uniprot_name"    TEXT,
    "go_annotations"    TEXT,
    "flat_go_annotations"    TEXT,
    "n_go_annotations"    INTEGER,
    "complete_go_annotation_indices"    TEXT,
    "n_complete_go_annotations"    INTEGER
);

Sample look like this:

index tax_id uniprot_name go_annotations flat_go_annotations n_go_annotations complete_go_annotation_indices n_complete_go_annotations
0 0 1.57204e+06 A0A5A9P0L4_9TELE {"GO Molecular Function": ["GO:0003755", "GO:0005524", "GO:0004672", "GO:0005509"], "GO Biological Process": [], "GO Cellular Component": []} ["GO:0003755", "GO:0004672", "GO:0005509", "GO:0005524"] 4 [2761, 3561, 4193, 4205] 4
1 1 648755 UPI0016133188 {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0
2 2 1.93059e+06 A0A410P257_9BACT {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0
3 3 519421 UPI0019403D63 {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0
4 4 72004 A0A6B0RPA5_9CETA {"GO Molecular Function": ["GO:0005524", "GO:0004672"], "GO Biological Process": [], "GO Cellular Component": []} ["GO:0004672", "GO:0005524"] 2 [3561, 4205] 2
5 5 375764 A0A672ZWI7_9TELE {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0
6 6 1.41558e+06 A0A6P7YNV3_9AMPH {"GO Molecular Function": ["GO:0005524", "GO:0004672"], "GO Biological Process": [], "GO Cellular Component": ["GO:0005886"]} ["GO:0004672", "GO:0005524", "GO:0005886"] 3 [3561, 4205, 4526] 3
7 7 240159 A0A4U5TZD8_COLLU {"GO Molecular Function": ["GO:0005524", "GO:0004672"], "GO Biological Process": [], "GO Cellular Component": ["GO:0016021", "GO:0005886"]} ["GO:0004672", "GO:0005524", "GO:0005886", "GO:0016021"] 4 [3561, 4205, 4526, 10019] 4
8 8 146911 UPI00074FFD9C {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0
9 9 260995 A0A6P8RG40_GEOSA {"GO Molecular Function": ["GO:0005524", "GO:0004672"], "GO Biological Process": [], "GO Cellular Component": ["GO:0005886"]} ["GO:0004672", "GO:0005524", "GO:0005886"] 3 [3561, 4205, 4526] 3
@rom1504
Copy link
Author

rom1504 commented Jul 16, 2021

I joined it with the fasta file (on the uniprot_name field which is the RepId in fasta).
I used this code:

import dask.dataframe as dd
from dask.distributed import Client
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
if __name__ == '__main__':
    client = Client()
    def read_db_save_to_parquet():
            daskDF = dd.read_sql_table('protein_annotations', "sqlite:///uniref_proteins_and_annotations.db", index_col='index')
            daskDF.to_parquet('the-parquet', engine='pyarrow')

    def read_fasta_save_to_parquet():
        fasta_df = dd.read_csv("uniref90.fasta",lineterminator=">", sep="$", header=None)
        def process_fasta_row(row):
            lines = row[0].split("\n")
            id = lines[0].split(" ")[-1].split("=")[-1]
            seq = "".join(lines[1:])
            return (id, seq)
        new_df = fasta_df.apply(process_fasta_row, axis=1,result_type='expand', meta={0: str, 1: str})
        new_df.columns = ['uniprot_id', 'seq']
        new_df.to_parquet('fasta_parquet', engine='pyarrow')

    def spark_join():
        # executor and driver not sure if needed
        spark = SparkSession.builder \
            .config("spark.executor.cores", "16") \
            .config("spark.executor.memory", "16G") \
            .config("spark.driver.cores", "16") \
            .config("spark.driver.memory", "16G") \
            .master("local[16]").appName('spark-merge').getOrCreate() 
        a = spark.read.parquet("the-parquet")
        b = spark.read.parquet("fasta_parquet")
        c = a.join(b, col("uniprot_name") == col("uniprot_id"))
        c.write.mode("overwrite").parquet("uniref90_with_annotations")

(best to run first the 2 functions in dask then in a second run the spark code)

For reference this would be the code to merge in dask (but is too slow):

    def merge_without_indexing():
        db_df = dd.read_parquet('the-parquet', engine='pyarrow') 
        fasta_index_df = dd.read_parquet('fasta_parquet', engine='pyarrow')
        merged = db_df.merge(fasta_index_df, left_on=("uniprot_name"), right_on=("uniprot_id"))
        merged.to_parquet('merged', engine='pyarrow')

(I also tried to index and save before hand, but that's also slow (hours))

I wanted to do it all in dask but turned out dask is slow at joining big collections compared to spark. Run time of this code:

  • 10min to do sql -> parquet in dask
  • 10 min to do fasta -> parquet in dask
  • 20min to do the join from both parquet in pyspark

The resulting collection has 135301051 records and looks like this:

Dask DataFrame Structure:
                 tax_id uniprot_name go_annotations flat_go_annotations n_go_annotations complete_go_annotation_indices n_complete_go_annotations index__1  index uniprot_id     seq
npartitions=200                                                                                                                                                                      
                float64       object         object              object            int64                         object                     int64    int64  int64     object  object
tax_id uniprot_name go_annotations flat_go_annotations n_go_annotations complete_go_annotation_indices n_complete_go_annotations index__1 index uniprot_id seq
0 654924 029L_FRG3G {"GO Molecular Function": ["GO:0016740"], "GO Biological Process": [], "GO Cellular Component": []} ["GO:0016740"] 1 [10611] 1 118799105 118799105 029L_FRG3G MRRMRSGFKHCAIPIDICRWEYILSPLILQDLQGPQQGGSVAVDVTVRCSVRFVHLPHYGGFNHGTVQRRVDPDDCRILRQLHIVLSLRLCLIDRDRL
1 3277 2SS_MATST {"GO Molecular Function": ["GO:0045735"], "GO Biological Process": [], "GO Cellular Component": ["GO:0005773", "GO:0033095"]} ["GO:0005773", "GO:0033095", "GO:0045735"] 3 [4428, 16459, 23615] 3 128842786 128842786 2SS_MATST DQASMQRASRLLHQCDLRPRDCARRSSERGQGERWRQQLRACDEDSEPRQQCCQNLQRISSQDRCRA
2 3847 2SS_SOYBN {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0 98266368 98266368 2SS_SOYBN MTKFTILLISLLFCIAHTCSASKWQHQQDSCRKQLQGVNLTPCEKHIMEKIQGRGDDDDDDDDDNHILRTMRGRINYIRRNEGKDEDEEEEGHMQKCCTEMSELRSPKCQCKALQKIMENQSEELEEKQKKKMEKELINLATMCRFGPMIQCDLSSDD
3 176652 374R_IIV6 {"GO Molecular Function": [], "GO Biological Process": [], "GO Cellular Component": []} [] 0 [] 0 95973209 95973209 374R_IIV6 MDIEFGNEYRTYGVGLGGYIEGMERGGVANRLNQMLMNPEEKFFSTLNLAFEKINDYRPLDANTRDLMADFATKMPHLSFKNATTFVLGCLASIKHKNNVLNKNEIKKIFSLLHHFKDTENISPSDVIRYAKFAMINNFYIEDLDINEEDDEEYGDEEYGDEEYE
4 9606 3BHS1_HUMAN {"GO Molecular Function": ["GO:0003854"], "GO Biological Process": ["GO:0006694"], "GO Cellular Component": ["GO:0110165"]} ["GO:0003854", "GO:0006694", "GO:0110165"] 3 [2848, 5268, 38813] 3 29137577 29137577 3BHS1_HUMAN MTGWSCLVTGAGGFLGQRIIRLLVKEKELKEIRVLDKAFGPELREEFSKLQNKTKLTVLEGDILDEPFLKRACQDVSVIIHTACIIDVFGVTHRESIMNVNVKGTQLLLEACVQASVPVFIYTSSIEVAGPNSYKEIIQNGHEEEPLENTWPAPYPHSKKLAEKAVLAANGWNLKNGGTLYTCALRPMYIYGEGSRFLSASINEALNNNGILSSVGKFSTVNPVYVGNVAWAHILALRALQDPKKAPSIRGQFYYISDDTPHQSYDNLNYTLSKEFGLRLDSRWSFPLSLMYWIGFLLEIVSFLLRPIYTYRPPFNRHIVTLSNSVFTFSYKKAQRDLAYKPLYSWEEAKQKTVEWVGSLVDRHKETLKSKTQ

example of code to read it in dask:

import dask.dataframe as dd
from dask.distributed import Client
client = Client()
merged = dd.read_parquet('uniref90_with_annotations', engine='pyarrow') 
merged.head()

@rom1504
Copy link
Author

rom1504 commented Jul 16, 2021

That's a simple way to read parquet as a torch dataset :

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd

from torch.utils.data import IterableDataset
from torch.utils.data import get_worker_info
from torch.multiprocessing import Queue

class IterableManualParquetDataset(IterableDataset):
    def __init__(self, path, process_func, batch_size=64):
        super().__init__()
        self.dataset = ds.dataset(path)
        self.batch_size = batch_size
        self.process_func = process_func

    def __iter__(self):
        worker_info = get_worker_info()

        # Only divide up batches when using multiple worker processes
        if worker_info != None:
            batches = list(self.dataset.to_batches(batch_size=self.batch_size))
            worker_load = len(batches) // worker_info.num_workers

            # If more workers than batches exist, some won't be used
            if worker_load == 0:
                if worker_info.id < len(batches): self.batches = [batches[worker_info.id]]
                else: return
            else:
                start = worker_load * worker_info.id
                end = min(start + worker_load, len(batches))
                self.batches = batches[start:end]
        else: self.batches = self.dataset.to_batches(batch_size=self.batch_size)

        # Process and yield each batch
        for batch in self.batches:
            batch = batch.to_pydict()
            batch.update(self.process_func(batch))

            yield batch
            
a = IterableManualParquetDataset("uniref90_parquet/uniref90_with_annotations", lambda x:x, batch_size=64)
u = next(iter(a))

(adapted from https://github.com/KamWithK/PyParquetLoaders/blob/master/PyTorchLoader.py )
using pyarrow https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html

the same could likely be done with tf data if that's better for jax.
An alternative is using https://github.com/uber/petastorm but it's a bit more involved, it requires first converting to their format, it's useful to handle much bigger datasets (at least > 500GB I would say) that would be stored on the cloud in a distributed file system.

@rom1504
Copy link
Author

rom1504 commented Jul 16, 2021

The dataset can be downloaded (it's 50GB) by running
!wget --recursive --no-parent -nd -P uniref90_with_annotations http://3080.rom1504.fr/uniref90_with_annotations/

example on colab https://colab.research.google.com/drive/1Zcns30b1H3IcxMJ-A-wQDF6pUcyNL5ei?usp=sharing

@lucidrains
Copy link
Owner

works perfectly!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants