### Workflow

- A new version is released
- We get a new version of sql-export json file
- We check if we have all the source tables in the DWH
- we create the models
- we run the models

In [4]:
import json 
import sqlparse
from sqlparse.sql import Identifier, IdentifierList, remove_quotes, Token, TokenList, Where
from sqlparse.tokens import Keyword, Name, Punctuation, String, Whitespace
from sqlparse.utils import imt
import pandas as pd
from flatten_dict import flatten
from pprint import pprint
import sql_metadata
import re
import os

In [5]:
## Files imported

## counter queries (sql+redis)

json_file_path = '/Users/mathieupeychet/Downloads/sql-export.json'


## foreign_key csv generated
foreign_key_df = pd.read_csv('/Users/mathieupeychet/Documents/foreign_keys.csv')

## To join an event to a ultimate_namespace_id, we have 3 potential standard tables to join
table_to_join = ['projects', 'namespaces', 'groups']

In [6]:
foreign_key_df.head(5)

Unnamed: 0,table_name,column_name,foreign_table_name,foreign_column_name
0,application_settings,push_rule_id,push_rules,id
1,application_settings,usage_stats_set_by_user_id,users,id
2,application_settings,instance_administrators_group_id,namespaces,id
3,application_settings,file_template_project_id,projects,id
4,application_settings,instance_administration_project_id,projects,id


In [7]:
with open(json_file_path) as f:
    data = json.load(f)
    
data['counts']['template_repositories']

'SELECT COUNT("projects"."id") FROM "projects" WHERE "projects"."namespace_id" IS NULLSELECT COUNT("projects"."id") FROM "projects" INNER JOIN namespaces ON projects.namespace_id = namespaces.custom_project_templates_group_id'

In [8]:
foreign_key_df.head(10)

Unnamed: 0,table_name,column_name,foreign_table_name,foreign_column_name
0,application_settings,push_rule_id,push_rules,id
1,application_settings,usage_stats_set_by_user_id,users,id
2,application_settings,instance_administrators_group_id,namespaces,id
3,application_settings,file_template_project_id,projects,id
4,application_settings,instance_administration_project_id,projects,id
5,application_settings,custom_project_templates_group_id,namespaces,id
6,approvals,merge_request_id,merge_requests,id
7,approver_groups,group_id,namespaces,id
8,boards,group_id,namespaces,id
9,boards,project_id,projects,id


In [9]:

def sql_queries_dict(json_file):
    ''' 
    function that transforms the sql-export.json file into a Python dict with only SQL batch counters
    '''
    with open(json_file) as f:
        data = json.load(f)

    from flatten_dict.reducer import make_reducer
    full_payload_dict = flatten(data, reducer=make_reducer(delimiter='.'))

    sql_queries_dict  = {}

    for (key, value) in full_payload_dict.items():
       # Check if key is even then add pair to new dictionary
       if isinstance(value, str) and str.startswith(value, 'SELECT') is True:
           sql_queries_dict[key] = value
    
    return sql_queries_dict

sql_queries_dict = sql_queries_dict(json_file_path)

In [10]:
sql_queries_dict

