In [1]:
import os, re,json, imp, unittest
import pandas as pd

print('First Directory at :', os.getcwd()[2:])

#Load Setting File
with open('setting.txt','r') as f:
    setting = json.load(f)
print('Load Setting OK')    

os.chdir(re.sub(os.getcwd().split('\\')[-1]+'$','',os.getcwd()))
print('Change Directory into :', os.getcwd()[2:])

First Directory at : \01_DEV_Function\Shared_Function\tests
Load Setting OK
Change Directory into : \01_DEV_Function\Shared_Function


In [2]:
local_library = True
#local_library = False

if local_library : 
    da_tran_SQL = imp.load_source('da_tran_SQL', 'py_topping/data_connection/database.py').da_tran_SQL
else :
    from py_topping.data_connection.database import da_tran_SQL
    
def help_test(df_in1, df_in2) :
    result = list()
    expect = list()
    for i in df_in1.columns :
        result_list = list(df_in1[i])
        expect_list = list(df_in2[i])
        result_list.sort()
        expect_list.sort()
        result.append(result_list)
        expect.append(expect_list)
    return result, expect

# MS SQL

## Default MSSQL

In [3]:
test_mssql = setting['MSSQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

  module = __import__("pymssql")


Connection OK
Start Filter Existing data from df at  2021-02-15 09:12:20.910024


.

Dump data to  unit_test_git  End  2021-02-15 09:12:21.498853
Connection OK
Start Filter Existing data from df at  2021-02-15 09:12:22.364060
Dump data to  unit_test_git  End  2021-02-15 09:12:22.530670


.

Connection OK
Start Filter Existing data from df at  2021-02-15 09:12:23.372549
Dump data to  unit_test_git  End  2021-02-15 09:12:23.549934


.

Connection OK


.

Connection OK
Start delete old data at 2021-02-15 09:12:25.259006
Delete Last ['col1'] at 2021-02-15 09:12:25.341159


.

Dump data to  unit_test_git  End  2021-02-15 09:12:25.461086
Connection OK
Start delete old data at 2021-02-15 09:12:26.335785
Delete Last col1 at 2021-02-15 09:12:26.417047


.

Dump data to  unit_test_git  End  2021-02-15 09:12:26.543981
Connection OK
Start delete old data at 2021-02-15 09:12:27.400922
Delete Last ['col1', 'col2'] at 2021-02-15 09:12:27.468301
Dump data to  unit_test_git  End  2021-02-15 09:12:27.590883


.

Connection OK
Start delete old data at 2021-02-15 09:12:28.454797
Delete Last ['col1', 'col2', 'col3'] at 2021-02-15 09:12:28.532575
Dump data to  unit_test_git  End  2021-02-15 09:12:28.656116


.

Connection OK
Start delete old data at 2021-02-15 09:12:29.529231
Delete Last ['col1'] at 2021-02-15 09:12:29.596659
Dump data to  unit_test_git  End  2021-02-15 09:12:29.718297


.

Connection OK
Drop Existing Table at  2021-02-15 09:12:30.569674
Dump data to  unit_test_git  End  2021-02-15 09:12:30.826441
Start delete old data at 2021-02-15 09:12:30.832220
Delete Last ['date'] at 2021-02-15 09:12:30.902840
Dump data to  unit_test_git  End  2021-02-15 09:12:31.022058


.

Connection OK
Start delete old data at 2021-02-15 09:12:32.705293
Delete Last ['col1'] at 2021-02-15 09:12:32.782234
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)
Dump data to  unit_test_git  End  2021-02-15 09:12:32.905920


.

Connection OK
Start delete old data at 2021-02-15 09:12:34.298221
Delete Last ['col1', 'col2'] at 2021-02-15 09:12:34.372549
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-02-15 09:12:34.505056
Connection OK


.

Connection OK
Drop Existing Table at  2021-02-15 09:12:36.379193


.

Dump data to  unit_test_git  End  2021-02-15 09:12:36.623480



----------------------------------------------------------------------
Ran 14 tests in 17.268s

OK


<unittest.main.TestProgram at 0x1954173cd08>

