-
Notifications
You must be signed in to change notification settings - Fork 91
/
dbf2sqlite
executable file
·126 lines (100 loc) · 3.06 KB
/
dbf2sqlite
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
dbf2sqlite - convert dbf files into sqlite database
Ole Martin Bjørndalen
University of Tromsø
Todo:
- -v --verbose option
- handle existing table (-f option?)
- primary key option? (make first column primary key)
- create only option?
- insert only option?
- options to select columns to insert?
"""
import sys
import argparse
import sqlite3
import traceback
from dbfread import DBF
typemap = {
'F': 'FLOAT',
'L': 'BOOLEAN',
'I': 'INTEGER',
'C': 'TEXT',
'N': 'REAL', # because it can be integer or float
'M': 'TEXT',
'D': 'DATE',
'T': 'DATETIME',
'0': 'INTEGER',
}
def add_table(cursor, table):
"""Add a dbase table to an open sqlite database."""
cursor.execute('drop table if exists "{}"'.format(table.name))
field_types = {}
for field in table.fields:
field_types[field.name] = typemap.get(field.type, 'TEXT')
#
# Create the table
#
defs = ', '.join(['"{}" {}'.format(f, field_types[f])
for f in table.field_names])
sql = 'create table "{}" ({})'.format(table.name, defs)
cursor.execute(sql)
# Create data rows
refs = ', '.join([':' + field for field in table.field_names])
sql = 'insert into "{}" values ({})'.format(table.name, refs)
for rec in table:
cursor.execute(sql, list(rec.values()))
def parse_args():
parser = argparse.ArgumentParser(
description='usage: %prog [OPTIONS] table1.dbf ... tableN.dbf')
arg = parser.add_argument
arg('-o', '--output-file',
action='store',
dest='output_file',
default=None,
help='sqlite database to write to '
'(default is to print schema to stdout)')
arg('-e', '--encoding',
action='store',
dest='encoding',
default=None,
help='character encoding in DBF file')
arg('--char-decode-errors',
action='store',
dest='char_decode_errors',
default='strict',
help='how to handle decode errors (see pydoc bytes.decode)')
arg('tables',
metavar='TABLE',
nargs='+',
help='tables to add to sqlite database')
return parser.parse_args()
def main():
args = parse_args()
conn = sqlite3.connect(args.output_file or ':memory:')
cursor = conn.cursor()
for table_file in args.tables:
try:
add_table(cursor, DBF(table_file,
lowernames=True,
encoding=args.encoding,
char_decode_errors=args.char_decode_errors))
except UnicodeDecodeError:
traceback.print_exc()
sys.exit('Please use --encoding or --char-decode-errors.')
conn.commit()
#
# Dump SQL schema and data to stdout if no
# database file was specified.
#
# This currently only works in Python 3,
# since Python 2 somehow defaults to 'ascii'
# encoding.
#
if not args.output_file:
for line in conn.iterdump():
print(line)
if __name__ == '__main__':
main()