{'active_user_count': 'SELECT COUNT("users"."id") FROM "users" WHERE ("users"."state" IN (\'active\')) AND ("users"."user_type" IS NULL OR "users"."user_type" IN (NULL, 6, 4))',
 'counts.assignee_lists': 'SELECT COUNT("lists"."id") FROM "lists" WHERE "lists"."list_type" = 3',
 'counts.boards': 'SELECT COUNT("boards"."id") FROM "boards"',
 'counts.ci_builds': 'SELECT COUNT("ci_builds"."id") FROM "ci_builds" WHERE "ci_builds"."type" = \'Ci::Build\'',
 'counts.ci_internal_pipelines': 'SELECT COUNT("ci_pipelines"."id") FROM "ci_pipelines" WHERE ("ci_pipelines"."source" IN (1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13) OR "ci_pipelines"."source" IS NULL)',
 'counts.ci_external_pipelines': 'SELECT COUNT("ci_pipelines"."id") FROM "ci_pipelines" WHERE "ci_pipelines"."source" = 6',
 'counts.ci_pipeline_config_auto_devops': 'SELECT COUNT("ci_pipelines"."id") FROM "ci_pipelines" WHERE "ci_pipelines"."config_source" = 2',
 'counts.ci_pipeline_config_repository': 'SELECT COUNT("ci_pipelines"."id") FROM "

In [11]:
test = 'SELECT COUNT("ci_pipelines".{:start=>1, :finish=>465}) FROM ci_pipelines WHERE "ci_pipelines"."config_source" = 1'

test= re.sub('{:start=>[0-9]{1,}, :finish=>[0-9]{1,}}', 'id', test)
test

'SELECT COUNT("ci_pipelines".id) FROM ci_pipelines WHERE "ci_pipelines"."config_source" = 1'

In [12]:
import yaml

path = os.getcwd()
print(path)

gitlab_dotcom_source_relative_path = '/models/sources/gitlab_dotcom/sources.yml'

gitlab_dotcom_source_path = os.path.join(path, gitlab_dotcom_source_relative_path)
gitlab_dotcom_source_path

source_yml = yaml.safe_load(open('/Users/mathieupeychet/repos/analytics/transform/snowflake-dbt/models/sources/gitlab_dotcom/sources.yml'))
gitlab_dotcom_tables = source_yml["sources"][0]['tables']

dwh_tables = []
for table in gitlab_dotcom_tables:
    dwh_tables.append(table['name'])


/Users/mathieupeychet/repos/analytics/transform/snowflake-dbt


In [13]:
needed_tables = []
source_replace = """{{ source('gitlab_dotcom', '{}') }}"""
model_replacement = """ {{ ref('gitlab_dotcom_{}_dedupe_source') }}"""

for key, value in sql_queries_dict.items():
    sql_value = sqlparse.parse(value)[0]
    ## get the table which is queried in the FROM statement
    queried_tables_list = sql_metadata.get_query_tables(value)
    for table in queried_tables_list:
        if table not in needed_tables:
            needed_tables.append(table)
print(needed_tables)
print(len(needed_tables))

tables_to_dedup = needed_tables

table_format = """
  SELECT *
  FROM {{{{ source('gitlab_dotcom', '{}') }}}}
  {{% if is_incremental() %}}

  WHERE updated_at >= (SELECT MAX(updated_at) FROM {{this}})

  {{% endif %}}
  QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) = 1
"""

tables_to_add = []
for dedup in tables_to_dedup:
    if dedup in dwh_tables:
        formatted = table_format.format(dedup)
        subdirectory= 'models/sources/gitlab_dotcom/dedupe'
        try:
            os.makedirs(subdirectory)
        except Exception:
            pass
        model_name = 'gitlab_dotcom_{}_dedupe_source.sql'.format(dedup)
        wr = open(os.path.join(subdirectory, model_name), 'w')
        wr.write(formatted)
        wr.close()
        for dname, dirs, files in os.walk('/Users/mathieupeychet/repos/analytics/transform/snowflake-dbt/models/sources/gitlab_dotcom/renamed'):
            for fname in files:
                fpath = os.path.join(dname, fname)
                with open(fpath) as f:
                    s = f.read()
                s = s.replace(source_replace.format(dedup), model_replacement.format(dedup))
                s = s.replace(""" {{ ref('gitlab_dotcom_{}') }}""".format(dedup), model_replacement.format(dedup))
                with open(fpath, "w") as f:
                    f.write(s)
    else:
        tables_to_add.append(dedup)
        
print(tables_to_add)

['users', 'lists', 'boards', 'ci_builds', 'ci_pipelines', 'ci_runners', 'ci_triggers', 'ci_pipeline_schedules', 'project_auto_devops', 'keys', 'deployments', 'environments', 'clusters', 'cluster_providers_aws', 'cluster_providers_gcp', 'clusters_applications_helm', 'clusters_applications_ingress', 'clusters_applications_cert_managers', 'clusters_applications_crossplane', 'clusters_applications_prometheus', 'clusters_applications_runners', 'clusters_applications_knative', 'clusters_applications_elastic_stacks', 'clusters_applications_jupyter', 'clusters_applications_cilium', 'cluster_agents', 'cluster_agent_tokens', 'grafana_integrations', 'namespaces', 'issues', 'sentry_issues', 'zoom_meetings', 'alert_management_alerts', 'issues_self_managed_prometheus_alert_events', 'issues_prometheus_alert_events', 'label_links', 'labels', 'lfs_objects', 'milestones', 'packages_packages', 'pages_domains', 'pool_repositories', 'projects', 'project_features', 'project_tracing_settings', 'project_error

In [14]:
gitlab_com_manifest = '/Users/mathieupeychet/repos/analytics/extract/postgres_pipeline/manifests/gitlab_com_db_manifest.yaml'

gitlab_com_manifest_dict = yaml.safe_load(open(gitlab_com_manifest))
gitlab_com_manifest_dict_tables = gitlab_com_manifest_dict['tables']

print(gitlab_com_manifest_dict)
for k in gitlab_com_manifest_dict_tables['issues']:
    print(k)


TypeError: 'NoneType' object is not subscriptable

In [96]:
column_df.head(10)
column_df = column_df.sort_values(by=['ORDINAL_POSITION'])
column_df

Unnamed: 0,COLUMN_ID,COLUMN_NAME,DATA_TYPE,ORDINAL_POSITION,TABLE_NAME,TABLE_SCHEMA,_UPLOADED_AT,_TASK_INSTANCE
215,geo_nodes_idpublic,id,integer,1,geo_nodes,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
39,jira_tracker_data_idpublic,id,bigint,1,jira_tracker_data,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
40,zoom_meetings_idpublic,id,bigint,1,zoom_meetings,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
221,cluster_agent_tokens_idpublic,id,bigint,1,cluster_agent_tokens,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
220,project_tracing_settings_idpublic,id,bigint,1,project_tracing_settings,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
219,ldap_group_links_idpublic,id,integer,1,ldap_group_links,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
113,jira_imports_idpublic,id,bigint,1,jira_imports,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
211,issues_prometheus_alert_events_issue_idpublic,issue_id,bigint,1,issues_prometheus_alert_events,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
60,status_page_published_incidents_idpublic,id,bigint,1,status_page_published_incidents,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...
202,project_incident_management_settings_project_i...,project_id,integer,1,project_incident_management_settings,public,1.612762e+09,gitlab_com_db_sync__gitlab-com-columns-db-scd_...


In [100]:
missing_tables_columns = '/Users/mathieupeychet/Documents/usage_pings/result.csv'

def table_query(table):
    column_df = pd.read_csv(missing_tables_columns)
    column_df = column_df.sort_values(by=['ORDINAL_POSITION'])
    column_list = list(column_df[column_df['TABLE_NAME'] == table]['COLUMN_NAME'])
    
    yaml_table_query = ''
    for index, column in enumerate(column_list):
        if index == 0:
            formatted_column = 'SELECT {}'.format(column)
            yaml_table_query = formatted_column
        else:
            formatted_column = ', {}'.format(column)
            yaml_table_query = '\n'.join([yaml_table_query, formatted_column])

    yaml_table_query = '\n'.join([yaml_table_query, 'FROM {}'.format(table)])

    if 'updated_at' in column_list:
        where_statement = """WHERE updated_at BETWEEN '{EXECUTION_DATE}'::timestamp - interval '{HOURS} hours'
  AND '{EXECUTION_DATE}'::timestamp"""
        yaml_table_query = '\n'.join([yaml_table_query, where_statement])
    elif 'created_at' in column_list:
        where_statement = """WHERE created_at BETWEEN '{EXECUTION_DATE}'::timestamp - interval '{HOURS} hours'
  AND '{EXECUTION_DATE}'::timestamp"""
        yaml_table_query = '\n'.join([yaml_table_query, where_statement])
    return yaml_table_query

In [63]:
import yaml
import collections

class MyDumper(yaml.Dumper):  # your force-indent dumper

    def increase_indent(self, flow=False, indentless=False):
        return super(MyDumper, self).increase_indent(flow, False)

class QuotedString(str):  # just subclass the built-in str
    pass

def quoted_scalar(dumper, data):  # a representer to force quotations on scalars
    return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="'")

# add the QuotedString custom type with a forced quotation representer to your dumper
MyDumper.add_representer(QuotedString, quoted_scalar)

table_dict = {
    'new_tables': 
    {
        'issues': 
        {
            'import_db': 'GITLAB_DB',
            'import_query': ,
            'export_schema': "'gitlab_com'",
            'export_table': QuotedString('testnew'),
            'export_schema': 'id'
        }
    }
}
sdump = yaml.dump(table_dict, Dumper=MyDumper, default_flow_style=False, sort_keys=False)

with open(gitlab_com_manifest, "a") as fo:
    fo.write(sdump)

SyntaxError: invalid syntax (<ipython-input-63-bddcb1a0906a>, line 24)

In [103]:
# good one
# TODO:
### need to fix the SQL query that looks a bit ugly still
import yaml
from collections import OrderedDict

class quoted(str):
    pass

def quoted_presenter(dumper, data):
    return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='"')
