In [1]:
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile('train.parquet')
print(parquet_file.schema.names)


['id', 'buildingblock1_smiles', 'buildingblock2_smiles', 'buildingblock3_smiles', 'molecule_smiles', 'protein_name', 'binds']


In [None]:
import dask.dataframe as dd

# Read train and test datasets
train_df = dd.read_parquet('train.parquet')
test_df = dd.read_parquet('test.parquet')

# Total number of rows in train dataset
total_rows = train_df.map_partitions(len).compute().sum()
print(f"Total number of rows: {total_rows}")

# Number of positive bindings
num_positive_bindings = train_df['binds'].sum().compute()
print(f"Number of positive bindings: {num_positive_bindings}")

# Number of negative bindings
num_negative_bindings = total_rows - num_positive_bindings
print(f"Number of negative bindings: {num_negative_bindings}")

# Percentage calculations
percent_positive = (num_positive_bindings / total_rows) * 100
percent_negative = (num_negative_bindings / total_rows) * 100
print(f"Percentage of positive bindings: {percent_positive:.2f}%")
print(f"Percentage of negative bindings: {percent_negative:.2f}%")

# Total unique proteins in train dataset
unique_proteins_train = train_df['protein_name'].dropna().unique().compute()
total_unique_proteins_train = len(unique_proteins_train)
print(f"Total unique proteins in train dataset: {total_unique_proteins_train}")

# Total unique proteins in test dataset
unique_proteins_test = test_df['protein_name'].dropna().unique().compute()
total_unique_proteins_test = len(unique_proteins_test)
print(f"Total unique proteins in test dataset: {total_unique_proteins_test}")

# Total unique proteins in both datasets
unique_proteins_all = dd.concat([
    train_df['protein_name'],
    test_df['protein_name']
]).dropna().unique().compute()
total_unique_proteins_all = len(unique_proteins_all)
print(f"Total unique proteins in both datasets: {total_unique_proteins_all}")

# Concatenate building block columns from both datasets
train_building_blocks = dd.concat([
    train_df['buildingblock1_smiles'],
    train_df['buildingblock2_smiles'],
    train_df['buildingblock3_smiles']
])

test_building_blocks = dd.concat([
    test_df['buildingblock1_smiles'],
    test_df['buildingblock2_smiles'],
    test_df['buildingblock3_smiles']
])

all_building_blocks = dd.concat([train_building_blocks, test_building_blocks])

# Compute unique building blocks
unique_building_blocks = all_building_blocks.dropna().unique().compute()
total_unique_building_blocks = len(unique_building_blocks)
print(f"Total unique building blocks (train and test): {total_unique_building_blocks}")

# Compute unique small molecules from train and test
train_small_molecules = train_df['molecule_smiles'].dropna()
test_small_molecules = test_df['molecule_smiles'].dropna()
all_small_molecules = dd.concat([train_small_molecules, test_small_molecules])

unique_small_molecules = all_small_molecules.unique().compute()
total_unique_small_molecules = len(unique_small_molecules)
print(f"Total unique small molecules (train and test): {total_unique_small_molecules}")

In [None]:
import dask.dataframe as dd

# Read train and test datasets
train_df = dd.read_parquet('train.parquet')
test_df = dd.read_parquet('test.parquet')

# Total number of rows in train dataset
total_rows = train_df.map_partitions(len).compute().sum()
print(f"Total number of training rows: {total_rows}")

# Total number of rows in test dataset
test_total_rows = test_df.map_partitions(len).compute().sum()
print(f"Total number of test rows: {test_total_rows}")

# Number of positive bindings
num_positive_bindings = train_df['binds'].sum().compute()
print(f"Number of positive bindings: {num_positive_bindings}")

# Number of negative bindings
num_negative_bindings = total_rows - num_positive_bindings
print(f"Number of negative bindings: {num_negative_bindings}")

# Percentage calculations
percent_positive = (num_positive_bindings / total_rows) * 100
percent_negative = (num_negative_bindings / total_rows) * 100
print(f"Percentage of positive bindings: {percent_positive:.2f}%")
print(f"Percentage of negative bindings: {percent_negative:.2f}%")

# Total unique proteins in train dataset
unique_proteins_train = train_df['protein_name'].dropna().unique().compute()
total_unique_proteins_train = len(unique_proteins_train)
print(f"Total unique proteins in train dataset: {total_unique_proteins_train}")

# Total unique proteins in test dataset
unique_proteins_test = test_df['protein_name'].dropna().unique().compute()
total_unique_proteins_test = len(unique_proteins_test)
print(f"Total unique proteins in test dataset: {total_unique_proteins_test}")

# Total unique proteins in both datasets
unique_proteins_all = dd.concat([
    train_df['protein_name'],
    test_df['protein_name']
]).dropna().unique().compute()
total_unique_proteins_all = len(unique_proteins_all)
print(f"Total unique proteins in both datasets: {total_unique_proteins_all}")
print(f"Unique proteins in both datasets: {unique_proteins_all.values}")

# Concatenate building block columns from both datasets
train_building_blocks = dd.concat([
    train_df['buildingblock1_smiles'],
    train_df['buildingblock2_smiles'],
    train_df['buildingblock3_smiles']
])

test_building_blocks = dd.concat([
    test_df['buildingblock1_smiles'],
    test_df['buildingblock2_smiles'],
    test_df['buildingblock3_smiles']
])

all_building_blocks = dd.concat([train_building_blocks, test_building_blocks])