# Parallel MSSQL

In [4]:
test_mssql = setting['MSSQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'],
                                partition_size = 2, 
                                parallel_dump = True, 
                                max_parallel = 2)
        
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.parallel_dump = True
        self.sql.max_parallel = 2
        self.sql.partition_size = 2

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}},debug = True)
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-02-14 14:53:51.247034


.

Dump data to  unit_test_git  End  2021-02-14 14:53:51.529358
Connection OK
Start Filter Existing data from df at  2021-02-14 14:53:52.620901


.

Dump data to  unit_test_git  End  2021-02-14 14:53:52.858577
Connection OK
Start Filter Existing data from df at  2021-02-14 14:53:53.634519


.

Dump data to  unit_test_git  End  2021-02-14 14:53:53.903256
Connection OK


.

Connection OK
Start delete old data at 2021-02-14 14:53:55.428292
Delete Last ['col1'] at 2021-02-14 14:53:55.488974


.

Dump data to  unit_test_git  End  2021-02-14 14:53:55.796165
Connection OK
Start delete old data at 2021-02-14 14:53:56.544219
Delete Last col1 at 2021-02-14 14:53:56.603062


.

Dump data to  unit_test_git  End  2021-02-14 14:53:56.906251
Connection OK
Start delete old data at 2021-02-14 14:53:57.707660
Delete Last ['col1', 'col2'] at 2021-02-14 14:53:57.772043


.

Dump data to  unit_test_git  End  2021-02-14 14:53:58.092077
Connection OK
Start delete old data at 2021-02-14 14:53:58.839437
Delete Last ['col1', 'col2', 'col3'] at 2021-02-14 14:53:58.903798


.

Dump data to  unit_test_git  End  2021-02-14 14:53:59.205648
Connection OK
Start delete old data at 2021-02-14 14:53:59.940069
Delete Last ['col1'] at 2021-02-14 14:54:00.007207


.

Dump data to  unit_test_git  End  2021-02-14 14:54:00.303290
Connection OK
Drop Existing Table at  2021-02-14 14:54:01.056700
Dump data to  unit_test_git  End  2021-02-14 14:54:01.434540
Start delete old data at 2021-02-14 14:54:01.435561
Delete Last ['date'] at 2021-02-14 14:54:01.493647
delete from unit_test_git where CAST([date] AS date) > '2020-10-12'


.

Dump data to  unit_test_git  End  2021-02-14 14:54:01.716384
Connection OK
Start delete old data at 2021-02-14 14:54:02.926976
Delete Last ['col1'] at 2021-02-14 14:54:02.991396
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)


.

Dump data to  unit_test_git  End  2021-02-14 14:54:03.294535
Connection OK
Start delete old data at 2021-02-14 14:54:04.526810
Delete Last ['col1', 'col2'] at 2021-02-14 14:54:04.593078
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-02-14 14:54:04.901860
Connection OK


.

Connection OK
Drop Existing Table at  2021-02-14 14:54:06.427828


.

Dump data to  unit_test_git  End  2021-02-14 14:54:06.767403



----------------------------------------------------------------------
Ran 14 tests in 16.378s

OK


<unittest.main.TestProgram at 0x23d81437048>

## ODBC MSSQL

In [5]:
test_mssql = setting['MSSQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mssql['type'],
                                host_name = test_mssql['host'],
                                database_name = test_mssql['database'],
                                user = test_mssql['user'],
                                password = test_mssql['password'],
                                driver = 'pyodbc', parameter = 'driver=SQL+Server')
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ CREATE OR ALTER PROCEDURE unit_test_git_SP (@PARAM1 AS VARCHAR(100))
                    AS
                    BEGIN
                    SELECT * FROM {} WHERE col1 = @PARAM1 
                    END""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'@PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK


.

Start Filter Existing data from df at  2021-02-14 14:54:08.088232
Dump data to  unit_test_git  End  2021-02-14 14:54:08.196334
Connection OK


.

Start Filter Existing data from df at  2021-02-14 14:54:08.833346
Dump data to  unit_test_git  End  2021-02-14 14:54:08.935442
Connection OK


