In [11]:
# After the xdf file has been uploaded to [sciserver_xdf_path] 
# and the stimulation files have been uploaded to their respective directories in
# [sciserver_stim_dir]/[dataset_name]/[stim_class]/
# This class is responsible for parsing and inserting the uploaded data files into 
# the CasJobs tables: sessions, session_eeg, image_stims, stim_timestamps

from SciServer import CasJobs, Files, Authentication
import pandas                                # data analysis tools
import numpy as np                           # numerical tools
from datetime import datetime, timedelta     # date and timestamp tools
from pprint import pprint
import sys
import os
import datetime

#BELOW is necessary since we are not currently running from project directory
#since we need to import libs from parent dir, need to add parent dir to path
project_path = '/home/idies/workspace/Storage/ncarey/persistent/PULSD/PsychoPy-pylsl-RSVP/'
if project_path not in sys.path:
    sys.path.append(project_path)

from xdf.Python.xdf import load_xdf



In [56]:

class XDFCasJobsInserter:
    
    def ensureFileStructure(self):
        #Ensure the session file exists.
        if not os.path.isfile(self.session_filepath):
            raise FileNotFoundError("{0} file was not found! Check filepath and directory".format(self.session_filepath))
        if not os.path.isdir(self.stim_dir_path):
            raise FileNotFoundError("{0} directory was not found! Check that it exists".format(self.stim_dir_path))
        
        self.stim_classes = os.listdir(self.stim_dir_path)
        for index in range(len(self.stim_classes)):  #TODO This only removes the first instance of a hidden directory
            cur_file = self.stim_classes[index]      # IF there was more than one, this wont find the second
            if cur_file[0] is '.':
                del self.stim_classes[index]
                break
                
        if len(self.stim_classes) < 1:
            raise FileNotFoundError('''In {0} directory, no subdirectory stim classes were found! 
                                    Check that youve uploaded the stim files in the correct structure, which is:
                                    [stim_directory]/[dataset_name]/[stim_classes]/[stim_files]'''.format(self.stim_dir_path))
        for stim_class in self.stim_classes:
            if not os.path.isdir(os.path.join(self.stim_dir_path, stim_class)):
                raise FileNotFoundError('''{0} is not a directory! {1} should only contain folders of stim files. 
                The folders correspond to the stim's class'''.format(os.path.join(self.stim_dir_path, stim_class), 
                                                                     self.stim_dir_path))

    
    def loadXDF(self):
        
        self.xdf = load_xdf(self.session_filepath, verbose=False)

        #you will need to dbl check this. your xdf may not be indexed the same way
        #I think time series is the frame count(or data), time stamps is the actual clock
        for stream in range(len(self.xdf[0])):
            if self.xdf[0][stream]['info']['name'][0] == 'g.USBamp-2': #ive got a feeling this wont always be the name ...
                eeg_time_series = self.xdf[0][stream]['time_series']
                eeg_time_stamps = self.xdf[0][stream]['time_stamps']
            elif self.xdf[0][stream]['info']['name'][0] == 'stim_stream':
                stim_time_series = self.xdf[0][stream]['time_series']
                stim_time_stamps = self.xdf[0][stream]['time_stamps']    
            elif self.xdf[0][stream]['info']['name'][0] == 'target_stream':
                target_time_series = self.xdf[0][stream]['time_series']
                target_time_stamps = self.xdf[0][stream]['time_stamps'] 
        

        dtype = [('session_ID','int32'),('timestamp','int32'), ('F3','float32'), ('Fz','float32'), ('F4','float32'), ('T7','float32'), ('C3','float32'),
         ('Cz','float32'), ('C4','float32'), ('T8','float32'), ('Cp3','float32'), ('Cp4','float32'), ('P3','float32'), ('Pz','float32'), ('P4','float32'),
         ('PO7','float32'), ('PO8','float32'), ('Oz','float32')]

        columns = ['session_ID', 'timestamp', 'F3', 'Fz', 'F4', 'T7', 'C3', 'Cz',
           'C4', 'T8', 'Cp3', 'Cp4', 'P3', 'Pz', 'P4', 'PO7', 'PO8', 'Oz']

        values = []
        for index in range(len(eeg_time_stamps)):
            cur_row = []
            cur_row.append(self.session_ID)
            cur_row.append(eeg_time_stamps[index])
            for eeg_index in range(len(eeg_time_series[index])):
                cur_row.append(eeg_time_series[index][eeg_index])
            values.append(cur_row)

        eeg_df_index = range(len(eeg_time_stamps))

        self.session_eeg_df = pandas.DataFrame(data=values, index=eeg_df_index, columns=columns)
        #CasJobs.uploadPandasDataFrameToTable(dataFrame=df_to_insert, tableName='session_eeg', context=context)


        #Now lets load up the stim_timestamp dataframe
        stim_id_fp_df = CasJobs.executeQuery(sql='SELECT stim_ID, stim_filepath from image_stims', context=self.casjobs_context)

        columns = ['session_ID','timestamp','stim_ID']
        values = []
        stim_time_df_index = range(len(stim_time_stamps))

        #session_ID, timestamp, stim_ID - already have session_ID
        
        for index in range(len(stim_time_stamps)):
            cur_time_stamp = stim_time_stamps[index]
            cur_stim_file_name = stim_time_series[index][0].split('\\')[-1]
            cur_stim_ID = -1
            for index in range(len(stim_id_fp_df)):
                cur_stim_ID = -1
                if cur_stim_file_name == stim_id_fp_df['stim_filepath'][index].split('/')[-1]:
                    cur_stim_ID = stim_id_fp_df['stim_ID'][index]
                    break
            if cur_stim_ID == -1:
                raise FileNotFoundError('{0} did not have an entry in the image_stims table...'.format(cur_stim_file_name))
            cur_row = [self.session_ID, cur_time_stamp, cur_stim_ID]
            values.append(cur_row)
    

        self.session_stim_timestamp_df = pandas.DataFrame(data=values, index=stim_time_df_index, columns=columns)
        

    
    
    #TODO handle duplicates
    def updateSessionsTable(self):
        insert_query ='''INSERT INTO sessions
        (session_ID, session_datetime, session_filepath, session_desc, session_stim_length_ms, subject_name, dataset_name)
        VALUES
        ({0}, '{1}', '{2}', '{3}', {4}, '{5}', '{6}')'''.format(self.session_ID, self.session_datetime, self.session_filepath, self.session_desc, self.session_stim_length_ms, self.subject_name, self.dataset_name)

        CasJobs.executeQuery(sql=insert_query, context=self.casjobs_context)

        
    def updateSessionEEGTable(self):
        CasJobs.uploadPandasDataFrameToTable(dataFrame=self.session_eeg_df, tableName='session_eeg', context=self.casjobs_context)
    
    def updateStimTimestampsTable(self):
        CasJobs.uploadPandasDataFrameToTable(dataFrame=self.session_stim_timestamp_df, tableName='stim_timestamps', context=self.casjobs_context)
        
    #This CasJobs insert is entirely based on the directory structure and no xdf. SO we do it first
    #TODO handle duplicate insert error
    def updateImageStimsTable(self):

        stim_id_df = CasJobs.executeQuery(sql='SELECT MAX(stim_ID) from image_stims', context=self.casjobs_context)

        cur_stim_ID = 0
        if stim_id_df['Column1'][0] is None:
            cur_stim_ID = 0
        else:
            cur_stim_ID = int(stim_id_df['Column1'][0]) + 1


        insert_query_template = '''INSERT INTO image_stims 
        (stim_ID, dataset_name, stim_training_class, stim_desc, stim_filepath)
        VALUES
        ({0}, '{1}', '{2}', '{3}', '{4}')'''

        current_stim_filepaths = CasJobs.executeQuery(sql ='select stim_filepath from image_stims', context = self.casjobs_context)

        for cur_stim_class in self.stim_classes:
            for stim_file in os.listdir(os.path.join(self.stim_dir_path, cur_stim_class)):
                skip_insert = False
                cur_stim_desc = stim_file
                cur_stim_path = os.path.join(self.stim_dir_path, cur_stim_class, stim_file)
                #check if cur_stim_path already in image_stims table. if so, delete then continue? or just skip? currently skip
                for image_path in current_stim_filepaths['stim_filepath']:
                    if image_path == cur_stim_path:
                        skip_insert = True
                if skip_insert is False:
                    insert_query = insert_query_template.format(cur_stim_ID, self.dataset_name, cur_stim_class, cur_stim_desc, cur_stim_path)
                    CasJobs.executeQuery(sql=insert_query, context=self.casjobs_context)
                    cur_stim_ID = cur_stim_ID + 1
                else:
                    print("Image Stim already in table, skipping")
        
        
    
    def __init__(self, session_file, session_desc,
                 casjobs_context='MyDB',
                 xdf_dir='/home/idies/workspace/Storage/ncarey/persistent/PULSD/data_backup/',
                 stim_dir='/home/idies/workspace/Storage/ncarey/persistent/PULSD/stim_backup/',
                 subject_name="Nick Carey",
                 dataset_name="HiddenCube",
                 session_stim_length_ms=233.3,
                 session_datetime=datetime.datetime.now()):
        
        self.casjobs_context = casjobs_context
        self.xdf_dir = xdf_dir
        self.subject_name = subject_name
        self.dataset_name = dataset_name
        self.session_stim_length_ms = session_stim_length_ms
        self.session_file = session_file
        self.session_desc = session_desc
        
        self.session_filepath = os.path.join(self.xdf_dir, self.session_file)
        self.stim_dir_path = os.path.join(stim_dir, self.dataset_name)
        self.stim_classes = []
        
        session_ID_df = CasJobs.executeQuery(sql='SELECT MAX(session_ID) from sessions', 
                                             context=self.casjobs_context)
        self.session_ID = 0
        if session_ID_df['Column1'][0] is None:
            self.session_ID = 0
        else:
            self.session_ID = int(session_ID_df['Column1'][0]) + 1
            
        self.session_datetime = session_datetime

        self.ensureFileStructure()
        self.updateImageStimsTable()
        self.loadXDF()
        



