In [1]:
import os, re,json, imp, unittest
import pandas as pd

print('First Directory at :', os.getcwd()[2:])

#Load Setting File
with open('setting.txt','r') as f:
    setting = json.load(f)
print('Load Setting OK')    

os.chdir(re.sub(os.getcwd().split('\\')[-1]+'$','',os.getcwd()))
print('Change Directory into :', os.getcwd()[2:])

First Directory at : \01_DEV_Function\Shared_Function\tests
Load Setting OK
Change Directory into : \01_DEV_Function\Shared_Function


In [2]:
#local_library = True
local_library = False

if local_library : 
    da_tran_SQL = imp.load_source('da_tran_SQL', 'py_topping/data_connection/database.py').da_tran_SQL
else :
    from py_topping.data_connection.database import da_tran_SQL
    
def help_test(df_in1, df_in2) :
    result = list()
    expect = list()
    for i in df_in1.columns :
        result.append(list(df_in1[i]))
        expect.append(list(df_in2[i]))
    return result, expect

In [10]:
test_mssql = setting['MSSQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(''), expect_df.fillna(''))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(''), expect_df.fillna(''))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-01-04 16:25:09.973293


.

Dump data to  unit_test_git  End  2021-01-04 16:25:10.178930
Connection OK
Start Filter Existing data from df at  2021-01-04 16:25:11.189060
Dump data to  unit_test_git  End  2021-01-04 16:25:11.374782


.

Connection OK
Start Filter Existing data from df at  2021-01-04 16:25:12.329915


.

Dump data to  unit_test_git  End  2021-01-04 16:25:12.548477
Connection OK


.

Connection OK
Start delete old data at 2021-01-04 16:25:14.554478
Delete Last ['col1'] at 2021-01-04 16:25:14.627757


.

Dump data to  unit_test_git  End  2021-01-04 16:25:14.769542
Connection OK
Start delete old data at 2021-01-04 16:25:15.804165
Delete Last col1 at 2021-01-04 16:25:15.925193


.

Dump data to  unit_test_git  End  2021-01-04 16:25:16.085332
Connection OK
Start delete old data at 2021-01-04 16:25:17.122739
Delete Last ['col1', 'col2'] at 2021-01-04 16:25:17.212237


.

Dump data to  unit_test_git  End  2021-01-04 16:25:17.357826
Connection OK
Start delete old data at 2021-01-04 16:25:18.339848
Delete Last ['col1', 'col2', 'col3'] at 2021-01-04 16:25:18.418761


.

Dump data to  unit_test_git  End  2021-01-04 16:25:18.557211
Connection OK
Start delete old data at 2021-01-04 16:25:19.505111
Delete Last ['col1'] at 2021-01-04 16:25:19.587273


.

Dump data to  unit_test_git  End  2021-01-04 16:25:19.725089
Connection OK
Start Filter Existing data from df at  2021-01-04 16:25:20.699502
Dump data to  unit_test_git  End  2021-01-04 16:25:21.246401
Start delete old data at 2021-01-04 16:25:21.247398
Delete Last ['date'] at 2021-01-04 16:25:21.325049


.

Dump data to  unit_test_git  End  2021-01-04 16:25:21.453172
Connection OK
Start delete old data at 2021-01-04 16:25:22.975955
Delete Last ['col1'] at 2021-01-04 16:25:23.051017
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)


.

Dump data to  unit_test_git  End  2021-01-04 16:25:23.182962
Connection OK
Start delete old data at 2021-01-04 16:25:24.768485
Delete Last ['col1', 'col2'] at 2021-01-04 16:25:24.846590
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-01-04 16:25:24.980947
Connection OK


.

Connection OK
Start Filter Existing data from df at  2021-01-04 16:25:27.006117


.

Dump data to  unit_test_git  End  2021-01-04 16:25:27.505316



----------------------------------------------------------------------
Ran 14 tests in 18.521s

OK


<unittest.main.TestProgram at 0x215845fc448>

In [4]:
test_mysql = setting['MYSQL']
    
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mysql['type'],
                                host_name = test_mysql['host'],
                                database_name = test_mysql['database'],
                                user = test_mysql['user'],
                                password = test_mysql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ drop procedure if exists unit_test_git_SP;"""
        self.sql.engine.execute(sql_q)
        sql_q = """ CREATE PROCEDURE unit_test_git_SP
                    (PARAM1 VARCHAR(100))
                    BEGIN
                    SELECT * FROM {} WHERE col1 = PARAM1 ;
                    end;""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)


    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)


unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2020-12-25 19:15:08.389932


.

Dump data to  unit_test_git  End  2020-12-25 19:15:08.618649
Connection OK
Start Filter Existing data from df at  2020-12-25 19:15:09.667258
Dump data to  unit_test_git  End  2020-12-25 19:15:09.861122


.

Connection OK
Start Filter Existing data from df at  2020-12-25 19:15:10.903196


.

Dump data to  unit_test_git  End  2020-12-25 19:15:11.150138
Connection OK


.

