In [1]:
import os, re,json, imp, unittest
import pandas as pd

print('First Directory at :', os.getcwd()[2:])

#Load Setting File
with open('setting.txt','r') as f:
    setting = json.load(f)
print('Load Setting OK')    

os.chdir(re.sub(os.getcwd().split('\\')[-1]+'$','',os.getcwd()))
print('Change Directory into :', os.getcwd()[2:])

First Directory at : \01_DEV_Function\Shared_Function\tests
Load Setting OK
Change Directory into : \01_DEV_Function\Shared_Function


In [2]:
local_library = True
#local_library = False

if local_library : 
    da_tran_SQL = imp.load_source('da_tran_SQL', 'py_topping/data_connection/database.py').da_tran_SQL
else :
    from py_topping.data_connection.database import da_tran_SQL
    
def help_test(df_in1, df_in2) :
    result = list()
    expect = list()
    for i in df_in1.columns :
        result_list = list(df_in1[i])
        expect_list = list(df_in2[i])
        result_list.sort()
        expect_list.sort()
        result.append(result_list)
        expect.append(expect_list)
    return result, expect

# MS SQL

## Default MSSQL

In [3]:
test_mssql = setting['MSSQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

  module = __import__("pymssql")


Connection OK
Start Filter Existing data from df at  2021-05-09 16:32:07.251883


.

Dump data to  unit_test_git  End  2021-05-09 16:32:07.786915
Connection OK
Start Filter Existing data from df at  2021-05-09 16:32:08.740224
Dump data to  unit_test_git  End  2021-05-09 16:32:08.918324


.

Connection OK
Start Filter Existing data from df at  2021-05-09 16:32:09.871560
Dump data to  unit_test_git  End  2021-05-09 16:32:10.052508


.

Connection OK


.

Connection OK
Start delete old data at 2021-05-09 16:32:11.885813
Delete Last ['col1'] at 2021-05-09 16:32:11.955854


.

Dump data to  unit_test_git  End  2021-05-09 16:32:12.092481
Connection OK
Start delete old data at 2021-05-09 16:32:13.023882
Delete Last col1 at 2021-05-09 16:32:13.101025


.

Dump data to  unit_test_git  End  2021-05-09 16:32:13.235310
Connection OK
Start delete old data at 2021-05-09 16:32:14.142877
Delete Last ['col1', 'col2'] at 2021-05-09 16:32:14.225886


.

Dump data to  unit_test_git  End  2021-05-09 16:32:14.381412
Connection OK
Start delete old data at 2021-05-09 16:32:15.309337
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:32:15.391832


.

Dump data to  unit_test_git  End  2021-05-09 16:32:15.538751
Connection OK
Start delete old data at 2021-05-09 16:32:16.508438
Delete Last ['col1'] at 2021-05-09 16:32:16.588337


.

Dump data to  unit_test_git  End  2021-05-09 16:32:16.722724
Connection OK
Drop Existing Table at  2021-05-09 16:32:17.925783
Dump data to  unit_test_git  End  2021-05-09 16:32:18.194624
Start delete old data at 2021-05-09 16:32:18.195626
Delete Last ['date'] at 2021-05-09 16:32:18.267834


.

Dump data to  unit_test_git  End  2021-05-09 16:32:18.395852
Connection OK
Start delete old data at 2021-05-09 16:32:19.898630
Delete Last ['col1'] at 2021-05-09 16:32:19.974999
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)


.

Dump data to  unit_test_git  End  2021-05-09 16:32:20.101934
Connection OK
Start delete old data at 2021-05-09 16:32:21.595974
Delete Last ['col1', 'col2'] at 2021-05-09 16:32:21.673668
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-05-09 16:32:21.809413
Connection OK


.

Connection OK
Drop Existing Table at  2021-05-09 16:32:23.818289


.

Dump data to  unit_test_git  End  2021-05-09 16:32:24.090450