.

Start Filter Existing data from df at  2021-02-14 14:54:09.592322
Dump data to  unit_test_git  End  2021-02-14 14:54:09.697160
Connection OK


.

Connection OK


.

Start delete old data at 2021-02-14 14:54:10.835086
Delete Last ['col1'] at 2021-02-14 14:54:10.863778
Dump data to  unit_test_git  End  2021-02-14 14:54:10.938793
Connection OK


.

Start delete old data at 2021-02-14 14:54:11.636191
Delete Last col1 at 2021-02-14 14:54:11.671571
Dump data to  unit_test_git  End  2021-02-14 14:54:11.745375
Connection OK


.

Start delete old data at 2021-02-14 14:54:12.392656
Delete Last ['col1', 'col2'] at 2021-02-14 14:54:12.421787
Dump data to  unit_test_git  End  2021-02-14 14:54:12.497664
Connection OK


.

Start delete old data at 2021-02-14 14:54:13.182736
Delete Last ['col1', 'col2', 'col3'] at 2021-02-14 14:54:13.214268
Dump data to  unit_test_git  End  2021-02-14 14:54:13.285243
Connection OK


.

Start delete old data at 2021-02-14 14:54:13.850000
Delete Last ['col1'] at 2021-02-14 14:54:13.876715
Dump data to  unit_test_git  End  2021-02-14 14:54:13.950667
Connection OK
Drop Existing Table at  2021-02-14 14:54:14.512812
Dump data to  unit_test_git  End  2021-02-14 14:54:14.640547
Start delete old data at 2021-02-14 14:54:14.641417
Delete Last ['date'] at 2021-02-14 14:54:14.668346


.

Dump data to  unit_test_git  End  2021-02-14 14:54:14.745693
Connection OK


.

Start delete old data at 2021-02-14 14:54:15.752044
Delete Last ['col1'] at 2021-02-14 14:54:15.779968
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)
Dump data to  unit_test_git  End  2021-02-14 14:54:15.855781
Connection OK


.

Start delete old data at 2021-02-14 14:54:16.897725
Delete Last ['col1', 'col2'] at 2021-02-14 14:54:16.933802
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-02-14 14:54:17.005197
Connection OK


.

Connection OK


.

Drop Existing Table at  2021-02-14 14:54:18.185205
Dump data to  unit_test_git  End  2021-02-14 14:54:18.312102



----------------------------------------------------------------------
Ran 14 tests in 11.453s

OK


<unittest.main.TestProgram at 0x23d8137ba08>

# MYSQL
## Default MYSQL

In [6]:
test_mysql = setting['MYSQL']
    
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mysql['type'],
                                host_name = test_mysql['host'],
                                database_name = test_mysql['database'],
                                user = test_mysql['user'],
                                password = test_mysql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ drop procedure if exists unit_test_git_SP;"""
        self.sql.engine.execute(sql_q)
        sql_q = """ CREATE PROCEDURE unit_test_git_SP
                    (PARAM1 VARCHAR(100))
                    BEGIN
                    SELECT * FROM {} WHERE col1 = PARAM1 ;
                    end;""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)


    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-02-14 14:54:19.405455
Dump data to  unit_test_git  End  2021-02-14 14:54:19.576619


.

Connection OK
Start Filter Existing data from df at  2021-02-14 14:54:20.622512
Dump data to  unit_test_git  End  2021-02-14 14:54:20.799995


.

Connection OK
Start Filter Existing data from df at  2021-02-14 14:54:21.659393
Dump data to  unit_test_git  End  2021-02-14 14:54:21.854384


.

Connection OK


.

Connection OK
Start delete old data at 2021-02-14 14:54:23.635212
Delete Last ['col1'] at 2021-02-14 14:54:23.711015


.

Dump data to  unit_test_git  End  2021-02-14 14:54:23.835199
Connection OK
Start delete old data at 2021-02-14 14:54:24.776137
Delete Last col1 at 2021-02-14 14:54:24.853629


.

