## Copying data from position_geography for a particular team_instance_id

In [None]:
#src ---> position_geography   dest----> hg_position_geo
import psycopg2
from psycopg2.extras import execute_batch
import pandas as pd
import numpy as np

In [1]:
DB_NAME = "next_gen"
DB_USER = "sde"
DB_PASS = "sde"
DB_HOST = "salesiqgen2.cygagau4oro0.us-west-2.rds.amazonaws.com"
DB_PORT = "5432"

In [2]:
def get_connection():
    # Establish the connection to the database
    try:
        conn = psycopg2.connect(
            dbname=DB_NAME, 
            user=DB_USER, 
            password=DB_PASS, 
            host=DB_HOST, 
            port=DB_PORT
        )
        return conn
    except Exception as e:
        print(f"Unable to connect to the database: {e}")
        return None


In [None]:
# cur = conn.cursor()

In [None]:
def check_team_instance_id(conn, team_instance_id):
    try:
        with conn.cursor() as cur:
            query = "SELECT EXISTS (SELECT 1 FROM position_geography WHERE team_instance_id = %s)"
            cur.execute(query, (team_instance_id,))
            return cur.fetchone()[0]
    except Exception as e:
        print(f"Error checking team_instance_id: {e}")
        return False

In [None]:
def fetch_rows(conn, team_instance_id):
    try:
        with conn.cursor() as cur:
            fetch_query = "SELECT * FROM position_geography WHERE team_instance_id = %s"
            cur.execute(fetch_query, (team_instance_id,))
            return cur.fetchall()
    except Exception as e:
        print(f"Error fetching rows: {e}")
        return []

In [None]:
conn = get_connection()

In [None]:
#a1h8W00000Ok8KXQAZ
team_instance_id = input("Enter team_instance_id: ")

In [None]:
def copy_rows(conn, rows):
    try:
        # Convert fetched rows to DataFrame
        columns=["id","zip","name","zip_type","state","center_x","center_y","x_min","x_max","y_min","y_max"
                ,"related_zip","shape","doughnut_zips","neighbour_zips","level1_code","level1_name","level2_code"
                ,"level2_name","level3_code","level3_name","level4_code","level4_name","level5_code","level5_name"
                ,"level6_code","level6_name","level7_code","level7_name","level8_code","level8_name","level9_code"
                ,"level9_name","level10_code","level10_name","team_name","team_instance_code","team_type"
                ,"boundary_refresh_flag","level1_rgb","level2_rgb","level3_rgb","level4_rgb","level5_rgb","st_areas"
                ,"st_lengths","proposed_position_code","previous_position_code","status","metric_data","team_instance_id"
                ,"salesforce_org_id","salesforce_record_id","origin","event_id","county_name","account_metric_data"
                ,"account_count","geography_zip_type","nearest_neighbor","is_simulation","original_metric_data"]
       
        df = pd.DataFrame(rows, columns=columns)
       # print(df.dtypes)
        
        # Convert DataFrame to list of tuples with native Python types
        def convert_value(value):
            if isinstance(value, (np.int64, np.float64, np.bool_)):
                return value.item()
            return value
        
        records_list = [tuple(map(convert_value, row)) for row in df.to_records(index=False)]
        
        insert_query = f"""
            INSERT INTO hg_position_geo ({", ".join(columns)})
            VALUES ({", ".join(["%s"] * len(columns))})
        """
        
        # Bulk insert using execute_batch
        with conn.cursor() as cur:
            execute_batch(cur, insert_query, records_list)
        conn.commit()
        print(f"Rows copied to hg_position_geo successfully.")
        
    except Exception as e:
        conn.rollback()
        print(f"Error copying rows: {e}")

In [None]:
if check_team_instance_id(conn, team_instance_id):
    rows = fetch_rows(conn, team_instance_id)
    if rows:
        copy_rows(conn, rows)
    else:
        print(f"No rows found for team_instance_id {team_instance_id}.")
else:
    print(f"team_instance_id {team_instance_id} does not exist in position_geography.")
conn.close()

## Populating data from {hg_position_geo} to {hg_positions}

In [3]:
import psycopg2
from psycopg2 import pool
import json
import pandas as pd
import numpy as np
import time
from psycopg2.extras import execute_batch
from concurrent.futures import ThreadPoolExecutor

In [4]:
conn = get_connection()