----------------------------------------------------------------------
Ran 14 tests in 23.476s

OK


<unittest.main.TestProgram at 0x1d4fc4554c8>

# Parallel MSSQL

In [4]:
test_mssql = setting['MSSQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'],
                                partition_size = 2, 
                                parallel_dump = True, 
                                max_parallel = 2)
        
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.parallel_dump = True
        self.sql.max_parallel = 2
        self.sql.partition_size = 2

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}},debug = True)
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-05-09 16:32:25.132061


.

Dump data to  unit_test_git  End  2021-05-09 16:32:25.447143
Connection OK
Start Filter Existing data from df at  2021-05-09 16:32:26.437345


.

Dump data to  unit_test_git  End  2021-05-09 16:32:26.732596
Connection OK
Start Filter Existing data from df at  2021-05-09 16:32:27.727393


.

Dump data to  unit_test_git  End  2021-05-09 16:32:28.028698
Connection OK


.

Connection OK
Start delete old data at 2021-05-09 16:32:30.235603
Delete Last ['col1'] at 2021-05-09 16:32:30.332610


.

Dump data to  unit_test_git  End  2021-05-09 16:32:30.766800
Connection OK
Start delete old data at 2021-05-09 16:32:31.697545
Delete Last col1 at 2021-05-09 16:32:31.781668


.

Dump data to  unit_test_git  End  2021-05-09 16:32:32.151576
Connection OK
Start delete old data at 2021-05-09 16:32:33.085621
Delete Last ['col1', 'col2'] at 2021-05-09 16:32:33.172078


.

Dump data to  unit_test_git  End  2021-05-09 16:32:33.563232
Connection OK
Start delete old data at 2021-05-09 16:32:34.822695
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:32:34.907019


.

Dump data to  unit_test_git  End  2021-05-09 16:32:35.264033
Connection OK
Start delete old data at 2021-05-09 16:32:36.197730
Delete Last ['col1'] at 2021-05-09 16:32:36.269537


.

Dump data to  unit_test_git  End  2021-05-09 16:32:36.670332
Connection OK
Drop Existing Table at  2021-05-09 16:32:37.636704
Dump data to  unit_test_git  End  2021-05-09 16:32:38.082186
Start delete old data at 2021-05-09 16:32:38.083220
Delete Last ['date'] at 2021-05-09 16:32:38.160467
DELETE FROM [unit_test_git] where CAST([date] AS date) > '2020-10-12'


.

Dump data to  unit_test_git  End  2021-05-09 16:32:38.419836
Connection OK
Start delete old data at 2021-05-09 16:32:39.909250
Delete Last ['col1'] at 2021-05-09 16:32:39.990101
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)


.

Dump data to  unit_test_git  End  2021-05-09 16:32:40.381776
Connection OK
Start delete old data at 2021-05-09 16:32:41.841747
Delete Last ['col1', 'col2'] at 2021-05-09 16:32:41.917523
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-05-09 16:32:42.297758
Connection OK


.

Connection OK
Drop Existing Table at  2021-05-09 16:32:44.200839


.

Dump data to  unit_test_git  End  2021-05-09 16:32:44.663932



----------------------------------------------------------------------
Ran 14 tests in 20.521s

OK


<unittest.main.TestProgram at 0x1d4fc719148>

## ODBC MSSQL

In [5]:
test_mssql = setting['MSSQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'],
                                driver = 'pyodbc', parameter = 'driver=SQL+Server')
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK


.

Start Filter Existing data from df at  2021-05-09 16:32:48.442719
Dump data to  unit_test_git  End  2021-05-09 16:32:48.569990
Connection OK


.

Start Filter Existing data from df at  2021-05-09 16:32:49.389020
Dump data to  unit_test_git  End  2021-05-09 16:32:49.514683
Connection OK


.

Start Filter Existing data from df at  2021-05-09 16:32:50.254901
Dump data to  unit_test_git  End  2021-05-09 16:32:50.384985
Connection OK


