In [1]:
import pandas as pd
import numpy as np
import csv
import re
import string
from itertools import product

In [2]:
plate_1 = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\plate1.csv"
plate_1_repeat = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\plate1_repeat.csv"
plate_2_1 = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\plate2_1.csv"
plate_2_2 = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\plate2_2.csv"
plate_2_3 = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\plate2_3.csv"
plate_2_repeat = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\plate2_repeat.csv"
plate_2_repeat_96 = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\plate2_repeat _96.csv"
list_A = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\listA.csv"
list_A_repeat = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\listA_repeat.csv"
list_B = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\listB.csv"
list_B_repeat_end = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\listB_repeat _end.csv"
list_B_repeat_96 = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\listB_repeat _96.csv"
list_C = "C:\\Users\\Bartek\\OneDrive\\Documents\\Programming\\Python\\Test data\\listC.csv"

In [3]:
class Error(Exception):
    pass

class DataReadInError(Error):
    pass
    
    
class FA:
    
    plate_dimensions = {'96':(8, 12), '384':(16, 24)}
    
    """
    :param csv_file: A csv file containing raw data.
    :param type: str
    :param size: Number of wells on the plate.
    :param type: int
    :param data_type: Format in which the raw data was exported (plate1, plate2, listA, listB or listC)
    :param type: str """
    
    def __init__(self, csv_file, info, data_type, wells):
        self.csv_file = csv_file
        self.info = info
        self.data_type = data_type
        self.wells = wells
        if data_type == 'plate':
            self.data_dict = self.process_plate(self.csv_file, self.info, self.wells)
        if data_type == 'list':
            self.data_dict = self.process_list(self.info)
    
    @classmethod
    def read_in_plate(cls, csv_file, data_type, wells):

        """ Iterates over the raw data file to find the line numbers at which the metadata table and raw data table begin for 
        each channel and reapeat. Calculates the length of those tables. Finds the G-factor. Determines the format of data (plate 1 or 2).

        :param csv_file: Raw data file in csv format.
        :param type: str
        :return: A tuple containing a list of tuples (one tuple for each channel) and a string representing data format. """

        with open(csv_file) as file:
            all_data_lines = list(csv.reader(file, delimiter=','))   # read the csv file and cast it into a list containing all lines

        blank_indices = list(index for index, item in enumerate(all_data_lines) if item == [])   # list containing indices of all blank rows
        blanks = np.array(blank_indices)
        values = []   # list for storage of tuples

        for index, item in enumerate(all_data_lines):   # iterate over each line of the csv file
            if item != [] and re.findall(r"Plate information", item[0]) == ['Plate information'] and re.search(r'Results for', all_data_lines[index + 9][0]) == None and re.findall(r"Formula", all_data_lines[index+1][10]) != ['Formula']:
                skiprows = index + 9   # Set the skiprows parameter for raw data table
                skiprows_meta = index + 1   # Set the skiprows parameter for metadata table
                end_of_data = blanks[blanks > skiprows].min()   # calculate the end of data table by finding the smallest blank index after the beginning of data table
                values.append((skiprows, end_of_data-skiprows+1, skiprows_meta))   # add the skiprows, caculated number of data lines and skiprows for metadata parameters to the list as a tuple
                data_format = 'plate1'

            if item != [] and re.findall(r"Plate information", item[0]) == ['Plate information'] and re.search(r'Results for', all_data_lines[index + 9][0]) != None:
                skiprows = index + 10
                skiprows_meta = index + 1
                end_of_data = blanks[blanks > skiprows].min()
                values.append((skiprows, end_of_data-skiprows, skiprows_meta))
                data_format = 'plate2'

            if item != [] and len(item) > 1 and re.findall(r"G-factor", item[0]) == ["G-factor"]:
                g_factor = float(item[4])   

        return cls(csv_file, (values, data_format, g_factor), data_type, wells)


    def process_plate(self, csv_file, values, wells):    

        """ Iterates over the raw data file and creates data frames for the data and metadata for each channel, converts them into
        a 384 or 96 by 1 format and adds them into a dictionary.

        :param csv file: Raw data file in csv format.
        :param type: str
        :param values: A list containg tuples with read in parameters for each channel and the data format parameter.
        :param type: tuple
        :param wells: Number of wells on the plate.
        :param type: str
        :return: A dictionary containg a dictionary for each repeat containg the metadata df and a dictionary with s and p channel dfs. """ 
        
        plate_dimensions = {'96':(8, 12), '384':(16, 24)}
        
        row_letters = list(string.ascii_uppercase)[0:plate_dimensions[wells][0]]   # generate letters for the data table
        col_numbers = list(np.arange(1, plate_dimensions[wells][1]+1).astype(str))   # generate numbers for the data table
        well_ids = ['%s%s' % (item[0], item[1]) for item in product(row_letters, col_numbers)]   # generate well IDs for the data table

        data_frames = {}   # dictionary to store the data frames
        counter = 1   # counter to enable alternating labelling of data frames as p or s

        for index,item in enumerate(values[0]):   # iterate over each tuple in the list

            if values[1] == 'plate1':   # raw data table does not have row and column names so 'names' parameter passed to omit the last column
                raw_data = pd.read_csv(csv_file, sep=',', names=col_numbers, index_col=False, engine='python', skiprows=item[0], nrows=item[1], encoding='utf-8')

            if values[1] == 'plate2':   # raw data table has row an column names, so 'index_col' must be 0 
                raw_data = pd.read_csv(csv_file, sep=',', index_col=0, engine='python', skiprows=item[0], nrows=item[1], encoding='utf-8')
                raw_data.drop(raw_data.columns[-1], axis=1, inplace=True)   # delete the last column because it is empty

            # generate df for metadata (number of rows is always 1) and conver measurement time into datetime object   
            metadata = pd.read_csv(csv_file, sep=',', engine='python', skiprows=item[2], nrows=1, encoding='utf-8').astype({'Measurement date': 'datetime64[ns]'})
            data_to_array = np.reshape(raw_data.to_numpy(), (int(wells), 1))   # convert data frames to numpy arrays and reshape into 1D array

            if counter % 1 == 0: 
                new_data = pd.DataFrame(data=data_to_array, index=well_ids, columns=['p'])   # generate new 384 (or 96) by 1 data frame with p channel data
                data_frames[f'repeat_{int(counter)}'] = {'metadata':metadata, 'data': {'p': new_data, 's':''}}   # add data and metadata dfs to the dictionary

            if counter % 1 != 0:
                new_data = pd.DataFrame(data=data_to_array, index=well_ids, columns=['s'])   # generate new 384 (or 96) by 1 data frame with s channel data
                data_frames[f'repeat_{int(counter-0.5)}']['data']['s'] = new_data

            counter = counter + 0.5

        return data_frames

    @classmethod
    def read_in_list(cls, csv_file, data_type, wells):

        """ Iterates over the csv file to find  the line numbers at which the metadata table and raw data table
        begin. Creates two pandas data frames: for the raw data and metadata.

        :param csv_file: Raw data file in csv format.
        :param type: str
        :return: A tuple with raw data and metadata dfs."""

        with open(csv_file) as file:  
            all_data_lines = list(csv.reader(file, delimiter=',')) # read the csv file and cast it into a list

        # set the skiprows to be greater than the total number of lines in the files to enable the evaluation of if statement until the 'skiprows' parameter is found
        skiprows = len(all_data_lines) + 1 
        # list containing indices of all blank rows
        blank_indices = list(index for index, item in enumerate(all_data_lines) if item == [])   # list containing indices of all blank rows
        blanks = np.array(blank_indices)   

        # iterate over all lines to find the beggining of the data table ('skiprows') and determine the format of data i.e. list A, B, or C
        for index, item in enumerate(all_data_lines):   
            if item != [] and len(item) == 1 and re.findall(r"Plate information", item[0]) == ["Plate information"]:
                skiprows_meta = index + 1
                end_of_metadata = blanks[blanks > skiprows_meta].min()   # find the end of metadata by finding the smallest blank index after the beginning of metadata
                nrows_meta = end_of_metadata - skiprows_meta - 1   # calucalte the length of metadata table

            if item != [] and len(item) >= 2 and re.findall(r"PlateNumber", item[0]) == ['PlateNumber'] and re.findall(r"PlateRepeat", item[1]) == ['PlateRepeat']:   # find line number with the beggining of the data
                skiprows = index - 1
                data_format = 'listA'

            if item != [] and len(item) >= 2 and re.findall(r"Plate", item[0]) == ['Plate'] and re.findall(r"Barcode", item[1]) == ['Barcode']:   # find line number with the beggining of the data
                skiprows = index
                data_format = 'listB'

            if item != [] and len(item) >= 2 and re.findall(r"Plate", item[0]) == ['Plate']  and re.findall(r"Well", item[1]) == ['Well']:
                skiprows = index
                data_format = 'listC'

            if item != [] and re.findall(r"G-factor", item[0]) == ["G-factor"]:   # find the g factor
                g_factor = float(item[4])

            # find the index of the first blank row after the data table i.e. the end of data table and break out of the loop
            if item == [] and index > skiprows:   
                end_of_data = index
                break  

        nrows = end_of_data - skiprows - 1   # calculate the length of data table

        raw_data = pd.read_csv(csv_file, sep=',', engine='python', skiprows=skiprows, nrows=nrows, encoding='utf-8')
        metadata = pd.read_csv(csv_file, sep=',', engine='python', skiprows=skiprows_meta, nrows=nrows_meta, encoding='utf-8')

        return cls(csv_file, (raw_data, metadata, data_format, g_factor), data_type, wells)


    def process_list(self, raw_data):

        """Extracts the data for each channel and repeat from the raw data table and adds to a dictionary.

        :param csv_file: Raw data file in csv format.
        :param type: str
        :param  raw_data: A tuple containing data frames for raw data, metadata and astring representing type of list (A, B, or C).
        :param type: tuple
        :return: A dictionary containg a dictionary for each repeat containg the metadata df and a dictionary with s and p channel dfs."""

        data_frames = {}   # dictionary to store data frames
        repeats = list(raw_data[1]['Repeat'].to_numpy())   # generate a list with repeats based on the metadata table

        # remove the '0' from middle position of well numbers (A01 -> A1), done by reassigning the 'Well' column to a Series containing modified well numbers
        raw_data[0]['Well'] = raw_data[0]['Well'].apply(lambda x: x[0] + x[2] if x[1] == '0' else x)

        for index, repeat in enumerate(repeats):   # iterate over the number of repeats
            if raw_data[2] == 'listA':
                groupped_data = raw_data[0].groupby(raw_data[0].PlateRepeat).get_group(repeat)   # group and extract the data by the plate repeat column, i.e. in each iteration get data only for the current repeat 

                p_groupped = groupped_data.iloc[::3, :]   # extract data only for the p channel, i.e. each third row starting from the first row
                s_groupped = groupped_data.iloc[1::3, :]   # extract data only for the s channel, i.e. each third row starting from the second row

                p_raw = p_groupped[['Well', 'Signal']]   # extract only the two relevant columns
                s_raw = s_groupped[['Well', 'Signal']]   # for each channel

            if raw_data[2] == 'listB' or raw_data[2] == 'listC': 
                # the column naming is different for the first repeat, i.e. just 'Signal', then it's 'Signal.1', 'Signal.2', etc.
                if repeat == 1: 
                    p_raw = self.info[0][['Well', 'Signal']]   
                    s_raw = self.info[0][['Well', f'Signal.{repeat}']]
                else:
                    p_raw = self.info[0][['Well', f'Signal.{repeat + index - 1}']]   # the column to be extracted is calculated in each iteration
                    s_raw = self.info[0][['Well', f'Signal.{repeat + index}']]

            # set row indices as the well numbers and rename the 'Signal' column to 'p' or 's'
            p_raw.set_index('Well', inplace=True)
            p_raw.set_axis(['p'], axis=1, inplace=True)
            s_raw.set_index('Well', inplace=True)
            s_raw.set_axis(['s'], axis=1, inplace=True)

            meta = raw_data[1].iloc[[repeat-1]].astype({'Measurement date': 'datetime64[ns]'})   # extract the row with metadata relevant for each repeat and covert date and time into a datetime object
            data_frames[f'repeat_{repeat}'] = {'metadata': meta, 'data': {'p': p_raw, 's':s_raw}}   # add data frames to the dictionary

        return data_frames

