# Cost Estimation for using MySQL and MSSQL Fivetran Connectors

## How fivetran pricing is structured
Fivetran structures their pricing based on the number of monthly active rows ingested (MAR). According to their documentation, MAR is defined as "the number of distinct primary keys synced from the source system to your destination in a given calendar month". If a primary key is not available, they create a hashed primary key to use instead.

If a row is synced more than once in a month, it is still only counted as one row. You don't pay multiple times for updates on the same row in the same month.

Initial syncs do not count towards monthly active rows. I.e. the initial full load which syncs historical data will not incur any costs. Most resyncs do not count towards monthly active cost, with the exception of automatic re-syncs of a table that was previously excluded, and for oracle connectors when a resync is triggered due to a table's SCN is no longer being available.

## How this notebook estimates cost
There are three Qlik tasks that we plan to move over to Fivetran:
- MYSQL_ORACLE_SNOWFLAKE
- MYSQL_RISK_SNOWFLAKE
- MYSQL_AIRFLOW_SNOWFLAKE

This script assumes that the earliest replication date in these tables is the date of the initial load. All these records are excluded, and the remaining rows are grouped by the month and year they were loaded and the number of rows counted. This is done for every table in the above tasks. The results are summed for every month in every year. The final result is a table with the total number of rows that have been ingested every month in a given year for the above tasks. This is used as an estimate for the additional monthly MAR that we would be charged for should we choose to migrate these tasks to qlik.


In [132]:
from api_utils.snowflake_api import Snowflake
import os
import pandas as pd
import matplotlib.pyplot as plt

def get_initial_sync_date(sf, conn, schema, table, date_column):


    initial_sync_query = f"select min({date_column}) as initial_sync_date from landing.{schema}.{table};"
    
    initial_sync_date = sf.execute_query(query=initial_sync_query, conn=conn).fetchone()[0]

    return initial_sync_date

def get_MAR(conn, initial_sync_date, schema, table, date_column):

    MAR_query = f"""
        select sum(rows_replicated), month, year from (
        select 
        case
            when date_trunc('SECOND', {date_column}) = 
            date_trunc('SECOND', to_timestamp('{initial_sync_date}')) then 0 else 1 
        end as rows_replicated,
        month({date_column}) as month,
        year({date_column}) as year
        from landing.{schema}.{table}
        )
        group by month, year;
    """

    cursor = conn.cursor()
    cursor.execute(MAR_query)
    df = cursor.fetch_pandas_all()

    return df