.

Connection OK


.

Start delete old data at 2021-05-09 16:32:51.940832
Delete Last ['col1'] at 2021-05-09 16:32:51.978586
Dump data to  unit_test_git  End  2021-05-09 16:32:52.077511
Connection OK


.

Start delete old data at 2021-05-09 16:32:52.857472
Delete Last col1 at 2021-05-09 16:32:52.887833
Dump data to  unit_test_git  End  2021-05-09 16:32:52.973725
Connection OK


.

Start delete old data at 2021-05-09 16:32:53.840317
Delete Last ['col1', 'col2'] at 2021-05-09 16:32:53.874870
Dump data to  unit_test_git  End  2021-05-09 16:32:53.965505
Connection OK


.

Start delete old data at 2021-05-09 16:32:54.641508
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:32:54.678553
Dump data to  unit_test_git  End  2021-05-09 16:32:54.764134
Connection OK


.

Start delete old data at 2021-05-09 16:32:55.740710
Delete Last ['col1'] at 2021-05-09 16:32:55.773914
Dump data to  unit_test_git  End  2021-05-09 16:32:55.862787
Connection OK
Drop Existing Table at  2021-05-09 16:32:56.579146
Dump data to  unit_test_git  End  2021-05-09 16:32:56.734741
Start delete old data at 2021-05-09 16:32:56.735686
Delete Last ['date'] at 2021-05-09 16:32:56.771040


.

Dump data to  unit_test_git  End  2021-05-09 16:32:56.854425
Connection OK


.

Start delete old data at 2021-05-09 16:32:58.154020
Delete Last ['col1'] at 2021-05-09 16:32:58.195592
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)
Dump data to  unit_test_git  End  2021-05-09 16:32:58.292625
Connection OK


.

Start delete old data at 2021-05-09 16:32:59.549855
Delete Last ['col1', 'col2'] at 2021-05-09 16:32:59.589728
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-05-09 16:32:59.677046
Connection OK


.

Connection OK


.

Drop Existing Table at  2021-05-09 16:33:01.111973
Dump data to  unit_test_git  End  2021-05-09 16:33:01.268957



----------------------------------------------------------------------
Ran 14 tests in 16.535s

OK


<unittest.main.TestProgram at 0x1d4fc42ea48>

# MYSQL
## Default MYSQL

In [6]:
test_mysql = setting['MYSQL']
    
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mysql['type'],
                                host_name = test_mysql['host'],
                                database_name = test_mysql['database'],
                                user = test_mysql['user'],
                                password = test_mysql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ drop procedure if exists unit_test_git_SP;"""
        self.sql.engine.execute(sql_q)
        sql_q = """ CREATE PROCEDURE unit_test_git_SP
                    (PARAM1 VARCHAR(100))
                    BEGIN
                    SELECT * FROM {} WHERE col1 = PARAM1 ;
                    end;""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)


    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-05-09 16:33:02.599914
Dump data to  unit_test_git  End  2021-05-09 16:33:02.798593


.

Connection OK
Start Filter Existing data from df at  2021-05-09 16:33:03.903249
Dump data to  unit_test_git  End  2021-05-09 16:33:04.101085


.

Connection OK
Start Filter Existing data from df at  2021-05-09 16:33:05.057359


.

Dump data to  unit_test_git  End  2021-05-09 16:33:05.266428
Connection OK


.

Connection OK
Start delete old data at 2021-05-09 16:33:08.039418
Delete Last ['col1'] at 2021-05-09 16:33:08.129532


.

Dump data to  unit_test_git  End  2021-05-09 16:33:08.270772
Connection OK
Start delete old data at 2021-05-09 16:33:09.286303
Delete Last col1 at 2021-05-09 16:33:09.378472


.

Dump data to  unit_test_git  End  2021-05-09 16:33:09.535532
Connection OK
Start delete old data at 2021-05-09 16:33:10.664142
Delete Last ['col1', 'col2'] at 2021-05-09 16:33:10.757861


