-
Notifications
You must be signed in to change notification settings - Fork 118
/
shell.py
executable file
·3010 lines (2684 loc) · 119 KB
/
shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
import sys
import apsw
import shlex
import os
import csv
import re
import textwrap
import time
import codecs
import base64
if sys.platform=="win32":
_win_colour=False
try:
import colorama
colorama.init()
del colorama
_win_colour=True
except: # there are several failure reasons, ignore them all
pass
class Shell(object):
"""Implements a SQLite shell
:param stdin: Where to read input from (default sys.stdin)
:param stdout: Where to send output (default sys.stdout)
:param stderr: Where to send errors (default sys.stderr)
:param encoding: Default encoding for files opened/created by the
Shell. If you want stdin/out/err to use a particular encoding
then you need to provide them `already configured <http://docs.python.org/library/codecs.html#codecs.open>`__ that way.
:param args: This should be program arguments only (ie if
passing in sys.argv do not include sys.argv[0] which is the
program name. You can also pass in None and then call
:meth:`process_args` if you want to catch any errors
in handling the arguments yourself.
:param db: A existing :class:`Connection` you wish to use
The commands and behaviour are modelled after the `interactive
shell <https://sqlite.org/sqlite.html>`__ that is part of
SQLite.
You can inherit from this class to embed in your own code and user
interface. Internally everything is handled as unicode.
Conversions only happen at the point of input or output which you
can override in your own code.
This implementation fixes a number of bugs/quirks present in the
sqlite shell. Its control-C handling is also friendlier. Some
examples of issues not present in this implementation:
* https://sqlite.org/src/info/c25aab7e7e
* https://sqlite.org/src/info/7b61b6c6ce
* https://sqlite.org/src/info/ee19e690ec
* https://sqlite.org/src/info/2466653295
Errors and diagnostics are only ever sent to error output
(self.stderr) and never to the regular output (self.stdout). This
means using shell output is always easy and consistent.
Shell commands begin with a dot (eg .help). They are implemented
as a method named after the command (eg command_help). The method
is passed one parameter which is the list of arguments to the
command.
Output modes are implemented by functions named after the mode (eg
output_column).
When you request help the help information is automatically
generated from the docstrings for the command and output
functions.
You should not use a Shell object concurrently from multiple
threads. It is one huge set of state information which would
become inconsistent if used simultaneously, and then give baffling
errors. It is safe to call methods one at a time from different
threads. ie it doesn't care what thread calls methods as long as
you don't call more than one concurrently.
"""
class Error(Exception):
"""Class raised on errors. The expectation is that the error
will be displayed by the shell as text so there are no
specific subclasses as the distinctions between different
types of errors doesn't matter."""
pass
def __init__(self, stdin=None, stdout=None, stderr=None, encoding="utf8", args=None, db=None):
"""Create instance, set defaults and do argument processing."""
super(Shell, self).__init__()
# The parameter doc has to be in main class doc as sphinx
# ignores any described here
self.exceptions=False
self.history_file="~/.sqlite_history"
self._db=None
self.dbfilename=None
if db:
self.db=db, db.filename
else:
self.db=None, None
self.prompt= "sqlite> "
self.moreprompt=" ..> "
self.separator="|"
self.bail=False
self.echo=False
self.timer=False
self.header=False
self.nullvalue=""
self.output=self.output_list
self._output_table=self._fmt_sql_identifier("table")
self.widths=[]
# do we truncate output in list mode? (explain doesn't, regular does)
self.truncate=True
# a stack of previous outputs. turning on explain saves previous, off restores
self._output_stack=[]
# other stuff
self.set_encoding(encoding)
if stdin is None: stdin=sys.stdin
if stdout is None: stdout=sys.stdout
if stderr is None: stderr=sys.stderr
self.stdin=stdin
self.stdout=stdout
self._original_stdout=stdout
self.stderr=stderr
# we don't become interactive until the command line args are
# successfully parsed and acted upon
self.interactive=None
# current colouring object
self.command_colour() # set to default
self._using_readline=False
self._input_stack=[]
self.input_line_number=0
self.push_input()
self.push_output()
self._input_descriptions=[]
if args:
try:
self.process_args(args)
except:
if len(self._input_descriptions):
self._input_descriptions.append("Processing command line arguments")
self.handle_exception()
raise
if self.interactive is None:
self.interactive=getattr(self.stdin, "isatty", False) and self.stdin.isatty() and getattr(self.stdout, "isatty", False) and self.stdout.isatty()
def _ensure_db(self):
"The database isn't opened until first use. This function ensures it is now open."
if not self._db:
if not self.dbfilename:
self.dbfilename=":memory:"
self._db=apsw.Connection(self.dbfilename, flags=apsw.SQLITE_OPEN_URI | apsw.SQLITE_OPEN_READWRITE | apsw.SQLITE_OPEN_CREATE)
return self._db
def _set_db(self, newv):
"Sets the open database (or None) and filename"
(db, dbfilename)=newv
if self._db:
self._db.close(True)
self._db=None
self._db=db
self.dbfilename=dbfilename
db=property(_ensure_db, _set_db, None, "The current :class:`Connection`")
def process_args(self, args):
"""Process command line options specified in args. It is safe to
call this multiple times. We try to be compatible with SQLite shell
argument parsing.
:param args: A list of string options. Do not include the
program as args[0]
:returns: A tuple of (databasefilename, initfiles,
sqlncommands). This is provided for informational purposes
only - they have already been acted upon. An example use
is that the SQLite shell does not enter the main interactive
loop if any sql/commands were provided.
The first non-option is the database file name. Each
remaining non-option is treated as a complete input (ie it
isn't joined with others looking for a trailing semi-colon).
The SQLite shell uses single dash in front of options. We
allow both single and double dashes. When an unrecognized
argument is encountered then
:meth:`process_unknown_args` is called.
"""
# we don't use optparse as we need to use single dashes for
# options - all hand parsed
if not args:
return None, [], []
# are options still valid?
options=True
# have we seen the database name?
havedbname=False
# List of init files to read
inits=[]
# List of sql/dot commands
sqls=[]
while args:
if not options or not args[0].startswith("-"):
options=False
if not havedbname:
# grab new database
self.db=None, args[0]
havedbname=True
else:
sqls.append(args[0])
args=args[1:]
continue
# remove initial single or double dash
args[0]=args[0][1:]
if args[0].startswith("-"):
args[0]=args[0][1:]
if args[0]=="init":
if len(args)<2:
raise self.Error("You need to specify a filename after -init")
inits.append(args[1])
args=args[2:]
continue
if args[0]=="header" or args[0]=="noheader":
self.header=args[0]=="header"
args=args[1:]
continue
if args[0] in ("echo", "bail", "interactive"):
setattr(self, args[0], True)
args=args[1:]
continue
if args[0]=="batch":
self.interactive=False
args=args[1:]
continue
if args[0] in ("separator", "nullvalue", "encoding"):
if len(args)<2:
raise self.Error("You need to specify a value after -"+args[0])
getattr(self, "command_"+args[0])([args[1]])
args=args[2:]
continue
if args[0]=="version":
self.write(self.stdout, apsw.sqlitelibversion()+"\n")
# A pretty gnarly thing to do
sys.exit(0)
if args[0]=="help":
self.write(self.stderr, self.usage())
sys.exit(0)
if args[0] in ("no-colour", "no-color", "nocolour", "nocolor"):
self.colour_scheme="off"
self._out_colour()
args=args[1:]
continue
# only remaining known args are output modes
if getattr(self, "output_"+args[0], None):
self.command_mode(args[:1])
args=args[1:]
continue
newargs=self.process_unknown_args(args)
if newargs is None:
raise self.Error("Unrecognized argument '"+args[0]+"'")
args=newargs
for f in inits:
self.command_read([f])
for s in sqls:
self.process_complete_line(s)
return self.dbfilename, inits, sqls
def process_unknown_args(self, args):
"""This is called when :meth:`process_args` encounters an
argument it doesn't understand. Override this method if you
want to be able to understand additional command line arguments.
:param args: A list of the remaining arguments. The initial one will
have had the leading dashes removed (eg if it was --foo on the command
line then args[0] will be "foo"
:returns: None if you don't recognize the argument either. Otherwise
return the list of remaining arguments after you have processed
yours.
"""
return None
def usage(self):
"Returns the usage message. Make sure it is newline terminated"
msg="""
Usage: program [OPTIONS] FILENAME [SQL|CMD] [SQL|CMD]...
FILENAME is the name of a SQLite database. A new database is
created if the file does not exist.
OPTIONS include:
-init filename read/process named file
-echo print commands before execution
-[no]header turn headers on or off
-bail stop after hitting an error
-interactive force interactive I/O
-batch force batch I/O
-column set output mode to 'column'
-csv set output mode to 'csv'
-html set output mode to 'html'
-line set output mode to 'line'
-list set output mode to 'list'
-python set output mode to 'python'
-separator 'x' set output field separator (|)
-nullvalue 'text' set text string for NULL values
-version show SQLite version
-encoding 'name' the encoding to use for files
opened via .import, .read & .output
-nocolour disables colour output to screen
"""
return msg.lstrip()
###
### Value formatting routines. They take a value and return a
### text formatting of them. Mostly used by the various output's
### but also by random other pieces of code.
###
_binary_type = eval(("buffer", "bytes") [sys.version_info>=(3,0)])
_basestring = eval(("basestring", "str") [sys.version_info>=(3,0)])
# bytes that are ok in C strings - no need for quoting
_printable=[ord(x) for x in
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789~!@#$%^&*()`_-+={}[]:;,.<>/?|"
]
def _fmt_c_string(self, v):
"Format as a C string including surrounding double quotes"
if isinstance(v, self._basestring):
op=['"']
for c in v:
if c=="\\":
op.append("\\\\")
elif c=="\r":
op.append("\\r")
elif c=="\n":
op.append("\\n")
elif c=="\t":
op.append("\\t")
elif ord(c) not in self._printable:
op.append("\\"+c)
else:
op.append(c)
op.append('"')
return "".join(op)
elif v is None:
return '"'+self.nullvalue+'"'
elif isinstance(v, self._binary_type):
if sys.version_info<(3,0):
o=lambda x: ord(x)
fromc=lambda x: x
else:
o=lambda x: x
fromc=lambda x: chr(x)
res=['"']
for c in v:
if o(c) in self._printable:
res.append(fromc(c))
else:
res.append("\\x%02X" % (o(c),))
res.append('"')
return "".join(res)
else:
# number of some kind
return '"%s"' % (v,)
def _fmt_html_col(self, v):
"Format as HTML (mainly escaping &/</>"
return self._fmt_text_col(v).\
replace("&", "&"). \
replace(">", ">"). \
replace("<", "<"). \
replace("'", "'"). \
replace('"', """)
def _fmt_json_value(self, v):
"Format a value."
if isinstance(v, self._basestring):
# we assume utf8 so only some characters need to be escaed
op=['"']
for c in v:
if c=="\\":
op.append("\\\\")
elif c=="\r":
op.append("\\r")
elif c=="\n":
op.append("\\n")
elif c=="\t":
op.append("\\t")
elif c=="/": # yes you have to escape forward slash for some reason
op.append("\\/")
elif c=='"':
op.append("\\"+c)
elif c=="\\b":
op.append("\\b")
elif c=="\\f":
op.append("\\f")
else:
# It isn't clear when \u sequences *must* be used.
# Assuming not needed due to utf8 output which
# corresponds to what rfc4627 implies.
op.append(c)
op.append('"')
return "".join(op)
elif v is None:
return 'null'
elif isinstance(v, self._binary_type):
if sys.version_info<(3,0):
o=base64.encodestring(v)
else:
o=base64.encodebytes(v).decode("ascii")
if o[-1]=="\n":
o=o[:-1]
return '"'+o+'"'
else:
# number of some kind
return '%s' % (v,)
def _fmt_python(self, v):
"Format as python literal"
if v is None:
return "None"
elif isinstance(v, self._basestring):
return repr(v)
elif isinstance(v, self._binary_type):
if sys.version_info<(3,0):
res=["buffer(\""]
for i in v:
if ord(i) in self._printable:
res.append(i)
else:
res.append("\\x%02X" % (ord(i),))
res.append("\")")
return "".join(res)
else:
res=['b"']
for i in v:
if i in self._printable:
res.append(chr(i))
else:
res.append("\\x%02X" % (i,))
res.append('"')
return "".join(res)
else:
return "%s" % (v,)
def _fmt_sql_identifier(self, v):
"Return the identifier quoted in SQL syntax if needed (eg table and column names)"
if not len(v): # yes sqlite does allow zero length identifiers
return '""'
nonalnum=re.sub("[A-Za-z_0-9]+", "", v)
if len(nonalnum)==0:
if v.upper() not in self._sqlite_reserved:
# Ok providing it doesn't start with a digit
if v[0] not in "0123456789":
return v
# double quote it unless there are any double quotes in it
if '"' in nonalnum:
return "[%s]" % (v,)
return '"%s"' % (v,)
def _fmt_text_col(self, v):
"Regular text formatting"
if v is None:
return self.nullvalue
elif isinstance(v, self._basestring):
return v
elif isinstance(v, self._binary_type):
# sqlite gives back raw bytes!
return "<Binary data>"
else:
return "%s" % (v,)
###
### The various output routines. They are always called with the
### header irrespective of the setting allowing for some per query
### setup. (see output_column for example). The doc strings are
### used to generate help.
###
def output_column(self, header, line):
"""
Items left aligned in space padded columns. They are
truncated if they do not fit. If the width hasn't been
specified for a column then 10 is used unless the column name
(header) is longer in which case that width is used. Use the
.width command to change column sizes.
"""
# as an optimization we calculate self._actualwidths which is
# reset for each query
if header:
def gw(n):
if n<len(self.widths) and self.widths[n]!=0:
return self.widths[n]
# if width is not present or 0 then autosize
text=self._fmt_text_col(line[n])
return max(len(text), 10)
widths=[gw(i) for i in range(len(line))]
if self.truncate:
self._actualwidths=["%"+("-%d.%ds", "%d.%ds")[w<0]%(abs(w), abs(w)) for w in widths]
else:
self._actualwidths=["%"+("-%ds", "%ds")[w<0]%(abs(w),) for w in widths]
if self.header:
# output the headers
c=self.colour
cols=[c.header+(self._actualwidths[i] % (self._fmt_text_col(line[i]),))+c.header_ for i in range(len(line))]
# sqlite shell uses two spaces between columns
self.write(self.stdout, " ".join(cols)+"\n")
if c is self._colours["off"]:
self.output_column(False, ["-"*abs(widths[i]) for i in range(len(widths))])
return
cols=[self.colour.colour_value(line[i], self._actualwidths[i] % (self._fmt_text_col(line[i]),)) for i in range(len(line))]
# sqlite shell uses two spaces between columns
self.write(self.stdout, " ".join(cols)+"\n")
output_columns=output_column
def output_csv(self, header, line):
"""
Items in csv format (comma separated). Use tabs mode for tab
separated. You can use the .separator command to use a
different one after switching mode. A separator of comma uses
double quotes for quoting while other separators do not do any
quoting. The Python csv library used for this only supports
single character separators.
"""
# we use self._csv for the work, setup when header is
# supplied. _csv is a tuple of a StringIO and the csv.writer
# instance.
# Sigh
if sys.version_info<(3,0):
fixdata=lambda x: x.encode("utf8")
else:
fixdata=lambda x: x
if header:
if sys.version_info<(3,0):
import StringIO as io
else:
import io
s=io.StringIO()
kwargs={}
if self.separator==",":
kwargs["dialect"]="excel"
elif self.separator=="\t":
kwargs["dialect"]="excel-tab"
else:
kwargs["quoting"]=csv.QUOTE_NONE
kwargs["delimiter"]=fixdata(self.separator)
kwargs["doublequote"]=False
# csv module is bug ridden junk - I already say no
# quoting so it still looks for the quotechar and then
# gets upset that it can't be quoted. Which bit of no
# quoting was ambiguous?
kwargs["quotechar"]="\x00"
writer=csv.writer(s, **kwargs)
self._csv=(s, writer)
if self.header:
self.output_csv(None, line)
return
if header is None:
c=self.colour
line=[c.header+fixdata(self._fmt_text_col(l))+c.header_ for l in line]
else:
fmt=lambda x: self.colour.colour_value(x, fixdata(self._fmt_text_col(x)))
line=[fmt(l) for l in line]
self._csv[1].writerow(line)
t=self._csv[0].getvalue()
if sys.version_info<(3,0):
t=t.decode("utf8")
# csv lib always does DOS eol
assert(t.endswith("\r\n"))
t=t[:-2]
# should not be other eol irregularities
assert(not t.endswith("\r") and not t.endswith("\n"))
self.write(self.stdout, t+"\n")
self._csv[0].truncate(0)
self._csv[0].seek(0)
def output_html(self, header, line):
"HTML table style"
if header:
if not self.header:
return
fmt=lambda x: self.colour.header+self._fmt_html_col(x)+self.colour.header_
else:
fmt=lambda x: self.colour.colour_value(x, self._fmt_html_col(x))
line=[fmt(l) for l in line]
out=["<TR>"]
for l in line:
out.append(("<TD>","<TH>")[header])
out.append(l)
out.append(("</TD>\n","</TH>\n")[header])
out.append("</TR>\n")
self.write(self.stdout, "".join(out))
def output_insert(self, header, line):
"""
Lines as SQL insert statements. The table name is "table"
unless you specified a different one as the second parameter
to the .mode command.
"""
if header:
return
fmt=lambda x: self.colour.colour_value(x, apsw.format_sql_value(x))
out="INSERT INTO "+self._output_table+" VALUES("+",".join([fmt(l) for l in line])+");\n"
self.write(self.stdout, out)
def output_json(self, header, line):
"""
Each line as a JSON object with a trailing comma. Blobs are
output as base64 encoded strings. You should be using UTF8
output encoding.
"""
if header:
self._output_json_cols=line
return
fmt=lambda x: self.colour.colour_value(x, self._fmt_json_value(x))
out=["%s: %s" % (self._fmt_json_value(k), fmt(line[i])) for i,k in enumerate(self._output_json_cols)]
self.write(self.stdout, "{ "+", ".join(out)+"},\n")
def output_line(self, header, line):
"""
One value per line in the form 'column = value' with a blank
line between rows.
"""
if header:
w=5
for l in line:
if len(l)>w:
w=len(l)
self._line_info=(w, line)
return
fmt=lambda x: self.colour.colour_value(x, self._fmt_text_col(x))
w=self._line_info[0]
for i in range(len(line)):
self.write(self.stdout, "%*s = %s\n" % (w, self._line_info[1][i], fmt(line[i])))
self.write(self.stdout, "\n")
output_lines=output_line
def output_list(self, header, line):
"All items on one line with separator"
if header:
if not self.header:
return
c=self.colour
fmt=lambda x: c.header+x+c.header_
else:
fmt=lambda x: self.colour.colour_value(x, self._fmt_text_col(x))
self.write(self.stdout, self.separator.join([fmt(x) for x in line])+"\n")
def output_python(self, header, line):
"Tuples in Python source form for each row"
if header:
if not self.header:
return
c=self.colour
fmt=lambda x: c.header+self._fmt_python(x)+c.header_
else:
fmt=lambda x: self.colour.colour_value(x, self._fmt_python(x))
self.write(self.stdout, '('+", ".join([fmt(l) for l in line])+"),\n")
def output_tcl(self, header, line):
"Outputs TCL/C style strings using current separator"
# In theory you could paste the output into your source ...
if header:
if not self.header:
return
c=self.colour
fmt=lambda x: c.header+self._fmt_c_string(x)+c.header_
else:
fmt=lambda x: self.colour.colour_value(x, self._fmt_c_string(x))
self.write(self.stdout, self.separator.join([fmt(l) for l in line])+"\n")
def _output_summary(self, summary):
# internal routine to output a summary line or two
self.write(self.stdout, self.colour.summary+summary+self.colour.summary_)
###
### Various routines
###
def cmdloop(self, intro=None):
"""Runs the main interactive command loop.
:param intro: Initial text banner to display instead of the
default. Make sure you newline terminate it.
"""
if intro is None:
intro="""
SQLite version %s (APSW %s)
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
""" % (apsw.sqlitelibversion(), apsw.apswversion())
intro=intro.lstrip()
if self.interactive and intro:
if sys.version_info<(3,0):
intro=unicode(intro)
c=self.colour
self.write(self.stdout, c.intro+intro+c.intro_)
using_readline=False
try:
if self.interactive and self.stdin is sys.stdin:
import readline
old_completer=readline.get_completer()
readline.set_completer(self.complete)
readline.parse_and_bind("tab: complete")
using_readline=True
try:
readline.read_history_file(os.path.expanduser(self.history_file))
except:
# We only expect IOError here but if the history
# file does not exist and this code has been
# compiled into the module it is possible to get
# an IOError that doesn't match the IOError from
# Python parse time resulting in an IOError
# exception being raised. Consequently we just
# catch all exceptions.
pass
except ImportError:
pass
try:
while True:
self._input_descriptions=[]
if using_readline:
# we drop completion cache because it contains
# table and column names which could have changed
# with last executed SQL
self._completion_cache=None
self._using_readline=True
try:
command=self.getcompleteline()
if command is None: # EOF
return
self.process_complete_line(command)
except:
self._append_input_description()
try:
self.handle_exception()
except UnicodeDecodeError:
self.handle_exception()
finally:
if using_readline:
readline.set_completer(old_completer)
readline.set_history_length(256)
readline.write_history_file(os.path.expanduser(self.history_file))
def handle_exception(self):
"""Handles the current exception, printing a message to stderr as appropriate.
It will reraise the exception if necessary (eg if bail is true)"""
eclass,eval,etb=sys.exc_info() # py2&3 compatible way of doing this
if isinstance(eval, SystemExit):
eval._handle_exception_saw_this=True
raise
self._out_colour()
self.write(self.stderr, self.colour.error)
if isinstance(eval, KeyboardInterrupt):
self.handle_interrupt()
text="Interrupted"
else:
text=str(eval)
if not text.endswith("\n"):
text=text+"\n"
if len(self._input_descriptions):
for i in range(len(self._input_descriptions)):
if i==0:
pref="At "
else:
pref=" "*i+"From "
self.write(self.stderr, pref+self._input_descriptions[i]+"\n")
self.write(self.stderr, text)
if self.exceptions:
stack=[]
while etb:
stack.append(etb.tb_frame)
etb = etb.tb_next
for frame in stack:
self.write(self.stderr, "\nFrame %s in %s at line %d\n" %
(frame.f_code.co_name, frame.f_code.co_filename,
frame.f_lineno))
vars=list(frame.f_locals.items())
vars.sort()
for k,v in vars:
try:
v=repr(v)[:80]
except:
v="<Unable to convert to string>"
self.write(self.stderr, "%10s = %s\n" % (k,v))
self.write(self.stderr, "\n%s: %s\n" % (eclass, repr(eval)))
self.write(self.stderr, self.colour.error_)
eval._handle_exception_saw_this=True
if self.bail:
raise
def process_sql(self, sql, bindings=None, internal=False, summary=None):
"""Processes SQL text consisting of one or more statements
:param sql: SQL to execute
:param bindings: bindings for the *sql*
:param internal: If True then this is an internal execution
(eg the .tables or .database command). When executing
internal sql timings are not shown nor is the SQL echoed.
:param summary: If not None then should be a tuple of two
items. If the ``sql`` returns any data then the first item
is printed before the first row, and the second item is
printed after the last row. An example usage is the .find
command which shows table names.
"""
cur=self.db.cursor()
# we need to know when each new statement is executed
state={'newsql': True, 'timing': None}
def et(cur, sql, bindings):
state['newsql']=True
# if time reporting, do so now
if not internal and self.timer:
if state['timing']:
self.display_timing(state['timing'], self.get_resource_usage())
# print statement if echo is on
if not internal and self.echo:
# ? should we strip leading and trailing whitespace? backslash quote stuff?
if bindings:
self.write(self.stderr, "%s [%s]\n" % (sql, bindings))
else:
self.write(self.stderr, sql+"\n")
# save resource from beginning of command (ie don't include echo time above)
if not internal and self.timer:
state['timing']=self.get_resource_usage()
return True
cur.setexectrace(et)
# processing loop
try:
for row in cur.execute(sql, bindings):
if state['newsql']:
# summary line?
if summary:
self._output_summary(summary[0])
# output a header always
cols=[h for h,d in cur.getdescription()]
self.output(True, cols)
state['newsql']=False
self.output(False, row)
if not state['newsql'] and summary:
self._output_summary(summary[1])
except:
# If echo is on and the sql to execute is a syntax error
# then the exec tracer won't have seen it so it won't be
# printed and the user will be wondering exactly what sql
# had the error. We look in the traceback and deduce if
# the error was happening in a prepare or not. Also we
# need to ignore the case where SQLITE_SCHEMA happened and
# a reprepare is being done since the exec tracer will
# have been called in that situation.
if not internal and self.echo:
tb=sys.exc_info()[2]
last=None
while tb:
last=tb.tb_frame
tb=tb.tb_next
if last and last.f_code.co_name=="sqlite3_prepare" \
and last.f_code.co_filename.endswith("statementcache.c") \
and "sql" in last.f_locals:
self.write(self.stderr, last.f_locals["sql"]+"\n")
raise
if not internal and self.timer:
self.display_timing(state['timing'], self.get_resource_usage())
def process_command(self, cmd):
"""Processes a dot command. It is split into parts using the
`shlex.split
<http://docs.python.org/library/shlex.html#shlex.split>`__
function which is roughly the same method used by Unix/POSIX
shells.
"""
if self.echo:
self.write(self.stderr, cmd+"\n")
# broken with unicode on Python 2!!!
if sys.version_info<(3,0):
cmd=cmd.encode("utf8")
cmd=[c.decode("utf8") for c in shlex.split(cmd)]
else:
cmd=shlex.split(cmd)
assert cmd[0][0]=="."
cmd[0]=cmd[0][1:]
fn=getattr(self, "command_"+cmd[0], None)
if not fn:
raise self.Error("Unknown command \"%s\". Enter \".help\" for help" % (cmd[0],))
res=fn(cmd[1:])
###
### Commands start here
###
def _boolean_command(self, name, cmd):
"Parse and verify boolean parameter"
if len(cmd)!=1 or cmd[0].lower() not in ("on", "off"):
raise self.Error(name+" expected ON or OFF")
return cmd[0].lower()=="on"
# Note that doc text is used for generating help output.
def command_backup(self, cmd):
"""backup ?DB? FILE: Backup DB (default "main") to FILE
Copies the contents of the current database to FILE
overwriting whatever was in FILE. If you have attached databases
then you can specify their name instead of the default of "main".
The backup is done at the page level - SQLite copies the pages
as is. There is no round trip through SQL code.
"""
dbname="main"
if len(cmd)==1:
fname=cmd[0]
elif len(cmd)==2:
dbname=cmd[0]
fname=cmd[1]
else:
raise self.Error("Backup takes one or two parameters")
out=apsw.Connection(fname)
b=out.backup("main", self.db, dbname)
try:
while not b.done:
b.step()
finally:
b.finish()
out.close()
def command_bail(self, cmd):
"""bail ON|OFF: Stop after hitting an error (default OFF)
If an error is encountered while processing commands or SQL
then exit. (Note this is different than SQLite shell which
only exits for errors in SQL.)
"""
self.bail=self._boolean_command("bail", cmd)
def command_colour(self, cmd=[]):
"""colour SCHEME: Selects a colour scheme
Residents of both countries that have not adopted the metric
system may also spell this command without a 'u'. If using a
colour terminal in interactive mode then output is
automatically coloured to make it more readable. Use 'off' to
turn off colour, and no name or 'default' for the default.
"""
if len(cmd)>1:
raise self.Error("Too many colour schemes")
c=cmd and cmd[0] or "default"
if c not in self._colours:
raise self.Error("No such colour scheme: "+c)
self.colour_scheme=c
self._out_colour()
command_color=command_colour
def command_databases(self, cmd):