Dump data to  unit_test_git  End  2021-02-14 14:54:24.982528
Connection OK
Start delete old data at 2021-02-14 14:54:25.944169
Delete Last ['col1', 'col2'] at 2021-02-14 14:54:26.036883


.

Dump data to  unit_test_git  End  2021-02-14 14:54:26.181189
Connection OK
Start delete old data at 2021-02-14 14:54:27.044308
Delete Last ['col1', 'col2', 'col3'] at 2021-02-14 14:54:27.121181
Dump data to  unit_test_git  End  2021-02-14 14:54:27.243671


.

Connection OK
Start delete old data at 2021-02-14 14:54:28.103976
Delete Last ['col1'] at 2021-02-14 14:54:28.176772
Dump data to  unit_test_git  End  2021-02-14 14:54:28.298247


.

Connection OK
Drop Existing Table at  2021-02-14 14:54:29.247451
Dump data to  unit_test_git  End  2021-02-14 14:54:29.577532
Start delete old data at 2021-02-14 14:54:29.578564
Delete Last ['date'] at 2021-02-14 14:54:29.662951


.

Dump data to  unit_test_git  End  2021-02-14 14:54:29.807313
Connection OK
Start delete old data at 2021-02-14 14:54:31.111919
Delete Last ['col1'] at 2021-02-14 14:54:31.191025
delete from unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL)


.

Dump data to  unit_test_git  End  2021-02-14 14:54:31.322924
Connection OK
Start delete old data at 2021-02-14 14:54:32.623331
Delete Last ['col1', 'col2'] at 2021-02-14 14:54:32.704434
delete from unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL) and col2 in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-02-14 14:54:32.839068
Connection OK


.

Connection OK
Drop Existing Table at  2021-02-14 14:54:34.732462


.

Dump data to  unit_test_git  End  2021-02-14 14:54:35.040777



----------------------------------------------------------------------
Ran 14 tests in 16.661s

OK


<unittest.main.TestProgram at 0x23d813c64c8>

## Parallel MYSQL

In [7]:
test_mysql = setting['MYSQL']
    
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_mysql['type'],
                                host_name = test_mysql['host'],
                                database_name = test_mysql['database'],
                                user = test_mysql['user'],
                                password = test_mysql['password'],
                                partition_size = 2, 
                                parallel_dump = True, 
                                max_parallel = 2)
        
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

    def test_sp(self):
        sql_q = """ drop procedure if exists unit_test_git_SP;"""
        self.sql.engine.execute(sql_q)
        sql_q = """ CREATE PROCEDURE unit_test_git_SP
                    (PARAM1 VARCHAR(100))
                    BEGIN
                    SELECT * FROM {} WHERE col1 = PARAM1 ;
                    end;""".format(self.table_name)
        self.sql.engine.execute(sql_q)
        df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
        expect_df = self.df1[self.df1['col1'] == 1]
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)


    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-02-14 14:54:35.956993


.

Dump data to  unit_test_git  End  2021-02-14 14:54:36.240897
Connection OK
Start Filter Existing data from df at  2021-02-14 14:54:37.105877


.

Dump data to  unit_test_git  End  2021-02-14 14:54:37.387468
Connection OK
Start Filter Existing data from df at  2021-02-14 14:54:38.256365


.

Dump data to  unit_test_git  End  2021-02-14 14:54:38.554946
Connection OK


.

Connection OK
Start delete old data at 2021-02-14 14:54:40.449409
Delete Last ['col1'] at 2021-02-14 14:54:40.532066


.

Dump data to  unit_test_git  End  2021-02-14 14:54:40.920738
Connection OK
Start delete old data at 2021-02-14 14:54:41.788213
Delete Last col1 at 2021-02-14 14:54:41.866476


.

Dump data to  unit_test_git  End  2021-02-14 14:54:42.242255
Connection OK
Start delete old data at 2021-02-14 14:54:43.116570
Delete Last ['col1', 'col2'] at 2021-02-14 14:54:43.190635


.

Dump data to  unit_test_git  End  2021-02-14 14:54:43.546584
Connection OK
Start delete old data at 2021-02-14 14:54:44.506258
Delete Last ['col1', 'col2', 'col3'] at 2021-02-14 14:54:44.587352


