Skip to content

Commit

Permalink
Add support for parsing comments in schemas: #2
Browse files Browse the repository at this point in the history
  • Loading branch information
thombashi committed Feb 6, 2022
1 parent febc543 commit 5378d7d
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 45 deletions.
1 change: 1 addition & 0 deletions sqliteschema/_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ class SchemaHeader:
NULL = "Null"
INDEX = "Index"
EXTRA = "Extra"
COMMENT = "Comment"
104 changes: 64 additions & 40 deletions sqliteschema/_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ class SQLiteSchemaExtractor:
_RE_UNIQUE = re.compile("UNIQUE", re.IGNORECASE)
_RE_AUTO_INC = re.compile("AUTOINCREMENT", re.IGNORECASE)

_RE_MULTI_LINE_COMMENT = re.compile(r"/\*.*?\*/", re.MULTILINE | re.DOTALL)
_RE_SINGLE_LINE_COMMENT = re.compile(r"[\s]+--.+", re.MULTILINE)
_RE_MULTI_LINE_COMMENT = re.compile(
rf"/\*(?P<{SchemaHeader.COMMENT}>.*?)\*/", re.MULTILINE | re.DOTALL
)
_RE_SINGLE_LINE_COMMENT = re.compile(rf"[\s]*--(?P<{SchemaHeader.COMMENT}>.+)", re.MULTILINE)

def __init__(self, database_source, max_workers: Optional[int] = None) -> None:
is_connection_required = True
Expand Down Expand Up @@ -267,7 +269,7 @@ def _extract_attr_constraints(self, schema: str) -> str:
return " ".join(schema_wo_name.split()[1:])

@stash_row_factory
def _fetch_attr_schema(self, table_name: str, schema_type: str) -> List[str]:
def _fetch_table_schema_text(self, table_name: str, schema_type: str) -> List[str]:
if table_name in SQLITE_SYSTEM_TABLES:
logger.debug(f"skip fetching sqlite system table: {table_name:s}")
return []
Expand All @@ -283,58 +285,50 @@ def _fetch_attr_schema(self, table_name: str, schema_type: str) -> List[str]:
),
self.global_debug_query,
)
error_message_format = "data not found in '{}' table"

try:
table_schema = result.fetchone()[0]
return result.fetchone()[0]
except TypeError:
raise DataNotFoundError(error_message_format.format(self._SQLITE_MASTER_TABLE_NAME))
raise DataNotFoundError(f"data not found in '{self._SQLITE_MASTER_TABLE_NAME}' table")

table_schema = self._RE_MULTI_LINE_COMMENT.sub("", table_schema)
table_schema = self._RE_SINGLE_LINE_COMMENT.sub("", table_schema)
descriptions = table_schema.split("(", maxsplit=1)[1].rsplit(")", maxsplit=1)[0].split(",")
raise RuntimeError("failed to fetch table schema")

return [attr.strip() for attr in descriptions if self._RE_FOREIGN_KEY.search(attr) is None]
def _parse_table_schema_text(self, table_name: str, table_schema_text: str):
index_query_list = self._fetch_index_schema(table_name)
table_metadata: List[Dict] = []

def _fetch_index_schema(self, table_name: str) -> List[str]:
self.__update_sqlite_master_db()
table_attr_text = table_schema_text.split("(", maxsplit=1)[1].rsplit(")", maxsplit=1)[0]
item_count = 0

result = self.__execute_sqlite_master(
"SELECT {:s} FROM {:s} WHERE {:s} AND {:s}".format(
"sql",
self._SQLITE_MASTER_TABLE_NAME,
"{:s} = '{:s}'".format("tbl_name", table_name),
"{:s} = '{:s}'".format("type", "index"),
),
self.global_debug_query,
)
for attr_item in re.split("[,\n]", table_attr_text):
attr_item = attr_item.strip()
if not attr_item:
continue

try:
return [
record[0] for record in result.fetchall() if typepy.is_not_empty_sequence(record[0])
]
except TypeError:
raise DataNotFoundError(f"index not found in '{table_name}'")
if self._RE_FOREIGN_KEY.search(attr_item) is not None:
continue

def __fetch_table_metadata(self, table_name: str) -> Mapping[str, List[Mapping[str, Any]]]:
index_query_list = self._fetch_index_schema(table_name)
metadata: Dict[str, List] = OrderedDict()
match = self._RE_MULTI_LINE_COMMENT.search(
attr_item
) or self._RE_SINGLE_LINE_COMMENT.search(attr_item)
comment = ""
if match:
comment = match.group(SchemaHeader.COMMENT).strip()