yaml.add_representer(quoted, quoted_presenter)

class literal(str):
    pass

def literal_presenter(dumper, data):
    return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
yaml.add_representer(literal, literal_presenter)

def ordered_dict_presenter(dumper, data):
    return dumper.represent_dict(data.items())
yaml.add_representer(OrderedDict, ordered_dict_presenter)

test = {}
final_test = {}

for table in tables_to_add:
    # create OrderedDict
    yaml_table_query = table_query(table)
    d = OrderedDict(import_db='GITLAB_DB', import_query=literal(yaml_table_query), export_schema=quoted("gitlab_com"), export_table=table, export_table_primary_key='id')
    test[table] = d
final_test['tables'] = test
sdump = yaml.dump(final_test)

print(sdump)
with open(gitlab_com_manifest, "a") as fo:
    fo.write(sdump)

tables:
  '@type_caster':
    import_db: GITLAB_DB
    import_query: |2-

      FROM @type_caster
    export_schema: "gitlab_com"
    export_table: '@type_caster'
    export_table_primary_key: id
  alert_management_http_integrations:
    import_db: GITLAB_DB
    import_query: |-
      SELECT id
      , created_at
      , updated_at
      , project_id
      , active
      , endpoint_identifier
      , name
      , payload_example
      , payload_attribute_mapping
      FROM alert_management_http_integrations
      WHERE updated_at BETWEEN '{EXECUTION_DATE}'::timestamp - interval '{HOURS} hours'
        AND '{EXECUTION_DATE}'::timestamp
    export_schema: "gitlab_com"
    export_table: alert_management_http_integrations
    export_table_primary_key: id
  analytics_cycle_analytics_group_stages:
    import_db: GITLAB_DB
    import_query: |-
      SELECT id
      , created_at
      , updated_at
      , relative_position
      , start_event_identifier
      , end_event_identifier
      , gro

