Skip to content

Commit

Permalink
Snowflake: Add implementation for CREATE TASK statement (sqlfluff#1597)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Hutter committed Oct 12, 2021
1 parent c61003a commit 66421a9
Show file tree
Hide file tree
Showing 3 changed files with 623 additions and 1 deletion.
91 changes: 90 additions & 1 deletion src/sqlfluff/dialects/dialect_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@
"TERSE",
"TABULAR",
"UNSET",
"USER_TASK_TIMEOUT_MS",
"USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE",
"WAIT_FOR_COMPLETION",
"WAREHOUSE_SIZE",
]
Expand Down Expand Up @@ -449,6 +451,7 @@ class StatementSegment(ansi_dialect.get_segment("StatementSegment")): # type: i
insert=[
Ref("UseStatementSegment"),
Ref("CreateStatementSegment"),
Ref("CreateTaskSegment"),
Ref("CreateCloneStatementSegment"),
Ref("ShowStatementSegment"),
Ref("AlterUserSegment"),
Expand Down Expand Up @@ -1368,6 +1371,93 @@ class CreateTableStatementSegment(BaseSegment):
)


@snowflake_dialect.segment()
class CreateTaskSegment(BaseSegment):
"""A snowflake `CREATE TASK` statement.
https://docs.snowflake.com/en/sql-reference/sql/create-task.html
"""

type = "create_task_statement"

match_grammar = Sequence(
"CREATE",
Sequence("OR", "REPLACE", optional=True),
"TASK",
Sequence("IF", "NOT", "EXISTS", optional=True),
Ref("ObjectReferenceSegment"),
Indent,
Sequence(
"WAREHOUSE",
Ref("EqualsSegment"),
Ref("ObjectReferenceSegment"),
optional=True,
),
Sequence(
"SCHEDULE",
Ref("EqualsSegment"),
Ref("QuotedLiteralSegment"),
optional=True,
),
Sequence(
"ALLOW_OVERLAPPING_EXECUTION",
Ref("EqualsSegment"),
Ref("BooleanLiteralGrammar"),
optional=True,
),
Delimited(
Sequence(
Ref("ParameterNameSegment"),
Ref("EqualsSegment"),
OneOf(
Ref("BooleanLiteralGrammar"),
Ref("QuotedLiteralSegment"),
Ref("NumericLiteralSegment"),
),
),
delimiter=Ref("CommaSegment"),
optional=True,
),
Sequence(
"USER_TASK_TIMEOUT_MS",
Ref("EqualsSegment"),
Ref("NumericLiteralSegment"),
optional=True,
),
Sequence(
"USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE",
Ref("EqualsSegment"),
Ref("QuotedLiteralSegment"),
optional=True,
),
Sequence(
"COPY",
"GRANTS",
optional=True,
),
Ref("CreateStatementCommentSegment", optional=True),
Sequence(
"AFTER",
Ref("ObjectReferenceSegment"),
optional=True,
),
Dedent,
Sequence(
"WHEN",
Indent,
Ref("ExpressionSegment"),
Dedent,
optional=True
),
Sequence(
Ref.keyword("AS"),
Indent,
Ref("StatementSegment"),
Dedent,
),
)


@snowflake_dialect.segment()
class CreateStatementSegment(BaseSegment):
"""A snowflake `CREATE` statement.
Expand Down Expand Up @@ -1405,7 +1495,6 @@ class CreateStatementSegment(BaseSegment):
Sequence("FILE", "FORMAT"),
"STAGE",
"STREAM",
"TASK",
),
Sequence("IF", "NOT", "EXISTS", optional=True),
Ref("ObjectReferenceSegment"),
Expand Down
86 changes: 86 additions & 0 deletions test/fixtures/dialects/snowflake/snowflake_create_task.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
-- Examples from the documentation

CREATE TASK t1
SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
TIMESTAMP_INPUT_FORMAT = 'YYYY-MM-DD HH24'
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
AS
INSERT INTO mytable(ts) VALUES(1);

CREATE TASK mytask_hour
WAREHOUSE = mywh
SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
TIMESTAMP_INPUT_FORMAT = 'YYYY-MM-DD HH24'
AS
INSERT INTO mytable(ts) VALUES(1, 2, 3);

-- All possible optional clauses
CREATE OR REPLACE TASK IF NOT EXISTS t1
WAREHOUSE = mywh
SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
ALLOW_OVERLAPPING_EXECUTION = TRUE
TIMESTAMP_INPUT_FORMAT = 'YYYY-MM-DD HH24'
USER_TASK_TIMEOUT_MS = 25
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
COPY GRANTS
COMMENT = 'Hello world'
AFTER dependency_task
AS
INSERT INTO mytable(ts) VALUES(1);

-- Only mandatory clauses
CREATE TASK t1
AS
INSERT INTO mytable(ts) VALUES(1);

-- Real life examples
CREATE OR REPLACE TASK insert_session
WAREHOUSE = eng_wh
SCHEDULE = 'USING CRON 45 6 * * * UTC'
AS
INSERT INTO sch.s_session
SELECT
*,
sum(break) OVER (PARTITION BY serial ORDER BY datetime) AS session_id
FROM
(
SELECT *
FROM base_table
)
;


CREATE OR REPLACE TASK update_session
WAREHOUSE = eng_wh
AFTER insert_session
AS
UPDATE sch.s_session
SET lag_datetime = v.lag_datetime, row_number = v.row_number
FROM
(
SELECT
*,
(
sum(break) OVER (PARTITION BY serial ORDER BY datetime)
) AS session_id
FROM
(
SELECT *
FROM derived_table
)
ORDER BY serial, datetime
) AS v
WHERE sch.s_session.event_id = v.event_id
;

CREATE OR REPLACE TASK sch.truncate_session
WAREHOUSE = eng_wh
AFTER sch.update_session
AS
CALL sch.session_agg_insert();

CREATE OR REPLACE TASK insert__agg
WAREHOUSE = eng_wh
SCHEDULE = 'USING CRON 15 7 2 * * UTC'
AS
CALL auto_device_insert();

0 comments on commit 66421a9

Please sign in to comment.