if table_name in self.fetch_view_names():
# can not extract metadata from views
return {}
if table_metadata and comment:
table_metadata[item_count - 1][SchemaHeader.COMMENT] = comment
continue

for attr_schema in self._fetch_attr_schema(table_name, "table"):
values: Dict[str, Any] = OrderedDict()
attr_name = self._extract_attr_name(attr_schema)
attr_name = self._extract_attr_name(attr_item)
re_index = re.compile(re.escape(attr_name))

values[SchemaHeader.ATTR_NAME] = attr_name
values[SchemaHeader.INDEX] = False
values[SchemaHeader.DATA_TYPE] = self._extract_attr_type(attr_schema)
values[SchemaHeader.DATA_TYPE] = self._extract_attr_type(attr_item)

try:
constraint = self._extract_attr_constraints(attr_schema)
constraint = self._extract_attr_constraints(attr_item)
except IndexError:
continue

Expand All @@ -354,10 +348,40 @@ def __fetch_table_metadata(self, table_name: str) -> Mapping[str, List[Mapping[s

values[SchemaHeader.EXTRA] = ", ".join(self.__extract_extra(constraint))

metadata.setdefault(table_name, []).append(values)
table_metadata.append(values)
item_count += 1

if not metadata:
pass
return table_metadata

def _fetch_index_schema(self, table_name: str) -> List[str]:
self.__update_sqlite_master_db()

result = self.__execute_sqlite_master(
"SELECT {:s} FROM {:s} WHERE {:s} AND {:s}".format(
"sql",
self._SQLITE_MASTER_TABLE_NAME,
"{:s} = '{:s}'".format("tbl_name", table_name),
"{:s} = '{:s}'".format("type", "index"),
),
self.global_debug_query,
)

try:
return [
record[0] for record in result.fetchall() if typepy.is_not_empty_sequence(record[0])
]
except TypeError:
raise DataNotFoundError(f"index not found in '{table_name}'")

def __fetch_table_metadata(self, table_name: str) -> Mapping[str, List[Mapping[str, Any]]]:
metadata: Dict[str, List] = OrderedDict()

if table_name in self.fetch_view_names():
# can not extract metadata from views
return {}

table_schema_text = self._fetch_table_schema_text(table_name, "table")
metadata[table_name] = self._parse_table_schema_text(table_name, table_schema_text)

return metadata

Expand Down
4 changes: 2 additions & 2 deletions test/test_dumps.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


def patch_attr(self, table_name, schema_type):
return ["'Primary Key ID' INTEGER PRIMARY KEY", "'AA BB CC' TEXT"]
return "CREATE TABLE testschema('Primary Key ID' INTEGER PRIMARY KEY, 'AA BB CC' TEXT);"


class Test_dumps:
Expand Down Expand Up @@ -190,7 +190,7 @@ def test_normal_inc_verbositty(self, database_path):
assert output == expected

def test_normal_get_table_schema_w_space(self, monkeypatch, database_path):
monkeypatch.setattr(self.EXTRACTOR_CLASS, "_fetch_attr_schema", patch_attr)
monkeypatch.setattr(self.EXTRACTOR_CLASS, "_fetch_table_schema_text", patch_attr)

extractor = self.EXTRACTOR_CLASS(database_path)
expected = dedent(
Expand Down
9 changes: 6 additions & 3 deletions test/test_schema_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ def test_normal_w_comments(self, database_path):
"Null": "YES",
"Key": "",
"Default": "NULL",
"Extra": ""
"Extra": "",
"Comment": "Very important comment"
},
{
"Field": "b",
Expand All @@ -246,7 +247,8 @@ def test_normal_w_comments(self, database_path):
"Null": "NO",
"Key": "",
"Default": "",
"Extra": ""
"Extra": "",
"Comment": "Another important comment"
},
{
"Field": "c",
Expand All @@ -255,7 +257,8 @@ def test_normal_w_comments(self, database_path):
"Null": "YES",
"Key": "",
"Default": "NULL",
"Extra": ""
"Extra": "",
"Comment": "block comment"
},
{
"Field": "d",
Expand Down

0 comments on commit 5378d7d

Please sign in to comment.