In [61]:
data_dir = '/home/idies/workspace/Storage/ncarey/persistent/PULSD/data_backup/'

for xdf in os.listdir(data_dir):
    if xdf[0] == '.':
        continue

    inserter = XDFCasJobsInserter(session_file=xdf, session_desc=xdf, xdf_dir=data_dir)
    inserter.updateSessionsTable()
    inserter.updateSessionEEGTable()
    inserter.updateStimTimestampsTable()

Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim already in table, skipping
Image Stim a

In [None]:
#FOR REFERENCE

create_session_eeg_table = '''
CREATE TABLE session_eeg (
  session_ID int NOT NULL,
  timestamp float NOT NULL,
  F3 float,
  Fz float,
  F4 float,
  T7 float,
  C3 float,
  Cz float,
  C4 float,
  T8 float,
  Cp3 float,
  Cp4 float,
  P3 float,
  Pz float,
  P4 float,
  PO7 float,
  PO8 float,
  Oz float,
  CONSTRAINT pk_session_eeg_timestamp PRIMARY KEY (session_ID, timestamp),
  CONSTRAINT fk_session_ID FOREIGN KEY (session_ID)
    REFERENCES sessions (session_ID)
    ON DELETE CASCADE
    ON UPDATE CASCADE
);
'''
create_sessions_table = '''
CREATE TABLE sessions (
  session_ID int NOT NULL,
  session_datetime datetime2 NOT NULL,
  session_filepath varchar(255),
  session_desc varchar(255),
  session_stim_length_ms float,
  subject_name varchar(50),
  dataset_name varchar(50),
  CONSTRAINT pk_session_ID PRIMARY KEY (session_ID),
  CONSTRAINT ak_session_filepath UNIQUE (session_filepath)
);
'''

create_stim_timestamps_table = '''
CREATE TABLE stim_timestamps (
  session_ID int NOT NULL,
  timestamp float NOT NULL,
  stim_ID int NOT NULL,
  CONSTRAINT pk_session_stim_timestamp PRIMARY KEY (session_ID, timestamp, stim_ID),
  CONSTRAINT fk_session_ID_stimtimeref FOREIGN KEY (session_ID)
    REFERENCES sessions (session_ID)
    ON DELETE CASCADE
    ON UPDATE CASCADE,
  CONSTRAINT fk_stim_ID FOREIGN KEY (stim_ID)
    REFERENCES image_stims (stim_ID)
    ON DELETE CASCADE
    ON UPDATE CASCADE
);
'''
create_stim_table = '''
CREATE TABLE image_stims (
  stim_ID int NOT NULL,
  dataset_name varchar(50),
  stim_training_class varchar(20),
  stim_desc varchar(255),
  stim_filepath varchar(255),
  CONSTRAINT pk_stim_ID PRIMARY KEY (stim_ID),
  CONSTRAINT ak_stim_filepath UNIQUE (stim_filepath)
);

'''