In [69]:
from typing import Any, Dict, Union
import pandas as pd
import logging
import re
import sqlfluff
import requests
import ruamel.yaml
import psycopg2
from bs4 import BeautifulSoup
from markdown import markdown
import datetime
from urllib.parse import unquote
from typing import Any, Dict, Iterator, List, Union
import json
from pgsanity.pgsanity import check_string
import os
from dbt.cli.main import dbtRunner, dbtRunnerResult
import smtplib
import ssl
from itertools import compress
import numpy as np
from pathlib import Path
from jinja2 import Template
with open(DBT_PROJECT_DIR + "create_model.txt", "r") as f:
    MODEL_TEMPLATE = Template(f.read())

In [63]:
MATERIALIZATION_MAPPING = {1: "table", 2: "view", 3: "incremental", 4: "ephemereal"}
SUPERSET_USERNAME = "superset"
SUPERSET_PASSWORD = "superset"
SUPERSET_HOST = "http://34.82.185.252:30007/"
DATABASE_USERNAME = "fdp"
DATABASE_PASSWORD = "fdp"
DATABASE_HOST = "34.82.185.252"
DATABASE_PORT = 30005
DATABASE_NAME = "financial_data"
QUERY_SCHEMA="financial_query"
QUERY_TABLE="query"
MANIFEST_PATH="/home/vu/Desktop/Projects/Thesis/financial/dbt/target/manifest.json"
EMAIL_PORT = 465
SMTP = "smtp.gmail.com"
EMAIL_SENDER = "catvu113@gmail.com"
EMAIL_PASSWORD = "xhtzakhmnsbufufy"
USER_MODEL_PATH = "/home/vu/Desktop/Projects/Thesis/financial/dbt/models/user"
DBT_PROJECT_DIR = "/home/vu/Desktop/Projects/Thesis/financial/dbt/"
DATABASE_ID = 1
SUPERSET_ID = 34
USER_SCHEMA = "user"
SERVING_SCHEMA="marts"
context = ssl.create_default_context()
SMTP = smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context)
SST_DATABASE_NAME = "FDP Reader"

In [4]:
class SupersetDBTSessionConnector:
    """A class for accessing the Superset API in an easy way."""

    def __init__(self):
        """Instantiates the class.

        ''access_token'' will be instantiated via enviromental variable
        If ``access_token`` is None, attempts to obtain it using ``refresh_token``.

        Args:
            api_url: Base API URL of a Superset instance, e.g. https://my-superset/api/v1.
            access_token: Access token to use for accessing protected endpoints of the Superset
                API. Can be automatically obtained if ``refresh_token`` is not None.
            refresh_token: Refresh token to use for obtaining or refreshing the ``access_token``.
                If None, no refresh will be done.
        """
        self.__url = SUPERSET_HOST
        self.__api_url = self.__url + "api/v1/"

        self.__session = requests.session()

        self.__username = SUPERSET_USERNAME
        self.__password = SUPERSET_PASSWORD

        self.__refresh_session()

    def __refresh_session(self):
        logging.info("Refreshing session")

        soup = BeautifulSoup(self.__session.post(self.__url + "login").text, "html.parser")
        self.__csrf_token = soup.find("input", {"id": "csrf_token"})["value"]  # type: ignore

        data = {
            "username": self.__username,
            "password": self.__password,
            "provider": "db",
            "refresh": True,
        }
        headers = {
            # 'Authorization': 'Bearer {}'.format(self.____access_token),
            "x-csrftoken": self.__csrf_token,
        }
        response = self.__session.post(self.__url + "login", json=data, headers=headers)  # type: ignore
        return True

    def request(self, method, endpoint, **request_kwargs):
        """Executes a request against the Superset API.

        Args:
            method: HTTP method to use.
            endpoint: Endpoint to use.
            **request_kwargs: Any ``requests.request`` arguments to use.

        Returns:
            A dictionary containing response body parsed from JSON.

        Raises:
            HTTPError: There is an HTTP error (detected by ``requests.Response.raise_for_status``)
                even after retrying with a fresh session.
        """

        logging.info("About to %s execute request for endpoint %s", method, endpoint)

        url = self.__api_url + endpoint
        csrf_headers = {
            # 'Authorization': 'Bearer {}'.format(self.__access_token),
            "x-csrftoken": self.__csrf_token,
        }

        res = self.__session.request(method, url, headers=csrf_headers, **request_kwargs)  # type: ignore

        logging.info("Request finished with status: %d", res.status_code)

        if res.status_code == 401 and res.json().get("msg") == "Token has expired" and self.__refresh_session():
            logging.info(f"Retrying {method} request for {url} %s with refreshed session")
            res = self.__session.request(method, url, headers=csrf_headers, **request_kwargs)  # type: ignore

            logging.info("Request finished with status: %d", res.status_code)

        if (
            res.status_code == 400
            and res.json()["message"] == "400 Bad Request: The CSRF session token is missing."
            and self.__refresh_session()
        ):
            logging.info(f"Retrying {method} request for {url} %s with refreshed session")
            res = self.__session.request(method, url, headers=csrf_headers, **request_kwargs)  # type: ignore
            logging.info(f"Request finished with status: {res.status_code}")
        res.raise_for_status()
        return res.json()

In [53]:
superset = SupersetDBTSessionConnector()

if Path(MANIFEST_PATH).is_file():
    with open(MANIFEST_PATH) as f:
        dbt_manifest = json.load(f)
else:
    raise Exception("No manifest found at path")

dbt_tables = get_tables_from_dbt(dbt_manifest, None)
serving_dbt_models = [
    dbt_tables[table] for table in dbt_tables if dbt_tables[table]["schema"] in (SERVING_SCHEMA, USER_SCHEMA)
]
datasources = []
for model in serving_dbt_models:
    datasources.append(
        {
            "database_name": SST_DATABASE_NAME,
            "datasource_name": model["name"],
            "datasource_type": "table",
            "schema": model["schema"],
        }
    )
datasources = {"datasources": datasources}
superset.request("POST", "cachekey/invalidate", json=datasources)


{}

In [70]:
df, succeeded = get_records()

# if df.empty:
#     logging.info("Early stopping because no records")
#     return "Early stopping because no records"

for filename in os.listdir(USER_MODEL_PATH):
    # If file is not present in list
    if filename not in succeeded:
        # Get full path of file and remove it
        full_file_path = os.path.join(USER_MODEL_PATH, filename)
        if os.path.isfile(full_file_path):
            os.remove(full_file_path)

dbt = dbtRunner()
cli_args = [
    "parse",
    "--project-dir",
    DBT_PROJECT_DIR,
]
res: dbtRunnerResult = dbt.invoke(cli_args)
if not res.success:
    raise Exception("Unable to parse project.")
# Get dagster execution time, see: https://stackoverflow.com/questions/75099470/getting-current-execution-date-in-a-task-or-asset-in-dagster
EXEC_TIME = datetime.datetime.today().strftime("%d/%m/%Y_%H:%M:%S")
# raise Exception(DBT_PROJECT_DIR, MANIFEST_PATH, USER_MODEL_PATH)
# Get all schema names in project
# Either this or defined schema name available to the user before
with open('target/manifest.json') as f:
    dbt_manifest = json.load(f)
    dbt_tables = get_tables_from_dbt(dbt_manifest, None)

# Getting the dbt tables keys
dbt_tables_names = list(dbt_tables.keys())
mapped = map(lambda x: x.startswith((SERVING_SCHEMA, USER_SCHEMA)), dbt_tables_names)
mask = list(mapped)

dbt_tables_reporting = list(compress(dbt_tables_names, mask))

dbt_names_aliases = [dbt_tables[table]["name"] for table in dbt_tables] + [
    dbt_tables[table]["alias"] for table in dbt_tables
]  # Name and aliases wo schema

