/
linter.py
1182 lines (1071 loc) · 46.8 KB
/
linter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Defines the linter class."""
import logging
import os
import time
from typing import (
TYPE_CHECKING,
Any,
Iterable,
Iterator,
List,
Optional,
Sequence,
Set,
Tuple,
Type,
cast,
)
import pathspec
import regex
from tqdm import tqdm
from sqlfluff.core.config import ConfigLoader, FluffConfig, progress_bar_configuration
from sqlfluff.core.errors import (
SQLBaseError,
SQLFluffSkipFile,
SQLFluffUserError,
SQLLexError,
SQLLintError,
SQLParseError,
SQLTemplaterError,
)
from sqlfluff.core.helpers.file import get_encoding
from sqlfluff.core.linter.common import (
ParsedString,
ParsedVariant,
RenderedFile,
RuleTuple,
)
from sqlfluff.core.linter.fix import apply_fixes, compute_anchor_edit_info
from sqlfluff.core.linter.linted_dir import LintedDir
from sqlfluff.core.linter.linted_file import (
TMP_PRS_ERROR_TYPES,
FileTimings,
LintedFile,
)
from sqlfluff.core.linter.linting_result import LintingResult
from sqlfluff.core.parser import Lexer, Parser
from sqlfluff.core.parser.segments.base import BaseSegment, SourceFix
from sqlfluff.core.rules import BaseRule, RulePack, get_ruleset
from sqlfluff.core.rules.noqa import IgnoreMask
if TYPE_CHECKING: # pragma: no cover
from sqlfluff.core.dialects import Dialect
from sqlfluff.core.parser.segments.meta import MetaSegment
from sqlfluff.core.templaters import RawTemplater, TemplatedFile
WalkableType = Iterable[Tuple[str, Optional[List[str]], List[str]]]
RuleTimingsType = List[Tuple[str, str, float]]
# Instantiate the linter logger
linter_logger: logging.Logger = logging.getLogger("sqlfluff.linter")
class Linter:
"""The interface class to interact with the linter."""
# Default to allowing process parallelism
allow_process_parallelism = True
def __init__(
self,
config: Optional[FluffConfig] = None,
formatter: Any = None,
dialect: Optional[str] = None,
rules: Optional[List[str]] = None,
user_rules: Optional[List[Type[BaseRule]]] = None,
exclude_rules: Optional[List[str]] = None,
) -> None:
# Store the config object
self.config = FluffConfig.from_kwargs(
config=config,
dialect=dialect,
rules=rules,
exclude_rules=exclude_rules,
# Don't require a dialect to be provided yet. Defer this until we
# are actually linting something, since the directory we are linting
# from may provide additional configuration, including a dialect.
require_dialect=False,
)
# Get the dialect and templater
self.dialect: "Dialect" = cast("Dialect", self.config.get("dialect_obj"))
self.templater: "RawTemplater" = cast(
"RawTemplater", self.config.get("templater_obj")
)
# Store the formatter for output
self.formatter = formatter
# Store references to user rule classes
self.user_rules = user_rules or []
def get_rulepack(self, config: Optional[FluffConfig] = None) -> RulePack:
"""Get hold of a set of rules."""
rs = get_ruleset()
# Register any user rules
for rule in self.user_rules:
rs.register(rule)
cfg = config or self.config
return rs.get_rulepack(config=cfg)
def rule_tuples(self) -> List[RuleTuple]:
"""A simple pass through to access the rule tuples of the rule set."""
rs = self.get_rulepack()
return [
RuleTuple(rule.code, rule.name, rule.description, rule.groups, rule.aliases)
for rule in rs.rules
]
# #### Static methods
# These are the building blocks of the linting process.
@staticmethod
def load_raw_file_and_config(
fname: str, root_config: FluffConfig
) -> Tuple[str, FluffConfig, str]:
"""Load a raw file and the associated config."""
file_config = root_config.make_child_from_path(fname)
config_encoding: str = file_config.get("encoding", default="autodetect")
encoding = get_encoding(fname=fname, config_encoding=config_encoding)
# Check file size before loading.
limit = file_config.get("large_file_skip_byte_limit")
if limit:
# Get the file size
file_size = os.path.getsize(fname)
if file_size > limit:
raise SQLFluffSkipFile(
f"Length of file {fname!r} is {file_size} bytes which is over "
f"the limit of {limit} bytes. Skipping to avoid parser lock. "
"Users can increase this limit in their config by setting the "
"'large_file_skip_byte_limit' value, or disable by setting it "
"to zero."
)
with open(fname, encoding=encoding, errors="backslashreplace") as target_file:
raw_file = target_file.read()
# Scan the raw file for config commands.
file_config.process_raw_file_for_config(raw_file, fname)
# Return the raw file and config
return raw_file, file_config, encoding
@staticmethod
def _normalise_newlines(string: str) -> str:
"""Normalise newlines to unix-style line endings."""
return regex.sub(r"\r\n|\r", "\n", string)
@staticmethod
def _lex_templated_file(
templated_file: "TemplatedFile", config: FluffConfig
) -> Tuple[Optional[Sequence[BaseSegment]], List[SQLLexError]]:
"""Lex a templated file."""
violations = []
linter_logger.info("LEXING RAW (%s)", templated_file.fname)
# Get the lexer
lexer = Lexer(config=config)
# Lex the file and log any problems
try:
segments, lex_vs = lexer.lex(templated_file)
# NOTE: There will always be segments, even if it's
# just an end of file marker.
assert segments, "The token sequence should never be empty."
# We might just get the violations as a list
violations += lex_vs
linter_logger.info("Lexed segments: %s", [seg.raw for seg in segments])
except SQLLexError as err: # pragma: no cover
linter_logger.info("LEXING FAILED! (%s): %s", templated_file.fname, err)
violations.append(err)
return None, violations
# Check that we've got sensible indentation from the lexer.
# We might need to suppress if it's a complicated file.
templating_blocks_indent = config.get("template_blocks_indent", "indentation")
if isinstance(templating_blocks_indent, str):
force_block_indent = templating_blocks_indent.lower().strip() == "force"
else:
force_block_indent = False
templating_blocks_indent = bool(templating_blocks_indent)
# If we're forcing it through we don't check.
if templating_blocks_indent and not force_block_indent:
indent_balance = sum(getattr(elem, "indent_val", 0) for elem in segments)
if indent_balance != 0: # pragma: no cover
linter_logger.debug(
"Indent balance test failed for %r. Template indents will not be "
"linted for this file.",
templated_file.fname,
)
# Don't enable the templating blocks.
templating_blocks_indent = False
# The file will have been lexed without config, so check all indents
# are enabled.
new_segments = []
for segment in segments:
if segment.is_meta:
meta_segment = cast("MetaSegment", segment)
if meta_segment.indent_val != 0:
# Don't allow it if we're not linting templating block indents.
if not templating_blocks_indent:
continue # pragma: no cover
new_segments.append(segment)
# Return new buffer
return new_segments, violations
@staticmethod
def _parse_tokens(
tokens: Sequence[BaseSegment],
config: FluffConfig,
fname: Optional[str] = None,
parse_statistics: bool = False,
) -> Tuple[Optional[BaseSegment], List[SQLParseError]]:
parser = Parser(config=config)
violations = []
# Parse the file and log any problems
try:
parsed: Optional[BaseSegment] = parser.parse(
# Regardless of how the sequence was passed in, we should
# coerce it to a tuple here, before we head deeper into
# the parsing process.
tuple(tokens),
fname=fname,
parse_statistics=parse_statistics,
)
except SQLParseError as err:
linter_logger.info("PARSING FAILED! : %s", err)
violations.append(err)
return None, violations
if parsed is None: # pragma: no cover
return None, violations
linter_logger.info("\n###\n#\n# {}\n#\n###".format("Parsed Tree:"))
linter_logger.info("\n" + parsed.stringify())
# We may succeed parsing, but still have unparsable segments. Extract them
# here.
for unparsable in parsed.iter_unparsables():
# No exception has been raised explicitly, but we still create one here
# so that we can use the common interface
assert unparsable.pos_marker
violations.append(
SQLParseError(
"Line {0[0]}, Position {0[1]}: Found unparsable section: "
"{1!r}".format(
unparsable.pos_marker.working_loc,
(
unparsable.raw
if len(unparsable.raw) < 40
else unparsable.raw[:40] + "..."
),
),
segment=unparsable,
)
)
linter_logger.info("Found unparsable segment...")
linter_logger.info(unparsable.stringify())
return parsed, violations
@staticmethod
def remove_templated_errors(
linting_errors: List[SQLBaseError],
) -> List[SQLBaseError]:
"""Filter a list of lint errors, removing those from the templated slices."""
# Filter out any linting errors in templated sections if relevant.
result: List[SQLBaseError] = []
for e in linting_errors:
if isinstance(e, SQLLintError):
assert e.segment.pos_marker
if (
# Is it in a literal section?
e.segment.pos_marker.is_literal()
# Is it a rule that is designed to work on templated sections?
or e.rule.targets_templated
):
result.append(e)
else:
# If it's another type, just keep it. (E.g. SQLParseError from
# malformed "noqa" comment).
result.append(e)
return result
@staticmethod
def _report_conflicting_fixes_same_anchor(message: str) -> None: # pragma: no cover
# This function exists primarily in order to let us monkeypatch it at
# runtime (replacing it with a function that raises an exception).
linter_logger.critical(message)
@staticmethod
def _warn_unfixable(code: str) -> None:
linter_logger.warning(
f"One fix for {code} not applied, it would re-cause the same error."
)
# ### Class Methods
# These compose the base static methods into useful recipes.
@classmethod
def parse_rendered(
cls,
rendered: RenderedFile,
parse_statistics: bool = False,
) -> ParsedString:
"""Parse a rendered file."""
tokens: Optional[Sequence[BaseSegment]]
parsed_variants: List[ParsedVariant] = []
_lexing_time = 0.0
_parsing_time = 0.0
for idx, variant in enumerate(rendered.templated_variants):
t0 = time.monotonic()
linter_logger.info("Parse Rendered. Lexing Variant %s", idx)
tokens, lex_errors = cls._lex_templated_file(variant, rendered.config)
t1 = time.monotonic()
linter_logger.info("Parse Rendered. Parsing Variant %s", idx)
if tokens:
parsed, parse_errors = cls._parse_tokens(
tokens,
rendered.config,
fname=rendered.fname,
parse_statistics=parse_statistics,
)
else: # pragma: no cover
parsed = None
parse_errors = []
_lt = t1 - t0
_pt = time.monotonic() - t1
linter_logger.info(
"Parse Rendered. Variant %s. Lex in %s. Parse in %s.", idx, _lt, _pt
)
parsed_variants.append(
ParsedVariant(
variant,
parsed,
lex_errors,
parse_errors,
)
)
_lexing_time += _lt
_parsing_time += _pt
time_dict = {
**rendered.time_dict,
"lexing": _lexing_time,
"parsing": _parsing_time,
}
return ParsedString(
parsed_variants=parsed_variants,
templating_violations=rendered.templater_violations,
time_dict=time_dict,
config=rendered.config,
fname=rendered.fname,
source_str=rendered.source_str,
)
@classmethod
def lint_fix_parsed(
cls,
tree: BaseSegment,
config: FluffConfig,
rule_pack: RulePack,
fix: bool = False,
fname: Optional[str] = None,
templated_file: Optional["TemplatedFile"] = None,
formatter: Any = None,
) -> Tuple[BaseSegment, List[SQLBaseError], Optional[IgnoreMask], RuleTimingsType]:
"""Lint and optionally fix a tree object."""
# Keep track of the linting errors on the very first linter pass. The
# list of issues output by "lint" and "fix" only includes issues present
# in the initial SQL code, EXCLUDING any issues that may be created by
# the fixes themselves.
initial_linting_errors = []
# A placeholder for the fixes we had on the previous loop
last_fixes = None
# Keep a set of previous versions to catch infinite loops.
previous_versions: Set[Tuple[str, Tuple["SourceFix", ...]]] = {(tree.raw, ())}
# Keep a buffer for recording rule timings.
rule_timings: RuleTimingsType = []
# If we are fixing then we want to loop up to the runaway_limit, otherwise just
# once for linting.
loop_limit = config.get("runaway_limit") if fix else 1
# Dispatch the output for the lint header
if formatter:
formatter.dispatch_lint_header(fname, sorted(rule_pack.codes()))
# Look for comment segments which might indicate lines to ignore.
if not config.get("disable_noqa"):
ignore_mask, ivs = IgnoreMask.from_tree(tree, rule_pack.reference_map)
initial_linting_errors += ivs
else:
ignore_mask = None
save_tree = tree
# There are two phases of rule running.
# 1. The main loop is for most rules. These rules are assumed to
# interact and cause a cascade of fixes requiring multiple passes.
# These are run the `runaway_limit` number of times (default 10).
# 2. The post loop is for post-processing rules, not expected to trigger
# any downstream rules, e.g. capitalization fixes. They are run on the
# first loop and then twice at the end (once to fix, and once again to
# check result of fixes), but not in the intervening loops.
phases = ["main"]
if fix:
phases.append("post")
for phase in phases:
if len(phases) > 1:
rules_this_phase = [
rule for rule in rule_pack.rules if rule.lint_phase == phase
]
else:
rules_this_phase = rule_pack.rules
for loop in range(loop_limit if phase == "main" else 2):
def is_first_linter_pass() -> bool:
return phase == phases[0] and loop == 0
# Additional newlines are to assist in scanning linting loops
# during debugging.
linter_logger.info(
f"\n\nEntering linter phase {phase}, "
f"loop {loop + 1}/{loop_limit}\n"
)
changed = False
if is_first_linter_pass():
# In order to compute initial_linting_errors correctly, need
# to run all rules on the first loop of the main phase.
rules_this_phase = rule_pack.rules
progress_bar_crawler = tqdm(
rules_this_phase,
desc="lint by rules",
leave=False,
disable=progress_bar_configuration.disable_progress_bar,
)
for crawler in progress_bar_crawler:
# Performance: After first loop pass, skip rules that don't
# do fixes. Any results returned won't be seen by the user
# anyway (linting errors ADDED by rules changing SQL, are
# not reported back to the user - only initial linting errors),
# so there's absolutely no reason to run them.
if (
fix
and not is_first_linter_pass()
and not crawler.is_fix_compatible
):
continue
progress_bar_crawler.set_description(f"rule {crawler.code}")
t0 = time.monotonic()
# fixes should be a dict {} with keys edit, delete, create
# delete is just a list of segments to delete
# edit and create are list of tuples. The first element is
# the "anchor", the segment to look for either to edit or to
# insert BEFORE. The second is the element to insert or create.
linting_errors, _, fixes, _ = crawler.crawl(
tree,
dialect=config.get("dialect_obj"),
fix=fix,
templated_file=templated_file,
ignore_mask=ignore_mask,
fname=fname,
config=config,
)
if is_first_linter_pass():
initial_linting_errors += linting_errors
if fix and fixes:
linter_logger.info(f"Applying Fixes [{crawler.code}]: {fixes}")
# Do some sanity checks on the fixes before applying.
anchor_info = compute_anchor_edit_info(fixes)
if any(
not info.is_valid for info in anchor_info.values()
): # pragma: no cover
message = (
f"Rule {crawler.code} returned conflicting "
"fixes with the same anchor. This is only "
"supported for create_before+create_after, so "
"the fixes will not be applied. "
)
for uuid, info in anchor_info.items():
if not info.is_valid:
message += f"\n{uuid}:"
for _fix in info.fixes:
message += f"\n {_fix}"
cls._report_conflicting_fixes_same_anchor(message)
for lint_result in linting_errors:
lint_result.fixes = []
elif fixes == last_fixes: # pragma: no cover
# If we generate the same fixes two times in a row,
# that means we're in a loop, and we want to stop.
# (Fixes should address issues, hence different
# and/or fewer fixes next time.)
cls._warn_unfixable(crawler.code)
else:
# This is the happy path. We have fixes, now we want to
# apply them.
last_fixes = fixes
new_tree, _, _, _valid = apply_fixes(
tree,
config.get("dialect_obj"),
crawler.code,
anchor_info,
)
# Check for infinite loops. We use a combination of the
# fixed templated file and the list of source fixes to
# apply.
loop_check_tuple = (
new_tree.raw,
tuple(new_tree.source_fixes),
)
if not _valid:
# The fixes result in an invalid file. Don't apply
# the fix and skip onward. Show a warning.
linter_logger.warning(
f"Fixes for {crawler.code} not applied, as it "
"would result in an unparsable file. Please "
"report this as a bug with a minimal query "
"which demonstrates this warning."
)
elif loop_check_tuple not in previous_versions:
# We've not seen this version of the file so
# far. Continue.
tree = new_tree
previous_versions.add(loop_check_tuple)
changed = True
continue
else:
# Applying these fixes took us back to a state
# which we've seen before. We're in a loop, so
# we want to stop.
cls._warn_unfixable(crawler.code)
# Record rule timing
rule_timings.append(
(crawler.code, crawler.name, time.monotonic() - t0)
)
if fix and not changed:
# We did not change the file. Either the file is clean (no
# fixes), or any fixes which are present will take us back
# to a previous state.
linter_logger.info(
f"Fix loop complete for {phase} phase. Stability "
f"achieved after {loop}/{loop_limit} loops."
)
break
else:
if fix:
# The linter loop hit the limit before reaching a stable point
# (i.e. free of lint errors). If this happens, it's usually
# because one or more rules produced fixes which did not address
# the original issue **or** created new issues.
linter_logger.warning(
f"Loop limit on fixes reached [{loop_limit}]."
)
# Discard any fixes for the linting errors, since they caused a
# loop. IMPORTANT: By doing this, we are telling SQLFluff that
# these linting errors are "unfixable". This is important,
# because when "sqlfluff fix" encounters unfixable lint errors,
# it exits with a "failure" exit code, which is exactly what we
# want in this situation. (Reason: Although this is more of an
# internal SQLFluff issue, users deserve to know about it,
# because it means their file(s) weren't fixed.
for violation in initial_linting_errors:
if isinstance(violation, SQLLintError):
violation.fixes = []
# Return the original parse tree, before any fixes were applied.
# Reason: When the linter hits the loop limit, the file is often
# messy, e.g. some of the fixes were applied repeatedly, possibly
# other weird things. We don't want the user to see this junk!
return save_tree, initial_linting_errors, ignore_mask, rule_timings
if config.get("ignore_templated_areas", default=True):
initial_linting_errors = cls.remove_templated_errors(initial_linting_errors)
linter_logger.info("\n###\n#\n# {}\n#\n###".format("Fixed Tree:"))
linter_logger.info("\n" + tree.stringify())
return tree, initial_linting_errors, ignore_mask, rule_timings
@classmethod
def lint_parsed(
cls,
parsed: ParsedString,
rule_pack: RulePack,
fix: bool = False,
formatter: Any = None,
encoding: str = "utf8",
) -> LintedFile:
"""Lint a ParsedString and return a LintedFile."""
violations = parsed.violations
time_dict = parsed.time_dict
tree: Optional[BaseSegment] = None
# TODO: Eventually enable linting of more than just the first variant.
if parsed.parsed_variants:
tree = parsed.parsed_variants[0].tree
variant = parsed.parsed_variants[0].templated_file
else:
variant = None
if tree:
t0 = time.monotonic()
linter_logger.info("LINTING (%s)", parsed.fname)
(
tree,
initial_linting_errors,
ignore_mask,
rule_timings,
) = cls.lint_fix_parsed(
tree,
config=parsed.config,
rule_pack=rule_pack,
fix=fix,
fname=parsed.fname,
templated_file=variant,
formatter=formatter,
)
# Update the timing dict
time_dict["linting"] = time.monotonic() - t0
# We're only going to return the *initial* errors, rather
# than any generated during the fixing cycle.
violations += initial_linting_errors
else:
ignore_mask = None
rule_timings = []
if not parsed.config.get("disable_noqa"):
# Templating and/or parsing have failed. Look for "noqa"
# comments (the normal path for identifying these comments
# requires access to the parse tree, and because of the failure,
# we don't have a parse tree).
ignore_mask, ignore_violations = IgnoreMask.from_source(
parsed.source_str,
[
lm
for lm in parsed.config.get("dialect_obj").lexer_matchers
if lm.name == "inline_comment"
][0],
rule_pack.reference_map,
)
violations += ignore_violations
# We process the ignore config here if appropriate
for violation in violations:
violation.ignore_if_in(parsed.config.get("ignore"))
violation.warning_if_in(parsed.config.get("warnings"))
linted_file = LintedFile(
parsed.fname,
# Deduplicate violations
LintedFile.deduplicate_in_source_space(violations),
FileTimings(time_dict, rule_timings),
tree,
ignore_mask=ignore_mask,
templated_file=variant,
encoding=encoding,
)
# This is the main command line output from linting.
if formatter:
formatter.dispatch_file_violations(
parsed.fname,
linted_file,
only_fixable=fix,
warn_unused_ignores=parsed.config.get("warn_unused_ignores"),
)
# Safety flag for unset dialects
if linted_file.get_violations(
fixable=True if fix else None, types=SQLParseError
):
if formatter: # pragma: no cover TODO?
formatter.dispatch_dialect_warning(parsed.config.get("dialect"))
return linted_file
@classmethod
def lint_rendered(
cls,
rendered: RenderedFile,
rule_pack: RulePack,
fix: bool = False,
formatter: Any = None,
) -> LintedFile:
"""Take a RenderedFile and return a LintedFile."""
parsed = cls.parse_rendered(rendered)
return cls.lint_parsed(
parsed,
rule_pack=rule_pack,
fix=fix,
formatter=formatter,
encoding=rendered.encoding,
)
# ### Instance Methods
# These are tied to a specific instance and so are not necessarily
# safe to use in parallel operations.
def render_string(
self, in_str: str, fname: str, config: FluffConfig, encoding: str
) -> RenderedFile:
"""Template the file."""
linter_logger.info("Rendering String [%s] (%s)", self.templater.name, fname)
# Start the templating timer
t0 = time.monotonic()
# Newlines are normalised to unix-style line endings (\n).
# The motivation is that Jinja normalises newlines during templating and
# we want consistent mapping between the raw and templated slices.
in_str = self._normalise_newlines(in_str)
# Since Linter.__init__() does not require a dialect to be specified,
# check for one now. (We're processing a string, not a file, so we're
# not going to pick up a .sqlfluff or other config file to provide a
# missing dialect at this point.)
config.verify_dialect_specified()
if not config.get("templater_obj") == self.templater:
linter_logger.warning(
(
f"Attempt to set templater to {config.get('templater_obj').name} "
f"failed. Using {self.templater.name} templater. Templater cannot "
"be set in a .sqlfluff file in a subdirectory of the current "
"working directory. It can be set in a .sqlfluff in the current "
"working directory. See Nesting section of the docs for more "
"details."
)
)
variant_limit = config.get("render_variant_limit")
templated_variants: List[TemplatedFile] = []
templater_violations: List[SQLTemplaterError] = []
try:
for variant, templater_errs in self.templater.process_with_variants(
in_str=in_str, fname=fname, config=config, formatter=self.formatter
):
if variant:
templated_variants.append(variant)
# NOTE: We could very easily end up with duplicate errors between
# different variants and this code doesn't currently do any
# deduplication between them. That will be resolved in further
# testing.
# TODO: Resolve potential duplicate templater violations between
# variants before we enable jinja variant linting by default.
templater_violations += templater_errs
if len(templated_variants) >= variant_limit:
# Stop if we hit the limit.
break
except SQLTemplaterError as templater_err:
# Fatal templating error. Capture it and don't generate a variant.
templater_violations.append(templater_err)
except SQLFluffSkipFile as skip_file_err: # pragma: no cover
linter_logger.warning(str(skip_file_err))
if not templated_variants:
linter_logger.info("TEMPLATING FAILED: %s", templater_violations)
linter_logger.info("Rendered %s variants", len(templated_variants))
# Record time
time_dict = {"templating": time.monotonic() - t0}
return RenderedFile(
templated_variants,
templater_violations,
config,
time_dict,
fname,
encoding,
in_str,
)
def render_file(self, fname: str, root_config: FluffConfig) -> RenderedFile:
"""Load and render a file with relevant config."""
# Load the raw file.
raw_file, config, encoding = self.load_raw_file_and_config(fname, root_config)
# Render the file
return self.render_string(raw_file, fname, config, encoding)
def parse_string(
self,
in_str: str,
fname: str = "<string>",
config: Optional[FluffConfig] = None,
encoding: str = "utf-8",
parse_statistics: bool = False,
) -> ParsedString:
"""Parse a string."""
violations: List[SQLBaseError] = []
# Dispatch the output for the template header (including the config diff)
if self.formatter:
self.formatter.dispatch_template_header(fname, self.config, config)
# Just use the local config from here:
config = config or self.config
# Scan the raw file for config commands.
config.process_raw_file_for_config(in_str, fname)
rendered = self.render_string(in_str, fname, config, encoding)
violations += rendered.templater_violations
# Dispatch the output for the parse header
if self.formatter:
self.formatter.dispatch_parse_header(fname)
return self.parse_rendered(rendered, parse_statistics=parse_statistics)
def fix(
self,
tree: BaseSegment,
config: Optional[FluffConfig] = None,
fname: Optional[str] = None,
templated_file: Optional["TemplatedFile"] = None,
) -> Tuple[BaseSegment, List[SQLBaseError]]:
"""Return the fixed tree and violations from lintfix when we're fixing."""
config = config or self.config
rule_pack = self.get_rulepack(config=config)
fixed_tree, violations, _, _ = self.lint_fix_parsed(
tree,
config,
rule_pack,
fix=True,
fname=fname,
templated_file=templated_file,
formatter=self.formatter,
)
return fixed_tree, violations
def lint(
self,
tree: BaseSegment,
config: Optional[FluffConfig] = None,
fname: Optional[str] = None,
templated_file: Optional["TemplatedFile"] = None,
) -> List[SQLBaseError]:
"""Return just the violations from lintfix when we're only linting."""
config = config or self.config
rule_pack = self.get_rulepack(config=config)
_, violations, _, _ = self.lint_fix_parsed(
tree,
config,
rule_pack,
fix=False,
fname=fname,
templated_file=templated_file,
formatter=self.formatter,
)
return violations
def lint_string(
self,
in_str: str = "",
fname: str = "<string input>",
fix: bool = False,
config: Optional[FluffConfig] = None,
encoding: str = "utf8",
) -> LintedFile:
"""Lint a string.
Returns:
:obj:`LintedFile`: an object representing that linted file.
"""
# Sort out config, defaulting to the built in config if no override
config = config or self.config
# Parse the string.
parsed = self.parse_string(
in_str=in_str,
fname=fname,
config=config,
)
# Get rules as appropriate
rule_pack = self.get_rulepack(config=config)
# Lint the file and return the LintedFile
return self.lint_parsed(
parsed,
rule_pack,
fix=fix,
formatter=self.formatter,
encoding=encoding,
)
def paths_from_path(
self,
path: str,
ignore_file_name: str = ".sqlfluffignore",
ignore_non_existent_files: bool = False,
ignore_files: bool = True,
working_path: str = os.getcwd(),
) -> List[str]:
"""Return a set of sql file paths from a potentially more ambiguous path string.
Here we also deal with the .sqlfluffignore file if present.
When a path to a file to be linted is explicitly passed
we look for ignore files in all directories that are parents of the file,
up to the current directory.
If the current directory is not a parent of the file we only
look for an ignore file in the direct parent of the file.
"""
if not os.path.exists(path):
if ignore_non_existent_files:
return []
else:
raise SQLFluffUserError(
f"Specified path does not exist. Check it/they exist(s): {path}."
)
# Files referred to exactly are also ignored if
# matched, but we warn the users when that happens
is_exact_file = os.path.isfile(path)
path_walk: WalkableType
if is_exact_file:
# When the exact file to lint is passed, we
# fill path_walk with an input that follows
# the structure of `os.walk`:
# (root, directories, files)
dirpath = os.path.dirname(path)
files = [os.path.basename(path)]
path_walk = [(dirpath, None, files)]
else:
path_walk = list(os.walk(path))
ignore_file_paths = ConfigLoader.find_ignore_config_files(
path=path, working_path=working_path, ignore_file_name=ignore_file_name
)
# Add paths that could contain "ignore files"
# to the path_walk list
path_walk_ignore_file = [
(
os.path.dirname(ignore_file_path),
None,
# Only one possible file, since we only
# have one "ignore file name"
[os.path.basename(ignore_file_path)],
)
for ignore_file_path in ignore_file_paths
]
path_walk += path_walk_ignore_file
# If it's a directory then expand the path!
buffer = []
ignores = {}
for dirpath, _, filenames in path_walk:
for fname in filenames:
fpath = os.path.join(dirpath, fname)
# Handle potential .sqlfluffignore files
if ignore_files and fname == ignore_file_name:
with open(fpath) as fh:
spec = pathspec.PathSpec.from_lines("gitwildmatch", fh)
ignores[dirpath] = spec
# We don't need to process the ignore file any further
continue
# We won't purge files *here* because there's an edge case
# that the ignore file is processed after the sql file.
# Scan for remaining files
for ext in (
self.config.get("sql_file_exts", default=".sql").lower().split(",")
):
# is it a sql file?
if fname.lower().endswith(ext):
buffer.append(fpath)
if not ignore_files:
return sorted(buffer)
# Check the buffer for ignore items and normalise the rest.
# It's a set, so we can do natural deduplication.
filtered_buffer = set()
for fpath in buffer:
abs_fpath = os.path.abspath(fpath)
for ignore_base, ignore_spec in ignores.items():
abs_ignore_base = os.path.abspath(ignore_base)
if abs_fpath.startswith(
abs_ignore_base
+ (
""
if os.path.dirname(abs_ignore_base) == abs_ignore_base
else os.sep