In [4]:
import os
import psycopg2
import pandas as pd
import numpy as np

In [25]:
def loadDataAndCreateColumns(file, STUDYNAME):
    '''
    This function loads a csv file and parses the column types to generate parsed dataframe and a a dictionary of PostgreSQL data typ.e.
    
    Parameters
    ---------- 
    file : str, path to the csv file. 
    '''
    print(file)
    df = pd.read_csv(file, sep=None, engine = 'python')
    iterator = pd.read_csv(file, sep=None, engine = 'python', iterator=True) # To check the csv formating
    inferredDelimiter = iterator._engine.data.dialect.delimiter 
    print(f'File delimiter is {inferredDelimiter}\n') # Prints the csv delimiter
    df.rename(columns={list(df)[0]: df.columns[0].replace("\ufeff", "")}, inplace = True) # Removes '\ufeff' from the first column name
    df.columns = df.columns.str.replace("&", "and").str.replace("-", "_") # Replaces some characters due to incompatibility with SQL naming schemes
    
    df['uniqueIdentifier'] = df.id.apply(lambda r: f'{STUDYNAME}_{r}') # Find the column named 'id' and creates a new one as a unique identifier by adding the study name
    
    dtypeDict = dict()
    for c in df.columns:
       
        msk = df[c].notna()
        if any(df.loc[msk, c].astype(str).str.match('[0-9]{1,5}-([0][1-9]|[1][0-2])-([0][1-9]|[1-2][0-9]|[3][0-1])')):
            dtypeDict[c] = 'date'
        else:
            df.loc[msk, c] = pd.to_numeric(df.loc[msk, c].astype(str).str.replace(',', '.'), errors="ignore") # Convert strings to numeric
            if df[c].dtype in (np.int64, np.int32, int, float, bool): # Preparing for parsing check
                if df[c].dtype in (np.int32, np.int64) or df[msk][c].apply(float.is_integer).all(): # If after NaN removal all the floats end in .0, the column dtype is int
                    df.loc[msk, c] = df.loc[msk, c].astype(int)
                    if set(df[c].dropna().unique()) == {0, 1}: # If they have only 0 and 1 as values, the column is bool
                        df[c] = df[c].dropna().astype(bool)
                        dtypeDict[c] = 'bool'
                    else:
                        dtypeDict[c] = 'int'
                else:
                    dtypeDict[c] = 'float' # If not every number ends in .0, then dtype is float
            else:
                dtypeDict[c] = "text" # If none of those conditions are met, the dtype is object (SQL dtype)
        for k, v in dtypeDict.items():
            if v == 'int':
                df[k] = df[k].astype(str).str.replace('\.0$', '') # Remove the .0 from the integers
                df = df.replace('nan', np.nan) # Reconvert the NaN to the proper dtype
             
    return df, dtypeDict



def createTable(columnDict, TABLENAME):
    '''
    Creates an SQL table by reading a dictionary of columns:dtype.

    Parameters
    ----------
    columnDict : dict, dictionary of columns with corresponding data type.
    TABLENAME : str, name of the table.
    ''' 
    columns = "(" + ",\n".join([f"{k} {v}" for k,v in columnDict.items()]) + ")" # Creating the columns for the table
    
    conn = psycopg2.connect("host=localhost dbname=test user=postgres password=test123")
    cur = conn.cursor()
    cur.execute(f"CREATE TABLE {TABLENAME} \n {columns}")
    conn.commit()
    cur.close()
    conn.close()

    
def addData(file, TABLENAME):

    conn = psycopg2.connect("host=localhost dbname=test user=postgres password=test123")
    cur = conn.cursor()
    with open(file, 'r') as f:
        next(f) # Skip the header row.
        cur.copy_from(f, TABLENAME.lower(), sep=';', null ="")
    conn.commit()
    cur.close()
    conn.close()
    

def dropTable(TABLENAME):
    conn = psycopg2.connect("host=localhost dbname=test user=postgres password=test123")
    cur = conn.cursor()
    cur.execute(f'DROP TABLE {TABLENAME.lower()};')
    conn.commit()
    cur.close()
    conn.close()

In [24]:
dataPath = '../csvTest'
file = [os.path.join(f'{dataPath}/{d}/{f}') for d in os.listdir(f'{dataPath}') if os.path.isdir(f'{dataPath}/{d}') for f in os.listdir(f'{dataPath}/{d}') if '.csv' in f]

for f in file:
    STUDYNAME = f.split('/')[2]
    TABLENAME = f.split('/')[3].removesuffix('.csv')

    conn = psycopg2.connect("host=localhost dbname=test user=postgres password=test123")
    cur = conn.cursor()
    cur.execute(f"""SELECT to_regclass('public.{TABLENAME.lower()}');""")
    tableCheck = cur.fetchone()[0]
    cur.close()
    conn.close()
    
    if TABLENAME.lower() == tableCheck:
        dropTable(TABLENAME)
         
        df, dtDict = loadDataAndCreateColumns(f, STUDYNAME) 
        endFile = f'{f[:-4]}_autogenerated.csv' 
        df.to_csv(endFile, index = False, sep = ';')

        createTable(dtDict, TABLENAME)
        addData(endFile, TABLENAME)
        os.remove(endFile)
    else:
        try:
            df, dtDict = loadDataAndCreateColumns(f, STUDYNAME)
        except:
            print(f'The formatting for table {f} is not right. Ignoring it and continuing.')
            continue
        endFile = f'{f[:-4]}_autogenerated.csv' 
        df.to_csv(endFile, index = False, sep = ';')

        createTable(dtDict, TABLENAME)
        addData(endFile, TABLENAME)
        
        os.remove(endFile)

File delimiter is ,



  df[k] = df[k].astype(str).str.replace('\.0$', '') # Remove the .0 from the integers


File delimiter is ,

File delimiter is ,

File delimiter is ;

The formatting for table ../csvTest/testStudy/gripTest.csv is not right. Ignoring it and continuing.
File delimiter is ;

File delimiter is ;

File delimiter is ;