.

Dump data to  unit_test_git  End  2021-05-09 16:33:10.908866
Connection OK
Start delete old data at 2021-05-09 16:33:11.776112
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:33:11.847357


.

Dump data to  unit_test_git  End  2021-05-09 16:33:11.981402
Connection OK
Start delete old data at 2021-05-09 16:33:12.970393
Delete Last ['col1'] at 2021-05-09 16:33:13.056176


.

Dump data to  unit_test_git  End  2021-05-09 16:33:13.201814
Connection OK
Drop Existing Table at  2021-05-09 16:33:14.041775
Dump data to  unit_test_git  End  2021-05-09 16:33:14.328017
Start delete old data at 2021-05-09 16:33:14.329822
Delete Last ['date'] at 2021-05-09 16:33:14.401280
Dump data to  unit_test_git  End  2021-05-09 16:33:14.523103


.

Connection OK
Start delete old data at 2021-05-09 16:33:15.879452
Delete Last ['col1'] at 2021-05-09 16:33:15.956578
DELETE FROM unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL)


.

Dump data to  unit_test_git  End  2021-05-09 16:33:16.087479
Connection OK
Start delete old data at 2021-05-09 16:33:17.566514
Delete Last ['col1', 'col2'] at 2021-05-09 16:33:17.656244
DELETE FROM unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL) and col2 in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-05-09 16:33:17.796313
Connection OK


.

Connection OK
Drop Existing Table at  2021-05-09 16:33:20.156343


.

Dump data to  unit_test_git  End  2021-05-09 16:33:20.502102



----------------------------------------------------------------------
Ran 14 tests in 19.155s

OK


<unittest.main.TestProgram at 0x1d4fc748f88>

## Parallel MYSQL

In [7]:
test_mysql = setting['MYSQL']
    
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mysql['type'],
                                host_name = test_mysql['host'],
                                database_name = test_mysql['database'],
                                user = test_mysql['user'],
                                password = test_mysql['password'],
                                partition_size = 2, 
                                parallel_dump = True, 
                                max_parallel = 2)
        
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ drop procedure if exists unit_test_git_SP;"""
        self.sql.engine.execute(sql_q)
        sql_q = """ CREATE PROCEDURE unit_test_git_SP
                    (PARAM1 VARCHAR(100))
                    BEGIN
                    SELECT * FROM {} WHERE col1 = PARAM1 ;
                    end;""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)


    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-05-09 16:33:21.683011


.

Dump data to  unit_test_git  End  2021-05-09 16:33:22.002360
Connection OK
Start Filter Existing data from df at  2021-05-09 16:33:23.044502


.

Dump data to  unit_test_git  End  2021-05-09 16:33:23.362046
Connection OK
Start Filter Existing data from df at  2021-05-09 16:33:24.418409


.

Dump data to  unit_test_git  End  2021-05-09 16:33:24.720868
Connection OK


.

Connection OK
Start delete old data at 2021-05-09 16:33:26.593082
Delete Last ['col1'] at 2021-05-09 16:33:26.662347


.

Dump data to  unit_test_git  End  2021-05-09 16:33:27.039634
Connection OK
Start delete old data at 2021-05-09 16:33:27.938687
Delete Last col1 at 2021-05-09 16:33:28.011193


.

Dump data to  unit_test_git  End  2021-05-09 16:33:28.432791
Connection OK
Start delete old data at 2021-05-09 16:33:29.336433
Delete Last ['col1', 'col2'] at 2021-05-09 16:33:29.410882


.

Dump data to  unit_test_git  End  2021-05-09 16:33:29.810429
Connection OK
Start delete old data at 2021-05-09 16:33:30.788969
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:33:30.873158


.

