In [None]:
import pandas as pd
import requests
import os
from tqdm.notebook import tqdm
import math
from requests.exceptions import Timeout
import concurrent.futures
import threading
from concurrent.futures import ThreadPoolExecutor
import time
from typing import List, Tuple, Dict

In [None]:
# Global lock for file operations
file_lock = threading.Lock()

def download_single_file(mirna: str, download_dir: str, timeout: int = 30) -> Tuple[str, bool, str]:
    """Download a single miRNA file with better error handling"""
    try:
        url = f'https://rnasysu.com/encori/moduleDownload.php?source=agoClipRNA&type=xls&value=hg38;lncRNA;{mirna};1;0;0;1;None;all'
        
        # Add retry mechanism
        max_retries = 3
        for attempt in range(max_retries):
            try:
                response = requests.get(url, timeout=timeout)
                response.raise_for_status()
                break
            except (requests.exceptions.RequestException, Timeout) as e:
                if attempt == max_retries - 1:
                    raise e
                time.sleep(2 ** attempt)  # Exponential backoff
        
        filename = f"{mirna.replace('/', '_')}.xls"
        file_path = os.path.join(download_dir, filename)
        
        with file_lock:
            with open(file_path, 'wb') as f:
                f.write(response.content)
        
        return filename, True, ""
        
    except Exception as e:
        return mirna, False, str(e)

def read_file(file_path: str) -> Tuple[pd.DataFrame, bool]:
    """Optimized file reading with pandas"""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read().strip()
            
        if "No Available results." in content:
            return pd.DataFrame(), True
            
        # Use pandas read_csv with tab delimiter
        df = pd.read_csv(file_path, sep='\t', comment='#', skip_blank_lines=True)
        
        if df.empty:
            return df, False
            
        return df, False
        
    except Exception as e:
        raise Exception(f"Error reading file {file_path}: {str(e)}")

In [None]:
def process_batch_parallel(files_batch: List[str], download_dir: str, max_workers: int = 4) -> Tuple[pd.DataFrame, List[str], List[str]]:
    """Process files in parallel using ThreadPoolExecutor"""
    no_results_files = []
    error_files = []
    processed_data = []
    
    def process_single_file(filename: str) -> Tuple[pd.DataFrame, str, bool]:
        try:
            file_path = os.path.join(download_dir, filename)
            df, is_no_results = read_file(file_path)
            
            # Delete file after processing
            with file_lock:
                os.remove(file_path)
                
            if is_no_results:
                return pd.DataFrame(), filename, True
            
            if not df.empty and 'miRNAname' in df.columns and 'geneName' in df.columns and 'geneType' in df.columns:
                lncrna_mask = df['geneType'] == "lncRNA"
                if lncrna_mask.any():
                    return df[lncrna_mask][['miRNAname', 'geneName']], filename, False
            
            return pd.DataFrame(), filename, False
            
        except Exception as e:
            print(f"Error processing {filename}: {str(e)}")
            return pd.DataFrame(), filename, False
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_single_file, filename) for filename in files_batch]
        
        for future in concurrent.futures.as_completed(futures):
            df, filename, is_no_results = future.result()
            
            if is_no_results:
                no_results_files.append(filename)
            elif not df.empty:
                processed_data.append(df)
            else:
                error_files.append(filename)
    
    # Combine all processed data efficiently
    if processed_data:
        final_df = pd.concat(processed_data, ignore_index=True)
        final_df.columns = ['Column_B', 'Column_D']
        return final_df, no_results_files, error_files
    
    return pd.DataFrame(columns=['Column_B', 'Column_D']), no_results_files, error_files

In [None]:
def download_batch(mirnas: List[str], download_dir: str, max_workers: int = 8) -> Tuple[List[str], List[str], List[str]]:
    """Download a batch of miRNAs in parallel"""
    downloaded_files = []
    not_downloaded = []
    errors = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_mirna = {executor.submit(download_single_file, mirna, download_dir): mirna for mirna in mirnas}
        
        for future in concurrent.futures.as_completed(future_to_mirna):
            filename, success, error = future.result()
            if success:
                downloaded_files.append(filename)
            else:
                not_downloaded.append(filename)
                if error:
                    errors.append(f"{filename}: {error}")
    
    return downloaded_files, not_downloaded, errors

def save_batch_results(batch_num: int, results: Dict) -> None:
    """Save batch results with error handling"""
    os.makedirs('output', exist_ok=True)
    
    try:
        for result_type, data in results.items():
            if not data:
                continue
                
            filename = f'output/{result_type}_batch_{batch_num}.csv'
            if isinstance(data, pd.DataFrame):
                data.to_csv(filename, index=False)
            else:
                pd.DataFrame({'Filename': data}).to_csv(filename, index=False)
                
    except Exception as e:
        print(f"Error saving batch {batch_num} results: {str(e)}")

In [None]:
def download_and_process_mirna(excel_path: str, batch_size: int = 50, download_workers: int = 8, process_workers: int = 4) -> Tuple[int, int, int, int]:
    """Optimized main function with parallel processing"""
    # Read input file efficiently
    df = pd.read_excel(excel_path, usecols=['miRNA'])
    miRNAs = df['miRNA'].dropna().unique()
    
    # Create directories
    for dir_name in ['downloaded_files', 'output']:
        os.makedirs(dir_name, exist_ok=True)
    
    # Process in batches with progress tracking
    num_batches = math.ceil(len(miRNAs) / batch_size)
    all_results = {
        'data_not_found': [],
        'not_downloaded': [],
        'error_files': [],
        'extracted_data': []
    }
    
    with tqdm(total=num_batches, desc='Processing batches') as pbar:
        for batch_num in range(num_batches):
            start_idx = batch_num * batch_size
            end_idx = min((batch_num + 1) * batch_size, len(miRNAs))
            batch_miRNAs = miRNAs[start_idx:end_idx]
            
            # Download batch in parallel
            downloaded_files, not_downloaded, errors = download_batch(
                batch_miRNAs,
                'downloaded_files',
                max_workers=download_workers
            )
            
            # Process downloaded files in parallel
            extracted_data, no_results, process_errors = process_batch_parallel(
                downloaded_files,
                'downloaded_files',
                max_workers=process_workers
            )
            
            # Save batch results
            batch_results = {
                'data_not_found': no_results,
                'not_downloaded': not_downloaded,
                'error_files': errors + process_errors,
                'extracted_data': extracted_data
            }
            save_batch_results(batch_num + 1, batch_results)
            
            # Accumulate results
            for key in all_results:
                if key == 'extracted_data':
                    if not extracted_data.empty:
                        all_results[key].append(extracted_data)
                else:
                    all_results[key].extend(batch_results[key])
            
            pbar.update(1)
    
    # Combine and save final results
    if all_results['extracted_data']:
        final_data = pd.concat(all_results['extracted_data'], ignore_index=True)
        final_data.to_csv('output/extracted_data_final.csv', index=False)
    
    for key in ['data_not_found', 'not_downloaded', 'error_files']:
        if all_results[key]:
            pd.DataFrame({'Filename': all_results[key]}).to_csv(f'output/{key}_final.csv', index=False)
    
    # Clean up download directory
    if os.path.exists('downloaded_files') and not os.listdir('downloaded_files'):
        os.rmdir('downloaded_files')
    
    return (
        len(all_results['data_not_found']),
        len(final_data) if 'final_data' in locals() else 0,
        len(all_results['error_files']),
        len(all_results['not_downloaded'])
    )