#!/usr/bin/env python
import itertools as it, operator as op, functools as ft
import pathlib as pl, collections as cs, contextlib as cl
import os, sys, re, math, configparser, tempfile, time
p_err = lambda msg,*a,**k: print(msg, *a, **k, file=sys.stderr, flush=True)
conf_path_default = '~/.filetag.ini'
conf_example = '''
;;;; This is an example INI file with specific sections/options.
;; For format description and examples, see:
;; Inline comments are not allowed. Multiline values are same as multiple same-key values.
;;;; scan - paths to process files in, assigning tags to them.
;; Paths can contain spaces. Symlinks are not followed.
;; Can be specified multiple times for multiple paths.
;; All files outside of these will be removed from db on update.
;; Use separate configs/databases for multiple sets of scanned paths.
scan = ~/src
scan = ~/dev
;;;; db - tag database path to use.
db = ~/.filetag.db
;; Regexps in python "re" module format to match scanned paths to include/skip.
;; Directives can be used to selectively
;; include/exclude files and skip scanning directories entirely.
;; All matched paths start with "/", which is scanned directory (from "scan=" above).
;; "include" directives override "skip" ones.
;; Scan this specific file from any ".git" directory.
include = /\.git/config$
;; And skip everything else in ".git" dirs
;; Note "." to match files in a dir, so that dir itself won't be skipped.
skip = /\.git/.
;; Skip these directories entirely - they won't be recursed into.
;; Slash at the end is the important part, "/\.hg/
skip = /\.(hg|bzr|redo)/
;; Skip files with specific extension(s).
;; Note that this won't match directories (no trailing "/").
skip = \.py[co]$
;; Skip specific directories in the root of scanned path(s).
skip = ^/(tmp|games)/
;; Example of using python regexp flags (i - case-insensitive match).
skip = (?i)/\.?svn(/|ignore)$
;; This will match both files and dirs starting with "venv-" anywhere
skip = /venv-
tag_map = dict(
# Useful binary formats
bin_ext = {
# Images
'jpe?g': 'image jpg', 'png': 'image png', 'webp': 'image webp',
'gif': 'image gif', 'svgz?': 'image svg', 'ico': 'image ico',
'xpm': 'image xpm', 'bmp': 'image bmp', 'tiff': 'image tiff', 'tga': 'image tga',
'xcf': 'image xcf', 'xcf': 'image psd',
# Crypto
'ghg': 'crypt ghg', 'gpg': 'crypt gpg',
# Fonts
'ttf|woff2?|eot|otf|pcf': 'font' },
bin_ext_suff = r'(\.(bak|old|tmp|\d+))*',
# Code formats
code_ext = {
'py|tac': 'py', 'go': 'go', r'c(c|pp|xx|\+\+)?|hh?|lex|y(acc)?': 'c',
r'js(o?n(\.txt)?)?|coffee': 'js', 'co?nf|cf|cfg|ini': 'conf',
'unit|service|taget|mount|desktop|rules': 'conf',
'[sx]?htm(l[45]?)?|css|less|jade': 'html', 'x[ms]l|xsd|dbk': 'xml',
'kml': 'kml', 'sgml|dtd': 'sgml',
'patch|diff|pat': 'diff', r'(ba|z|k|c|fi)?sh|env|exheres-\d+|ebuild|initd?': 'sh',
'sql': 'sql', 'p(l|m|erl|od)|al': 'perl', 'ph(p[s45t]?|tml)': 'php',
'[cej]l|li?sp|rkt|sc[mh]|stk|ss': 'lisp', 'el(\.gz)?': 'el', 'ml': 'ml ocaml',
'hs': 'haskell', 'rb': 'ruby', 'lua': 'lua', 'awk': 'awk', 'tcl': 'tcl', 'java': 'java',
'mk?d|markdown': 'md', 're?st': 'rst', 'te?xt': 'txt', 'log': 'log',
'rdf': 'rdf', 'xul': 'xul', 'po': 'po', 'csv': 'csv',
'f(or)?': 'fortran', 'p(as)?': 'pascal', 'dpr': 'delphi', 'ad[abs]|ad[bs].dg': 'ada',
'ya?ml': 'yaml', r'jso?n(\.txt)?': 'json', 'do': 'redo', 'm[k4c]|a[cm]|cmake': 'make' },
code_ext_suff = r'(\.(in|tpl|(src-)?bak|old|tmp|\d+'
code_shebang = '(?i)^#!((/usr/bin/env)?\s+)?(/.*/)?(?P<bin>\S+)',
code_bin = {
'lua[-\d.]': 'lua', 'php\d?': 'php',
'j?ruby(\d\.\d)?|rbx': 'ruby',
'[jp]ython(\d(\.\d+)?)?': 'py',
'[gnm]?awk': 'awk',
'(mini)?perl(\d(\.\d+)?)?': 'perl',
'wishx?|tcl(sh)?': 'tcl',
'scm|guile|clisp|racket|(sb)?cl|emacs': 'lisp',
'([bdo]?a|t?c|k|z)?sh': 'sh' },
code_path = {
'/(Makefile|CMakeLists.txt|Imakefile|makepp|configure)$': 'make',
'rakefile': 'ruby', '/config$': 'conf', '/zsh/_[^/]+$': 'sh', 'patch': 'diff' } )
class TagDB:
# No migrations needed here, as db gets rebuilt on each run
_db, _db_schema = None, '''
create table if not exists paths (
tag text not null, path text not null, mtime real not null );
create index if not exists paths_tag on paths (tag);'''
def __init__(self, path, lock_timeout=60, lazy=True):
import sqlite3
self._sqlite, self._ts_activity = sqlite3, 0
self._db_kws = dict( database=path,
isolation_level='IMMEDIATE', timeout=lock_timeout )
if not lazy: self._db_init()
def close(self, inactive_timeout=None):
if ( inactive_timeout is not None
and (time.monotonic() - self._ts_activity) < inactive_timeout ): return
if self._db:
self._db = None
def __enter__(self): return self
def __exit__(self, *err): self.close()
def _db_init(self):
self._db = self._sqlite.connect(**self._db_kws)
with self._db_cursor() as c:
for stmt in self._db_schema.split(';'): c.execute(stmt)
def _db_cursor(self):
self._ts_activity = time.monotonic()
if not self._db: self._db_init()
with self._db as conn, cl.closing(conn.cursor()) as c: yield c
def tagger_context(self):
with self._db_cursor() as c:
tag_func = lambda tag, path, mtime: c.execute(
'insert into paths (tag, path, mtime) values (?, ?, ?)', (tag, path, mtime) )
yield tag_func
def lookup_tags(self, tagsets):
q_where, q_args = list(), list()
for tag_group in filter(None, tagsets):
q_where.append('(' + ' and '.join(['tag = ?']*len(tag_group)) + ')')
q_where = ('where ' + ' or '.join(q_where)) if q_where else ''
with self._db_cursor() as c:
c.execute(f'select path, mtime from paths {q_where} order by mtime desc', q_args)
return (row[0] for row in (c.fetchall() or list()))
def lookup_paths(self, paths):
q_where, q_args = list(), list()
for p_str in paths:
q_where.append('path = ?')
q_where = ('where ' + ' or '.join(q_where)) if q_where else ''
with self._db_cursor() as c:
c.execute(f'select path, tag from paths {q_where} order by path', q_args)
return list(
(p_str, set(map(op.itemgetter(1), p_tags)))
for p_str, p_tags in it.groupby(c.fetchall() or list(), key=op.itemgetter(0)) )
class PathFilter:
def __init__(self, res_inc, res_skip):
self.res_inc, self.res_skip = res_inc, res_skip
def check(self, fn):
if any( for rx in self.res_inc): return True
if any( for rx in self.res_skip): return False
return True
class FileTreeTagger:
def __init__(self, tag_func, tag_map, p_filter, verbose=False):
self.tag_func, self.p_filter, self.verbose = tag_func, p_filter, verbose
self.tags = self.tag_map_compile(tag_map)
def tag_map_compile(tag_map):
tag_map, tag_set = tag_map.copy(), set()
code_ext_suff, bin_ext_suff = (
tag_map.pop(k) for k in ['code_ext_suff', 'bin_ext_suff'] )
for k, v in tag_map.items():
if isinstance(v, str): tag_map[k] = re.compile(v)
elif isinstance(v, dict):
taggers = tag_map[k] = list()
for rx, tags in v.items():
if k == 'code_ext': rx = rf'\.({rx}){code_ext_suff}$'
elif k == 'bin_ext': rx = rf'(?i)\.({rx}){bin_ext_suff}$'
elif k == 'code_bin': rx = rf'^{rx}$'
tags = tags.split()
taggers.append((re.compile(rx), tags))
return type('TagMap', (object,), tag_map)
def process_path(self, p):
p, ts0 = str(p.resolve()), time.monotonic()
p_len = len(p)
for root, dirs, files in os.walk(p):
if not root.startswith(p): raise RuntimeError(p, root)
fn_root, root = root[p_len:] + '/', pl.Path(root)
for n, fn in enumerate(dirs):
fn = fn_root + fn + '/'
if self.p_filter.check(fn): continue
del dirs[n] # will skip going into dir
for fn in files:
fn, p_file = fn_root + fn, root / fn
if not self.p_filter.check(fn): continue
tagset = self.tag_file(p_file)
if self.verbose: print(f'{time.monotonic()-ts0:.3f} {len(tagset)} {fn.lstrip("/")}')
def tag_file(self, p):
p_str, tagset = str(p), set()
try: p_mtime = p.stat().st_mtime
except OSError: p_mtime = 0
for rx, tags in self.tags.code_ext: # code file extension
if tagset.update(tags)
if not tagset:
for rx, tags in self.tags.bin_ext: # bin file extension
if tagset.update(tags)
if not tagset: # code shebang
with'rb', 255) as src:
os.posix_fadvise(src.fileno(), 0, 256, os.POSIX_FADV_NOREUSE)
shebang = src.readline(200).decode().strip()
except (OSError, UnicodeDecodeError): shebang = ''
if m :=
bin_name ='bin')
for rx, tags in self.tags.code_bin:
if tagset.update(tags)
if not tagset: # code filenames/paths
for rx, tags in self.tags.code_path:
if tagset.update(tags)
if tagset: # commit path tags to db
for tag in tagset: self.tag_func(tag, p_str, p_mtime)
return tagset
class ConfDict(cs.UserDict):
def __setitem__(self, k, v):
if isinstance(v, list) and k in self: self[k].extend(v)
else: super().__setitem__(k, v)
def main(args=None):
import argparse, textwrap
dd = lambda text: (textwrap.dedent(text).strip('\n') + '\n').replace('\t', ' ')
fill = lambda s,w=90,ind='',ind_next=' ',**k: textwrap.fill(
s, w, initial_indent=ind, subsequent_indent=ind if ind_next is None else ind_next, **k )
parser = argparse.ArgumentParser(
usage='%(prog)s [options] [-u|--update] [ tags... ]',
description='Query or update file tagging database.')
parser.add_argument('tags', nargs='*', help=dd('''
For default tag-lookup mode - list files for specified tag(s).
"+" will combine tags via "and" logic, spaces and multiple args are "or".
Use DNF logic calculator to convert complex human-readable expressions
to simplified ones for use here, for example via
Returned paths are ordered by stored modification time values.'''))
parser.add_argument('-c', '--conf',
action='append', metavar='path', default=list(),
Path to configuration file, specifying db file and paths to process.
Use "--conf help" to print example config with all supported options.
Can be specified multiple times, with values in later files overriding earlier ones.
Default config path, if none are specified: {conf_path_default}'''))
group = parser.add_argument_group('Update tags mode')
group.add_argument('-u', '--update', action='store_true',
help='Build/update tag database for all configured paths.')
group.add_argument('-v', '--verbose', action='store_true',
help='Print all files being processed and precise timestamps for benchmarking.')
group.add_argument('--nice', metavar='[prio:][io-class[.io-level]]', default='15:idle',
Set "nice" and/or "ionice" (CFQ I/O) priorities.
"nice" prio value, if specified, must be
in -20-20 range, where lower = higher prio, and base=0.
"ionice" value should be in class[:level] format, where
"class" is one of [rt, be, idle] and "level" in 0-7 range (0=highest prio).
See setpriority(2) / ioprio_set(2) for more info. Default: %(default)s'''))
group = parser.add_argument_group('Query/lookup options')
group.add_argument('-e', '--existing', action='store_true',
help='Only print paths that are currently accessible for stat().')
group.add_argument('-0', '-z', '--print0', action='store_true',
help='Output NUL-separated (\\x00 byte) paths instead of newline-separated.')
group.add_argument('-p', '--path',
action='store_true', help='Print db tags for specified path(s).')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if 'help' in opts.conf: return print(conf_example.strip())
conf = configparser.ConfigParser(strict=False, dict_type=ConfDict, interpolation=None) for p in (opts.conf or [conf_path_default])))
conf_paths = conf['paths']
try: conf_filter = conf['filter-regexps']
except KeyError: conf_filter = dict()
conf_list, conf_value = lambda v: re.split(r'\n+', v), lambda v: conf_list(v)[-1]
p_db = pl.Path(conf_value(conf_paths['db'])).expanduser()
p_scan_list = (pl.Path(p).expanduser() for p in conf_list(conf_paths['scan']))
path_filter = PathFilter(*(
(list(map(re.compile, conf_list(conf_filter[k]))) if k in conf_filter else [])
for k in ['include', 'skip'] ))
if opts.update:
if opts.nice is not None:
nice, ionice = (opts.nice + ':').split(':', 1)
if not ionice and not nice.isdigit(): nice, ionice = None, nice
if nice: os.setpriority(os.PRIO_PROCESS, os.getpid(), int(nice))
if ionice: # grep -r ioprio_set /usr/share/gdb/syscalls/
scid = os.uname().machine
try: scid = dict(x86_64=251, armv7l=314, aarch64=30)[scid]
except KeyError: parser.error(f'--nice ionice is not supported on arch: {scid}')
ionice = ionice.rstrip(':').split('.', 1)
if len(ionice) == 1: ionice.append(0)
ionice[0] = dict(rt=1, be=2, idle=3)[ionice[0].lower()]
if ionice[0] == 3: ionice[1] = 0
elif 0 <= ionice[1] <= 7: parser.error('--nice ionice prio level must be in 0-7 range')
ionice = (ionice[0] << 13) | ionice[1]
import ctypes as ct
err = ct.CDLL('', use_errno=True).syscall(scid, 1, os.getpid(), ionice)
if err != 0: raise OSError(e := ct.get_errno(), f'ionice_set failed - {os.strerror(e)}')
if not p_scan_list: parser.error('No paths to scan found in config file')
with cl.ExitStack() as ctx:
p_db_tmp = ctx.enter_context(
tempfile.NamedTemporaryFile(dir=p_db.parent,'.', delete=False) )
p_db_tmp = pl.Path(
db = ctx.enter_context(TagDB(p_db_tmp))
tag_func = ctx.enter_context(db.tagger_context())
tagger = FileTreeTagger(tag_func, tag_map, path_filter, verbose=opts.verbose)
for p in p_scan_list: tagger.process_path(p)
else: p_db_tmp.rename(p_db)
if opts.tags:
print_func = lambda *line: print(*line, end='\n' if not opts.print0 else '\0')
with TagDB(p_db) as db:
if not opts.path:
tagsets = list(set(ts.split('+')) for ts in ' '.join(opts.tags).split())
for p_str in db.lookup_tags(tagsets):
if opts.existing and not os.path.exists(p_str): continue
paths = set()
for p_str in opts.tags:
p = pl.Path(p_str).resolve()
if opts.existing and not p.exists(): continue
for p_str, tags in db.lookup_paths(paths):
print_func(p_str, '::', ' '.join(tags))
if paths: # check if tags are missing due to filtering lists
for p_str in paths:
for p_scan in p_scan_list:
p = str(p_scan.resolve())
if not p_str.startswith(f'{p}/'): continue
p_str = p_str[len(p):]
else: continue # can still be a symlink, but whatever
p_chunks = p_str.split('/')
for n in range(1, len(p_chunks)):
p_dir = '/'.join(p_chunks[:n]) + '/'
if not path_filter.check(p_dir):
p_err( f'{p_str}: parent dir {p_dir!r}'
f' is ignored due to filtering (scan root: {p_scan})' )
if not path_filter.check(p_str):
p_err('{p_str}: path ignored due to filtering (scan root: {p_scan})')
except BrokenPipeError: return
if __name__ == '__main__': sys.exit(main())