Dump data to  unit_test_git  End  2021-05-09 16:33:31.270606
Connection OK
Start delete old data at 2021-05-09 16:33:32.302923
Delete Last ['col1'] at 2021-05-09 16:33:32.412773


.

Dump data to  unit_test_git  End  2021-05-09 16:33:32.834472
Connection OK
Drop Existing Table at  2021-05-09 16:33:33.845140
Dump data to  unit_test_git  End  2021-05-09 16:33:34.331955
Start delete old data at 2021-05-09 16:33:34.332961
Delete Last ['date'] at 2021-05-09 16:33:34.405023


.

Dump data to  unit_test_git  End  2021-05-09 16:33:34.699009
Connection OK
Start delete old data at 2021-05-09 16:33:36.018300
Delete Last ['col1'] at 2021-05-09 16:33:36.098153
DELETE FROM unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL)


.

Dump data to  unit_test_git  End  2021-05-09 16:33:36.485897
Connection OK
Start delete old data at 2021-05-09 16:33:37.820626
Delete Last ['col1', 'col2'] at 2021-05-09 16:33:37.893735
DELETE FROM unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL) and col2 in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-05-09 16:33:38.279172
Connection OK


.

Connection OK
Drop Existing Table at  2021-05-09 16:33:40.491269


.

Dump data to  unit_test_git  End  2021-05-09 16:33:41.031152



----------------------------------------------------------------------
Ran 14 tests in 20.482s

OK


<unittest.main.TestProgram at 0x1d4fc2f7c08>

# PostgreSQL
## Default PostgreSQL

In [8]:
test_postgresql = setting['POSTGRESQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_postgresql['type'],
                                host_name = test_postgresql['host'],
                                database_name = test_postgresql['database'],
                                user = test_postgresql['user'],
                                password = test_postgresql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

#     def test_sp(self):
#         sql_q = """ CREATE OR REPLACE PROCEDURE unit_test_git_SP (PARAM1 INT)
#                     RETURNS TABLE (col1 int, col2 int, col3 int)
#                     language plpgsql
#                     AS $$
#                     BEGIN
#                     RETURN QUERY(SELECT * FROM {} WHERE col1 = PARAM1) ;
#                     END ; $$""".format(self.table_name)
#         self.sql.engine.execute(sql_q)
#         df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
#         expect_df = self.df1[self.df1['col1'] == 1]
#         result, expect = help_test(df_read, expect_df)
#         self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)
        
unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-05-09 16:34:00.316341
Dump data to  unit_test_git  End  2021-05-09 16:34:02.900893


.

Connection OK
Start Filter Existing data from df at  2021-05-09 16:34:19.443975
Dump data to  unit_test_git  End  2021-05-09 16:34:21.962341


.

Connection OK
Start Filter Existing data from df at  2021-05-09 16:34:38.595584
Dump data to  unit_test_git  End  2021-05-09 16:34:41.130989


.

Connection OK


.

Connection OK
Start delete old data at 2021-05-09 16:35:13.973038
Delete Last ['col1'] at 2021-05-09 16:35:14.820196
Dump data to  unit_test_git  End  2021-05-09 16:35:16.515967


.

Connection OK
Start delete old data at 2021-05-09 16:35:32.149423
Delete Last col1 at 2021-05-09 16:35:32.983679
Dump data to  unit_test_git  End  2021-05-09 16:35:34.659341


.

Connection OK
Start delete old data at 2021-05-09 16:35:49.757166
Delete Last ['col1', 'col2'] at 2021-05-09 16:35:50.559808
Dump data to  unit_test_git  End  2021-05-09 16:35:52.172295


.

Connection OK
Start delete old data at 2021-05-09 16:36:08.090516
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:36:08.911569
Dump data to  unit_test_git  End  2021-05-09 16:36:10.556093


.

Connection OK
Start delete old data at 2021-05-09 16:36:25.500420
Delete Last ['col1'] at 2021-05-09 16:36:26.294108
Dump data to  unit_test_git  End  2021-05-09 16:36:27.886648


.

Connection OK
Drop Existing Table at  2021-05-09 16:36:43.252162
Dump data to  unit_test_git  End  2021-05-09 16:36:46.544485
Start delete old data at 2021-05-09 16:36:46.545522
Delete Last ['date'] at 2021-05-09 16:36:47.370065
Dump data to  unit_test_git  End  2021-05-09 16:36:49.006188


.

Connection OK
Start delete old data at 2021-05-09 16:37:13.278232
Delete Last ['col1'] at 2021-05-09 16:37:14.089954
DELETE FROM "unit_test_git" where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL)
Dump data to  unit_test_git  End  2021-05-09 16:37:15.717771


