In [0]:
import psycopg2
from psycopg2.extras import execute_values
import os


In [0]:


jdbc_url = "jdbc:postgresql://kishoremannava-pgserver.postgres.database.azure.com:5432/postgres"
username = "mypgadmin"
password = "YourSecurePassword123!"

def get_connection():
    connection = psycopg2.connect(
        dbname="postgres",
        user=username,
        password=password,
        host="kishoremannava-pgserver.postgres.database.azure.com",
        port="5432"
    )
    connection.set_session(readonly=False, autocommit=True)
    return connection

def drop_tables(cursor, table_names, suffix):
    for table in table_names:
        print(f"Dropping iteration {table}")
        drop_table_sql = f"DROP TABLE IF EXISTS {table}_{suffix} CASCADE"
        print(drop_table_sql)
        cursor.execute(drop_table_sql)

def copy_table_structure(table_names, source_schema, suffix):
    for table in table_names:
        #print("readonly", spark.conf.get("spark.sql.sources.readOnly"))
        df = spark.sql(f"SELECT * FROM {source_schema}.{table} WHERE 1=0")
        df.write.format("jdbc") \
            .option("url", jdbc_url) \
            .option("user", username) \
            .option("password", password) \
            .option("dbtable", f"{table}_{suffix}") \
            .mode("overwrite") \
            .save()

def set_tables_unlogged(cursor, table_names, suffix):
    for table in table_names:
        alter_table_sql = f"ALTER TABLE {table}_{suffix} SET UNLOGGED"
        print(alter_table_sql)
        cursor.execute(alter_table_sql)

def copy_csv_to_postgres_table(suffix, file_path, table_name):
    connection = get_connection()
    try:
        new_table_name = f"{table_name}_{suffix}"
        with open(file_path, 'r') as f:
            cursor = connection.cursor()
            copy_sql = f"COPY {new_table_name} FROM STDIN WITH CSV HEADER"
            print(copy_sql)
            cursor.copy_expert(sql=copy_sql, file=f)
    finally:
        connection.close()

def optimize_tables(cursor, suffix):
    tables_columns = {
        "call_center": "cc_call_center_sk",
        "catalog_page": "cp_catalog_page_sk",
        "catalog_returns": "cr_item_sk",
        "customer": "c_customer_sk",
        "customer_address": "ca_address_sk",
        "customer_demographics": "cd_demo_sk",
        "household_demographics": "hd_demo_sk",
        "income_band": "ib_income_band_sk",
        "inventory": "inv_item_sk",
        "item": "i_item_sk",
        "promotion": "p_promo_sk",
        "reason": "r_reason_sk",
        "store": "s_store_sk",
        "store_returns": "sr_item_sk",
        "store_sales": "ss_item_sk",
        "time_dim": "t_time_sk",
        "warehouse": "w_warehouse_sk",
        "web_page": "wp_web_page_sk",
        "web_returns": "wr_item_sk",
        "web_sales": "ws_item_sk",
        "catalog_sales": "cs_item_sk",
        "web_site": "web_site_sk"
    }
    for table, column in tables_columns.items():
        cursor.execute(f"""
            DO $$ BEGIN
            IF NOT EXISTS (
                SELECT 1 FROM pg_indexes
                WHERE tablename = '{table}_{suffix}' AND indexname = 'idx_{column}_{suffix}'
            ) THEN
                EXECUTE 'CREATE INDEX idx_{column}_{suffix} ON {table}_{suffix} ({column})';
            END IF;
            END $$;
        """)
        cursor.execute(f"ANALYZE {table}_{suffix} ({column})")

def run_iterations(iterations, table_names, source_schema):
    for suffix in range(1, iterations + 1):
        connection = get_connection()
        cursor = connection.cursor()
        try:
            print(f"Running iteration {suffix}")
            copy_table_structure(table_names, source_schema, suffix)
            set_tables_unlogged(cursor, table_names, suffix)

            table_paths = [
                (table_name, f"/Volumes/kishoremannava/default/tpcds/tpcds_sf100_csv/{table_name}")
                for table_name in table_names if table_name != "_sqldf"
            ]

            for table_name, table_path in table_paths:
                print("tempFilePath:", table_path)
                csv_files = [os.path.join(table_path, f) for f in os.listdir(table_path) if f.endswith(".csv")]
                from pyspark.sql.functions import lit

                tasks = [(suffix, file_path, table_name) for file_path in csv_files]
                tasks_df = spark.createDataFrame(tasks, ["suffix", "file_path", "table_name"])

                def copy_task(row):
                    copy_csv_to_postgres_table(row['suffix'], row['file_path'], row['table_name'])

                tasks_df.foreach(copy_task)
        finally:
            cursor.close()
            connection.close()

In [0]:
catalog = "kishoremannava"
schema = "tpcds_sf100_delta"
unity_catalog_tables = spark.sql(f"SHOW TABLES IN {catalog}.{schema}")
table_names = [row.tableName for row in unity_catalog_tables.select("tableName").collect()]
source_schema = f"{catalog}.{schema}"
# copy_table_structure(table_names, source_schema, 1)
run_iterations(5, table_names, source_schema)

Running iteration 1
ALTER TABLE call_center_1 SET UNLOGGED
ALTER TABLE catalog_page_1 SET UNLOGGED
ALTER TABLE catalog_returns_1 SET UNLOGGED
ALTER TABLE catalog_sales_1 SET UNLOGGED
ALTER TABLE customer_1 SET UNLOGGED
ALTER TABLE customer_address_1 SET UNLOGGED
ALTER TABLE customer_demographics_1 SET UNLOGGED
ALTER TABLE date_dim_1 SET UNLOGGED
ALTER TABLE household_demographics_1 SET UNLOGGED
ALTER TABLE income_band_1 SET UNLOGGED
ALTER TABLE inventory_1 SET UNLOGGED
ALTER TABLE item_1 SET UNLOGGED
ALTER TABLE promotion_1 SET UNLOGGED
ALTER TABLE reason_1 SET UNLOGGED
ALTER TABLE ship_mode_1 SET UNLOGGED
ALTER TABLE store_1 SET UNLOGGED
ALTER TABLE store_returns_1 SET UNLOGGED
ALTER TABLE store_sales_1 SET UNLOGGED
ALTER TABLE time_dim_1 SET UNLOGGED
ALTER TABLE warehouse_1 SET UNLOGGED
ALTER TABLE web_page_1 SET UNLOGGED
ALTER TABLE web_returns_1 SET UNLOGGED
ALTER TABLE web_sales_1 SET UNLOGGED
ALTER TABLE web_site_1 SET UNLOGGED
tempFilePath: /Volumes/kishoremannava/default/tpcds/