Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions tests/test_003_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,41 +77,41 @@ def test_autocommit_setter(db_connection):
db_connection.autocommit = True
cursor = db_connection.cursor()
# Make a transaction and check if it is autocommited
drop_table_if_exists(cursor, "pytest_test_autocommit")
drop_table_if_exists(cursor, "#pytest_test_autocommit")
try:
cursor.execute("CREATE TABLE pytest_test_autocommit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO pytest_test_autocommit (id, value) VALUES (1, 'test');")
cursor.execute("SELECT * FROM pytest_test_autocommit WHERE id = 1;")
cursor.execute("CREATE TABLE #pytest_test_autocommit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_autocommit (id, value) VALUES (1, 'test');")
cursor.execute("SELECT * FROM #pytest_test_autocommit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Autocommit failed: No data found"
assert result[1] == 'test', "Autocommit failed: Incorrect data"
except Exception as e:
pytest.fail(f"Autocommit failed: {e}")
finally:
cursor.execute("DROP TABLE pytest_test_autocommit;")
cursor.execute("DROP TABLE #pytest_test_autocommit;")
db_connection.commit()
assert db_connection.autocommit is True, "Autocommit should be True"

db_connection.autocommit = False
cursor = db_connection.cursor()
# Make a transaction and check if it is not autocommited
drop_table_if_exists(cursor, "pytest_test_autocommit")
drop_table_if_exists(cursor, "#pytest_test_autocommit")
try:
cursor.execute("CREATE TABLE pytest_test_autocommit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO pytest_test_autocommit (id, value) VALUES (1, 'test');")
cursor.execute("SELECT * FROM pytest_test_autocommit WHERE id = 1;")
cursor.execute("CREATE TABLE #pytest_test_autocommit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_autocommit (id, value) VALUES (1, 'test');")
cursor.execute("SELECT * FROM #pytest_test_autocommit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Autocommit failed: No data found"
assert result[1] == 'test', "Autocommit failed: Incorrect data"
db_connection.commit()
cursor.execute("SELECT * FROM pytest_test_autocommit WHERE id = 1;")
cursor.execute("SELECT * FROM #pytest_test_autocommit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Autocommit failed: No data found after commit"
assert result[1] == 'test', "Autocommit failed: Incorrect data after commit"
except Exception as e:
pytest.fail(f"Autocommit failed: {e}")
finally:
cursor.execute("DROP TABLE pytest_test_autocommit;")
cursor.execute("DROP TABLE #pytest_test_autocommit;")
db_connection.commit()

def test_set_autocommit(db_connection):
Expand All @@ -123,49 +123,49 @@ def test_set_autocommit(db_connection):
def test_commit(db_connection):
# Make a transaction and commit
cursor = db_connection.cursor()
drop_table_if_exists(cursor, "pytest_test_commit")
drop_table_if_exists(cursor, "#pytest_test_commit")
try:
cursor.execute("CREATE TABLE pytest_test_commit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO pytest_test_commit (id, value) VALUES (1, 'test');")
cursor.execute("CREATE TABLE #pytest_test_commit (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_commit (id, value) VALUES (1, 'test');")
db_connection.commit()
cursor.execute("SELECT * FROM pytest_test_commit WHERE id = 1;")
cursor.execute("SELECT * FROM #pytest_test_commit WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Commit failed: No data found"
assert result[1] == 'test', "Commit failed: Incorrect data"
except Exception as e:
pytest.fail(f"Commit failed: {e}")
finally:
cursor.execute("DROP TABLE pytest_test_commit;")
cursor.execute("DROP TABLE #pytest_test_commit;")
db_connection.commit()

def test_rollback(db_connection):
# Make a transaction and rollback
cursor = db_connection.cursor()
drop_table_if_exists(cursor, "pytest_test_rollback")
drop_table_if_exists(cursor, "#pytest_test_rollback")
try:
# Create a table and insert data
cursor.execute("CREATE TABLE pytest_test_rollback (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO pytest_test_rollback (id, value) VALUES (1, 'test');")
cursor.execute("CREATE TABLE #pytest_test_rollback (id INT PRIMARY KEY, value VARCHAR(50));")
cursor.execute("INSERT INTO #pytest_test_rollback (id, value) VALUES (1, 'test');")
db_connection.commit()

# Check if the data is present before rollback
cursor.execute("SELECT * FROM pytest_test_rollback WHERE id = 1;")
cursor.execute("SELECT * FROM #pytest_test_rollback WHERE id = 1;")
result = cursor.fetchone()
assert result is not None, "Rollback failed: No data found before rollback"
assert result[1] == 'test', "Rollback failed: Incorrect data"

# Insert data and rollback
cursor.execute("INSERT INTO pytest_test_rollback (id, value) VALUES (2, 'test');")
cursor.execute("INSERT INTO #pytest_test_rollback (id, value) VALUES (2, 'test');")
db_connection.rollback()

# Check if the data is not present after rollback
cursor.execute("SELECT * FROM pytest_test_rollback WHERE id = 2;")
cursor.execute("SELECT * FROM #pytest_test_rollback WHERE id = 2;")
result = cursor.fetchone()
assert result is None, "Rollback failed: Data found after rollback"
except Exception as e:
pytest.fail(f"Rollback failed: {e}")
finally:
cursor.execute("DROP TABLE pytest_test_rollback;")
cursor.execute("DROP TABLE #pytest_test_rollback;")
db_connection.commit()

def test_invalid_connection_string():
Expand Down
Loading