Connection OK
Start delete old data at 2020-12-25 19:15:13.149911
Delete Last ['col1'] at 2020-12-25 19:15:13.229989


.

Dump data to  unit_test_git  End  2020-12-25 19:15:13.378353
Connection OK
Start delete old data at 2020-12-25 19:15:14.539744
Delete Last col1 at 2020-12-25 19:15:14.628764


.

Dump data to  unit_test_git  End  2020-12-25 19:15:14.790377
Connection OK
Start delete old data at 2020-12-25 19:15:15.922171
Delete Last ['col1', 'col2'] at 2020-12-25 19:15:16.000898


.

Dump data to  unit_test_git  End  2020-12-25 19:15:16.165373
Connection OK
Start delete old data at 2020-12-25 19:15:17.547204
Delete Last ['col1', 'col2', 'col3'] at 2020-12-25 19:15:17.647090


.

Dump data to  unit_test_git  End  2020-12-25 19:15:17.816622
Connection OK
Start delete old data at 2020-12-25 19:15:19.008867
Delete Last ['col1'] at 2020-12-25 19:15:19.112133


.

Dump data to  unit_test_git  End  2020-12-25 19:15:19.278996
Connection OK
Start Filter Existing data from df at  2020-12-25 19:15:20.322145
Dump data to  unit_test_git  End  2020-12-25 19:15:20.786248
Start delete old data at 2020-12-25 19:15:20.787085
Delete Last ['date'] at 2020-12-25 19:15:20.876843


.

Dump data to  unit_test_git  End  2020-12-25 19:15:21.010820
Connection OK


.

Connection OK
Start Filter Existing data from df at  2020-12-25 19:15:23.096914


.

Dump data to  unit_test_git  End  2020-12-25 19:15:23.481714



----------------------------------------------------------------------
Ran 12 tests in 16.559s

OK


<unittest.main.TestProgram at 0x254ed629d48>

In [5]:
test_postgresql = setting['POSTGRESQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_postgresql['type'],
                                host_name = test_postgresql['host'],
                                database_name = test_postgresql['database'],
                                user = test_postgresql['user'],
                                password = test_postgresql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

#     def test_sp(self):
#         sql_q = """ CREATE OR REPLACE PROCEDURE unit_test_git_SP (PARAM1 INT)
#                     RETURNS TABLE (col1 int, col2 int, col3 int)
#                     language plpgsql
#                     AS $$
#                     BEGIN
#                     RETURN QUERY(SELECT * FROM {} WHERE col1 = PARAM1) ;
#                     END ; $$""".format(self.table_name)
#         self.sql.engine.execute(sql_q)
#         df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
#         expect_df = self.df1[self.df1['col1'] == 1]
#         result, expect = help_test(df_read, expect_df)
#         self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2020-12-25 19:15:56.990689
Dump data to  unit_test_git  End  2020-12-25 19:15:59.532019


.

Connection OK
Start Filter Existing data from df at  2020-12-25 19:16:15.373170
Dump data to  unit_test_git  End  2020-12-25 19:16:17.868010


.

Connection OK
Start Filter Existing data from df at  2020-12-25 19:16:32.926516
Dump data to  unit_test_git  End  2020-12-25 19:16:35.320071


.

Connection OK


.

Connection OK
Start delete old data at 2020-12-25 19:17:05.362168
Delete Last ['col1'] at 2020-12-25 19:17:06.212236
Dump data to  unit_test_git  End  2020-12-25 19:17:07.751064


.

Connection OK
Start delete old data at 2020-12-25 19:17:23.383242
Delete Last col1 at 2020-12-25 19:17:24.333670
Dump data to  unit_test_git  End  2020-12-25 19:17:26.005668


.

Connection OK
Start delete old data at 2020-12-25 19:17:41.759632
Delete Last ['col1', 'col2'] at 2020-12-25 19:17:42.587927
Dump data to  unit_test_git  End  2020-12-25 19:17:44.255969


.

Connection OK
Start delete old data at 2020-12-25 19:17:59.422066
Delete Last ['col1', 'col2', 'col3'] at 2020-12-25 19:18:00.215000
Dump data to  unit_test_git  End  2020-12-25 19:18:01.811771


.

Connection OK
Start delete old data at 2020-12-25 19:18:16.602997
Delete Last ['col1'] at 2020-12-25 19:18:17.387286
Dump data to  unit_test_git  End  2020-12-25 19:18:18.964344


.

Connection OK
Start Filter Existing data from df at  2020-12-25 19:18:33.741509
Dump data to  unit_test_git  End  2020-12-25 19:18:41.401895
Start delete old data at 2020-12-25 19:18:41.402894
Delete Last ['date'] at 2020-12-25 19:18:42.187532
Dump data to  unit_test_git  End  2020-12-25 19:18:43.767279


.

Connection OK
Start Filter Existing data from df at  2020-12-25 19:18:59.363655
Dump data to  unit_test_git  End  2020-12-25 19:19:07.436298


.
----------------------------------------------------------------------
Ran 11 tests in 224.815s

OK


<unittest.main.TestProgram at 0x254ed725088>