In [4]:
my_data = FA.read_in_list(list_A_repeat, 'list', '384')

In [5]:
my_data.data_dict['repeat_1']['data']['s']

Unnamed: 0_level_0,s
Well,Unnamed: 1_level_1
A1,20469296.0
A2,29296716.0
A3,18210982.0
A4,23159988.0
A5,24960618.0
...,...
K20,26376527.0
K21,11347544.0
K22,8781580.0
K23,14211858.0


In [6]:
my_data2 = FA.read_in_plate(plate_1_repeat, 'plate', '384')

In [7]:
my_data2.data_dict['repeat_1']['metadata']

Unnamed: 0,Plate,Repeat,Barcode,Measured height,Chamber temperature at start,Chamber temperature at end,Humidity at start,Humidity at end,Ambient temperature at start,Ambient temperature at end,Group,Label,ScanX,ScanY,Measinfo,Kinetics,Measurement date,Unnamed: 17
0,1,1,,14.4,18.98,18.8,61.7,61.5,18.98,18.9,1,Copy of Kris FP Fluorescein anisotropy(1),0,0,De=1st Ex=Top Em=Top Wdw=N/A (15),0,2020-11-17 13:33:43,


In [8]:
my_data2.data_dict['repeat_2']['data']['p']

Unnamed: 0,p
A1,18964719.0
A2,27435568.0
A3,16237095.0
A4,21291729.0
A5,22591167.0
...,...
P20,
P21,
P22,
P23,