.

Dump data to  unit_test_git  End  2021-02-14 14:54:44.982324
Connection OK
Start delete old data at 2021-02-14 14:54:45.862232
Delete Last ['col1'] at 2021-02-14 14:54:45.933555


.

Dump data to  unit_test_git  End  2021-02-14 14:54:46.297092
Connection OK
Drop Existing Table at  2021-02-14 14:54:47.187656
Dump data to  unit_test_git  End  2021-02-14 14:54:47.674630
Start delete old data at 2021-02-14 14:54:47.675628
Delete Last ['date'] at 2021-02-14 14:54:47.750381


.

Dump data to  unit_test_git  End  2021-02-14 14:54:48.019238
Connection OK
Start delete old data at 2021-02-14 14:54:49.312326
Delete Last ['col1'] at 2021-02-14 14:54:49.382000
delete from unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL)


.

Dump data to  unit_test_git  End  2021-02-14 14:54:49.756779
Connection OK
Start delete old data at 2021-02-14 14:54:51.067290
Delete Last ['col1', 'col2'] at 2021-02-14 14:54:51.140083
delete from unit_test_git where (col1 in ('4.0', '5.0', '6.0', '7.0') OR col1 IS NULL) and col2 in ('3', '4', '5')


.

Dump data to  unit_test_git  End  2021-02-14 14:54:51.507437
Connection OK


.

Connection OK
Drop Existing Table at  2021-02-14 14:54:54.081583


.

Dump data to  unit_test_git  End  2021-02-14 14:54:54.539423



----------------------------------------------------------------------
Ran 14 tests in 19.434s

OK


<unittest.main.TestProgram at 0x23d8187aac8>

# PostgreSQL
## Default PostgreSQL

In [8]:
test_postgresql = setting['POSTGRESQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_postgresql['type'],
                                host_name = test_postgresql['host'],
                                database_name = test_postgresql['database'],
                                user = test_postgresql['user'],
                                password = test_postgresql['password'])
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

#     def test_sp(self):
#         sql_q = """ CREATE OR REPLACE PROCEDURE unit_test_git_SP (PARAM1 INT)
#                     RETURNS TABLE (col1 int, col2 int, col3 int)
#                     language plpgsql
#                     AS $$
#                     BEGIN
#                     RETURN QUERY(SELECT * FROM {} WHERE col1 = PARAM1) ;
#                     END ; $$""".format(self.table_name)
#         self.sql.engine.execute(sql_q)
#         df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
#         expect_df = self.df1[self.df1['col1'] == 1]
#         result, expect = help_test(df_read, expect_df)
#         self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)
        
unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-02-14 14:55:08.603252
Dump data to  unit_test_git  End  2021-02-14 14:55:10.945409


.

Connection OK
Start Filter Existing data from df at  2021-02-14 14:55:27.238356
Dump data to  unit_test_git  End  2021-02-14 14:55:29.871032


.

Connection OK
Start Filter Existing data from df at  2021-02-14 14:55:45.448302
Dump data to  unit_test_git  End  2021-02-14 14:55:47.949492


.

Connection OK


.

Connection OK
Start delete old data at 2021-02-14 14:56:18.947684
Delete Last ['col1'] at 2021-02-14 14:56:19.788753
Dump data to  unit_test_git  End  2021-02-14 14:56:21.474375


.

Connection OK
Start delete old data at 2021-02-14 14:56:37.617579
Delete Last col1 at 2021-02-14 14:56:38.483963
Dump data to  unit_test_git  End  2021-02-14 14:56:40.213427


.

Connection OK
Start delete old data at 2021-02-14 14:56:56.795291
Delete Last ['col1', 'col2'] at 2021-02-14 14:56:57.668672
Dump data to  unit_test_git  End  2021-02-14 14:56:59.410994


.

Connection OK
Start delete old data at 2021-02-14 14:57:14.475294
Delete Last ['col1', 'col2', 'col3'] at 2021-02-14 14:57:15.277113
Dump data to  unit_test_git  End  2021-02-14 14:57:16.878972