In [17]:
import sys
import ruamel.yaml
from ruamel.yaml import YAML


yaml=YAML(typ='safe')   # default, if not specfied, is 'rt' (round-trip)
gitlab_com_manifest_dict = yaml.load(open(gitlab_com_manifest))
gitlab_com_manifest_dict_tables = gitlab_com_manifest_dict

yaml.dump(gitlab_com_manifest_dict, sys.stdout)

null
...


In [None]:
  issues:
    import_db: GITLAB_DB
    import_query: >
    additional_filtering: AND created_at NOT IN ( '0001-01-01 00:00:00+00', '1000-01-01 00:00:00+00', '10000-01-01 00:00:00+00')
    export_schema: 'gitlab_com'
    export_table: 'issues'
    export_table_primary_key: id
    dbt_source_model: true
    dbt_snapshots: true

In [90]:
with open(gitlab_com_manifest,'w') as yamlfile:
    yaml.safe_dump(gitlab_com_manifest_dict, yamlfile) # Also note the safe_dump

SELECT
    , next_run_at
    , cadence
    , created_at
    , older_than
    , updated_at
    , name_regex
    , keep_n
    , enabled
    , project_id
    , name_regex_keep
SELECT
    , next_run_at
    , cadence
    , created_at
    , older_than
    , updated_at
    , name_regex
    , keep_n
    , enabled
    , project_id
    , name_regex_keep
WHERE updated_at BETWEEN '{EXECUTION_DATE}'::timestamp - interval '{HOURS} hours'
          AND '{EXECUTION_DATE}'::timestamp


In [None]:
import os
source_replace = """{{ source('gitlab_dotcom', '{}') }}"""
model_replacement = """ {{ ref('gitlab_dotcom_{}') }}"""

for dname, dirs, files in os.walk('/Users/mathieupeychet/repos/analytics/transform/snowflake-dbt/models/sources/gitlab_dotcom/renamed'):
    for fname in files:
        fpath = os.path.join(dname, fname)
        with open(fpath) as f:
            s = f.read()
        s = s.replace(source_replace.format(table), model_replacement.format(table))
        with open(fpath, "w") as f:
            f.write(s)

In [64]:
def create_join_mapping_df(sql_queries_dict):
    '''
        The functoin returns a dataframe with the following columns
        - counter: name of the counter, which is the item key in the dictionary passed
        - sql_query: query run to calculate the counter, item value in the dictionary passed as argument
        - table_name: name of the table in the FROM statement of the sql_query
        - foreingn_table_name: 
        - foreign_column_name:
    '''
    final_join_mapping_df = pd.DataFrame()
    for key, value in sql_queries_dict.items():
        sql_value = sqlparse.parse(value)[0]
        ## get the table which is queried in the FROM statement
        queried_tables_list = sql_metadata.get_query_tables(value)
        value = value.replace('"', "")
        
        value= re.sub('{:start=>[0-9]{1,}, :finish=>[0-9]{1,}}', 'id', value)
        # if projects is just do the join on projects
        if 'projects' in queried_tables_list:
            potential_joins = foreign_key_df[(foreign_key_df['table_name'] == 'projects') & (foreign_key_df['foreign_table_name'] == 'namespaces')]
            potential_joins = potential_joins.drop_duplicates()
            table_to_append = potential_joins[potential_joins.table_name == 'projects']
            table_to_append["counter"] = key
            table_to_append["sql_query"] = value
            final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)   
        else:
            for index, queried_table in enumerate(queried_tables_list):

            ## 
                potential_joins = foreign_key_df[(foreign_key_df['table_name'] == queried_table) & (foreign_key_df['foreign_table_name'].isin(table_to_join))]
                potential_joins = potential_joins.drop_duplicates()
                
                if potential_joins[potential_joins.foreign_table_name == 'projects'].empty is False:
                    table_to_append = potential_joins[potential_joins.foreign_table_name == 'projects']
                    table_to_append["counter"] = key
                    table_to_append["sql_query"] = value
                    final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)
                elif potential_joins[potential_joins.foreign_table_name == 'groups'].empty is False:
                    table_to_append = potential_joins[potential_joins.foreign_table_name == 'groups']
                    table_to_append["counter"] = key
                    table_to_append["sql_query"] = value
                    final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)
                elif potential_joins[potential_joins.foreign_table_name == 'namespaces'].empty is False:
                    table_to_append = potential_joins[potential_joins.foreign_table_name == 'namespaces']
                    table_to_append["counter"] = key
                    table_to_append["sql_query"] = value
                    final_join_mapping_df = final_join_mapping_df.append(table_to_append, ignore_index=True)
                if table_to_append.empty is False:
                    break
                
        
    return final_join_mapping_df

