Skip to content

Commit

Permalink
Migrate to pkgutil
Browse files Browse the repository at this point in the history
  • Loading branch information
ooprathamm committed Apr 6, 2024
1 parent fe8006d commit b7d24b5
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 45 deletions.
15 changes: 7 additions & 8 deletions floss/qs/db/expert.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import re
import pathlib
import pkgutil
from typing import Set, Dict, List, Tuple, Literal, Sequence
from dataclasses import dataclass

import msgspec

import floss.qs.db


class ExpertRule(msgspec.Struct):
type: Literal["string", "substring", "regex"]
Expand Down Expand Up @@ -51,13 +49,13 @@ def query(self, s: str) -> Set[str]:
return ret

@classmethod
def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase":
def from_file(cls, package: str, resource: str) -> "ExpertStringDatabase":
string_rules: Dict[str, ExpertRule] = {}
substring_rules: List[ExpertRule] = []
regex_rules: List[Tuple[ExpertRule, re.Pattern]] = []

decoder = msgspec.json.Decoder(type=ExpertRule)
buf = path.read_bytes()
buf = pkgutil.get_data(package, resource)
for line in buf.split(b"\n"):
if not line:
continue
Expand All @@ -81,9 +79,10 @@ def from_file(cls, path: pathlib.Path) -> "ExpertStringDatabase":
regex_rules=regex_rules,
)


DEFAULT_PATHS = (pathlib.Path(floss.qs.db.__file__).parent / "data" / "expert" / "capa.jsonl",)
DEFAULT_FILENAMES = (
"capa.jsonl",
)


def get_default_databases() -> Sequence[ExpertStringDatabase]:
return [ExpertStringDatabase.from_file(path) for path in DEFAULT_PATHS]
return [ExpertStringDatabase.from_file('floss.qs.db', 'data/expert/' + f) for f in DEFAULT_FILENAMES]
33 changes: 16 additions & 17 deletions floss/qs/db/gp.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import gzip
import hashlib
import pathlib
import pkgutil
import datetime
from typing import Set, Dict, List, Literal, Optional, Sequence
from collections import defaultdict
from dataclasses import dataclass

import msgspec

import floss.qs.db

Encoding = Literal["ascii"] | Literal["utf-16le"] | Literal["unknown"]
# header | gap | overlay
Expand Down Expand Up @@ -57,13 +56,13 @@ def new_db(cls, note: Optional[str] = None):
)

@classmethod
def from_file(cls, path: pathlib.Path, compress: bool = True) -> "StringGlobalPrevalenceDatabase":
def from_file(cls, package:str, file:str , compress: bool = True) -> "StringGlobalPrevalenceDatabase":
metadata_by_string: Dict[str, List[StringGlobalPrevalence]] = defaultdict(list)

if compress:
lines = gzip.decompress(path.read_bytes()).split(b"\n")
lines = gzip.decompress(pkgutil.get_data(package, file)).split(b"\n")
else:
lines = path.read_bytes().split(b"\n")
lines = pkgutil.get_data(package, file).split(b"\n")

decoder = msgspec.json.Decoder(type=StringGlobalPrevalence)
for line in lines[1:]:
Expand Down Expand Up @@ -112,10 +111,10 @@ def __contains__(self, other: bytes | str) -> bool:
raise ValueError("other must be bytes or str")

@classmethod
def from_file(cls, path: pathlib.Path) -> "StringHashDatabase":
def from_file(cls, package:str, file:str) -> "StringHashDatabase":
string_hashes: Set[bytes] = set()

buf = path.read_bytes()
buf = pkgutil.get_data(package, file)

for i in range(0, len(buf), 8):
string_hashes.add(buf[i : i + 8])
Expand All @@ -125,19 +124,19 @@ def from_file(cls, path: pathlib.Path) -> "StringHashDatabase":
)