.

Connection OK
Start delete old data at 2021-02-14 14:57:31.699506
Delete Last ['col1'] at 2021-02-14 14:57:32.490628
Dump data to  unit_test_git  End  2021-02-14 14:57:34.075226


.

Connection OK
Drop Existing Table at  2021-02-14 14:57:50.181743
Dump data to  unit_test_git  End  2021-02-14 14:57:54.128101
Start delete old data at 2021-02-14 14:57:54.128819
Delete Last ['date'] at 2021-02-14 14:57:54.994837
Dump data to  unit_test_git  End  2021-02-14 14:57:56.724319


.

Connection OK
Start delete old data at 2021-02-14 14:58:22.700743
Delete Last ['col1'] at 2021-02-14 14:58:23.584809
delete from unit_test_git where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL)
Dump data to  unit_test_git  End  2021-02-14 14:58:25.332049


.

Connection OK
Start delete old data at 2021-02-14 14:58:49.142830
Delete Last ['col1', 'col2'] at 2021-02-14 14:58:49.938954
delete from unit_test_git where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL) and "col2" in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-02-14 14:58:51.534110


.

Connection OK
Drop Existing Table at  2021-02-14 14:59:06.639481
Dump data to  unit_test_git  End  2021-02-14 14:59:09.887266


.
----------------------------------------------------------------------
Ran 13 tests in 256.039s

OK


<unittest.main.TestProgram at 0x23d813d8908>

## Parallel PostgreSQL

In [9]:
test_postgresql = setting['POSTGRESQL']

class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = test_postgresql['type'],
                                host_name = test_postgresql['host'],
                                database_name = test_postgresql['database'],
                                user = test_postgresql['user'],
                                password = test_postgresql['password'],
                                partition_size = 2, 
                                parallel_dump = True, 
                                max_parallel = 2)
        
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')

#     def test_sp(self):
#         sql_q = """ CREATE OR REPLACE PROCEDURE unit_test_git_SP (PARAM1 INT)
#                     RETURNS TABLE (col1 int, col2 int, col3 int)
#                     language plpgsql
#                     AS $$
#                     BEGIN
#                     RETURN QUERY(SELECT * FROM {} WHERE col1 = PARAM1) ;
#                     END ; $$""".format(self.table_name)
#         self.sql.engine.execute(sql_q)
#         df_read = self.sql.read('unit_test_git_SP', SP = True , param = {'PARAM1' : 1})
#         expect_df = self.df1[self.df1['col1'] == 1]
#         result, expect = help_test(df_read, expect_df)
#         self.assertEqual(result, expect)
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_logic_date(self) :
        df_date = pd.DataFrame({'col1' : [1,2,3,4,5], 
                                'date' : pd.date_range('2020-10-10','2020-10-14'), 'col2' :[1,1,1,1,1]})
        
        self.sql.dump_whole(df_date, self.table_name)
        expect_df = df_date[df_date['date'] <= '2020-10-12']
        df_date['col2'] = 3
        self.sql.dump_replace(df_in = df_date , table_name_in = self.table_name, list_key = ['date'],
                             math_logic = {'date' : {'logic' : '>', 'value' : '2020-10-12', 'type' : 'date'}})
        df_read = self.sql.read(self.table_name)
        expect_df = expect_df.append(df_date)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)
        
unittest.main(argv = ['first-arg-is-ignored'], exit = False)

Connection OK
Start Filter Existing data from df at  2021-02-14 14:59:25.444774
Dump data to  unit_test_git  End  2021-02-14 14:59:29.531721


.

Connection OK
Start Filter Existing data from df at  2021-02-14 14:59:45.764521
Dump data to  unit_test_git  End  2021-02-14 14:59:50.151597


.

Connection OK
Start Filter Existing data from df at  2021-02-14 15:00:05.385239
Dump data to  unit_test_git  End  2021-02-14 15:00:09.543679


.

Connection OK


.

Connection OK
Start delete old data at 2021-02-14 15:00:40.005706
Delete Last ['col1'] at 2021-02-14 15:00:40.788196
Dump data to  unit_test_git  End  2021-02-14 15:00:45.833526


