-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathfdb_to_sqlite.py
207 lines (168 loc) · 5.87 KB
/
fdb_to_sqlite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""Module for converting a FDB database to a SQLite database"""
import argparse
import os
import sqlite3
import struct
from collections import OrderedDict
# There seems to be no difference between 4 and 8, but just in case there is I'm keeping that type info
SQLITE_TYPE = {}
SQLITE_TYPE[0] = "none"
SQLITE_TYPE[1] = "int32"
SQLITE_TYPE[3] = "real"
SQLITE_TYPE[4] = "text_4"
SQLITE_TYPE[5] = "int_bool"
SQLITE_TYPE[6] = "int64"
SQLITE_TYPE[8] = "text_8"
def pointer_scope(func):
"""The FDB format has a lot of pointers to structures, so this decorator automatically reads the pointer, seeks to the pointer position, calls the function, and seeks back."""
def wrapper(self, *args, **kwargs):
pointer = kwargs.get("pointer")
if pointer == None:
pointer = self._read_int32()
else:
del kwargs["pointer"]
if pointer == -1:
return
current_pos = self.fdb.tell()
self.fdb.seek(pointer)
result = func(self, *args, **kwargs)
self.fdb.seek(current_pos)
return result
return wrapper
# I'm using a class for this to save things like the fdb and the sqlite without using globals
class convert:
def __init__(self, in_file, out_file=None, add_link_info=False):
self.add_link_info = add_link_info
if out_file == None:
out_file = os.path.splitext(os.path.basename(in_file))[0] + ".sqlite"
if os.path.exists(out_file):
os.remove(out_file)
self.fdb = open(in_file, "rb")
self.sqlite = sqlite3.connect(out_file)
self._read()
print("-"*79)
print("Finished converting database!")
print("Converted file is at: "+out_file)
print("-"*79)
self.sqlite.commit()
self.sqlite.close()
self.fdb.close()
def _read(self):
number_of_tables = self._read_int32()
self._read_tables(number_of_tables)
@pointer_scope
def _read_tables(self, number_of_tables):
for table_struct_index in range(number_of_tables):
table_name, number_of_columns = self._read_column_header()
print("[%2i%%] Reading table %s" % (table_struct_index*100//number_of_tables, table_name))
self._read_row_header(table_name, number_of_columns)
@pointer_scope
def _read_column_header(self):
number_of_columns = self._read_int32()
table_name = self._read_string()
columns = self._read_columns(number_of_columns)
sql = "create table if not exists '%s' (%s)" % \
(table_name, ", ".join(["'%s' %s" % (col, SQLITE_TYPE[columns[col]]) for col in columns]))
self.sqlite.execute(sql)
return table_name, len(columns)
@pointer_scope
def _read_columns(self, number_of_columns):
columns = OrderedDict()
for _ in range(number_of_columns):
data_type = self._read_int32()
name = self._read_string()
columns[name] = data_type
if self.add_link_info:
columns["_linked_from"] = 1
columns["_does_link"] = 5
columns["_invalid"] = 5
return columns
@pointer_scope
def _read_row_header(self, table_name, number_of_columns):
number_of_allocated_rows = self._read_int32()
if number_of_allocated_rows != 0:
assert number_of_allocated_rows & (number_of_allocated_rows - 1) == 0 # assert power of 2 allocation size
self.sqlite.executemany("insert into '%s' values (%s)" % (table_name, ", ".join(["?"] * number_of_columns)), self._read_rows(number_of_allocated_rows, number_of_columns))
@pointer_scope
def _read_rows(self, number_of_allocated_rows, number_of_columns):
rowid = 0
percent_read = -1 # -1 so 0% is displayed as new
for row in range(number_of_allocated_rows):
new_percent_read = row*100//number_of_allocated_rows
if new_percent_read > percent_read:
percent_read = new_percent_read
print("[%2i%%] Reading rows" % percent_read, end="\r")
row_pointer = self._read_int32()
if row_pointer == -1:
if self.add_link_info:
yield (None,) * (number_of_columns-1) + (True,) # invalid row
rowid += 1
else:
linked_rows, rowid = self._read_row(rowid, pointer=row_pointer)
for values in linked_rows:
yield values
@pointer_scope
def _read_row(self, rowid):
rows = []
linked_from = None
while True:
row_values = self._read_row_info()
linked = self._read_int32()
if self.add_link_info:
row_values.append(linked_from)
row_values.append(linked != -1)
row_values.append(False) # valid row
rows.append(row_values)
rowid += 1
if linked == -1:
break
self.fdb.seek(linked)
linked_from = rowid
return rows, rowid
@pointer_scope
def _read_row_info(self):
number_of_columns = self._read_int32()
return self._read_row_values(number_of_columns)
@pointer_scope
def _read_row_values(self, number_of_columns):
values = []
for _ in range(number_of_columns):
data_type = self._read_int32()
if data_type == 0:
assert self.fdb.read(4) == b"\0\0\0\0"
value = None
elif data_type == 1:
value = self._read_int32()
elif data_type == 3:
value = struct.unpack("f", self.fdb.read(4))[0]
elif data_type in (4, 8):
value = self._read_string()
elif data_type == 5:
value = struct.unpack("?xxx", self.fdb.read(4))[0]
elif data_type == 6:
value = self._read_int64()
else:
raise NotImplementedError(data_type)
values.append(value)
return values
def _read_int32(self):
return struct.unpack("i", self.fdb.read(4))[0]
@pointer_scope
def _read_string(self):
str_bytes = bytearray()
while True:
byte = self.fdb.read(1)
if byte == b"\0":
break
str_bytes += byte
return str_bytes.decode("latin1")
@pointer_scope
def _read_int64(self):
return struct.unpack("q", self.fdb.read(8))[0]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("fdb_path")
parser.add_argument("--sqlite_path")
parser.add_argument("--add_link_info", action="store_true")
args = parser.parse_args()
convert(args.fdb_path, args.sqlite_path, args.add_link_info)