# Compute unique building blocks
unique_building_blocks = all_building_blocks.dropna().unique().compute()
total_unique_building_blocks = len(unique_building_blocks)
print(f"Total unique building blocks (train and test): {total_unique_building_blocks}")

# Compute unique small molecules from train and test
train_small_molecules = train_df['molecule_smiles'].dropna()
test_small_molecules = test_df['molecule_smiles'].dropna()
all_small_molecules = dd.concat([train_small_molecules, test_small_molecules])

unique_small_molecules = all_small_molecules.unique().compute()
total_unique_small_molecules = len(unique_small_molecules)
print(f"Total unique small molecules (train and test): {total_unique_small_molecules}")

In [2]:
import os
import pandas as pd
from tqdm import tqdm
from joblib import Parallel, delayed
import pyarrow.parquet as pq
import pyarrow as pa

# Constants
BATCH_SIZE = 1_000_000  # Process in chunks to handle large data efficiently
proteins = ['HSA', 'sEH', 'BRD4']

def process_chunk(df_chunk, output_dir, chunk_id):
    """Process a chunk of the dataframe and save binds/non-binds to separate temporary files for each protein."""
    for protein in proteins:
        # Filter based on protein
        df_protein = df_chunk[df_chunk['protein_name'] == protein]

        # Separate into binds and non-binds
        df_binds = df_protein[df_protein['binds'] == 1]
        df_non_binds = df_protein[df_protein['binds'] == 0]

        # Write binds and non-binds to separate temporary files with unique names
        if not df_binds.empty:
            binds_file = f"{output_dir}/{protein}_binds_chunk_{chunk_id}.parquet"
            df_binds.to_parquet(binds_file, engine='pyarrow', compression='snappy')

        if not df_non_binds.empty:
            non_binds_file = f"{output_dir}/{protein}_non_binds_chunk_{chunk_id}.parquet"
            df_non_binds.to_parquet(non_binds_file, engine='pyarrow', compression='snappy')

def merge_and_sample_non_binds(output_dir, protein):
    """Merge temporary Parquet files for binds/non-binds and sample two non-binds for each bind."""
    binds_files = [os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.startswith(f"{protein}_binds_chunk_")]
    non_binds_files = [os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.startswith(f"{protein}_non_binds_chunk_")]

    if not binds_files or not non_binds_files:
        return None

    # Read and merge binds
    df_binds = pd.concat([pd.read_parquet(f) for f in binds_files])

    # Read and merge non-binds
    df_non_binds = pd.concat([pd.read_parquet(f) for f in non_binds_files])

    # Shuffle the non-binds to get a random selection
    df_non_binds = df_non_binds.sample(frac=1).reset_index(drop=True)

    result_rows = []
    non_binds_index = 0
    for _, bind_row in df_binds.iterrows():
        if non_binds_index + 1 >= len(df_non_binds):
            break  # Stop if we run out of non-binds

        non_bind_1 = df_non_binds.iloc[non_binds_index]
        non_bind_2 = df_non_binds.iloc[non_binds_index + 1]

        # Append a row to the result
        result_rows.append({
            'id': bind_row['id'],
            'smiles_binds': bind_row['molecule_smiles'],
            'smiles_non_binds_1': non_bind_1['molecule_smiles'],
            'smiles_non_binds_2': non_bind_2['molecule_smiles'],
            'protein_name': bind_row['protein_name']
        })

        # Increment the non-binds index by 2 for each bind
        if non_binds_index + 2 >= len(df_non_binds):
            break 
        non_binds_index += 2

    # Create a DataFrame with the triplets
    result_df = pd.DataFrame(result_rows)

    # Write to a single file for this protein
    output_file = f"{output_dir}/{protein}_cleaned.parquet"
    result_df.to_parquet(output_file, engine='pyarrow', compression='snappy')

    # Clean up the intermediate files
    for f in binds_files + non_binds_files:
        os.remove(f)

def process_large_parquet(input_file, output_dir, batch_size=BATCH_SIZE):
    """Process the large dataset in parallel chunks and save to temporary files."""
    # Open parquet file
    parquet_file = pq.ParquetFile(input_file)
    
    # Process each row group (chunk) in parallel
    tasks = []
    with tqdm(total=parquet_file.num_row_groups, dynamic_ncols=True) as pbar:  # Adjust tqdm for parallel processing
        for i in range(parquet_file.num_row_groups):
            # Read a chunk of the parquet file
            df_chunk = parquet_file.read_row_group(i).to_pandas()

            # Append the task to the list
            tasks.append(delayed(process_chunk)(df_chunk, output_dir, i))
            
            # Update the progress bar
            pbar.update(1)

    # Execute the tasks in parallel
    Parallel(n_jobs=-1)(tasks)

    # Merge and sample non-binds for each protein
    for protein in proteins:
        merge_and_sample_non_binds(output_dir, protein)

    # Merge the final cleaned data for all proteins into one file
    cleaned_files = [f"{output_dir}/{protein}_cleaned.parquet" for protein in proteins]
    df_final = pd.concat([pd.read_parquet(f) for f in cleaned_files])
    df_final.to_parquet(f"{output_dir}/cleaned_train.parquet", engine='pyarrow', compression='snappy')

    # Clean up intermediate files
    for f in cleaned_files:
        os.remove(f)

# Define the input file and output directory
input_parquet = "train.parquet"
output_directory = "."

# Run the parallel processing and merging
process_large_parquet(input_parquet, output_directory)


100%|█████████████████████████████████████████| 282/282 [00:33<00:00,  8.30it/s]


KeyboardInterrupt: 