.

Connection OK
Start delete old data at 2021-02-14 15:01:02.180117
Delete Last col1 at 2021-02-14 15:01:03.058915
Dump data to  unit_test_git  End  2021-02-14 15:01:08.152599


.

Connection OK
Start delete old data at 2021-02-14 15:01:24.303231
Delete Last ['col1', 'col2'] at 2021-02-14 15:01:25.168537
Dump data to  unit_test_git  End  2021-02-14 15:01:30.082880


.

Connection OK
Start delete old data at 2021-02-14 15:01:44.793317
Delete Last ['col1', 'col2', 'col3'] at 2021-02-14 15:01:45.577979
Dump data to  unit_test_git  End  2021-02-14 15:01:50.694584


.

Connection OK
Start delete old data at 2021-02-14 15:02:06.424635
Delete Last ['col1'] at 2021-02-14 15:02:07.266814
Dump data to  unit_test_git  End  2021-02-14 15:02:12.154083


.

Connection OK
Drop Existing Table at  2021-02-14 15:02:26.987967
Dump data to  unit_test_git  End  2021-02-14 15:02:32.628701
Start delete old data at 2021-02-14 15:02:32.630682
Delete Last ['date'] at 2021-02-14 15:02:33.441268
Dump data to  unit_test_git  End  2021-02-14 15:02:36.659381


.

Connection OK
Start delete old data at 2021-02-14 15:03:02.538767
Delete Last ['col1'] at 2021-02-14 15:03:03.412570
delete from unit_test_git where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL)
Dump data to  unit_test_git  End  2021-02-14 15:03:08.443159


.

Connection OK
Start delete old data at 2021-02-14 15:03:34.111094
Delete Last ['col1', 'col2'] at 2021-02-14 15:03:34.980243
delete from unit_test_git where ("col1" in ('4.0', '5.0', '6.0', '7.0') OR "col1" IS NULL) and "col2" in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-02-14 15:03:40.128906


.

Connection OK
Drop Existing Table at  2021-02-14 15:03:55.933371
Dump data to  unit_test_git  End  2021-02-14 15:04:01.622354


.
----------------------------------------------------------------------
Ran 13 tests in 291.708s

OK


<unittest.main.TestProgram at 0x23d822dc208>

# SQLite
## Default SQLite : No Datetime in SQLite