In [133]:
sf = Snowflake(
    user=os.getenv("SNOWFLAKE_USER"),
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    role=os.getenv("SNOWFLAKE_ROLE"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE")
)

conn = sf.get_connection()

schemas_tables_date_columns = {
    "risk": {
        "date_column": "replication_date",
        "tables": [
            "sam_list_excludedaddress",
            "sam_list_excludedcardholder",
            "sam_list_excludedcardnumber",
            "sam_list_merchantexclusion",
            "sam_ref_birulesmapping",
            "sam_ref_customerfields",
            "sam_ref_mismatchrule",
            "sam_ref_pickupfraudrule",
            "sam_ref_postfilterrule",
            "sam_ref_prefilterrule",
            "sam_ref_ruleprofile",
            "sam_ref_ruletype",
            "sam_ref_trxloadschedule",
            "sam_ref_velocityfields",
            "sam_ref_velocityrule"
        ]
    },
    "oracle": {
        "date_column": "replication_date",
        "tables": [
            "oracle_prod_acquirer",
            "oracle_prod_acquirer_bin",
            "oracle_prod_acquirer_bin_mapping",
            "oracle_prod_audit",
            "oracle_prod_country",
            "oracle_prod_country_override",
            "oracle_prod_criteria",
            "oracle_prod_currency",
            "oracle_prod_excluded_fee",
            "oracle_prod_fee",
            "oracle_prod_fee_acquirer_mapping",
            "oracle_prod_fee_action_mapping",
            "oracle_prod_fee_country_mapping",
            "oracle_prod_fee_criteria_mapping",
            "oracle_prod_fee_external_descriptor",
            "oracle_prod_fee_rate",
            "oracle_prod_fee_region_mapping",
            "oracle_prod_fee_scheme_mapping",
            "oracle_prod_global_acquirer_mapping",
            "oracle_prod_industry",
            "oracle_prod_lookup_list",
            "oracle_prod_lookup_list_value",
            "oracle_prod_mastercard_country",
            "oracle_prod_mastercard_public_holiday",
            "oracle_prod_merchant_category_code",
            "oracle_prod_schemaversions",
            "oracle_prod_scheme",
            "oracle_prod_settlement_currencies",
            "oracle_prod_user_account",
            "oracle_prod_visa_country",
            "oracle_prod_visa_public_holiday"

        ]
    },
    "airflow_db": {
        "date_column": "event_timestamp",
        "tables": [
            "ab_permission",
            "ab_permission_view",
            "ab_permission_view_role",
            "ab_register_user",
            "ab_role",
            "ab_user",
            "ab_user_role",
            "ab_view_menu",
            "alembic_version",
            "callback_request",
            "celery_tasksetmeta",
            "celery_taskmeta",
            "connection",
            "dag",
            "dag_code",
            "dag_pickle",
            "dag_run", 
            "dag_tag",
            "enabled_dags",
            "import_error",
            "job",
            "known_event",
            "known_event_type",
            "log",
            "log_template",
            "rendered_task_instance_fields",
            "sensor_instance",
            "serialized_dag",
            "session",
            "sla_miss",
            "slot_pool",
            "task_fail",
            "task_instance",
            "task_map",
            "task_reschedule",
            "variable",
            "xcom"
        ]
    },
}

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...


In [134]:
for schema, info in schemas_tables_date_columns.items():
    date_column = info["date_column"]
    tables = info["tables"]
    df = None

    for table in tables:

        initial_sync_date = get_initial_sync_date(
            sf=sf,
            conn=conn,
            schema=schema,
            table=table,
            date_column=date_column
        )
        
        if df is None:
            df = get_MAR(
                conn=conn,
                initial_sync_date=initial_sync_date,
                schema=schema,
                table=table,
                date_column=date_column
            )
        else:
            new_df = get_MAR(
                conn=conn,
                initial_sync_date=initial_sync_date,
                schema=schema,
                table=table,
                date_column=date_column
            )

            # concatenate old and new
            df = pd.concat([df, new_df], ignore_index=True)
conn.close()

df

Unnamed: 0,SUM(ROWS_REPLICATED),MONTH,YEAR
0,0,10,2022
1,24,12,2022
2,54,10,2022
3,30,11,2022
4,7,1,2023
...,...,...,...
90,82,1,2023
91,13681,1,2023
92,91873,11,2022
93,95204,12,2022


In [135]:
df.rename({
    "SUM(ROWS_REPLICATED)": "qlikRowsReplicated",
    "MONTH": "Month",
    "YEAR": "Year"
}, axis="columns", inplace=True)

df

Unnamed: 0,qlikRowsReplicated,Month,Year
0,0,10,2022
1,24,12,2022
2,54,10,2022
3,30,11,2022
4,7,1,2023
...,...,...,...
90,82,1,2023
91,13681,1,2023
92,91873,11,2022
93,95204,12,2022


In [136]:
grouped_df = df.groupby(["Month", "Year"]).sum().sort_values(["Year"])
grouped_df

Unnamed: 0_level_0,Unnamed: 1_level_0,qlikRowsReplicated
Month,Year,Unnamed: 2_level_1
9,2022,10280859
10,2022,8276546
11,2022,3065514
12,2022,3471388
1,2023,527951


In [137]:
grouped_df["currentMAR"] = [38575043, 29475645, 21696586, 21167446, 4720203]
grouped_df["currentDollarsPerMillionMAR"] = [130, 133, 158, 161, 323]
grouped_df["currentMonthlyCost"] = grouped_df.apply(lambda row: (row.currentMAR/10**6)*row.currentDollarsPerMillionMAR, axis=1)
grouped_df["projectedMAR"] = grouped_df.apply(lambda row: row.qlikRowsReplicated+row.currentMAR, axis=1)

grouped_df

Unnamed: 0_level_0,Unnamed: 1_level_0,qlikRowsReplicated,currentMAR,currentDollarsPerMillionMAR,currentMonthlyCost,projectedMAR
Month,Year,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
9,2022,10280859,38575043,130,5014.75559,48855902.0
10,2022,8276546,29475645,133,3920.260785,37752191.0
11,2022,3065514,21696586,158,3428.060588,24762100.0
12,2022,3471388,21167446,161,3407.958806,24638834.0
1,2023,527951,4720203,323,1524.625569,5248154.0


In [138]:
grouped_df["projectedDollarsPerMillionMAR"] = [130, 130, 147, 147, 311]
grouped_df["newEstimatedMonthlyCost"] = grouped_df.apply(lambda row: (row.projectedMAR/10**6)*row.projectedDollarsPerMillionMAR, axis=1)


grouped_df["projectedAdditionalSpend"] = grouped_df.apply(lambda row: abs(row.newEstimatedMonthlyCost-row.currentMonthlyCost), axis=1)
grouped_df

Unnamed: 0_level_0,Unnamed: 1_level_0,qlikRowsReplicated,currentMAR,currentDollarsPerMillionMAR,currentMonthlyCost,projectedMAR,projectedDollarsPerMillionMAR,newEstimatedMonthlyCost,projectedAdditionalSpend
Month,Year,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
9,2022,10280859,38575043,130,5014.75559,48855902.0,130,6351.26726,1336.51167
10,2022,8276546,29475645,133,3920.260785,37752191.0,130,4907.78483,987.524045
11,2022,3065514,21696586,158,3428.060588,24762100.0,147,3640.0287,211.968112
12,2022,3471388,21167446,161,3407.958806,24638834.0,147,3621.908598,213.949792
1,2023,527951,4720203,323,1524.625569,5248154.0,311,1632.175894,107.550325