DEFAULT_PATHS = (
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "gp.jsonl.gz",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "cwindb-native.jsonl.gz",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "cwindb-dotnet.jsonl.gz",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "xaa-hashes.bin",
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "yaa-hashes.bin",
DEFAULT_FILENAMES = (
"gp.jsonl.gz",
"cwindb-native.jsonl.gz",
"cwindb-dotnet.jsonl.gz",
"xaa-hashes.bin",
"yaa-hashes.bin",
)


def get_default_databases() -> Sequence[StringGlobalPrevalenceDatabase | StringHashDatabase]:
return [
StringGlobalPrevalenceDatabase.from_file(path)
if path.name.endswith(".jsonl.gz")
else StringHashDatabase.from_file(path)
for path in DEFAULT_PATHS
StringGlobalPrevalenceDatabase.from_file("floss.qs.db", "data/gp/" + file)
if file.endswith(".jsonl.gz")
else StringHashDatabase.from_file("floss.qs.db", "data/gp/" + file)
for file in DEFAULT_FILENAMES
]
17 changes: 6 additions & 11 deletions floss/qs/db/oss.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import gzip
import pathlib
import pkgutil
from typing import Dict, Sequence
from dataclasses import dataclass

import msgspec

import floss.qs.db


class OpenSourceString(msgspec.Struct):
string: str
Expand All @@ -25,10 +23,10 @@ def __len__(self) -> int:
return len(self.metadata_by_string)

@classmethod
def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase":
def from_file(cls, package: str, resource: str) -> "OpenSourceStringDatabase":
metadata_by_string: Dict[str, OpenSourceString] = {}
decoder = msgspec.json.Decoder(type=OpenSourceString)
for line in gzip.decompress(path.read_bytes()).split(b"\n"):
for line in gzip.decompress(pkgutil.get_data(package, resource)).split(b"\n"):
if not line:
continue
s = decoder.decode(line)
Expand Down Expand Up @@ -57,10 +55,7 @@ def from_file(cls, path: pathlib.Path) -> "OpenSourceStringDatabase":
"zlib.jsonl.gz",
)

DEFAULT_PATHS = tuple(
pathlib.Path(floss.qs.db.__file__).parent / "data" / "oss" / filename for filename in DEFAULT_FILENAMES
) + (pathlib.Path(floss.qs.db.__file__).parent / "data" / "crt" / "msvc_v143.jsonl.gz",)


def get_default_databases() -> Sequence[OpenSourceStringDatabase]:
return [OpenSourceStringDatabase.from_file(path) for path in DEFAULT_PATHS]
oss_databases = [OpenSourceStringDatabase.from_file('floss.qs.db', 'data/oss/' + f) for f in DEFAULT_FILENAMES]
crt_database = [OpenSourceStringDatabase.from_file('floss.qs.db', 'data/crt/msvc_v143.jsonl.gz')]
return oss_databases + crt_database
16 changes: 8 additions & 8 deletions floss/qs/db/winapi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import gzip
import pathlib
import pkgutil
from typing import Set, Sequence
from dataclasses import dataclass

import floss.qs.db


@dataclass
class WindowsApiStringDatabase:
Expand All @@ -15,25 +13,27 @@ def __len__(self) -> int:
return len(self.dll_names) + len(self.api_names)

@classmethod
def from_dir(cls, path: pathlib.Path) -> "WindowsApiStringDatabase":
def from_dir(cls, package: str, path: str) -> "WindowsApiStringDatabase":
dll_names: Set[str] = set()
api_names: Set[str] = set()

for line in gzip.decompress((path / "dlls.txt.gz").read_bytes()).decode("utf-8").splitlines():
for line in gzip.decompress(pkgutil.get_data(package, path + "/dlls.txt.gz")).decode("utf-8").splitlines():
if not line:
continue
dll_names.add(line)

for line in gzip.decompress((path / "apis.txt.gz").read_bytes()).decode("utf-8").splitlines():
for line in gzip.decompress(pkgutil.get_data(package, path + "/apis.txt.gz")).decode("utf-8").splitlines():
if not line:
continue
api_names.add(line)

return cls(dll_names=dll_names, api_names=api_names)


DEFAULT_PATHS = (pathlib.Path(floss.qs.db.__file__).parent / "data" / "winapi",)
DEFAULT_PATHS = (
'data/winapi/',
)


def get_default_databases() -> Sequence[WindowsApiStringDatabase]:
return [WindowsApiStringDatabase.from_dir(path) for path in DEFAULT_PATHS]
return [WindowsApiStringDatabase.from_dir("floss.qs.db", path) for path in DEFAULT_PATHS]
3 changes: 2 additions & 1 deletion floss/qs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import bisect
import logging
import pathlib
import pkgutil
import argparse
import functools
import itertools
Expand Down Expand Up @@ -476,7 +477,7 @@ def make_tagger(db, queryfn) -> Sequence[Tag]:

# supplement code analysis with a database of junk code strings
junk_db = StringGlobalPrevalenceDatabase.from_file(
pathlib.Path(floss.qs.db.__file__).parent / "data" / "gp" / "junk-code.jsonl.gz"
"floss.qs.db", "data/gp/junk-code.jsonl.gz"
)
ret.append(make_tagger(junk_db, query_code_string_database))

Expand Down

0 comments on commit b7d24b5

Please sign in to comment.