Skip to content

Commit

Permalink
gh-77617: Add sqlite3 command-line interface (#95026)
Browse files Browse the repository at this point in the history
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
  • Loading branch information
erlend-aasland and serhiy-storchaka committed Aug 1, 2022
1 parent 1e6b635 commit bc7c7cd
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 0 deletions.
20 changes: 20 additions & 0 deletions Doc/library/sqlite3.rst
Expand Up @@ -1442,6 +1442,26 @@ and you can let the ``sqlite3`` module convert SQLite types to
Python types via :ref:`converters <sqlite3-converters>`.


.. _sqlite3-cli:

Command-line interface
^^^^^^^^^^^^^^^^^^^^^^

The ``sqlite3`` module can be invoked as a script
in order to provide a simple SQLite shell.
Type ``.quit`` or CTRL-D to exit the shell.

.. program:: python -m sqlite3 [-h] [-v] [filename] [sql]

.. option:: -h, --help
Print CLI help.

.. option:: -v, --version
Print underlying SQLite library version.

.. versionadded:: 3.12


.. _sqlite3-howtos:

How-to guides
Expand Down
7 changes: 7 additions & 0 deletions Doc/whatsnew/3.12.rst
Expand Up @@ -112,6 +112,13 @@ os
(Contributed by Kumar Aditya in :gh:`93312`.)


sqlite3
-------

* Add a :ref:`command-line interface <sqlite3-cli>`.
(Contributed by Erlend E. Aasland in :gh:`77617`.)


Optimizations
=============

Expand Down
97 changes: 97 additions & 0 deletions Lib/sqlite3/__main__.py
@@ -0,0 +1,97 @@
import sqlite3
import sys

from argparse import ArgumentParser
from code import InteractiveConsole
from textwrap import dedent


def execute(c, sql, suppress_errors=True):
try:
for row in c.execute(sql):
print(row)
except sqlite3.Error as e:
tp = type(e).__name__
try:
print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
except AttributeError:
print(f"{tp}: {e}", file=sys.stderr)
if not suppress_errors:
sys.exit(1)


class SqliteInteractiveConsole(InteractiveConsole):

def __init__(self, connection):
super().__init__()
self._con = connection
self._cur = connection.cursor()

def runsource(self, source, filename="<input>", symbol="single"):
match source:
case ".version":
print(f"{sqlite3.sqlite_version}")
case ".help":
print("Enter SQL code and press enter.")
case ".quit":
sys.exit(0)
case _:
if not sqlite3.complete_statement(source):
return True
execute(self._cur, source)
return False


def main():
parser = ArgumentParser(
description="Python sqlite3 CLI",
prog="python -m sqlite3",
)
parser.add_argument(
"filename", type=str, default=":memory:", nargs="?",
help=(
"SQLite database to open (defaults to ':memory:'). "
"A new database is created if the file does not previously exist."
),
)
parser.add_argument(
"sql", type=str, nargs="?",
help=(
"An SQL query to execute. "
"Any returned rows are printed to stdout."
),
)
parser.add_argument(
"-v", "--version", action="version",
version=f"SQLite version {sqlite3.sqlite_version}",
help="Print underlying SQLite library version",
)
args = parser.parse_args()

if args.filename == ":memory:":
db_name = "a transient in-memory database"
else:
db_name = repr(args.filename)

banner = dedent(f"""
sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
Connected to {db_name}
Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or CTRL-D to quit.
""").strip()
sys.ps1 = "sqlite> "
sys.ps2 = " ... "

con = sqlite3.connect(args.filename, isolation_level=None)
try:
if args.sql:
execute(con, args.sql, suppress_errors=False)
else:
console = SqliteInteractiveConsole(con)
console.interact(banner, exitmsg="")
finally:
con.close()


main()
155 changes: 155 additions & 0 deletions Lib/test/test_sqlite3/test_cli.py
@@ -0,0 +1,155 @@
"""sqlite3 CLI tests."""

import sqlite3 as sqlite
import subprocess
import sys
import unittest

from test.support import SHORT_TIMEOUT, requires_subprocess
from test.support.os_helper import TESTFN, unlink


@requires_subprocess()
class CommandLineInterface(unittest.TestCase):

def _do_test(self, *args, expect_success=True):
with subprocess.Popen(
[sys.executable, "-Xutf8", "-m", "sqlite3", *args],
encoding="utf-8",
bufsize=0,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
proc.wait()
if expect_success == bool(proc.returncode):
self.fail("".join(proc.stderr))
stdout = proc.stdout.read()
stderr = proc.stderr.read()
if expect_success:
self.assertEqual(stderr, "")
else:
self.assertEqual(stdout, "")
return stdout, stderr

def expect_success(self, *args):
out, _ = self._do_test(*args)
return out

def expect_failure(self, *args):
_, err = self._do_test(*args, expect_success=False)
return err

def test_cli_help(self):
out = self.expect_success("-h")
self.assertIn("usage: python -m sqlite3", out)

def test_cli_version(self):
out = self.expect_success("-v")
self.assertIn(sqlite.sqlite_version, out)

def test_cli_execute_sql(self):
out = self.expect_success(":memory:", "select 1")
self.assertIn("(1,)", out)

def test_cli_execute_too_much_sql(self):
stderr = self.expect_failure(":memory:", "select 1; select 2")
err = "ProgrammingError: You can only execute one statement at a time"
self.assertIn(err, stderr)

def test_cli_execute_incomplete_sql(self):
stderr = self.expect_failure(":memory:", "sel")
self.assertIn("OperationalError (SQLITE_ERROR)", stderr)

def test_cli_on_disk_db(self):
self.addCleanup(unlink, TESTFN)
out = self.expect_success(TESTFN, "create table t(t)")
self.assertEqual(out, "")
out = self.expect_success(TESTFN, "select count(t) from t")
self.assertIn("(0,)", out)


@requires_subprocess()
class InteractiveSession(unittest.TestCase):
TIMEOUT = SHORT_TIMEOUT / 10.
MEMORY_DB_MSG = "Connected to a transient in-memory database"
PS1 = "sqlite> "
PS2 = "... "

def start_cli(self, *args):
return subprocess.Popen(
[sys.executable, "-Xutf8", "-m", "sqlite3", *args],
encoding="utf-8",
bufsize=0,
stdin=subprocess.PIPE,
# Note: the banner is printed to stderr, the prompt to stdout.
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

def expect_success(self, proc):
proc.wait()
if proc.returncode:
self.fail("".join(proc.stderr))

def test_interact(self):
with self.start_cli() as proc:
out, err = proc.communicate(timeout=self.TIMEOUT)
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(self.PS1, out)
self.expect_success(proc)

def test_interact_quit(self):
with self.start_cli() as proc:
out, err = proc.communicate(input=".quit", timeout=self.TIMEOUT)
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(self.PS1, out)
self.expect_success(proc)

def test_interact_version(self):
with self.start_cli() as proc:
out, err = proc.communicate(input=".version", timeout=self.TIMEOUT)
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(sqlite.sqlite_version, out)
self.expect_success(proc)

def test_interact_valid_sql(self):
with self.start_cli() as proc:
out, err = proc.communicate(input="select 1;",
timeout=self.TIMEOUT)
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn("(1,)", out)
self.expect_success(proc)

def test_interact_valid_multiline_sql(self):
with self.start_cli() as proc:
out, err = proc.communicate(input="select 1\n;",
timeout=self.TIMEOUT)
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(self.PS2, out)
self.assertIn("(1,)", out)
self.expect_success(proc)

def test_interact_invalid_sql(self):
with self.start_cli() as proc:
out, err = proc.communicate(input="sel;", timeout=self.TIMEOUT)
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn("OperationalError (SQLITE_ERROR)", err)
self.expect_success(proc)

def test_interact_on_disk_file(self):
self.addCleanup(unlink, TESTFN)
with self.start_cli(TESTFN) as proc:
out, err = proc.communicate(input="create table t(t);",
timeout=self.TIMEOUT)
self.assertIn(TESTFN, err)
self.assertIn(self.PS1, out)
self.expect_success(proc)
with self.start_cli(TESTFN, "select count(t) from t") as proc:
out = proc.stdout.read()
err = proc.stderr.read()
self.assertIn("(0,)", out)
self.expect_success(proc)


if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,2 @@
Add :mod:`sqlite3` :ref:`command-line interface <sqlite3-cli>`.
Patch by Erlend Aasland.

0 comments on commit bc7c7cd

Please sign in to comment.