# `etl_reduction` with *dask*
The input is a txt file with long format.

## Extract
*dask* extracts the input in **parallel**

## Reduction
* select sequenced peptides
* select certain patients (train, test) according to input
* select peptides passing a certain threshold

## Load
* output the data as parquet files (in a separate folder)
* or output the data as d.csv

In [2]:
import numpy as np
import pandas as pd
import os

import pyarrow as pa
import dask.dataframe as dd
import shutil

from set_path import *


In [5]:
class REDUCE:
    def __init__(self,raw,pep,pat,output1,output2,
                 freq_thres=0.5,fold_partition=5,bsize="100MB"):
        self.raw=raw
        self.pep=pep
        self.pat=pat
        self.output1=output1
        self.output2=output2
        self.freq_thres=freq_thres
        self.fold_partition=fold_partition
        self.bsize=bsize
        
    def read_raw(self):
        self.dfraw = dd.read_csv(self.raw,
                 encoding='latin1',
                  blocksize=self.bsize,
                usecols=['Muster', 'Name', 'ProbenID', 'Amplitude'])
    def select_pep(self):
        """select peptides (sequenced) from raw"""
        if self.pep:
            dfseq=pd.read_excel(self.pep,index_col=0)

            # select sequenced peptide from ml1 
            pepseq=dfseq[dfseq['Sequence'].notnull()].index.tolist()
            pepseq=np.sort([int('999'+i[1:]) for i in pepseq])
            
            # filter only seq. peptides from raw
            self.dfraw=self.dfraw[self.dfraw.Muster.isin(pepseq)]
            
    def select_pat(self):
        """select patients (training or test) from raw"""
        if self.pat:
            # if we need to select patients
            dftrain=pd.read_csv(self.pat,index_col=0)
            # define number of patients selected
            self.npatients=len(dftrain)
#             print (dftrain.columns.tolist())
#             print (dftrain.head())
            pattrain=np.sort(dftrain.index.tolist())
            # select patietns
            self.dfraw=self.dfraw[self.dfraw.ProbenID.isin(pattrain)]
            print ('self.npatients', self.npatients)
            print ('self.dfraw.shape',len(self.dfraw))
        
    def pep_freq(self):
        """select peptides which present in  >freq_thes of patients
        freq_thres default=0.5"""
        freq_thres=self.freq_thres*self.npatients
        
        # calculate frequency of each peptide
        # it is important to have groupby dataframe as pandas df by compute()
        a=self.dfraw.groupby('Muster')['ProbenID'].size().compute()
        
        # bind the frequency to raw data
        mask=self.dfraw['Muster'].map(a)
        
        # select frequency larger than freq_thres by masking
        self.dfraw=self.dfraw[mask>=freq_thres]
        
    def repartition(self):
        """repartition dask dataframe into smaller number of chunks
        number of chunks is reduced by 1/n, default n=5
        so that we still have around 100MB of data in each chunk
        """
        self.dfraw = self.dfraw.repartition(npartitions=self.dfraw.npartitions // self.fold_partition)
    
    def export_parquet(self):
        
        # create pyarrow schema for data type
        padict=pa.schema([("Muster",pa.int64()),\
                                    ("Name",pa.int64()),\
                                    ("ProbenID",pa.string()),\
                                   ('Amplitude',pa.float64()),\
                                   #('fidAuswertung',pa.int64()),\
                                   # ('Dateiname',pa.string())\
                         ])
        
        # if destination exists, empty it before creating new
        if os.path.exists(self.output1):
                shutil.rmtree(self.output1)
        os.makedirs(self.output1)
        # export to parquet files
        self.dfraw.to_parquet(self.output1,\
                            engine='pyarrow',\
                            schema=padict,\
                           append=False)
    def export_csv(self):
        self.dfraw.compute().to_csv(self.output2,index=None)
        
            


In [6]:
# extract
x1=REDUCE(raw=f1+'qryMusterExportMFinderMF.txt',\
          pep=f1+'ML1 02122020 semi-final.xlsx',\
          pat=f2+'Tianlin_GFR_CKD_EPI_cleaned_20201005.csv',\
          output1=f2+'reduced_all/',\
          output2=None,\
         bsize="100MB")
x1.read_raw()
# reduce
x1.select_pep()
x1.select_pat()
x1.pep_freq()
# load
x1.repartition()
x1.export_parquet()

self.npatients 7939
self.dfraw.shape 5693027