.

Connection OK
Start delete old data at 2021-05-09 16:37:40.565335
Delete Last ['col1', 'col2'] at 2021-05-09 16:37:41.401204
DELETE FROM "unit_test_git" where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL) and "col2" in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-05-09 16:37:43.070481


.

Connection OK
Drop Existing Table at  2021-05-09 16:38:00.348459
Dump data to  unit_test_git  End  2021-05-09 16:38:03.964938


.
----------------------------------------------------------------------
Ran 13 tests in 263.309s

OK


<unittest.main.TestProgram at 0x1d4fc544648>

## Parallel PostgreSQL

In [9]:
test_postgresql = setting['POSTGRESQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_postgresql['type'],
                                host_name = test_postgresql['host'],
                                database_name = test_postgresql['database'],
                                user = test_postgresql['user'],
                                password = test_postgresql['password'],
                                partition_size = 2, 
                                parallel_dump = True, 
                                max_parallel = 2)
        
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

#     def test_sp(self):
#         sql_q = """ CREATE OR REPLACE PROCEDURE unit_test_git_SP (PARAM1 INT)
#                     RETURNS TABLE (col1 int, col2 int, col3 int)
#                     language plpgsql
#                     AS $$
#                     BEGIN
#                     RETURN QUERY(SELECT * FROM {} WHERE col1 = PARAM1) ;
#                     END ; $$""".format(self.table_name)
#         self.sql.engine.execute(sql_q)
#         df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
#         expect_df = self.df1[self.df1['col1'] == 1]
#         result, expect = help_test(df_read, expect_df)
#         self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)
        
unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-05-09 16:38:20.960896
Dump data to  unit_test_git  End  2021-05-09 16:38:25.376006


.

Connection OK
Start Filter Existing data from df at  2021-05-09 16:38:42.627041
Dump data to  unit_test_git  End  2021-05-09 16:38:47.074026


.

Connection OK
Start Filter Existing data from df at  2021-05-09 16:39:03.665870
Dump data to  unit_test_git  End  2021-05-09 16:39:07.909443


.

Connection OK


.

Connection OK
Start delete old data at 2021-05-09 16:39:39.348325
Delete Last ['col1'] at 2021-05-09 16:39:40.200887
Dump data to  unit_test_git  End  2021-05-09 16:39:45.286635


.

Connection OK
Start delete old data at 2021-05-09 16:40:01.855268
Delete Last col1 at 2021-05-09 16:40:02.744869
Dump data to  unit_test_git  End  2021-05-09 16:40:07.981273


.

Connection OK
Start delete old data at 2021-05-09 16:40:24.100596
Delete Last ['col1', 'col2'] at 2021-05-09 16:40:24.961356
Dump data to  unit_test_git  End  2021-05-09 16:40:30.185362


.

Connection OK
Start delete old data at 2021-05-09 16:40:46.303624
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:40:47.163799
Dump data to  unit_test_git  End  2021-05-09 16:40:52.333297


.

Connection OK
Start delete old data at 2021-05-09 16:41:08.457593
Delete Last ['col1'] at 2021-05-09 16:41:09.287422
Dump data to  unit_test_git  End  2021-05-09 16:41:14.448570


.