status = {}  # Status of preliminary checking
for i in df.index:
    # Check name validity
    name_validation = is_valid_table_name(df.loc[i]["name"])
    if not name_validation:
        status[i] = "Invalid name"
        df.loc[i, "success"] = False
        continue
    name_unique = is_unique_table_name(df.loc[i]["name"], dbt_names_aliases)  # check aliases and name
    if not name_unique:
        status[i] = "Model name is duplicated with another existing model"
        df.loc[i, "success"] = False
        continue
    # Check syntax
    query_string = df.loc[i]["query_string"]
    query_string = query_string + ";" if query_string[-1] != ";" else query_string
    validation = check_string(query_string)
    if not validation[0]:
        df.loc[i, "success"] = False
        status[i] = "Invalid query: {error}".format(error=validation[1])
        continue
    # Check multi-query
    parsed = sqlfluff.parse(query_string, "postgres")["file"]
    if type(parsed) == list:
        df.loc[i, "success"] = False
        status[i] = "Multiple statement"
        continue
    # Check select statements
    # if list(statement_list[0]["statement"].keys())[0] != "select_statement":
    #     df.loc[i, "success"] = False
    #     status[i] = ("Query is not 'SELECT'")
    #     continue
    # Check tables and add model ref
    ref_tables, processed_status = get_ref(df.loc[i, "query_string"], dbt_tables, parsed, dbt_tables_reporting)
    if processed_status != "Success":
        df.loc[i, "success"] = False
        status[i] = processed_status
        continue
    model_path = USER_MODEL_PATH + "/{name}.sql".format(name=df.loc[i, "name"])
    if os.path.exists(model_path):
        status[i] = "Model name is duplicated with another in processing batch"
        df.loc[i, "success"] = False
        continue
    with open(model_path, "w+") as f:
        template_output = MODEL_TEMPLATE.render(
            materialization=MATERIALIZATION_MAPPING[df.loc[i, "materialization"]],
            desc=df.loc[i, "description"],
            user_id=str(df.loc[i, "user_id"]),
            exec_time=EXEC_TIME,
            schema=USER_SCHEMA,
            refs=ref_tables,
            query=df.loc[i, "query_string"],
        )
        f.write(template_output)
        logging.info("Wrote model {name} contents".format(name=df.loc[i, "name"]))
        f.close()
    status[i] = "Success"
# Get Emails from API
superset = SupersetDBTSessionConnector()
users = set(df["user_id"].to_list())
email_list = get_emails(superset, users)
email_dict = {key: value for element_dict in email_list for key, value in element_dict.items()}

SMTP.login(EMAIL_SENDER, EMAIL_PASSWORD)

for i in df.index:
    # Check Success
    if df.loc[i, "success"] == False:
        message = get_mail_content(df.loc[i, "name"], df.loc[i, "query_string"], status[i])
        # Add checked
        df.loc[i, "checked"] = True
        SMTP.sendmail(EMAIL_SENDER, email_dict[str(df.loc[i, "user_id"])], message)

# If every record is unsuccesful, terminate script early
# if not df["success"].any():
#     update_records(df)
#     logging.info("Early stopping because no successful records")
#     return "Early stopping because no successful records"

sst_datasets = get_physical_datasets_from_superset(superset, DATABASE_ID)
sst_user_tables = [table["name"] for table in sst_datasets if table["schema"] == USER_SCHEMA]

# initialize
dbt = dbtRunner()

# create CLI args as a list of strings
cli_args = [
    "run",
    "--project-dir",
    DBT_PROJECT_DIR,
    "--select",
    "tag:{exec_time}".format(exec_time=EXEC_TIME),
]

# run the command
res: dbtRunnerResult = dbt.invoke(cli_args)

# inspect the results
for r in res.result:
    logging.info(f"dbt run result: {r.node.name}: {r.status}")
# Map df index to result
dbt_res_df_map = {}

for i in df.index:
    for r in res.result:
        if r.node.name == df.loc[i, "name"]:
            dbt_res_df_map[i] = r
            break

for i in df.index:
    # Check Success
    if df.loc[i, "success"] is not False:
        if dbt_res_df_map[i].status == "success" and df.loc[i, "name"] not in sst_user_tables:
            df.loc[i, "success"] = True
            rison_request = "/dataset/"
            # Data to be written
            dictionary = {
                # Parameter database
                "database": DATABASE_ID,
                "schema": USER_SCHEMA,
                "table_name": df.loc[i, "name"],
                "owners": [int(df.loc[i, "user_id"]), SUPERSET_ID],
            }
            # Serializing json
            json_object = json.dumps(dictionary)
            response = superset.request("POST", rison_request, json=dictionary)

            message = get_mail_content(df.loc[i, "name"], df.loc[i, "query_string"], "dbt success")

        else:
            df.loc[i, "success"] = False
            message = get_mail_content(
                df.loc[i, "name"], df.loc[i, "query_string"], "dbt fail", dbt_res_df_map[i].message
            )
        # Add checked
        df.loc[i, "checked"] = True

        SMTP.sendmail(EMAIL_SENDER, email_dict[str(df.loc[i, "user_id"])], message)

SMTP.quit()
# Delete unsucessful model
for i in df.index:
    # Check Success
    if not df.loc[i, "success"]:
        full_file_path = os.path.join(USER_MODEL_PATH, "{name}.sql".format(name=df.loc[i, "name"]))
        if os.path.isfile(full_file_path):
            os.remove(full_file_path)

update_records(df)

DEBUG:file_log:[0m18:35:01.230275 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f056c6d88e0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f056c86b1c0>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f056392d2a0>]}