fj_df = create_join_mapping_df(sql_queries_dict)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy


In [68]:
fj_df[fj_df.counter == 'counts.service_desk_issues']

Unnamed: 0,table_name,column_name,foreign_table_name,foreign_column_name,counter,sql_query
334,projects,namespace_id,namespaces,id,counts.service_desk_issues,SELECT COUNT(issues.id) FROM issues WHERE issu...


In [18]:
def add_counter_name_as_column(counter_row):
    sql_query_parsed = sqlparse.parse(counter_row['sql_query'])
    token_list = sql_query_parsed[0].tokens
    from_index = 0
    for index, token in enumerate(token_list):
        #Token.Keyword.DML
        if (token.is_keyword and str(token) == 'SELECT') is True:
            select_index = index
            break
    for index, token in enumerate(token_list):
        if (token.is_keyword and str(token) == 'FROM') is True:
            from_index = index
            break
    token_list_with_counter_name = token_list[:]
    token_list_with_counter_name.insert(from_index - 1, " AS counter_value, TO_DATE(CURRENT_DATE) AS run_day  ")
    token_list_with_counter_name.insert(select_index + 1, " '" + counter_row['counter'] + "' AS counter_name, ")
    enhanced_query_list = [str(token) for token in token_list_with_counter_name]
    enhanced_query = ''.join(enhanced_query_list)
    
    return enhanced_query

In [50]:
def add_join_to_namespaces(counter_row):
    sql_query_parsed = sqlparse.parse(counter_row['sql_query'])
    token_list_with_counter_name = sql_query_parsed[0].tokens
    where_index = 0
    select_index = 0
    from_index = 0
    for index, token in enumerate(token_list_with_counter_name):        #Token.Keyword.DML
        if (token.is_keyword and str(token) == 'SELECT') is True:
            select_index = index
        if (token.is_keyword and str(token) == 'FROM') is True:
            from_index = index
        if isinstance(token, Where) is True:
            where_index = index
            break

    new_tok_list = token_list_with_counter_name[:]
    
    join_to_insert = ''
    if counter_row.table_name != 'users' and counter_row.table_name != 'namespaces':
        join_to_insert = ' LEFT JOIN ' + counter_row.foreign_table_name + ' ON ' + counter_row.foreign_table_name + '.' + counter_row.foreign_column_name + ' = ' + counter_row.table_name + '.' + counter_row.column_name + ' '
    if counter_row.foreign_table_name == 'projects'  and counter_row.table_name != 'namespaces':
        join_to_insert += ' LEFT JOIN namespaces ON projects.namespace_id = namespaces.id'
        
    join_to_insert  = join_to_insert + " LEFT JOIN {{ref('gitlab_dotcom_namespaces_xf')}} AS namespaces_xf ON namespaces.id = namespaces_xf.namespace_id "
    if where_index > 0:
        token_list_with_counter_name.insert(where_index + 1 , ' GROUP BY 1')
        token_list_with_counter_name.insert(where_index, join_to_insert)
    else:
        token_list_with_counter_name.append(join_to_insert)
        token_list_with_counter_name.append(' GROUP BY 1')

    token_list_with_counter_name.insert(from_index - 1, " AS counter_value ")
    if select_index >= 0:
        token_list_with_counter_name.insert(select_index + 1, ' namespaces_xf.namespace_id, TO_DATE(CURRENT_DATE) AS run_day, ')

    enhanced_query_list = [str(token) for token in token_list_with_counter_name]
    enhanced_query = ''.join(enhanced_query_list)
    
    return enhanced_query


In [20]:
def write_sql_model_files(counter_row, sql_query_column, counter_name_column, subdirectory, suffix=''):
    #subdirectory= 'models/workspaces/test_usage_pings/'
    try:
        os.makedirs(subdirectory)
    except Exception:
        pass
    model_name = counter_row[counter_name_column].replace('.',  '_') + suffix + '.sql'
    wr = open(os.path.join(subdirectory, model_name), 'w')
    wr.write(counter_row[sql_query_column])
    wr.close()