Connection OK
Drop Existing Table at  2021-05-09 16:41:30.767177
Dump data to  unit_test_git  End  2021-05-09 16:41:36.616276
Start delete old data at 2021-05-09 16:41:36.618271
Delete Last ['date'] at 2021-05-09 16:41:37.492933
Dump data to  unit_test_git  End  2021-05-09 16:41:40.914860


.

Connection OK
Start delete old data at 2021-05-09 16:42:06.577577
Delete Last ['col1'] at 2021-05-09 16:42:07.449323
DELETE FROM "unit_test_git" where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL)
Dump data to  unit_test_git  End  2021-05-09 16:42:12.427350


.

Connection OK
Start delete old data at 2021-05-09 16:42:37.774360
Delete Last ['col1', 'col2'] at 2021-05-09 16:42:38.630517
DELETE FROM "unit_test_git" where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL) and "col2" in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-05-09 16:42:43.708439


.

Connection OK
Drop Existing Table at  2021-05-09 16:42:58.783113
Dump data to  unit_test_git  End  2021-05-09 16:43:04.373376


.
----------------------------------------------------------------------
Ran 13 tests in 300.162s

OK


<unittest.main.TestProgram at 0x1d4fc72b8c8>

# SQLite
## Default SQLite : No Datetime in SQLite

In [10]:
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = 'sqlite', 
                    host_name = 'test.db', 
                    database_name = '', 
                    user = '', 
                    password = '' )
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

.....

Connection OK
Start Filter Existing data from df at  2021-05-09 16:43:05.334766
Dump data to  unit_test_git  End  2021-05-09 16:43:05.346696
Connection OK
Start Filter Existing data from df at  2021-05-09 16:43:05.377641
Dump data to  unit_test_git  End  2021-05-09 16:43:05.387585
Connection OK
Start Filter Existing data from df at  2021-05-09 16:43:05.418539
Dump data to  unit_test_git  End  2021-05-09 16:43:05.430507
Connection OK
Connection OK
Start delete old data at 2021-05-09 16:43:05.481335
Delete Last ['col1'] at 2021-05-09 16:43:05.485324
Dump data to  unit_test_git  End  2021-05-09 16:43:05.491309
Connection OK


.....

Start delete old data at 2021-05-09 16:43:05.517246
Delete Last col1 at 2021-05-09 16:43:05.521117
Dump data to  unit_test_git  End  2021-05-09 16:43:05.528115
Connection OK
Start delete old data at 2021-05-09 16:43:05.559033
Delete Last ['col1', 'col2'] at 2021-05-09 16:43:05.564035
Dump data to  unit_test_git  End  2021-05-09 16:43:05.569977
Connection OK
Start delete old data at 2021-05-09 16:43:05.594865
Delete Last ['col1', 'col2', 'col3'] at 2021-05-09 16:43:05.595862
Dump data to  unit_test_git  End  2021-05-09 16:43:05.601885
Connection OK
Start delete old data at 2021-05-09 16:43:05.628128
Delete Last ['col1'] at 2021-05-09 16:43:05.632117
Dump data to  unit_test_git  End  2021-05-09 16:43:05.638145
Connection OK
Start delete old data at 2021-05-09 16:43:05.681024
Delete Last ['col1'] at 2021-05-09 16:43:05.685032
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)
Dump data to  unit_test_git  End  2021-05-09 16:43:05.690992
Connection 

..

Start delete old data at 2021-05-09 16:43:05.737029
Delete Last ['col1', 'col2'] at 2021-05-09 16:43:05.740978
DELETE FROM [unit_test_git] where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-05-09 16:43:05.746975
Connection OK
Drop Existing Table at  2021-05-09 16:43:05.773248
Dump data to  unit_test_git  End  2021-05-09 16:43:05.786018



----------------------------------------------------------------------
Ran 12 tests in 0.538s

OK


<unittest.main.TestProgram at 0x1d4fd2f2d08>