[0m11:35:01  Running with dbt=1.5.1


INFO:stdout_log:[0m11:35:01  Running with dbt=1.5.1
INFO:file_log:

[0m18:35:01.232916 [info ] [MainThread]: Running with dbt=1.5.1
DEBUG:file_log:[0m18:35:01.240733 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/home/vu/.dbt', 'fail_fast': 'False', 'version_check': 'True', 'log_path': '/home/vu/Desktop/Projects/Thesis/financial/dbt/logs', 'debug': 'False', 'warn_error': 'None', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'log_format': 'default', 'introspect': 'True', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'static_parser': 'True', 'target_path': 'None', 'send_anonymous_usage_stats': 'True'}
DEBUG:file_log:[0m18:35:01.284703 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'project_id', 'label': 'f61cc454-61

[0m11:35:01  Performance info: target/perf_info.json


INFO:stdout_log:[0m11:35:01  Performance info: target/perf_info.json
INFO:file_log:[0m18:35:01.640745 [info ] [MainThread]: Performance info: target/perf_info.json
DEBUG:file_log:[0m18:35:01.943399 [debug] [MainThread]: Command `cli parse` succeeded at 18:35:01.942995 after 0.72 seconds
DEBUG:file_log:[0m18:35:01.945565 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f056c6f7880>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f05637df190>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0563d58160>]}
DEBUG:file_log:[0m18:35:01.948197 [debug] [MainThread]: Flushing usage events
DEBUG:file_log:[0m18:35:09.353751 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f056c9ed6c0>,

[0m11:35:09  Running with dbt=1.5.1


INFO:stdout_log:[0m11:35:09  Running with dbt=1.5.1
INFO:file_log:

[0m18:35:09.356057 [info ] [MainThread]: Running with dbt=1.5.1
DEBUG:file_log:[0m18:35:09.360814 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'profiles_dir': '/home/vu/.dbt', 'fail_fast': 'False', 'version_check': 'True', 'log_path': '/home/vu/Desktop/Projects/Thesis/financial/dbt/logs', 'debug': 'False', 'warn_error': 'None', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'log_format': 'default', 'introspect': 'True', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'static_parser': 'True', 'target_path': 'None', 'send_anonymous_usage_stats': 'True'}
DEBUG:file_log:[0m18:35:09.405928 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'project_id', 'label': 'b9056fb1-78

[0m11:35:09  Found 32 models, 58 tests, 6 snapshots, 0 analyses, 760 macros, 0 operations, 0 seed files, 12 sources, 1 exposure, 0 metrics, 0 groups


INFO:stdout_log:[0m11:35:09  Found 32 models, 58 tests, 6 snapshots, 0 analyses, 760 macros, 0 operations, 0 seed files, 12 sources, 1 exposure, 0 metrics, 0 groups
INFO:file_log:[0m18:35:09.852087 [info ] [MainThread]: Found 32 models, 58 tests, 6 snapshots, 0 analyses, 760 macros, 0 operations, 0 seed files, 12 sources, 1 exposure, 0 metrics, 0 groups
DEBUG:file_log:[0m18:35:09.857068 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': 'b9056fb1-785d-43c0-a145-a3c42b9f014f', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f05a03cb580>]}


[0m11:35:09  


INFO:stdout_log:[0m11:35:09  
INFO:file_log:[0m18:35:09.863250 [info ] [MainThread]: 
DEBUG:file_log:[0m18:35:09.868617 [debug] [MainThread]: Acquiring new postgres connection 'master'
DEBUG:file_log:[0m18:35:09.872451 [debug] [ThreadPool]: Acquiring new postgres connection 'list_financial_data'
DEBUG:file_log:[0m18:35:09.902382 [debug] [ThreadPool]: Using postgres connection "list_financial_data"
DEBUG:file_log:[0m18:35:09.910240 [debug] [ThreadPool]: On list_financial_data: /* {"app": "dbt", "dbt_version": "1.5.1", "profile_name": "dbt_financial", "target_name": "dev_cloud", "connection_name": "list_financial_data"} */

    select distinct nspname from pg_namespace
  
DEBUG:file_log:[0m18:35:09.914329 [debug] [ThreadPool]: Opening a new connection, currently in state init
DEBUG:file_log:[0m18:35:12.153724 [debug] [ThreadPool]: SQL status: SELECT 23 in 2.0 seconds
DEBUG:file_log:[0m18:35:12.157800 [debug] [ThreadPool]: On list_financial_data: Close
DEBUG:file_log:[0m18:35:12

[0m11:35:20  Concurrency: 4 threads (target='dev_cloud')


INFO:stdout_log:[0m11:35:20  Concurrency: 4 threads (target='dev_cloud')
INFO:file_log:[0m18:35:20.690206 [info ] [MainThread]: Concurrency: 4 threads (target='dev_cloud')


[0m11:35:20  


INFO:stdout_log:[0m11:35:20  
INFO:file_log:[0m18:35:20.694394 [info ] [MainThread]: 
DEBUG:file_log:[0m18:35:20.707133 [debug] [Thread-6 (]: Began running node model.dbt_financial.sda


[0m11:35:20  1 of 1 START sql table model user.sda .......................................... [RUN]


INFO:stdout_log:[0m11:35:20  1 of 1 START sql table model user.sda .......................................... [RUN]
INFO:file_log:[0m18:35:20.709786 [info ] [Thread-6 (]: 1 of 1 START sql table model user.sda .......................................... [RUN]
DEBUG:file_log:[0m18:35:20.717329 [debug] [Thread-6 (]: Re-using an available connection from the pool (formerly list_financial_data_snapshots, now model.dbt_financial.sda)
DEBUG:file_log:[0m18:35:20.719512 [debug] [Thread-6 (]: Began compiling node model.dbt_financial.sda
DEBUG:file_log:[0m18:35:20.729019 [debug] [Thread-6 (]: Writing injected SQL for node "model.dbt_financial.sda"
DEBUG:file_log:[0m18:35:20.733397 [debug] [Thread-6 (]: Timing info for model.dbt_financial.sda (compile): 18:35:20.721290 => 18:35:20.732806
DEBUG:file_log:[0m18:35:20.735772 [debug] [Thread-6 (]: Began executing node model.dbt_financial.sda
DEBUG:file_log:[0m18:35:20.831073 [debug] [Thread-6 (]: Writing runtime sql for node "model.dbt_financial

[0m11:35:24  1 of 1 OK created sql table model user.sda ..................................... [[32mSELECT 5[0m in 3.99s]


INFO:stdout_log:[0m11:35:24  1 of 1 OK created sql table model user.sda ..................................... [[32mSELECT 5[0m in 3.99s]
INFO:file_log:[0m18:35:24.712478 [info ] [Thread-6 (]: 1 of 1 OK created sql table model user.sda ..................................... [[32mSELECT 5[0m in 3.99s]
DEBUG:file_log:[0m18:35:24.718212 [debug] [Thread-6 (]: Finished running node model.dbt_financial.sda
DEBUG:file_log:[0m18:35:24.723510 [debug] [MainThread]: Using postgres connection "master"
DEBUG:file_log:[0m18:35:24.725744 [debug] [MainThread]: On master: BEGIN
DEBUG:file_log:[0m18:35:24.728261 [debug] [MainThread]: Opening a new connection, currently in state closed
DEBUG:file_log:[0m18:35:27.006083 [debug] [MainThread]: SQL status: BEGIN in 2.0 seconds
DEBUG:file_log:[0m18:35:27.008207 [debug] [MainThread]: On master: COMMIT
DEBUG:file_log:[0m18:35:27.010247 [debug] [MainThread]: Using postgres connection "master"
DEBUG:file_log:[0m18:35:27.012387 [debug] [MainThread]: On

[0m11:35:27  


INFO:stdout_log:[0m11:35:27  
INFO:file_log:[0m18:35:27.331547 [info ] [MainThread]: 


[0m11:35:27  Finished running 1 table model in 0 hours 0 minutes and 17.46 seconds (17.46s).


INFO:stdout_log:[0m11:35:27  Finished running 1 table model in 0 hours 0 minutes and 17.46 seconds (17.46s).
INFO:file_log:[0m18:35:27.343097 [info ] [MainThread]: Finished running 1 table model in 0 hours 0 minutes and 17.46 seconds (17.46s).
DEBUG:file_log:[0m18:35:27.350104 [debug] [MainThread]: Command end result


[0m11:35:27  


INFO:stdout_log:[0m11:35:27  
INFO:file_log:[0m18:35:27.411133 [info ] [MainThread]: 


[0m11:35:27  [32mCompleted successfully[0m


INFO:stdout_log:[0m11:35:27  [32mCompleted successfully[0m
INFO:file_log:[0m18:35:27.416686 [info ] [MainThread]: [32mCompleted successfully[0m


[0m11:35:27  


INFO:stdout_log:[0m11:35:27  
INFO:file_log:[0m18:35:27.422582 [info ] [MainThread]: 


[0m11:35:27  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1


INFO:stdout_log:[0m11:35:27  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
INFO:file_log:[0m18:35:27.429810 [info ] [MainThread]: Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
DEBUG:file_log:[0m18:35:27.436760 [debug] [MainThread]: Command `cli run` succeeded at 18:35:27.436365 after 18.09 seconds
DEBUG:file_log:[0m18:35:27.438982 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0562330610>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f0561e63730>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x7f05a03c8d00>]}
DEBUG:file_log:[0m18:35:27.441645 [debug] [MainThread]: Flushing usage events


In [71]:

update_records(df)

In [66]:
sst_datasets

[{'id': 126,
  'name': 'dim_organization',
  'schema': 'marts',
  'database': 'FDP Reader',
  'dataset_id': 126,
  'key': 'marts.dim_organization',
  'table': ['marts.dim_organization']},
 {'id': 160,
  'name': 'fact_price_history',
  'schema': 'marts',
  'database': 'FDP Reader',
  'dataset_id': 160,
  'key': 'marts.fact_price_history',
  'table': ['marts.fact_price_history']},
 {'id': 164,
  'name': 'fact_bollinger',
  'schema': 'marts',
  'database': 'FDP Reader',
  'dataset_id': 164,
  'key': 'marts.fact_bollinger',
  'table': ['marts.fact_bollinger']},
 {'id': 165,
  'name': 'fact_industry_health_rating',
  'schema': 'marts',
  'database': 'FDP Reader',
  'dataset_id': 165,
  'key': 'marts.fact_industry_health_rating',
  'table': ['marts.fact_industry_health_rating']},
 {'id': 166,
  'name': 'fact_cash_flow',
  'schema': 'marts',
  'database': 'FDP Reader',
  'dataset_id': 166,
  'key': 'marts.fact_cash_flow',
  'table': ['marts.fact_cash_flow']},
 {'id': 167,
  'name': 'fact_valu

In [36]:
a = ['marts.fact_bollinger', 'marts.fact_bov', 'marts.fact_mfi', 'marts.fact_business_operation_rating', 'marts.fact_valuation_rating', 'marts.fact_business_model_rating', 'marts.fact_industry_health_rating', 'marts.fact_general_rating', 'marts.fact_financial_health_rating', 'marts.fact_price_history', 'marts.fact_cash_flow', 'marts.fact_stock_intraday', 'marts.fact_balance_sheet', 'marts.fact_income_statement', 'marts.dim_organization']


In [32]:
get_tables_from_sql("select * from marts.balance_sheet", "postgres")

['marts.balance_sheet']

In [49]:
original_query="select * from marts.dim_balance_sheet"
parsed_result = sqlfluff.parse(original_query)
dbt_tables_names = list(dbt_tables.keys())
mapped = map(lambda x: x.startswith((SERVING_SCHEMA, USER_SCHEMA)), dbt_tables_names)
mask = list(mapped)

serving_tables_names = list(compress(dbt_tables_names, mask))

In [35]:
set(['marts.balance_sheet']).difference(a)

{'marts.balance_sheet'}

In [50]:
fixed_query = str(original_query)
table_names = set(get_tables_from_sql(fixed_query, dialect="postgres", sql_parsed=parsed_result))
fixed_query = sqlfluff.fix(fixed_query, dialect="postgres")
dbt_set = set(serving_tables_names)
if not table_names.issubset(dbt_set):  # serving_tables_names include schema
    print("Tables referenced out of serving schemas")
# Put tables in subqueries
final_tables = tuple(table_names.intersection(serving_tables_names))  # Filter out


Tables referenced out of serving schemas


In [51]:
table_names

{'marts.dim_balance_sheet'}

In [52]:
dbt_set

{'marts.dim_organization',
 'marts.fact_balance_sheet',
 'marts.fact_bollinger',
 'marts.fact_bov',
 'marts.fact_business_model_rating',
 'marts.fact_business_operation_rating',
 'marts.fact_cash_flow',
 'marts.fact_financial_health_rating',
 'marts.fact_general_rating',
 'marts.fact_income_statement',
 'marts.fact_industry_health_rating',
 'marts.fact_mfi',
 'marts.fact_price_history',
 'marts.fact_stock_intraday',
 'marts.fact_valuation_rating'}

In [46]:
tables_raw = list(get_json_segment(parsed_result, "table_reference"))  # type: ignore
tables_cleaned = []  # With schema
for table_ref in tables_raw:
    if isinstance(table_ref, list):
        table_ref_identifier = []
        # Get last 2 "naked_identifier"
        for dictionary in table_ref[::-1]:
            if "naked_identifier" in dictionary:
                table_ref_identifier.append(dictionary["naked_identifier"])
                if len(table_ref_identifier) == 2:
                    tables_cleaned.append(".".join(table_ref_identifier[::-1]))
                    break
    if isinstance(table_ref, dict):
        tables_cleaned.append(table_ref["naked_identifier"])

In [47]:
tables_raw

[[{'naked_identifier': 'marts'},
  {'dot': '.'},
  {'naked_identifier': 'balance_sheet'}]]

In [24]:
df

(   id name                           query_string  user_id  materialization  \
 0  39    a  SELECT * from marts.dim_balance_sheet        1                1   
 
   description                insert_time  checked success  
 0             2023-07-30 08:43:18.845008    False    None  ,
 [])

In [27]:
entries_to_update=str(tuple(zip(df.checked, df.success, df.id))).replace("None", "Null")[1:-1]

In [28]:
if entries_to_update[-1]==",": entries_to_update=entries_to_update[:-1]

In [29]:
entries_to_update

'(False, Null, 39)'

In [5]:
dbt = dbtRunner()
cli_args = [
    "parse",
    "--project-dir",
    DBT_PROJECT_DIR,
]
res: dbtRunnerResult = dbt.invoke(cli_args)

[0m08:47:07  Running with dbt=1.5.1


[0m08:47:08  Unable to do partial parsing because profile has changed
[0m08:47:13  Performance info: target/perf_info.json


In [6]:
import os
current_path = os.getcwd()

In [7]:
current_path

'/home/vu/Desktop/Projects/Thesis/financial/dagster/financial/superset_utils'

In [72]:
res.success

True

In [None]:
force": true,
  "thumb_size": [
    0
  ],
  "window_size": [
    0
  ]

In [None]:
superset.request("GET",f'chart/1/cache_screenshot/?q={{"page":{page_number},"page_size":100}}')

In [51]:
superset = SupersetDBTSessionConnector()
datasources = {
    "datasources": [
        {
            "database_name": "FDP Reader",
            "datasource_name": "marts.fact_price_history",
            "datasource_type": "table",
            "schema": "marts",
        }
    ]
}
superset.request("POST", "cachekey/invalidate", json=datasources)


{}

In [20]:




#### ADD ALL OF THE END POINT USED TO CSRF EXEMPT LIST TO RUN PARALLELY
#### ONLY USE SESSION FOR SEQUENTIAL RUNNING SCRIPTS


class SupersetDBTSessionConnector:
    """A class for accessing the Superset API in an easy way."""

    def __init__(self):
        """Instantiates the class.

        ''access_token'' will be instantiated via enviromental variable
        If ``access_token`` is None, attempts to obtain it using ``refresh_token``.

        Args:
            api_url: Base API URL of a Superset instance, e.g. https://my-superset/api/v1.
            access_token: Access token to use for accessing protected endpoints of the Superset
                API. Can be automatically obtained if ``refresh_token`` is not None.
            refresh_token: Refresh token to use for obtaining or refreshing the ``access_token``.
                If None, no refresh will be done.
        """
        self.__url = SUPERSET_HOST
        self.__api_url = self.__url + "api/v1/"

        self.__session = requests.session()

        self.__username = SUPERSET_USERNAME
        self.__password = SUPERSET_PASSWORD

        self.__refresh_session()

    def __refresh_session(self):
        logging.info("Refreshing session")

        soup = BeautifulSoup(self.__session.post(self.__url + "login").text, "html.parser")
        self.__csrf_token = soup.find("input", {"id": "csrf_token"})["value"]  # type: ignore

        data = {
            "username": self.__username,
            "password": self.__password,
            "provider": "db",
            "refresh": True,
        }
        headers = {
            # 'Authorization': 'Bearer {}'.format(self.____access_token),
            "x-csrftoken": self.__csrf_token,
        }
        response = self.__session.post(self.__url + "login", json=data, headers=headers)  # type: ignore
        return True

    def request(self, method, endpoint, **request_kwargs):
        """Executes a request against the Superset API.

        Args:
            method: HTTP method to use.
            endpoint: Endpoint to use.
            **request_kwargs: Any ``requests.request`` arguments to use.

        Returns:
            A dictionary containing response body parsed from JSON.

        Raises:
            HTTPError: There is an HTTP error (detected by ``requests.Response.raise_for_status``)
                even after retrying with a fresh session.
        """

        logging.info("About to %s execute request for endpoint %s", method, endpoint)

        url = self.__api_url + endpoint
        csrf_headers = {
            # 'Authorization': 'Bearer {}'.format(self.__access_token),
            "x-csrftoken": self.__csrf_token,
        }

        res = self.__session.request(method, url, headers=csrf_headers, **request_kwargs)  # type: ignore

        logging.info("Request finished with status: %d", res.status_code)

        if res.status_code == 401 and res.json().get("msg") == "Token has expired" and self.__refresh_session():
            logging.info(f"Retrying {method} request for {url} %s with refreshed session")
            res = self.__session.request(method, url, headers=csrf_headers, **request_kwargs)  # type: ignore

            logging.info("Request finished with status: %d", res.status_code)

        if (
            res.status_code == 400
            and res.json()["message"] == "400 Bad Request: The CSRF session token is missing."
            and self.__refresh_session()
        ):
            logging.info(f"Retrying {method} request for {url} %s with refreshed session")
            res = self.__session.request(method, url, headers=csrf_headers, **request_kwargs)  # type: ignore
            logging.info(f"Request finished with status: {res.status_code}")
        res.raise_for_status()
        return res.json()


def get_tables_from_dbt(dbt_manifest, dbt_db_name):
    tables = {}
    for table_type in ["nodes"]:
        manifest_subset = dbt_manifest[table_type]

        for table_key_long in manifest_subset:
            table = manifest_subset[table_key_long]
            name = table["name"]
            schema = table["schema"]
            database = table["database"]
            description = table["description"]
            alias = table["alias"]
            source = table["unique_id"].split(".")[-2]
            table_key = schema + "." + alias  # Key will be alias, not name
            columns = table["columns"]

            if dbt_db_name is None or database == dbt_db_name:
                # fail if it breaks uniqueness constraint
                assert table_key not in tables, (
                    f"Table {table_key} is a duplicate name (schema + table) across databases. "
                    "This would result in incorrect matching between Superset and dbt. "
                    "To fix this, remove duplicates or add ``dbt_db_name``."
                )
                tables[table_key] = {
                    "name": name,
                    "schema": schema,
                    "database": database,
                    "type": table_type[:-1],
                    "ref": f"ref('{name}')" if table_type == "nodes" else f"source('{source}', '{name}')",
                    "user": None,
                    "columns": columns,
                    "description": description,
                    "alias": alias,
                }
            if schema == "user":
                tables[table_key]["user"] = table["tags"][0]

    assert tables, "Manifest is empty!"

    return tables


def get_physical_datasets_from_superset(superset: SupersetDBTSessionConnector, superset_db_id):
    logging.info("Getting physical datasets from Superset.")
    page_number = 0
    datasets = []
    datasets_keys = set()
    while True:
        logging.info("Getting page %d.", page_number + 1)
        rison_request = f"dataset/?q=(page_size:100,page:{page_number},order_column:changed_on_delta_humanized,order_direction:asc,filters:!((col:table_name,opr:nct,value:archived),(col:sql,opr:dataset_is_null_or_empty,value:true)))"
        res = superset.request("GET", rison_request)
        result = res["result"]
        if result:
            for r in result:
                name = r["table_name"]
                schema = r["schema"]
                database_name = r["database"]["database_name"]
                dataset_id = r["id"]
                database_id = r["database"]["id"]
                dataset_key = f"{schema}.{name}"  # same format as in dashboards

                kind = r["kind"]
                if kind == "physical" and (superset_db_id is None or database_id == superset_db_id):
                    dataset_id = r["id"]

                    name = r["table_name"]
                    schema = r["schema"]
                    dataset_key = f"{schema}.{name}"  # used as unique identifier

                    dataset_dict = {
                        "id": dataset_id,
                        "name": name,
                        "schema": schema,
                        "database": database_name,
                        "dataset_id": dataset_id,
                        "key": dataset_key,
                        "table": [dataset_key],
                    }

                    # fail if it breaks uniqueness constraint
                    assert dataset_key not in datasets_keys, (
                        f"Dataset {dataset_key} is a duplicate name (schema + table) "
                        "across databases. "
                        "This would result in incorrect matching between Superset and dbt. "
                        "To fix this, remove duplicates or add the ``superset_db_id`` argument."
                    )

                    datasets_keys.add(dataset_key)
                    datasets.append(dataset_dict)

            page_number += 1
        else:
            break

    return datasets


def get_tables_from_sql_simple(sql):
    """
    (Superset) Fallback SQL parsing using regular expressions to get tables names.
    """
    sql = re.sub(r"(--.*)|(#.*)", "", sql)
    sql = re.sub(r"\s+", " ", sql).lower()
    sql = re.sub(r"(/\*(.|\n)*\*/)", "", sql)

    regex = re.compile(r"\b(from|join)\b\s+(\"?(\w+)\"?(\.))?\"?(\w+)\"?\b")
    tables_match = regex.findall(sql)
    tables = [
        table[2] + "." + table[4] if table[2] != "" else table[4] for table in tables_match if table[4] != "unnest"
    ]

    tables = list(set(tables))

    return tables


def get_tables_from_sql(sql, dialect, sql_parsed=None):
    """
    (Superset) SQL parsing using sqlfluff to get clean tables names.
    If sqlfluff parsing fails it runs the above regex parsing func.
    Returns a tables list.
    """
    try:
        if not sql_parsed:
            sql_parsed = sqlfluff.parse(sql, dialect=dialect)
        tables_raw = list(get_json_segment(sql_parsed, "table_reference"))  # type: ignore
        tables_cleaned = []  # With schema
        for table_ref in tables_raw:
            if isinstance(table_ref, list):
                table_ref_identifier = []
                # Get last 2 "naked_identifier"
                for dictionary in table_ref[::-1]:
                    if "naked_identifier" in dictionary:
                        table_ref_identifier.append(dictionary["naked_identifier"])
                        if len(table_ref_identifier) == 2:
                            tables_cleaned.append(".".join(table_ref_identifier[::-1]))
                            break
            if isinstance(table_ref, dict):
                tables_cleaned.append(table_ref["naked_identifier"])
    except (
        sqlfluff.core.errors.SQLParseError,  # type: ignore
        sqlfluff.core.errors.SQLLexError,  # type: ignore
        sqlfluff.core.errors.SQLFluffUserError,  # type: ignore
        sqlfluff.api.simple.APIParsingError,  # type: ignore
    ) as e:  # type: ignore
        logging.warning(
            "Parsing SQL through sqlfluff failed. "
            "Let me attempt this via regular expressions at least and "
            "check the problematic query and error below.\n%s",
            sql,
            exc_info=e,
        )
        tables_cleaned = get_tables_from_sql_simple(sql)

    return tables_cleaned


def get_json_segment(
    parse_result: Dict[str, Any], segment_type: str
) -> Iterator[Union[str, Dict[str, Any], List[Dict[str, Any]]]]:
    """Recursively search JSON parse result for specified segment type.

    Args:
        parse_result (Dict[str, Any]): JSON parse result from `sqlfluff.fix`.
        segment_type (str): The segment type to search for.

    Yields:
        Iterator[Union[str, Dict[str, Any], List[Dict[str, Any]]]]:
        Retrieves children of specified segment type as either a string for a raw
        segment or as JSON or an array of JSON for non-raw segments.
    """
    for k, v in parse_result.items():
        if k == segment_type:
            yield v
        elif isinstance(v, dict):
            yield from get_json_segment(v, segment_type)
        elif isinstance(v, list):
            for s in v:
                yield from get_json_segment(s, segment_type)


def get_dashboards_from_superset(superset: SupersetDBTSessionConnector, superset_db_id, user_id):
    """
    This function gets
    1. Get dashboards id list from Superset iterating on the pages of the url
    2. Get a dashboard detail information :
        title, owner, url, unique datasets names

    Returns dashboards, dashboards_datasets
    """

    logging.info("Getting published dashboards from Superset.")
    page_number = 0
    dashboards_id = []
    while True:
        logging.info("Getting page %d.", page_number + 1)
        res = superset.request("GET", f'/dashboard/?q={{"page":{page_number},"page_size":100}}')
        result = res["result"]
        if result:
            for r in result:
                if r["published"] and r["created_by"]["id"] == user_id:
                    dashboards_id.append(r["id"])
            page_number += 1
        else:
            break

    assert len(dashboards_id) > 0, "There are no dashboards in Superset!"

    logging.info("There are %d published dashboards in Superset.", len(dashboards_id))

    dashboards = []
    dashboards_datasets_w_db = set()
    for i, d in enumerate(dashboards_id):
        logging.info("Getting info for dashboard %d/%d.", i + 1, len(dashboards_id))
        res = superset.request("GET", f"/dashboard/{d}")
        result = res["result"]

        dashboard_id = result["id"]
        title = result["dashboard_title"]
        url = superset.url + "/superset/dashboard/" + str(dashboard_id)
        owner_name = result["owners"][0]["first_name"] + " " + result["owners"][0]["last_name"]

        # take unique dataset names, formatted as "[database].[schema].[table]" by Superset
        res_table_names = superset.request("GET", f"/dashboard/{d}/datasets")
        result_table_names = res_table_names["result"]

        testing = []
        for i in range(0, len(result_table_names)):
            testing.append(result_table_names[i]["name"])

        # datasets_raw = list(set(result['table_names'].split(', ')))
        datasets_raw = testing

        # parse dataset names into parts
        datasets_parsed = [dataset[1:-1].split("].[", maxsplit=2) for dataset in datasets_raw]
        datasets_parsed = [
            [dataset[0], "None", dataset[1]]  # add None in the middle
            if len(dataset) == 2
            else dataset  # if missing the schema
            for dataset in datasets_parsed
        ]

        # put them all back together to get "database.schema.table"
        datasets_w_db = [".".join(dataset) for dataset in datasets_parsed]

        dbt_project_name = "your_dbt_project."
        datasets_w_db = [dbt_project_name + sub for sub in testing]

        dashboards_datasets_w_db.update(datasets_w_db)

        # skip database, i.e. first item, to get only "schema.table"
        datasets_wo_db = [".".join(dataset[1:]) for dataset in datasets_parsed]

        datasets_wo_db = testing
        dashboard = {
            "id": dashboard_id,
            "title": title,
            "url": url,
            "owner_name": owner_name,
            "owner_email": "",  # required for dbt to accept owner_name but not in response
            "datasets": datasets_wo_db,  # add in "schema.table" format
        }
        dashboards.append(dashboard)
    # test if unique when database disregarded
    # loop to get the name of duplicated dataset and work with unique set of datasets w db
    dashboards_datasets = set()
    for dataset_w_db in dashboards_datasets_w_db:
        dataset = ".".join(dataset_w_db.split(".")[1:])  # similar logic as just a bit above

        # fail if it breaks uniqueness constraint and not limited to one database
        assert dataset not in dashboards_datasets or superset_db_id is not None, (
            f"Dataset {dataset} is a duplicate name (schema + table) across databases. "
            "This would result in incorrect matching between Superset and dbt. "
            "To fix this, remove duplicates or add ``superset_db_id``."
        )

        dashboards_datasets.add(dataset)

    return dashboards, dashboards_datasets


def get_datasets_from_superset_dbt_refs(
    superset: SupersetDBTSessionConnector, dashboards_datasets, dbt_tables, sql_dialect, superset_db_id
):
    """
    Returns datasets (dict) containing table info and dbt references
    """

    logging.info("Getting datasets info from Superset.")
    page_number = 0
    datasets = {}
    while True:
        logging.info("Getting page %d.", page_number + 1)
        res = superset.request("GET", f'/dataset/?q={{"page":{page_number},"page_size":100}}')
        result = res["result"]
        if result:
            for r in result:
                name = r["table_name"]
                schema = r["schema"]
                database_name = r["database"]["database_name"]
                database_id = r["database"]["id"]

                dataset_key = f"{schema}.{name}"  # same format as in dashboards

                # only add datasets that are in dashboards, optionally limit to one database
                if dataset_key in dashboards_datasets and (superset_db_id is None or database_id == superset_db_id):
                    kind = r["kind"]
                    if kind == "virtual":  # built on custom sql
                        sql = r["sql"]
                        tables = get_tables_from_sql(sql, sql_dialect)
                        tables = [table if "." in table else f"{schema}.{table}" for table in tables]
                    else:  # built on tables
                        tables = [dataset_key]
                    dbt_refs = [dbt_tables[table]["ref"] for table in tables if table in dbt_tables]

                    datasets[dataset_key] = {
                        "name": name,
                        "schema": schema,
                        "database": database_name,
                        "kind": kind,
                        "tables": tables,
                        "dbt_refs": dbt_refs,
                    }
            page_number += 1
        else:
            break

    return datasets


def refresh_columns_in_superset(superset: SupersetDBTSessionConnector, dataset_id):
    logging.info("Refreshing columns in Superset.")
    superset.request("PUT", f"/dataset/{dataset_id}/refresh")


def add_sst_dataset_metadata(superset: SupersetDBTSessionConnector, dataset_id, sst_dataset_key, dbt_tables):
    logging.info("Refreshing columns in Superset.")
    body = {
        "extra": '{"certification": \n  {"certified_by": "Data Analytics Team", \n   "details": "This table is the source of truth." \n    \n  }\n}',
        "description": dbt_tables[sst_dataset_key]["description"],
        "owners": [SUPERSET_ID],
    }
    if dbt_tables[sst_dataset_key]["user"]:
        body["owners"].append(dbt_tables[sst_dataset_key]["user"])
    superset.request("PUT", f"/dataset/{dataset_id}", json=body)


def add_superset_columns(superset: SupersetDBTSessionConnector, dataset):
    logging.info("Pulling fresh columns info from Superset.")
    res = superset.request("GET", f"/dataset/{dataset['id']}")
    columns = res["result"]["columns"]
    dataset["columns"] = columns
    return dataset


def convert_markdown_to_plain_text(md_string):
    """Converts a markdown string to plaintext.

    The following solution is used:
    https://gist.github.com/lorey/eb15a7f3338f959a78cc3661fbc255fe
    """

    # md -> html -> text since BeautifulSoup can extract text cleanly
    html = markdown(md_string)

    # remove code snippets
    html = re.sub(r"<pre>(.*?)</pre>", " ", html)
    html = re.sub(r"<code>(.*?)</code >", " ", html)

    # extract text
    soup = BeautifulSoup(html, "html.parser")
    text = "".join(soup.findAll(text=True))

    # make one line
    single_line = re.sub(r"\s+", " ", text)

    # make fixes
    single_line = re.sub("→", "->", single_line)
    single_line = re.sub("<null>", '"null"', single_line)

    return single_line


def merge_columns_info(dataset, tables):
    logging.info("Merging columns info from Superset and manifest.json file.")

    key = dataset["key"]
    sst_columns = dataset["columns"]
    dbt_columns = tables.get(key, {}).get("columns", {})
    columns_new = []
    for sst_column in sst_columns:
        # add the mandatory field
        column_new = {"column_name": sst_column["column_name"]}

        # add optional fields only if not already None, otherwise it would error out
        for field in [
            "expression",
            "filterable",
            "groupby",
            "python_date_format",
            "verbose_name",
            "type",
            "is_dttm",
            "is_active",
        ]:
            if sst_column[field] is not None:
                column_new[field] = sst_column[field]

        # add description
        if (
            sst_column["column_name"] in dbt_columns
            and "description" in dbt_columns[sst_column["column_name"]]
            and sst_column["expression"] == ""
        ):  # database columns
            description = dbt_columns[sst_column["column_name"]]["description"]
            description = convert_markdown_to_plain_text(description)
        else:  # if cant find in dbt
            description = sst_column["description"]
        column_new["description"] = description

        columns_new.append(column_new)

    dataset["columns_new"] = columns_new

    return dataset


def put_columns_to_superset(superset: SupersetDBTSessionConnector, dataset):
    logging.info("Putting new columns info with descriptions back into Superset.")
    body = {"columns": dataset["columns_new"]}
    superset.request("PUT", f"/dataset/{dataset['id']}?override_columns=true", json=body)


def merge_dashboards_with_datasets(dashboards, datasets):
    for dashboard in dashboards:
        refs = set()
        for dataset in dashboard["datasets"]:
            if dataset in datasets:
                refs.update(datasets[dataset]["dbt_refs"])
        refs = list(sorted(refs))

        dashboard["refs"] = refs

    return dashboards


def get_exposures_dict(dashboards, exposures):
    dashboards.sort(key=lambda dashboard: dashboard["id"])
    titles = [dashboard["title"] for dashboard in dashboards]
    # fail if it breaks uniqueness constraint for exposure names
    assert len(set(titles)) == len(titles), "There are duplicate dashboard names!"

    exposures_orig = {exposure["url"]: exposure for exposure in exposures}
    exposures_dict = [
        {
            "name": f"superset__{dashboard['title']}",
            "type": "dashboard",
            "url": dashboard["url"],
            "description": exposures_orig.get(dashboard["url"], {}).get("description", ""),
            "depends_on": dashboard["refs"],
            "owner": {"name": dashboard["owner_name"], "email": dashboard["owner_email"]},
        }
        for dashboard in dashboards
    ]

    return exposures_dict


class YamlFormatted(ruamel.yaml.YAML):
    def __init__(self):
        super(YamlFormatted, self).__init__()
        self.default_flow_style = False
        self.allow_unicode = True
        self.encoding = "utf-8"
        self.block_seq_indent = 2
        self.indent = 4
        self.emitter.alt_null = "''"


# Create Query


def is_valid_table_name(table_name):
    """
    Checks if the given string is a valid table name in PostgreSQL.

    Args:
        table_name: The string to check.

    Returns:
        True if the string is a valid table name, False otherwise.
    """

    # The regular expression to match a valid table name.
    regex = re.compile(r"^[a-zA-Z0-9_]{1,63}$")

    # Check if the string matches the regular expression.
    if regex.match(table_name):
        return True
    else:
        return False


def is_unique_table_name(table_name, dbt_tables):
    """
    Checks if the given string is a valid table name in PostgreSQL and dbt.

    Args:
        table_name: The string to check.
        dbt_tables: Dict of get_dbt_tables
    Returns:
        True if the string is a valid table name, False otherwise.
    """

    # The regular expression to match a valid table name.
    regex = re.compile(r"^[a-zA-Z0-9_]{1,63}$")

    # Check if the string matches the regular expression.
    if table_name not in dbt_tables:
        return True
    else:
        return False


def get_ref(original_query, dbt_tables, parsed_result, dbt_tables_names):
    """
    Returns content of a user-created dbt model file w/o config.

    Args:
        original_query: Query needed processing
        dbt_tables: Dict of dicts obtained by get_tables_from_dbt.
        schema_names: List of serving schema names.

    Returns:
        ref_tables: list of models that is referenced in the query
    """
    # original_query = original_query[:-1] if original_query[-1] == ";" else original_query # Maybe unneeded since not wrapping with
    # Access table names
    fixed_query = str(original_query)
    table_names = set(get_tables_from_sql(fixed_query, dialect="postgres", sql_parsed=parsed_result))
    fixed_query = sqlfluff.fix(fixed_query, dialect="postgres")
    if len(table_names.difference(dbt_tables_names)) > 0:  # dbt_tables_names include schema
        return None, "Tables referenced out of serving schemas"
    # Put tables in subqueries
    final_tables = tuple(table_names.intersection(dbt_tables_names))  # Filter out

    if len(final_tables) == 0:
        return None, "No tables referenced in dbt projects"

    return [dbt_tables[table]["name"] for table in final_tables], "Success"


def get_records():
    # Query records
    try:
        connection = psycopg2.connect(
            user=DATABASE_USERNAME,
            password=DATABASE_PASSWORD,
            host=DATABASE_HOST,
            port=DATABASE_PORT,
            database=DATABASE_NAME,
        )
        cursor = connection.cursor()
        postgreSQL_select_Query = f"select * from {QUERY_SCHEMA}.{QUERY_TABLE} where checked = False"

        logging.info(f"Executing query to fetch records: {postgreSQL_select_Query}")
        cursor.execute(postgreSQL_select_Query)
        query_columns = [
            "id",
            "name",
            "query_string",
            "user_id",
            "materialization",
            "description",
            "insert_time",
            "checked",
            "success",
        ]
        df = pd.DataFrame(cursor.fetchall(), columns=query_columns)

        postgreSQL_select_Query = f"select name from {QUERY_SCHEMA}.{QUERY_TABLE} where success = True"

        logging.info(f"Executing query to fetch records: {postgreSQL_select_Query}")
        cursor.execute(postgreSQL_select_Query)

        succeeded = cursor.fetchall()

    finally:
        # closing database connection.
        if connection:
            cursor.close()
            connection.close()
            logging.info("PostgreSQL connection is closed")
    return df, succeeded


def update_records(df):
    entries_to_update = str(tuple(zip(df.checked, df.success, df.id))).replace("None", "Null")[1:-1]
    try:
        connection = psycopg2.connect(
            user=DATABASE_USERNAME,
            password=DATABASE_PASSWORD,
            host=DATABASE_HOST,
            port=DATABASE_PORT,
            database=DATABASE_NAME,
        )
        cursor = connection.cursor()
        update_sql_query = f"""UPDATE {QUERY_SCHEMA}.{QUERY_TABLE} q 
                                SET success = v.success,
                                    checked = v.checked

                                FROM (VALUES {entries_to_update}) AS v (checked, success, id)
                                WHERE q.id = v.id;"""
        logging.info(f"Executing query to update records: {update_sql_query}")
        cursor.execute(update_sql_query)

    finally:
        # closing database connection.
        if connection:
            cursor.close()
            connection.close()
            logging.info("PostgreSQL connection is closed")


def get_emails(superset, user_ids):
    url = unquote(f"/security/get_email/?q={list(user_ids)}")
    res = superset.request("GET", url)
    return res["emails"]


def get_mail_content(name, sql, status, dbt_reason=None):
    if status == "dbt success":
        message = """\
Subject: Superset Model Creation

Your Model {name} was successfully created. 

SQL:{sql}
        """.format(
            sql=sql, name=name
        )

    elif status == "dbt fail":
        message = """\
Subject: Superset Model Creation

Your Model {name} was unsuccessfully created during dbt's run, please contact the administrator.

Reason:
{reason}

SQL:
{sql}
        """.format(
            reason=dbt_reason, sql=sql, name=name
        )
    else:
        message = """\
Subject: Superset Model Creation

Your Model {name} was unsuccessfully created.

Reason:
{reason}

SQL:
{sql}
        """.format(
            reason=status, sql=sql, name=name
        )
    return message


In [11]:
df = get_records()

select * from financial_query.query where checked = False
PostgreSQL connection is closed


In [12]:
df

Unnamed: 0,query_string,materialization,user_id,description,insert_time,name,checked,success
0,SELECT * from marts.dim_balance_sheet,2,1,,2023-07-24 09:37:06.123662,asd,False,
1,SELECT * from dim_bollinger,2,1,,2023-07-23 08:11:58.188941,unique_2,False,
2,SELECT * from marts.dim_bollinger,2,1,asdad,2023-07-23 08:12:43.670466,unique_3,False,
3,SELECT * from marts.dim_balance_sheet,2,1,asdasdwa,2023-07-23 09:37:33.026336,unique_4,False,


In [72]:
entries_to_update = str(tuple(zip(df.name, df.user_id, df.checked, df.success))).replace("None", "Null")[1:-1]
print("entries")
print(entries_to_update)

entries
('a', 1, True, False), ('sda', 1, True, True)


In [74]:
update_records(df)


In [75]:

entries_to_update = str(tuple(zip(df.checked, df.success, df.id))).replace("None", "Null")[1:-1]
connection = psycopg2.connect(
    user=DATABASE_USERNAME,
    password=DATABASE_PASSWORD,
    host=DATABASE_HOST,
    port=DATABASE_PORT,
    database=DATABASE_NAME,
)
cursor = connection.cursor()
update_sql_query = f"""UPDATE {QUERY_SCHEMA}.{QUERY_TABLE} q 
                        SET success = v.success,
                            checked = v.checked

                        FROM (VALUES {entries_to_update}) AS v (checked, success, id)
                        WHERE q.id = v.id;"""
logging.info(f"Executing query to update records: {update_sql_query}")
cursor.execute(update_sql_query)

if connection:
    cursor.close()
    connection.close()
    logging.info("PostgreSQL connection is closed")

In [77]:
print(update_sql_query)

UPDATE financial_query.query q 
                            SET success = v.success,
                                checked = v.checked

                            FROM (VALUES (True, False, 39), (True, True, 40)) AS v (checked, success, id)
                            WHERE q.id = v.id;


In [74]:
for i in dbt_res_df_map.keys():
    print(i)

4


In [80]:
for i in df.index:
    for r in res.result:
        print(f"{r.node.name}: {r.status}")
        print(r.node.name == df.loc[i, "name"])
# Map df index to result
dbt_res_df_map = {}

for i in df.index:
    for r in res.result:
        if r.node.name == df.loc[i, "name"]:
            dbt_res_df_map[i] = r
            break

unique_1: success
False
tetst: success
False
unique_1: success
False
tetst: success
False
unique_1: success
False
tetst: success
False
unique_1: success
False
tetst: success
True
unique_1: success
True
tetst: success
False


In [81]:
dbt_res_df_map

{3: RunResult(status=<RunStatus.Success: 'success'>, timing=[TimingInfo(name='compile', started_at=datetime.datetime(2023, 7, 23, 6, 49, 39, 256404), completed_at=datetime.datetime(2023, 7, 23, 6, 49, 39, 279021)), TimingInfo(name='execute', started_at=datetime.datetime(2023, 7, 23, 6, 49, 39, 283493), completed_at=datetime.datetime(2023, 7, 23, 6, 49, 42, 346239))], thread_id='Thread-40 (worker)', execution_time=3.102137327194214, adapter_response={'_message': 'SELECT 1979', 'code': 'SELECT', 'rows_affected': 1979}, message='SELECT 1979', failures=None, node=ModelNode(database='financial_data', schema='financial_user', name='tetst', resource_type=<NodeType.Model: 'model'>, package_name='dbt_financial', path='user/tetst.sql', original_file_path='models/user/tetst.sql', unique_id='model.dbt_financial.tetst', fqn=['dbt_financial', 'user', 'tetst'], alias='tetst', checksum=FileHash(name='sha256', checksum='8e23edf47bc6221ab2224b44712988691722cb47b9151c36229d1e70454a867a'), config=NodeConf

In [55]:
dbt_res_df_map

{3: RunResult(status=<RunStatus.Success: 'success'>, timing=[TimingInfo(name='compile', started_at=datetime.datetime(2023, 7, 23, 6, 36, 18, 918788), completed_at=datetime.datetime(2023, 7, 23, 6, 36, 18, 941524)), TimingInfo(name='execute', started_at=datetime.datetime(2023, 7, 23, 6, 36, 18, 946818), completed_at=datetime.datetime(2023, 7, 23, 6, 36, 22, 629960))], thread_id='Thread-26 (worker)', execution_time=3.7253305912017822, adapter_response={'_message': 'SELECT 1979', 'code': 'SELECT', 'rows_affected': 1979}, message='SELECT 1979', failures=None, node=ModelNode(database='financial_data', schema='financial_user', name='tetst', resource_type=<NodeType.Model: 'model'>, package_name='dbt_financial', path='user/tetst.sql', original_file_path='models/user/tetst.sql', unique_id='model.dbt_financial.tetst', fqn=['dbt_financial', 'user', 'tetst'], alias='tetst', checksum=FileHash(name='sha256', checksum='446ff4b5dcaab790e15e72b1b78d8cec877be517e2f2d157d7fcb5c8c82ed6c1'), config=NodeCon

In [61]:
dbt_res_df_map[3].status=="success"

True

In [49]:
for i in df.index:
    # Check Success
    if not df.loc[i, "success"]:
        model_path = USER_MODEL_PATH + "/{name}.sql".format(name=df.loc[i, "name"])
        if os.path.exists(model_path):
            os.remove(model_path)

In [47]:
df

Unnamed: 0,query_string,materialization,user_id,description,insert_time,name,success,checked
0,SELECT * from marts.dim_price_history,1,1,test description,2023-07-15 05:40:36.283000,test_query_hgffhgf,False,True
1,SELECT * from dim_price_history,2,1,test_desc,2023-07-15 06:40:36.283000,test_query_2_gftyf,False,True
2,SELECT * from dim_price_history,1,1,test_desc,2023-07-20 14:13:51.968099,test_query_123,False,True
3,SELECT * from marts.dim_price_history,1,1,ffgfs,2023-07-22 09:47:39.173512,tetst,False,True
4,SELECT * from marts.dim_price_history,2,1,asd,2023-07-22 09:47:39.173000,unique_1,False,True


In [35]:
res.result

RunExecutionResult(results=[RunResult(status=<RunStatus.Error: 'error'>, timing=[], thread_id='Thread-5 (worker)', execution_time=2.9518167972564697, adapter_response={}, message='Database Error in model tetst (models/user/tetst.sql)\n  syntax error at or near ")"\n  LINE 14:   );\n             ^\n  compiled Code at target/run/dbt_financial/models/user/tetst.sql', failures=None, node=ModelNode(database='financial_data', schema='financial_user', name='tetst', resource_type=<NodeType.Model: 'model'>, package_name='dbt_financial', path='user/tetst.sql', original_file_path='models/user/tetst.sql', unique_id='model.dbt_financial.tetst', fqn=['dbt_financial', 'user', 'tetst'], alias='tetst', checksum=FileHash(name='sha256', checksum='0914864f78b4327814b8b537d6a6a724dfb24533730814a7129d43b775b9f5ed'), config=NodeConfig(_extra={'name': 'tetst', 'description': 'ffgfs'}, enabled=True, alias=None, schema='financial_user', database=None, tags=['1', 'user_created', '23/07/2023_13:27:18'], meta={}, 

In [32]:
print(add_materialization(df.loc[i], "", EXEC_TIME))


{{ config(
    materialized='table',
    name='tetst',
    description='ffgfs',
    tags = ['1','user_created','23/07/2023_13:22:24'],
    schema = 'financial_user'
) }}


In [24]:
table_names = set(get_tables_from_sql("SELECT * from marts.dim_price_history", dialect="postgres"))

In [25]:
table_names.difference(dbt_tables_reporting)

set()

In [None]:
"""
UPDATE financial_query.query q 
                                SET success = v.success,
                                    checked = v.checked

                                FROM (values ('test_query_hgffhgf', 1, True, False), ('test_query_2_gftyf', 1, True, False), ('test_query_123', 1, True, False), ('tetst', 1, True, False)) AS v (name, user_id, checked, success)
                                WHERE q.user_id = v.user_id 
                                AND q.name = v.name;
"""

In [7]:
email_dict

{'1': 'catvu113@gmail.com'}

In [10]:
email_dict[df.loc[i, "user_id"]]

KeyError: 1

In [9]:
entries_to_update = str(tuple(zip(df.name, df.user_id, df.checked, df.success))).replace("None", "Null")[1:-1]
print(entries_to_update)

('test_query_hgffhgf', 1, False, False), ('test_query_2_gftyf', 1, False, False), ('test_query_123', 1, Null, False), ('tetst', 1, Null, False)


In [7]:
update_sql_query = f"""UPDATE {QUERY_SCHEMA}.{QUERY_TABLE} q 
                        SET success = v.success,
                            checked = v.checked

                        FROM (values {entries_to_update}) AS v (name, user_id, checked, success)
                        WHERE q.user_id = v.user_id 
                        AND q.name = v.name;"""
print(update_sql_query)

UPDATE financial_query.query q 
                        SET success = v.success,
                            checked = v.checked

                        FROM (values ('test_query_hgffhgf', 1, True, False), ('test_query_2_gftyf', 1, True, False), ('test_query_123', 1, True, False), ('tetst', 1, True, False), ('unique_1', 1, True, False)) AS v (name, user_id, checked, success)
                        WHERE q.user_id = v.user_id 
                        AND q.name = v.name;


In [9]:
df

Unnamed: 0,query_string,materialization,user_id,description,insert_time,name,checked,success
0,SELECT * from dim_bollinger,2,1,,2023-07-23 08:11:58.188941,unique_2,False,
1,SELECT * from marts.dim_bollinger,2,1,asdad,2023-07-23 08:12:43.670466,unique_3,False,


select * from financial_query.query where checked = False
PostgreSQL connection is closed
unique_2
True
True
Wrote model unique_2 contents
unique_3
True
True
Wrote model unique_3 contents
entries
('unique_2', 1, False, Null), ('unique_3', 1, False, Null)
UPDATE financial_query.query q 
                                SET success = v.success,
                                    checked = v.checked

                                FROM (values ('unique_2', 1, False, Null), ('unique_3', 1, False, Null)) AS v (name, user_id, checked, success)
                                WHERE q.user_id = v.user_id 
                                AND q.name = v.name;
Error while updating data in PostgreSQL column "success" is of type boolean but expression is of type text
LINE 2:                                 SET success = v.success,
                                                      ^
HINT:  You will need to rewrite or cast the expression.

PostgreSQL connection is closed


InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block


In [21]:
EXEC_TIME

'23/07/2023_15:28:01'

In [11]:
    # initialize
    dbt = dbtRunner()

    # create CLI args as a list of strings
    cli_args = [
        "run",
        "--project-dir",
        DBT_PROJECT_DIR,
        "--select",
        "tag:{exec_time}".format(exec_time=EXEC_TIME),
    ]

    # run the command
    res: dbtRunnerResult = dbt.invoke(cli_args)

[0m08:23:04  Running with dbt=1.5.1
[0m08:23:04  Found 34 models, 73 tests, 6 snapshots, 0 analyses, 760 macros, 0 operations, 0 seed files, 12 sources, 1 exposure, 0 metrics, 0 groups
[0m08:23:04  
[0m08:23:15  Concurrency: 4 threads (target='dev_cloud')
[0m08:23:15  
[0m08:23:15  1 of 2 START sql view model financial_user.unique_2 ............................ [RUN]
[0m08:23:15  2 of 2 START sql view model financial_user.unique_3 ............................ [RUN]
[0m08:23:18  1 of 2 ERROR creating sql view model financial_user.unique_2 ................... [[31mERROR[0m in 3.17s]
[0m08:23:19  2 of 2 OK created sql view model financial_user.unique_3 ....................... [[32mCREATE VIEW[0m in 4.27s]
[0m08:23:21  
[0m08:23:21  Finished running 2 view models in 0 hours 0 minutes and 16.71 seconds (16.71s).
[0m08:23:21  
[0m08:23:21  
[0m08:23:21  [33mDatabase Error in model unique_2 (models/user/unique_2.sql)[0m
[0m08:23:21    relation "dim_bollinger" does not exi

In [96]:
sql = """
SELECT * from dim_bollinger
"""
parsed=sqlfluff.parse(sql)

In [101]:
partially_model, processed_status = get_ref(sql, dbt_tables, parsed, dbt_tables_reporting)

In [102]:
partially_model

"\n    -- depends_on: {{ref('dim_bollinger')}}\n    \nSELECT * from dim_bollinger\n"

In [52]:
status

['Tables referenced out of serving schemas', 'Success']

In [70]:
rison_request="/dataset"
dictionary = {
    # Parameter database
    "database": DATABASE_ID,
    "schema": USER_SCHEMA,
    "table_name": df.loc[i, "name"],
    "owners": [int(df.loc[i, "user_id"]), SUPERSET_ID],
}
# Serializing json
json_object = json.dumps(dictionary)
response = superset.request("POST", rison_request, json=dictionary)

HTTPError: 422 Client Error: UNPROCESSABLE ENTITY for url: http://34.82.185.252:30007/api/v1/dataset/

In [73]:
response

{'id': 113,
 'result': {'database': 1,
  'owners': [1, 34],
  'schema': 'financial_user',
  'table_name': 'unique_4'}}

In [57]:
rison_request="/dataset"
dictionary = {
    # Parameter database
    "database": DATABASE_ID,
    "schema": USER_SCHEMA,
    "table_name": "unique_3",
    "owners": [int(df.loc[i, "user_id"]), SUPERSET_ID],
}
# Serializing json
json_object = json.dumps(dictionary)
response = superset.request("POST", rison_request, json=dictionary)

In [64]:
dbt = dbtRunner()
cli_args = ["parse","--project-dir",
    DBT_PROJECT_DIR,]
res = dbt.invoke(cli_args)

[0m09:48:04  Running with dbt=1.5.1
[0m09:48:05  Performance info: target/perf_info.json


In [31]:
df

Unnamed: 0,query_string,materialization,user_id,description,insert_time,name,checked,success
0,SELECT * from dim_bollinger,2,1,,2023-07-23 08:11:58.188941,unique_2,False,
1,SELECT * from marts.dim_bollinger,2,1,asdad,2023-07-23 08:12:43.670466,unique_3,False,


In [28]:
for r in res.result:
    print(f"{r.node.name}: {r.status}")

unique_2: error
unique_3: success
