Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 649 lines (489 sloc) 22.39 kb
c3f6b46 @mkleehammer Import from Subversion 2.0.63; reworked versioning
authored
1 #!/usr/bin/python
2
3 usage="""\
4 usage: %prog [options] filename
5
6 Unit tests for Microsoft Access
7
8 These run using the version from the 'build' directory, not the version
9 installed into the Python directories. You must run python setup.py build
10 before running the tests.
11
12 To run, pass the filename of an Access database on the command line:
13
14 accesstests test.accdb
15
16 An empty Access 2000 database (empty.mdb) and an empty Access 2007 database
17 (empty.accdb), are provided.
18
19 To run a single test, use the -t option:
20
21 accesstests test.accdb -t unicode_null
22
23 If you want to report an error, it would be helpful to include the driver information
24 by using the verbose flag and redirecting the output to a file:
25
26 accesstests test.accdb -v >& results.txt
27
28 You can pass the verbose flag twice for more verbose output:
29
30 accesstests test.accdb -vv
31 """
32
33 # Access SQL data types: http://msdn2.microsoft.com/en-us/library/bb208866.aspx
34
35 import sys, os, re
36 import unittest
37 from decimal import Decimal
38 from datetime import datetime, date, time
39 from os.path import abspath
40 from testutils import *
41
42 CNXNSTRING = None
43
44 _TESTSTR = '0123456789-abcdefghijklmnopqrstuvwxyz-'
45
46 def _generate_test_string(length):
47 """
48 Returns a string of composed of `seed` to make a string `length` characters long.
49
50 To enhance performance, there are 3 ways data is read, based on the length of the value, so most data types are
51 tested with 3 lengths. This function helps us generate the test data.
52
53 We use a recognizable data set instead of a single character to make it less likely that "overlap" errors will
54 be hidden and to help us manually identify where a break occurs.
55 """
56 if length <= len(_TESTSTR):
57 return _TESTSTR[:length]
58
59 c = (length + len(_TESTSTR)-1) / len(_TESTSTR)
60 v = _TESTSTR * c
61 return v[:length]
62
63
64 class AccessTestCase(unittest.TestCase):
65
66 SMALL_FENCEPOST_SIZES = [ 0, 1, 254, 255 ] # text fields <= 255
67 LARGE_FENCEPOST_SIZES = [ 256, 270, 304, 508, 510, 511, 512, 1023, 1024, 2047, 2048, 4000, 4095, 4096, 4097, 10 * 1024, 20 * 1024 ]
68
69 ANSI_FENCEPOSTS = [ _generate_test_string(size) for size in SMALL_FENCEPOST_SIZES ]
70 UNICODE_FENCEPOSTS = [ unicode(s) for s in ANSI_FENCEPOSTS ]
71 IMAGE_FENCEPOSTS = ANSI_FENCEPOSTS + [ _generate_test_string(size) for size in LARGE_FENCEPOST_SIZES ]
72
73 def __init__(self, method_name):
74 unittest.TestCase.__init__(self, method_name)
75
76 def setUp(self):
77 self.cnxn = pyodbc.connect(CNXNSTRING)
78 self.cursor = self.cnxn.cursor()
79
80 for i in range(3):
81 try:
82 self.cursor.execute("drop table t%d" % i)
83 self.cnxn.commit()
84 except:
85 pass
86
87 self.cnxn.rollback()
88
89 def tearDown(self):
90 try:
91 self.cursor.close()
92 self.cnxn.close()
93 except:
94 # If we've already closed the cursor or connection, exceptions are thrown.
95 pass
96
97 def test_multiple_bindings(self):
98 "More than one bind and select on a cursor"
99 self.cursor.execute("create table t1(n int)")
100 self.cursor.execute("insert into t1 values (?)", 1)
101 self.cursor.execute("insert into t1 values (?)", 2)
102 self.cursor.execute("insert into t1 values (?)", 3)
103 for i in range(3):
104 self.cursor.execute("select n from t1 where n < ?", 10)
105 self.cursor.execute("select n from t1 where n < 3")
106
107
108 def test_different_bindings(self):
109 self.cursor.execute("create table t1(n int)")
110 self.cursor.execute("create table t2(d datetime)")
111 self.cursor.execute("insert into t1 values (?)", 1)
112 self.cursor.execute("insert into t2 values (?)", datetime.now())
113
114 def test_datasources(self):
115 p = pyodbc.dataSources()
116 self.assert_(isinstance(p, dict))
117
118 def test_getinfo_string(self):
119 value = self.cnxn.getinfo(pyodbc.SQL_CATALOG_NAME_SEPARATOR)
120 self.assert_(isinstance(value, str))
121
122 def test_getinfo_bool(self):
123 value = self.cnxn.getinfo(pyodbc.SQL_ACCESSIBLE_TABLES)
124 self.assert_(isinstance(value, bool))
125
126 def test_getinfo_int(self):
127 value = self.cnxn.getinfo(pyodbc.SQL_DEFAULT_TXN_ISOLATION)
128 self.assert_(isinstance(value, (int, long)))
129
130 def test_getinfo_smallint(self):
131 value = self.cnxn.getinfo(pyodbc.SQL_CONCAT_NULL_BEHAVIOR)
132 self.assert_(isinstance(value, int))
133
134 def _test_strtype(self, sqltype, value, colsize=None):
135 """
136 The implementation for string, Unicode, and binary tests.
137 """
0e212c8 Made accesstests.py work with Python 2.4
Michael Kleehammer authored
138 assert colsize is None or (value is None or colsize >= len(value)), 'colsize=%s value=%s' % (colsize, (value is None) and 'none' or len(value))
c3f6b46 @mkleehammer Import from Subversion 2.0.63; reworked versioning
authored
139
140 if colsize:
141 sql = "create table t1(n1 int not null, s1 %s(%s), s2 %s(%s))" % (sqltype, colsize, sqltype, colsize)
142 else:
143 sql = "create table t1(n1 int not null, s1 %s, s2 %s)" % (sqltype, sqltype)
144
145 self.cursor.execute(sql)
146 self.cursor.execute("insert into t1 values(1, ?, ?)", (value, value))
147 row = self.cursor.execute("select s1, s2 from t1").fetchone()
148
149 # Access only uses Unicode, but strings might have been passed in to see if they can be written. When we read
150 # them back, they'll be unicode, so compare our results to a Unicode version of `value`.
151 if type(value) is str:
152 value = unicode(value)
153
154 for i in range(2):
155 v = row[i]
156
157 self.assertEqual(type(v), type(value))
158
159 if value is not None:
160 self.assertEqual(len(v), len(value))
161
162 self.assertEqual(v, value)
163
164 #
165 # unicode
166 #
167
168 def test_unicode_null(self):
169 self._test_strtype('varchar', None, 255)
170
171 # Generate a test for each fencepost size: test_varchar_0, etc.
172 def _maketest(value):
173 def t(self):
174 self._test_strtype('varchar', value, len(value))
175 t.__doc__ = 'unicode %s' % len(value)
176 return t
177 for value in UNICODE_FENCEPOSTS:
178 locals()['test_unicode_%s' % len(value)] = _maketest(value)
179
180 #
181 # ansi -> varchar
182 #
183
184 # Access only stores Unicode text but it should accept ASCII text.
185
186 # Generate a test for each fencepost size: test_varchar_0, etc.
187 def _maketest(value):
188 def t(self):
189 self._test_strtype('varchar', value, len(value))
190 t.__doc__ = 'ansi %s' % len(value)
191 return t
192 for value in ANSI_FENCEPOSTS:
193 locals()['test_ansivarchar_%s' % len(value)] = _maketest(value)
194
195 #
196 # binary
197 #
198
199 # Generate a test for each fencepost size: test_varchar_0, etc.
200 def _maketest(value):
201 def t(self):
202 self._test_strtype('varbinary', buffer(value), len(value))
203 t.__doc__ = 'binary %s' % len(value)
204 return t
205 for value in ANSI_FENCEPOSTS:
206 locals()['test_binary_%s' % len(value)] = _maketest(value)
207
208
209 #
210 # image
211 #
212
213 def test_null_image(self):
214 self._test_strtype('image', None)
215
216 # Generate a test for each fencepost size: test_varchar_0, etc.
217 def _maketest(value):
218 def t(self):
219 self._test_strtype('image', buffer(value))
220 t.__doc__ = 'image %s' % len(value)
221 return t
222 for value in IMAGE_FENCEPOSTS:
223 locals()['test_image_%s' % len(value)] = _maketest(value)
224
225 #
226 # memo
227 #
228
229 def test_null_memo(self):
230 self._test_strtype('memo', None)
231
232 # Generate a test for each fencepost size: test_varchar_0, etc.
233 def _maketest(value):
234 def t(self):
235 self._test_strtype('memo', unicode(value))
236 t.__doc__ = 'Unicode to memo %s' % len(value)
237 return t
238 for value in IMAGE_FENCEPOSTS:
239 locals()['test_memo_%s' % len(value)] = _maketest(value)
240
241 # ansi -> memo
242 def _maketest(value):
243 def t(self):
244 self._test_strtype('memo', value)
245 t.__doc__ = 'ANSI to memo %s' % len(value)
246 return t
247 for value in IMAGE_FENCEPOSTS:
248 locals()['test_ansimemo_%s' % len(value)] = _maketest(value)
249
250 def test_subquery_params(self):
251 """Ensure parameter markers work in a subquery"""
252 self.cursor.execute("create table t1(id integer, s varchar(20))")
253 self.cursor.execute("insert into t1 values (?,?)", 1, 'test')
254 row = self.cursor.execute("""
255 select x.id
256 from (
257 select id
258 from t1
259 where s = ?
260 and id between ? and ?
261 ) x
262 """, 'test', 1, 10).fetchone()
263 self.assertNotEqual(row, None)
264 self.assertEqual(row[0], 1)
265
266 def _exec(self):
267 self.cursor.execute(self.sql)
268
269 def test_close_cnxn(self):
270 """Make sure using a Cursor after closing its connection doesn't crash."""
271
272 self.cursor.execute("create table t1(id integer, s varchar(20))")
273 self.cursor.execute("insert into t1 values (?,?)", 1, 'test')
274 self.cursor.execute("select * from t1")
275
276 self.cnxn.close()
277
278 # Now that the connection is closed, we expect an exception. (If the code attempts to use
279 # the HSTMT, we'll get an access violation instead.)
280 self.sql = "select * from t1"
281 self.assertRaises(pyodbc.ProgrammingError, self._exec)
282
283
284 def test_unicode_query(self):
285 self.cursor.execute(u"select 1")
286
287 def test_negative_row_index(self):
288 self.cursor.execute("create table t1(s varchar(20))")
289 self.cursor.execute("insert into t1 values(?)", "1")
290 row = self.cursor.execute("select * from t1").fetchone()
291 self.assertEquals(row[0], "1")
292 self.assertEquals(row[-1], "1")
293
294 def test_version(self):
295 self.assertEquals(3, len(pyodbc.version.split('.'))) # 1.3.1 etc.
296
297 #
298 # date, time, datetime
299 #
300
301 def test_datetime(self):
302 value = datetime(2007, 1, 15, 3, 4, 5)
303
304 self.cursor.execute("create table t1(dt datetime)")
305 self.cursor.execute("insert into t1 values (?)", value)
306
307 result = self.cursor.execute("select dt from t1").fetchone()[0]
308 self.assertEquals(value, result)
309
310 #
311 # ints and floats
312 #
313
314 def test_int(self):
315 value = 1234
316 self.cursor.execute("create table t1(n int)")
317 self.cursor.execute("insert into t1 values (?)", value)
318 result = self.cursor.execute("select n from t1").fetchone()[0]
319 self.assertEquals(result, value)
320
321 def test_negative_int(self):
322 value = -1
323 self.cursor.execute("create table t1(n int)")
324 self.cursor.execute("insert into t1 values (?)", value)
325 result = self.cursor.execute("select n from t1").fetchone()[0]
326 self.assertEquals(result, value)
327
328 def test_smallint(self):
329 value = 32767
330 self.cursor.execute("create table t1(n smallint)")
331 self.cursor.execute("insert into t1 values (?)", value)
332 result = self.cursor.execute("select n from t1").fetchone()[0]
333 self.assertEquals(result, value)
334
335 def test_real(self):
336 value = 1234.5
337 self.cursor.execute("create table t1(n real)")
338 self.cursor.execute("insert into t1 values (?)", value)
339 result = self.cursor.execute("select n from t1").fetchone()[0]
340 self.assertEquals(result, value)
341
342 def test_negative_real(self):
343 value = -200.5
344 self.cursor.execute("create table t1(n real)")
345 self.cursor.execute("insert into t1 values (?)", value)
346 result = self.cursor.execute("select n from t1").fetchone()[0]
347 self.assertEqual(value, result)
348
349 def test_float(self):
350 value = 1234.567
351 self.cursor.execute("create table t1(n float)")
352 self.cursor.execute("insert into t1 values (?)", value)
353 result = self.cursor.execute("select n from t1").fetchone()[0]
354 self.assertEquals(result, value)
355
356 def test_negative_float(self):
357 value = -200.5
358 self.cursor.execute("create table t1(n float)")
359 self.cursor.execute("insert into t1 values (?)", value)
360 result = self.cursor.execute("select n from t1").fetchone()[0]
361 self.assertEqual(value, result)
362
363 def test_tinyint(self):
364 self.cursor.execute("create table t1(n tinyint)")
365 value = 10
366 self.cursor.execute("insert into t1 values (?)", value)
367 result = self.cursor.execute("select n from t1").fetchone()[0]
368 self.assertEqual(type(result), type(value))
369 self.assertEqual(value, result)
370
371 #
372 # decimal & money
373 #
374
375 def test_decimal(self):
376 value = Decimal('12345.6789')
377 self.cursor.execute("create table t1(n numeric(10,4))")
378 self.cursor.execute("insert into t1 values(?)", value)
379 v = self.cursor.execute("select n from t1").fetchone()[0]
380 self.assertEqual(type(v), Decimal)
381 self.assertEqual(v, value)
382
383 def test_money(self):
384 self.cursor.execute("create table t1(n money)")
385 value = Decimal('1234.45')
386 self.cursor.execute("insert into t1 values (?)", value)
387 result = self.cursor.execute("select n from t1").fetchone()[0]
388 self.assertEqual(type(result), type(value))
389 self.assertEqual(value, result)
390
391 def test_negative_decimal_scale(self):
392 value = Decimal('-10.0010')
393 self.cursor.execute("create table t1(d numeric(19,4))")
394 self.cursor.execute("insert into t1 values(?)", value)
395 v = self.cursor.execute("select * from t1").fetchone()[0]
396 self.assertEqual(type(v), Decimal)
397 self.assertEqual(v, value)
398
399 #
400 # bit
401 #
402
403 def test_bit(self):
404 self.cursor.execute("create table t1(b bit)")
405
406 value = True
407 self.cursor.execute("insert into t1 values (?)", value)
408 result = self.cursor.execute("select b from t1").fetchone()[0]
409 self.assertEqual(type(result), bool)
410 self.assertEqual(value, result)
411
412 def test_bit_null(self):
413 self.cursor.execute("create table t1(b bit)")
414
415 value = None
416 self.cursor.execute("insert into t1 values (?)", value)
417 result = self.cursor.execute("select b from t1").fetchone()[0]
418 self.assertEqual(type(result), bool)
419 self.assertEqual(False, result)
420
421 def test_guid(self):
422 # REVIEW: Python doesn't (yet) have a UUID type so the value is returned as a string. Access, however, only
423 # really supports Unicode. For now, we'll have to live with this difference. All strings in Python 3.x will
424 # be Unicode -- pyodbc 3.x will have different defaults.
425 value = "de2ac9c6-8676-4b0b-b8a6-217a8580cbee"
426 self.cursor.execute("create table t1(g1 uniqueidentifier)")
427 self.cursor.execute("insert into t1 values (?)", value)
428 v = self.cursor.execute("select * from t1").fetchone()[0]
429 self.assertEqual(type(v), type(value))
430 self.assertEqual(len(v), len(value))
431
432
433 #
434 # rowcount
435 #
436
437 def test_rowcount_delete(self):
438 self.assertEquals(self.cursor.rowcount, -1)
439 self.cursor.execute("create table t1(i int)")
440 count = 4
441 for i in range(count):
442 self.cursor.execute("insert into t1 values (?)", i)
443 self.cursor.execute("delete from t1")
444 self.assertEquals(self.cursor.rowcount, count)
445
446 def test_rowcount_nodata(self):
447 """
448 This represents a different code path than a delete that deleted something.
449
450 The return value is SQL_NO_DATA and code after it was causing an error. We could use SQL_NO_DATA to step over
451 the code that errors out and drop down to the same SQLRowCount code. On the other hand, we could hardcode a
452 zero return value.
453 """
454 self.cursor.execute("create table t1(i int)")
455 # This is a different code path internally.
456 self.cursor.execute("delete from t1")
457 self.assertEquals(self.cursor.rowcount, 0)
458
459 def test_rowcount_select(self):
460 """
461 Ensure Cursor.rowcount is set properly after a select statement.
462
463 pyodbc calls SQLRowCount after each execute and sets Cursor.rowcount, but SQL Server 2005 returns -1 after a
464 select statement, so we'll test for that behavior. This is valid behavior according to the DB API
465 specification, but people don't seem to like it.
466 """
467 self.cursor.execute("create table t1(i int)")
468 count = 4
469 for i in range(count):
470 self.cursor.execute("insert into t1 values (?)", i)
471 self.cursor.execute("select * from t1")
472 self.assertEquals(self.cursor.rowcount, -1)
473
474 rows = self.cursor.fetchall()
475 self.assertEquals(len(rows), count)
476 self.assertEquals(self.cursor.rowcount, -1)
477
478 def test_rowcount_reset(self):
479 "Ensure rowcount is reset to -1"
480
481 self.cursor.execute("create table t1(i int)")
482 count = 4
483 for i in range(count):
484 self.cursor.execute("insert into t1 values (?)", i)
485 self.assertEquals(self.cursor.rowcount, 1)
486
487 self.cursor.execute("create table t2(i int)")
488 self.assertEquals(self.cursor.rowcount, -1)
489
490 #
491 # Misc
492 #
493
494 def test_lower_case(self):
495 "Ensure pyodbc.lowercase forces returned column names to lowercase."
496
497 # Has to be set before creating the cursor, so we must recreate self.cursor.
498
499 pyodbc.lowercase = True
500 self.cursor = self.cnxn.cursor()
501
502 self.cursor.execute("create table t1(Abc int, dEf int)")
503 self.cursor.execute("select * from t1")
504
505 names = [ t[0] for t in self.cursor.description ]
506 names.sort()
507
508 self.assertEquals(names, [ "abc", "def" ])
509
510 # Put it back so other tests don't fail.
511 pyodbc.lowercase = False
512
513 def test_row_description(self):
514 """
515 Ensure Cursor.description is accessible as Row.cursor_description.
516 """
517 self.cursor = self.cnxn.cursor()
518 self.cursor.execute("create table t1(a int, b char(3))")
519 self.cnxn.commit()
520 self.cursor.execute("insert into t1 values(1, 'abc')")
521
522 row = self.cursor.execute("select * from t1").fetchone()
523 self.assertEquals(self.cursor.description, row.cursor_description)
524
525
526 def test_executemany(self):
527 self.cursor.execute("create table t1(a int, b varchar(10))")
528
529 params = [ (i, str(i)) for i in range(1, 6) ]
530
531 self.cursor.executemany("insert into t1(a, b) values (?,?)", params)
532
533 count = self.cursor.execute("select count(*) from t1").fetchone()[0]
534 self.assertEqual(count, len(params))
535
536 self.cursor.execute("select a, b from t1 order by a")
537 rows = self.cursor.fetchall()
538 self.assertEqual(count, len(rows))
539
540 for param, row in zip(params, rows):
541 self.assertEqual(param[0], row[0])
542 self.assertEqual(param[1], row[1])
543
544
545 def test_executemany_failure(self):
546 """
547 Ensure that an exception is raised if one query in an executemany fails.
548 """
549 self.cursor.execute("create table t1(a int, b varchar(10))")
550
551 params = [ (1, 'good'),
552 ('error', 'not an int'),
553 (3, 'good') ]
554
555 self.failUnlessRaises(pyodbc.Error, self.cursor.executemany, "insert into t1(a, b) value (?, ?)", params)
556
557
558 def test_row_slicing(self):
559 self.cursor.execute("create table t1(a int, b int, c int, d int)");
560 self.cursor.execute("insert into t1 values(1,2,3,4)")
561
562 row = self.cursor.execute("select * from t1").fetchone()
563
564 result = row[:]
565 self.failUnless(result is row)
566
567 result = row[:-1]
568 self.assertEqual(result, (1,2,3))
569
570 result = row[0:4]
571 self.failUnless(result is row)
572
573
574 def test_row_repr(self):
575 self.cursor.execute("create table t1(a int, b int, c int, d int)");
576 self.cursor.execute("insert into t1 values(1,2,3,4)")
577
578 row = self.cursor.execute("select * from t1").fetchone()
579
580 result = str(row)
581 self.assertEqual(result, "(1, 2, 3, 4)")
582
583 result = str(row[:-1])
584 self.assertEqual(result, "(1, 2, 3)")
585
586 result = str(row[:1])
587 self.assertEqual(result, "(1,)")
588
589
590 def test_concatenation(self):
591 v2 = u'0123456789' * 25
592 v3 = u'9876543210' * 25
593 value = v2 + 'x' + v3
594
595 self.cursor.execute("create table t1(c2 varchar(250), c3 varchar(250))")
596 self.cursor.execute("insert into t1(c2, c3) values (?,?)", v2, v3)
597
598 row = self.cursor.execute("select c2 + 'x' + c3 from t1").fetchone()
599
600 self.assertEqual(row[0], value)
601
602
603 def test_autocommit(self):
604 self.assertEqual(self.cnxn.autocommit, False)
605
606 othercnxn = pyodbc.connect(CNXNSTRING, autocommit=True)
607 self.assertEqual(othercnxn.autocommit, True)
608
609 othercnxn.autocommit = False
610 self.assertEqual(othercnxn.autocommit, False)
611
612
613 def main():
614 from optparse import OptionParser
615 parser = OptionParser(usage=usage)
616 parser.add_option("-v", "--verbose", action="count", help="Increment test verbosity (can be used multiple times)")
617 parser.add_option("-d", "--debug", action="store_true", default=False, help="Print debugging items")
618 parser.add_option("-t", "--test", help="Run only the named test")
619
620 (options, args) = parser.parse_args()
621
622 if len(args) != 1:
623 parser.error('dbfile argument required')
624
625 if args[0].endswith('.accdb'):
626 driver = 'Microsoft Access Driver (*.mdb, *.accdb)'
627 else:
628 driver = 'Microsoft Access Driver (*.mdb)'
629
630 global CNXNSTRING
631 CNXNSTRING = 'DRIVER={%s};DBQ=%s;ExtendedAnsiSQL=1' % (driver, abspath(args[0]))
632
633 cnxn = pyodbc.connect(CNXNSTRING)
634 print_library_info(cnxn)
635 cnxn.close()
636
637 suite = load_tests(AccessTestCase, options.test)
638
639 testRunner = unittest.TextTestRunner(verbosity=options.verbose)
640 result = testRunner.run(suite)
641
642
643 if __name__ == '__main__':
644
645 # Add the build directory to the path so we're testing the latest build, not the installed version.
646 add_to_path()
647 import pyodbc
648 main()
Something went wrong with that request. Please try again.