In [10]:
class SQL_Test_class(unittest.TestCase):
    def setUp(self):
        self.sql = da_tran_SQL(sql_type = 'sqlite', 
                    host_name = 'test.db', 
                    database_name = '', 
                    user = '', 
                    password = '' )
        self.df1 = pd.DataFrame({'col1' : [1,2,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        self.df2 = pd.DataFrame({'col1' : [4,5,6,7,8] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        self.table_name = 'unit_test_git'
        self.df1.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        
    def test_read_sql(self):
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_whole(self):
        self.sql.dump_whole(self.df2, self.table_name)
        df_read = self.sql.read(self.table_name)
        expect_df = self.df2
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    
    def test_replace1(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace1_str(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,3,3,4,4,5] , 'col3' : [1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace2(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,4,5,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,2,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_replace_logic(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'],
                             math_logic = {'col1' : {'logic' : '>', 'value' : 1}})
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1[self.df1['col1'] <= 1].append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace3(self) :
        self.sql.dump_replace(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2','col3'])
        df_read = self.sql.read(self.table_name)
        expect_df = self.df1.append(self.df2)
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1_str(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = 'col1')
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new1(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1'])
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,6,7,8] , 'col2' : [1,1,2,2,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)

    def test_new2(self) :
        self.sql.dump_new(df_in = self.df2 , table_name_in = self.table_name, list_key = ['col1','col2'], debug = False)
        df_read = self.sql.read(self.table_name)
        expect_df = pd.DataFrame({'col1' : [1,2,3,4,5,4,6,7,8] , 'col2' : [1,1,2,2,3,3,4,4,5] , 'col3' : [1,1,1,1,1,2,2,2,2]})
        result, expect = help_test(df_read, expect_df)
        self.assertEqual(result, expect)
        
    def test_replace_null1(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({'col1' : [1,3,4,5,6,7,None] , 'col2' : [1,2,3,3,4,4,5] , 'col3' : [1,1,2,2,2,2,2]})
        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

    def test_replace_null2(self) :
        df1_n = pd.DataFrame({'col1' : [1,None,3,4,5] , 'col2' : [1,1,2,2,3] , 'col3' : [1,1,1,1,1]})
        df2_n = pd.DataFrame({'col1' : [4,5,6,7,None] , 'col2' : [3,3,4,4,5] , 'col3' : [2,2,2,2,2]})
        df1_n.to_sql(self.table_name, con = self.sql.engine, index = False, if_exists = 'replace')
        self.sql.dump_replace(df_in = df2_n , table_name_in = self.table_name, list_key = ['col1','col2'] , debug = True)
        df_read = self.sql.read(self.table_name).sort_values('col1').reset_index(drop = True)
        expect_df = pd.DataFrame({ 'col1' : [1,None,3,4,4,5,6,7,None] , 'col2' : [1,1,2,2,3,3,4,4,5] 
                                  ,'col3' : [1,1,1,1,2,2,2,2,2]}).sort_values('col1').reset_index(drop = True)

        result, expect = help_test(df_read.fillna(99), expect_df.fillna(99))
        self.assertEqual(result, expect)

unittest.main(argv = ['first-arg-is-ignored'], exit = False)

.....

Connection OK
Start Filter Existing data from df at  2021-02-14 15:04:02.587290
Dump data to  unit_test_git  End  2021-02-14 15:04:02.598843
Connection OK
Start Filter Existing data from df at  2021-02-14 15:04:02.625774
Dump data to  unit_test_git  End  2021-02-14 15:04:02.633919
Connection OK
Start Filter Existing data from df at  2021-02-14 15:04:02.663667
Dump data to  unit_test_git  End  2021-02-14 15:04:02.674805
Connection OK
Connection OK
Start delete old data at 2021-02-14 15:04:02.726944
Delete Last ['col1'] at 2021-02-14 15:04:02.730428
Dump data to  unit_test_git  End  2021-02-14 15:04:02.735423
Connection OK
Start delete old data at 2021-02-14 15:04:02.762081


.....

Delete Last col1 at 2021-02-14 15:04:02.766071
Dump data to  unit_test_git  End  2021-02-14 15:04:02.771057
Connection OK
Start delete old data at 2021-02-14 15:04:02.798322
Delete Last ['col1', 'col2'] at 2021-02-14 15:04:02.802350
Dump data to  unit_test_git  End  2021-02-14 15:04:02.809327
Connection OK
Start delete old data at 2021-02-14 15:04:02.838256
Delete Last ['col1', 'col2', 'col3'] at 2021-02-14 15:04:02.840210
Dump data to  unit_test_git  End  2021-02-14 15:04:02.845196
Connection OK
Start delete old data at 2021-02-14 15:04:02.870639
Delete Last ['col1'] at 2021-02-14 15:04:02.875627
Dump data to  unit_test_git  End  2021-02-14 15:04:02.880641
Connection OK
Start delete old data at 2021-02-14 15:04:02.921169
Delete Last ['col1'] at 2021-02-14 15:04:02.925158
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL)
Dump data to  unit_test_git  End  2021-02-14 15:04:02.929718
Connection OK


..

Start delete old data at 2021-02-14 15:04:02.971679
Delete Last ['col1', 'col2'] at 2021-02-14 15:04:02.975668
delete from unit_test_git where ([col1] in ('4.0', '5.0', '6.0', '7.0') OR [col1] IS NULL) and [col2] in ('3', '4', '5')
Dump data to  unit_test_git  End  2021-02-14 15:04:02.981652
Connection OK
Drop Existing Table at  2021-02-14 15:04:03.010574
Dump data to  unit_test_git  End  2021-02-14 15:04:03.025536



----------------------------------------------------------------------
Ran 12 tests in 0.509s

OK


<unittest.main.TestProgram at 0x23d8170b6c8>