In [10]:
src="hg_position_geo"
dest="hg_positions"

In [15]:
import time
import psycopg2
from psycopg2 import pool
from psycopg2.extras import execute_batch
import pandas as pd
import numpy as np
import json
from concurrent.futures import ThreadPoolExecutor

start_time = time.time()

DB_NAME = "next_gen"
DB_USER = "sde"
DB_PASS = "sde"
DB_HOST = "salesiqgen2.cygagau4oro0.us-west-2.rds.amazonaws.com"
DB_PORT = "5432"

# Create a connection pool
try:
    connection_pool = psycopg2.pool.ThreadedConnectionPool(1, 10, database=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST, port=DB_PORT)
    if connection_pool:
        print("Connection pool created successfully")
except Exception as e:
    print(f"Connection pool creation failed: {e}")

# Function to find max level
def find_max_level(cur):
    max_level = 0
    level = 1
    while True:
        code_column = f"level{level}_code"
        cur.execute(f"SELECT COUNT(*) FROM {src} WHERE {code_column} IS NOT NULL")
        count = cur.fetchone()[0]
        if count == 0:
            break
        max_level = level
        level += 1
    return max_level

# Main processing
try:
    # Get a connection from the pool
    conn = connection_pool.getconn()
    cur = conn.cursor()

    # Find the maximum level
    max_level = find_max_level(cur)
    
    # Get the metric keys
    temp_query = f"""SELECT DISTINCT jsonb_object_keys(CAST(metric_data AS jsonb)) AS metric_key FROM {src}"""
    cur.execute(temp_query)
    metric_keys = [row[0] for row in cur.fetchall()]

    # Prepare metric data items for the SQL query
    metric_data_items = ', '.join([f"'{metric_key}', SUM(COALESCE((metric_data::jsonb->>'{metric_key}')::float, 0))" for metric_key in metric_keys])

    # Function to execute a single query
    def execute_query(query):
        conn = connection_pool.getconn()
        try:
            with conn.cursor() as cur:
                cur.execute(query)
                return cur.fetchall()
        finally:
            connection_pool.putconn(conn)

    queries = []
    for level in range(1, max_level + 1):
        code_column = f"level{level}_code"
        code_name = f"level{level}_name"
        queries.append(f"""
            SELECT 
                {code_name} AS name, 
                jsonb_build_object(
                    {metric_data_items}
                ) AS metric_data,
                ST_Union(shape) AS shape,
                {level} AS hierarchy_level,
                (SELECT level{level+1}_code FROM {src} AS sub_src WHERE sub_src.{code_column} = {src}.{code_column}
                limit 1) AS parent_position_code,
                {code_column} AS client_position_code,
                {code_column} AS client_territory_code,
                MAX(salesforce_org_id) AS salesforce_org_id,
                MAX(salesforce_record_id) AS salesforce_record_id
            FROM {src}
            GROUP BY {code_column}, {code_name}
        """)

    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(execute_query, queries))

    # Flatten the results
    results = [item for sublist in results for item in sublist]

    # Define the columns for the DataFrame
    columns = ["name", "metric_data", "shape","hierarchy_level","parent_position_code","client_position_code","client_territory_code","salesforce_org_id", "salesforce_record_id"]

    # Create a DataFrame from the results
    df = pd.DataFrame(results, columns=columns)

    # Convert metric_data to JSON string
    df["metric_data"] = df["metric_data"].apply(lambda x: json.dumps(x))

    # Convert DataFrame to list of tuples with native Python types
    def convert_value(value):
        if isinstance(value, (np.int64, np.float64, np.bool_)):
            return value.item()
        return value

    records_list = [tuple(map(convert_value, row)) for row in df.to_records(index=False)]

    # Define the insert query
    insert_query = f"""
        INSERT INTO {dest} ({", ".join(df.columns)})
        VALUES ({", ".join(["%s"] * len(df.columns))})
    """

    # Execute the bulk insert
    execute_batch(cur, insert_query, records_list)

    # Commit the transaction
    conn.commit()

except Exception as e:
    print(f"An error occurred: {e}")

finally:
    # Close the cursor and return the connection to the pool
    cur.close()
    connection_pool.putconn(conn)

    # Close all connections in the pool
    connection_pool.closeall()

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Script completed in {elapsed_time:.2f} seconds")


Connection pool created successfully
Script completed in 59.66 seconds