In [40]:
def rename_query_tables(query):
    keyword_to_look_at = [            
                'FROM',
                "JOIN",
                "INNER JOIN",
                "FULL JOIN",
                "FULL OUTER JOIN",
                "LEFT JOIN",
                "RIGHT JOIN",
                "LEFT OUTER JOIN",
                "RIGHT OUTER JOIN",
    ]


    parsed = sqlparse.parse(query)
    tokens = list(TokenList(parsed[0].tokens).flatten())

    keyword_token_index = -1
    while keyword_token_index != 0:
        keyword_token_index = 0
        for index, token in enumerate(tokens):
            if str(token) in keyword_to_look_at:
                keyword_token_index = index
                i = 1
                while tokens[index + i].ttype is Whitespace:
                    i += 1
                next_token = tokens[index + i]
                if str(next_token).startswith('{') is False:
                    tokens.insert(keyword_token_index + i, "{{ref('gitlab_dotcom_" + str(next_token) + "_dedupe_source')}} AS " )
                    tokens = [str(token) for token in tokens]
                    token_query = ''.join(tokens)
                    parsed = sqlparse.parse(token_query)
                    tokens = list(TokenList(parsed[0].tokens).flatten())
                    print(next_token.ttype)
                    break
                else:
                    keyword_token_index = 0
            if keyword_token_index > 0:
                break
    return token_query
        
        

In [None]:
def add_last_join_to_namespace_xf(sql_query):
    sql_query_parsed = sqlparse.parse(sql_query)
    token_list_with_counter_name = sql_query_parsed[0].tokens
    where_index = 0
    select_index = 0
    from_index = 0
    for index, token in enumerate(token_list_with_counter_name):        #Token.Keyword.DML
        if isinstance(token, Where) is True:
            where_index = index
            break

In [51]:
for index, row in fj_df.iterrows():
    fj_df.loc[index,'global_counter_query'] = add_counter_name_as_column(row)
    fj_df.loc[index,'counter_per_namespace_query'] = add_join_to_namespaces(row)
    tables = sql_metadata.get_query_tables(fj_df.loc[index,'global_counter_query'])
    
    are_all_tables_in_analytics = True
    for table in tables:
        file_name = 'gitlab_dotcom_{}_dedupe_source.sql'.format(table)
        result = []
        path = os.getcwd()
        model_path = os.path.join(dirname, 'models')
        for root, dirs, files in os.walk(model_path):
            if file_name in files:
                result.append(os.path.join(root, file_name))
                print(file_name)
                are_all_tables_in_analytics = True
                break
            else:
                are_all_tables_in_analytics = False
        if are_all_tables_in_analytics == False:
            break
    fj_df.loc[index,'are_all_tables_in_analytics'] = str(are_all_tables_in_analytics)
    fj_df.loc[index,'global_counter_query'] = rename_query_tables(row['global_counter_query'])
    fj_df.loc[index,'counter_per_namespace_query'] = rename_query_tables(row['counter_per_namespace_query'])
for index, row in fj_df.iterrows():
    if fj_df.loc[index,'are_all_tables_in_analytics'] == 'True':
        write_sql_model_files(row, 'global_counter_query', 'counter', 'models/workspaces/test_usage_pings/')
        write_sql_model_files(row, 'counter_per_namespace_query', 'counter', 'models/workspaces/workspace_usage_pings_namespaces/',suffix='_namespaces')

Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name

Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name

Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name

gitlab_dotcom_projects_dedupe_source.sql
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Text.Whitespace.Newline
Token.Text.Whitespace.Newline
Token.Text.Whitespace.Newline
Token.Name
Token.Name
Token.Text.Whitespace.Newline
Token.Text.Whitespace.Newline
Token.Text.Whitespace.Newline
gitlab_dotcom_projects_dedupe_source.sql
Token.Name
Token.Name
Token.Name
gitlab_dotcom_projects_dedupe_source.sql
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
gitlab_dotcom_projects_dedupe_source.sql
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Name
Token.Keyword
Token.Keyword
Token.Name
Token.Name
Token.Name
Token.Name
gitlab_dotcom_projects_dedupe_source.sql
Token.Name
Token.Name
Toke

In [99]:
tables_to_dedup = needed_tables

table_format = """
  SELECT *
  FROM {{{{ source('gitlab_dotcom', '{}') }}}}
  QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) = 1
"""
for dedup in tables_to_dedup:
    formatted = table_format.format(dedup)
    subdirectory= 'models/sources/gitlab_dotcom/dedupe'
    try:
        os.makedirs(subdirectory)
    except Exception:
        pass
    model_name = 'gitlab_dotcom_{}_dedupe_source.sql'.format(dedup)
    wr = open(os.path.join(subdirectory, model_name), 'w')
    wr.write(formatted)
    wr.close()

