<a href="https://colab.research.google.com/github/ivan-spantree/istio.io/blob/master/apps/append-only-table-compaction-script/append_only_table_compaction_script/append_only_table_compaction_script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Connect to PostgreSQL

In [None]:
import pandas as pd
import sqlalchemy as db
import json
import os
from sqlalchemy.orm import Session, close_all_sessions
from sqlalchemy.ext.automap import automap_base
from getpass import getpass

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

POSTGRES_HOST = "db.qmyiliblwjiahsiuqnwr.supabase.co"
POSTGRES_PORT = 5432
POSTGRES_USER = "postgres"
POSTGRES_DATABASE = "postgres"

In [None]:
def getenv(varname, getpass_message = None):
  value = os.getenv("POSTGRES_PASSWORD")
  if not value:
    getpass_message = getpass_message or f"Enter {varname}"
    value = getpass(getpass_message)
  return value

POSTGRES_PASSWORD = getenv('POSTGRES_PASSWORD', 'Enter Postgres Password')

Enter Postgres Password··········


In [None]:
db_url = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DATABASE}"

close_all_sessions()

engine = db.create_engine(db_url, pool_timeout = 3, connect_args={"connect_timeout": 10})
conn = engine.connect()

Base = automap_base()
Base.prepare(engine, schema="service_float_snapshot")
metadata = Base.metadata
metadata.reflect(bind=engine)

tables_list = list(metadata.tables.keys())

## Retrieve Row IDs to Delete from Raw Tables



In [None]:
raw_tables_list = [table_name.split(".")[1] for table_name in tables_list if len(table_name.split(".")) > 1 and list(table_name.split(".")[1])[0] == "_"]

table_id_columns = {
    "accounts": "account_id",
    "clients": "client_id",
    "departments": "department_id",
    "logged_time": "logged_time_id",
    "milestones": "milestone_id",
    "people": "people_id",
    "projects": "project_id",
    "public_holidays": "id",
    "tasks": "task_id",
    "team_holidays": "holiday_id",
    "time_off": "timeoff_id",
    "time_off_types": "timeoff_type_id"
}
# "people_department" has a compound id of "_airbyte_people_hashid" and "airbyte_department_hashid"

In [None]:
duplicate_raw_ids = {}

for table_name in raw_tables_list:
  float_table_equivalent = "_".join(table_name.split("_")[3:])

  # "phases" and "statuses" exist in the raw Airbyte tables but have no rows and don't exist as Float tables
  if float_table_equivalent != "phases" and float_table_equivalent != "statuses":

    query = db.text(f"""SELECT
                    _airbyte_ab_id
                    FROM (
                            SELECT
                                _airbyte_ab_id, row_number() OVER (
                                    PARTITION BY _airbyte_data ->> '{table_id_columns[float_table_equivalent]}', md5(_airbyte_data::text)
                                    ORDER BY _airbyte_emitted_at ASC
                                ) AS duplicated_row_index
                            FROM
                                service_float_snapshot.{table_name}
                        ) sub
                    WHERE duplicated_row_index > 1""")

    df = pd.read_sql(query, conn)

    duplicate_raw_ids[float_table_equivalent] = df["_airbyte_ab_id"].tolist()

In [None]:
len(duplicate_raw_ids['logged_time'])

0

## Delete Duplicate Rows from Raw Tables

In [None]:
# DELETE_CHUNK_SIZE = getenv('DELETE_CHUNK_SIZE')
DELETE_CHUNK_SIZE = 1000

for float_table_name, ids_to_delete in duplicate_raw_ids.items():
  full_table_name = f"service_float_snapshot._airbyte_raw_{float_table_name}"
  table = metadata.tables[full_table_name]

  for start in range(0, len(ids_to_delete), DELETE_CHUNK_SIZE):
      end = start + DELETE_CHUNK_SIZE
      ids_chunk = ids_to_delete[start:end]

      delete_statement = db.delete(table).where(table.c._airbyte_ab_id.in_(ids_chunk))

      try:
        conn.execute(delete_statement)
        conn.commit()

        print(f"Deleted {DELETE_CHUNK_SIZE} rows from {full_table_name}")
      except Exception as e:
          # Rollback the transaction in case of an exception
          conn.rollback()
          print(f"Error: {e}")

## Delete Duplicate Rows in Float Tables

In [None]:
float_tables = table_id_columns.keys()
float_columns = {}

for table_name in float_tables:
  query = db.text(f"""SELECT * FROM service_float_snapshot.{table_name} WHERE 1 = 0""")

  df = pd.read_sql(query, conn)

  all_columns_list = list(df.columns.values)
  float_columns_list = [column_name for column_name in all_columns_list if column_name[0] != "_"]
  float_columns[table_name] = float_columns_list

In [None]:
duplicate_ids = {}

for float_table_name, float_columns_list in float_columns.items():
  full_table_name = f"service_float_snapshot.{float_table_name}"
  table = metadata.tables[full_table_name]

  reformatted_columns_list = ", ".join(float_columns_list)

  query = db.text(f"""SELECT
                    _airbyte_ab_id
                    FROM (
                            SELECT
                                _airbyte_ab_id, row_number() OVER (
                                    PARTITION BY {reformatted_columns_list}
                                    ORDER BY _airbyte_emitted_at ASC
                                ) AS duplicated_row_index
                            FROM
                                {full_table_name}
                        ) sub
                    WHERE duplicated_row_index > 1""")

  df = pd.read_sql(query, conn)

  duplicate_ids[float_table_name] = df["_airbyte_ab_id"].tolist()

In [None]:
# DELETE_CHUNK_SIZE = getenv('DELETE_CHUNK_SIZE')
DELETE_CHUNK_SIZE = 1000

for float_table_name, ids_to_delete in duplicate_ids.items():
  full_table_name = f"service_float_snapshot.{float_table_name}"
  table = metadata.tables[full_table_name]

  for start in range(0, len(ids_to_delete), DELETE_CHUNK_SIZE):
      end = start + DELETE_CHUNK_SIZE
      ids_chunk = ids_to_delete[start:end]

      delete_statement = db.delete(table).where(table.c._airbyte_ab_id.in_(ids_chunk))

      try:
        conn.execute(delete_statement)
        conn.commit()

        print(f"Deleted {DELETE_CHUNK_SIZE} rows from {full_table_name}")
      except Exception as e:
          # Rollback the transaction in case of an exception
          conn.rollback()
          print(f"Error: {e}")