In [43]:
for index, token in enumerate(tok_list):
    #Token.Keyword.DML
    if (token.is_keyword and str(token) == 'SELECT') is True:
        select_index = index
    if isinstance(token, Where) is True:
        where_index = index

new_tok_list = tok_list[:]
new_tok_list.insert(where_index + 1 , ' GROUP BY 1')
new_tok_list.insert(where_index, ' LEFT JOIN projects ON services.id = projects.project_id ')
new_tok_list.insert(select_index + 1, ' namespace_id,')

tok_list_str = [str(token) for token in new_tok_list]
query_stringed = ''.join(tok_list_str)
query_parsed = sqlparse.parse(query_stringed)
tok_list_transformed = query_parsed[0].tokens

query_stringed
test_formatting = sqlparse.format(query_stringed)
test_formatting



NameError: name 'tok_list' is not defined

In [44]:
def _update_table_names(
    tables: List[str], tokens: List[sqlparse.sql.Token], index: int, last_keyword: str
) -> List[str]:
    """
    Return new table names matching database.table or database.schema.table notation
    :type tables list[str]
    :type tokens list[sqlparse.sql.Token]
    :type index int
    :type last_keyword str
    :rtype: list[str]
    """

    token = tokens[index]
    last_token = tokens[index - 1].value.upper() if index > 0 else None
    next_token = tokens[index + 1].value.upper() if index + 1 < len(tokens) else None

    if (
        last_keyword
        in [
            "FROM",
            "JOIN",
            "INNER JOIN",
            "FULL JOIN",
            "FULL OUTER JOIN",
            "LEFT JOIN",
            "RIGHT JOIN",
            "LEFT OUTER JOIN",
            "RIGHT OUTER JOIN",
            "INTO",
            "UPDATE",
            "TABLE",
        ]
        and last_token not in ["AS"]
        and token.value not in ["AS", "SELECT"]
    ):
        if last_token == "." and next_token != ".":
            # we have database.table notation example
            table_name = "{}.{}".format(tokens[index - 2], tokens[index])
            if len(tables) > 0:
                tables[-1] = table_name
            else:
                tables.append(table_name)

        schema_notation_match = (Name, ".", Name, ".", Name)
        schema_notation_tokens = (
            (
                tokens[index - 4].ttype,
                tokens[index - 3].value,
                tokens[index - 2].ttype,
                tokens[index - 1].value,
                tokens[index].ttype,
            )
            if len(tokens) > 4
            else None
        )
        if schema_notation_tokens == schema_notation_match:
            # we have database.schema.table notation example
            table_name = "{}.{}.{}".format(
                tokens[index - 4], tokens[index - 2], tokens[index]
            )
            if len(tables) > 0:
                tables[-1] = table_name
            else:
                tables.append(table_name)
        elif tokens[index - 1].value.upper() not in [",", last_keyword]:
            # it's not a list of tables, e.g. SELECT * FROM foo, bar
            # hence, it can be the case of alias without AS, e.g. SELECT * FROM foo bar
            pass
        else:
            table_name = str(token.value.strip("`"))
            tables.append(table_name)

    return tables


NameError: name 'List' is not defined

In [None]:
sql_test = 'SELECT test FROM "services" LEFT JOIN "users" ON users.id = services.id WHERE users.id = 5'

sql_test = sql_test.replace('"', "")
#print(sql_test)
parsed = sqlparse.parse(sql_test)
tokens = TokenList(parsed[0].tokens).flatten()
# print([(token.value, token.ttype) for token in tokens])

test = [token for token in tokens if token.ttype is not Whitespace]

table_syntax_keywords = [
    # SELECT queries
    "FROM",
    "WHERE",
    "JOIN",
    "INNER JOIN",
    "FULL JOIN",
    "FULL OUTER JOIN",
    "LEFT OUTER JOIN",
    "RIGHT OUTER JOIN",
    "LEFT JOIN",
    "RIGHT JOIN",
    "ON",
    # INSERT queries
    "INTO",
    "VALUES",
    # UPDATE queries
    "UPDATE",
    "SET",
    # Hive queries
    "TABLE",  # INSERT TABLE
]

tables = []
last_keyword = None
last_token=None

print(test)
for index, token in enumerate(test):
    print(token.ttype is Name)
    #print([token, token.ttype, last_token, last_keyword, token.is_keyword, index])
    if token.is_keyword and token.value.upper() == 'WHERE':
        # keep the name of the last keyword, the next one can be a table name
        where_index = index
        print(where_index)
    elif (
        token.is_keyword
        and str(token).upper() == "SELECT"
    ):
        # reset the last_keyword for "INSERT INTO SELECT" and "INSERT TABLE SELECT" queries
        last_keyword = None
        select_index = index
        print(2)
    elif token.ttype is Name or token.is_keyword:
        tables.append(str(token))
        print(3)
        
print(tables)
tables[0] = 'replace'
tables

In [None]:
for index, row in final_join.iterrows():
   print(row['table_name'])

In [None]:
for index, row in final_join_mapping_df.iterrows():
    if row.table_name != 'users':
        join_to_insert = 'LEFT JOIN ' + row.foreign_table_name + ' ON ' + row.foreign_table_name + '.' + row.foreign_column_name + ' = ' + row.table_name + '.' + row.column_name
        if row.foreign_table_name == 'projects':
            join_to_insert += ' LEFT JOIN namespaces ON projects.namespace_id = namespaces.id '
        print(join_to_insert)
    parsed_sql_query = sqlparse.parse(row.sql_query)[0]
    parsed_join_to_insert = sqlparse.parse(join_to_insert)
    parsed_sql_query.insert_before(-1, parsed_join_to_insert[0])
    print(parsed_sql_query)
    print(row.counter)
        


In [None]:
## verify it is a where clause

parsed = sqlparse.parse(value)
where = parsed[0][-1]
print(where)
where_clause = False
for i in where.tokens:
    if str(i).find('WHERE') >= 0:
        where_clause = True
    
for i in parsed[0].tokens:
    try:
        for j in i.tokens:

            if str(j) == '"services"':
                print(j)
    except:
        pass
    
idx, _ = parsed[0].token_next_by(m=(Identifier, 'WHERE'))
print(idx)

In [None]:
import json 
import sqlparse
from sqlparse.sql import Identifier, IdentifierList, remove_quotes, Token, TokenList, Where
from sqlparse.tokens import Keyword, Name, Punctuation, String, Whitespace
from sqlparse.utils import imt
import pandas as pd

sql_first = sql_queries_dict["counts.issues"]

parsed = sqlparse.parse(sql_first)[0]
test_list = []

print(parsed.get_name())

parsed._pprint_tree()
where_statment = parsed[-1]
select_statement = parsed

for x in select_statement:
    test_list.append(str(x))
    
test_list

left_join = ' LEFT JOIN projects ON test.project_id = projects.id'
test_list.append(left_join)
test_list

In [None]:

table_to_look = 'issues'
table_to_join = ['projects', 'namespaces', 'groups']
foreign_key_df.head(20)

potential_joins = foreign_key_df[(foreign_key_df['table_name'] == table_to_look) & (foreign_key_df['foreign_table_name'].isin(table_to_join))]

for index, row in final_join.iterrows():
    if row.table_name != 'users':
        print('LEFT JOIN ' + row.foreign_table_name + ' ON ' + row.foreign_table_name + '.' + row.foreign_column_name + ' = ' + row.table_name + '.' + row.column_name)
        if row.foreign_table_name == 'projects':
            print('LEFT JOIN namespaces ON projects.namespace_id = namespaces.id')
    print(row.table_name)
    print(row.counter)

    
group_by_statement = 'GROUP BY 1'
print(group_by_statement)


In [None]:
isinstance(where_statment, Where)

In [None]:
idx, _ = parsed.token_next_by(m=(Keyword, "BETWEEN"))
print(idx)
if idx is not None:
    _, token = parsed.token_next(idx=idx)
    if token:
        if isinstance(token, IdentifierList):
            # In case of "LIMIT <offset>, <limit>", find comma and extract
            # first succeeding non-whitespace token
            idx, _ = token.token_next_by(m=(sqlparse.tokens.Punctuation, ","))
            _, token = token.token_next(idx=idx)
        if token and token.ttype == sqlparse.tokens.Literal.Number.Integer:
            print(int(token.value))

where = next(token for token in parsed.tokens if isinstance(token, Where))


In [None]:
value = 'SELECT test FROM "services" LEFT JOIN "users" ON users.id = services.id WHERE users.id = 5'

elements = sqlparse.parse(value)
tok_list = elements[0].tokens

isinstance(tok_list[-1], Where)

tok_list

## add group by at the end
## add joins (what if there is a namespace, group or project table already)
## add namespace_id


In [None]:
for index, token in enumerate(tok_list):
    #Token.Keyword.DML
    if (token.is_keyword and str(token) == 'SELECT') is True:
        select_index = index
    if isinstance(token, Where) is True:
        where_index = index

new_tok_list = tok_list[:]
new_tok_list.insert(where_index + 1 , ' GROUP BY 1')
new_tok_list.insert(where_index, ' LEFT JOIN projects ON services.id = projects.project_id ')
new_tok_list.insert(select_index + 1, ' namespace_id,')

tok_list_str = [str(token) for token in new_tok_list]
query_stringed = ''.join(tok_list_str)
query_parsed = sqlparse.parse(query_stringed)
tok_list_transformed = query_parsed[0].tokens

query_stringed
test_formatting = sqlparse.format(query_stringed)
test_formatting

wr = open('test.sql', 'w')
wr.write(test_formatting)
wr.close()