In [1]:
# Connect to postgresql db using sqlalchemy library and read data from it to pandas dataframe

import pandas as pd
from sqlalchemy import create_engine

postgresql_user = 'postgres'
postgresql_password = '1'
postgresql_host = 'localhost'
postgresql_port = '5433'
postgresql_dbname = 'sales'

#database_uri = 'postgresql+psycopg2://user:password@host:port/dbname'

database_uri = f'postgresql+psycopg2://{postgresql_user}:{postgresql_password}@{postgresql_host}:{postgresql_port}/{postgresql_dbname}'

# Create the SQLAlchemy engine
engine = create_engine(database_uri)

# Define the sql query
query = 'SELECT * FROM migration.sales_data --LIMIT 100;'

# Use Pandas to read the SQL query into a DataFrame (57.8 sec)
sales_data_df = pd.read_sql(query, engine) 

# Print dataframe
sales_data_df


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x000001F5B7BB2600>>
Traceback (most recent call last):
  File "c:\Projects\db_to_db_python\project_venv\Lib\site-packages\ipykernel\ipkernel.py", line 796, in _clean_thread_parent_frames
    active_threads = {thread.ident for thread in threading.enumerate()}
                                                 ^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.12_3.12.2800.0_x64__qbz5n2kfra8p0\Lib\threading.py", line 1535, in enumerate
    def enumerate():
    
KeyboardInterrupt: 


KeyboardInterrupt: 

In [None]:
# Create a new field "Shipping duration" to calculate the difference between ship_date and order_date

## Create a new dataframe that will store the new field
updated_sales_data_df = sales_data_df.copy(deep=True)

## Convert the order_date and ship_date columns to datetime
updated_sales_data_df['order_date'] = pd.to_datetime(updated_sales_data_df['order_date'])
updated_sales_data_df['ship_date'] = pd.to_datetime(updated_sales_data_df['ship_date'])

## Calculate the difference between ship_date and order_date
updated_sales_data_df['shipping_duration'] = (updated_sales_data_df['ship_date'] - updated_sales_data_df['order_date']).dt.days

## Print the updated dataframe (see the last column)
updated_sales_data_df

## Send back updated dataframe to Postgres to the table `updated_sales_data` (9 mins)
#updated_sales_data_df.to_sql('updated_sales_data', con=engine, schema='migration', if_exists='replace', index=False)

In [None]:
# chunk_size=10000 and max_workers=4 - 4 min 30 sec

import concurrent.futures

# Function to write chunks to SQL Server
def write_to_sql(chunk):
    try:
        chunk.to_sql(name='updated_sales_data', con=engine, schema='migration', if_exists='append', index=False)
    except Exception as e:
        print(f"An error occurred: {e}")

# Split DataFrame into chunks (number of rows)
chunk_size = 10000
chunks = [updated_sales_data_df[i:i+chunk_size] for i in range(0, updated_sales_data_df.shape[0], chunk_size)] 

# Use ThreadPoolExecutor to write chunks in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:  # Adjust max_workers as needed
    # Submit all the tasks and keep track of the futures
    futures = [executor.submit(write_to_sql, chunk) for chunk in chunks]

    # Using as_completed to ensure all futures are done and to handle exceptions
    for future in concurrent.futures.as_completed(futures):
        try:
            # result() will re-raise any exception caught during execution.
            future.result()
        except Exception as e:
            # Handle or log the exception if needed
            print(f"An error occurred during database operation: {e}")

# At this point, all futures have completed, and all chunks have been processed
print("All tasks are completed.")

In [None]:
# Connect to snowflake  using snowflake connector and read data from it to pandas dataframe

import snowflake.connector 
from snowflake.connector.pandas_tools import write_pandas

conn = snowflake.connector.connect(
          user='user',
          password='password',
          account='snowflake_account',
          warehouse='WH_SUPERSTORE',
          database='RAW',
          schema='SUPERSTORE')

# Create a cursor object.
cur = conn.cursor()

# Execute a statement that will generate a result set.
sql = "select * from orders limit 100"
cur.execute(sql)

# Fetch the result set from the cursor and deliver it as the pandas DataFrame.
df = cur.fetch_pandas_all()

df

In [None]:
# Migrate data from PostgreSQL database table to Snowflake using Pandas dataframe as intermediary 

import pandas as pd
from sqlalchemy import create_engine
import snowflake.connector 
from snowflake.connector.pandas_tools import write_pandas

postgresql_user     = 'postgres'
postgresql_password = '1'
postgresql_host     = 'localhost'
postgresql_port     = '5433'
postgresql_dbname   = 'sales'

# Create connection string for PostgreSQL
database_uri = f'postgresql+psycopg2://{postgresql_user}:{postgresql_password}@{postgresql_host}:{postgresql_port}/{postgresql_dbname}'

# Create the SQLAlchemy engine
engine = create_engine(database_uri)

# Define the sql query
query = 'SELECT * FROM migration.sales_data -- LIMIT 3000000;'

# Use Pandas to read the SQL query into a DataFrame
sales_data_df = pd.read_sql(query, engine)
print("# of rows in sales_data_df = ", len(sales_data_df))

# Configure connection to Snowflake
snowflake_conn = snowflake.connector.connect(
          user='user',
          password='password',
          account='snowflake_account',
          warehouse='WH_SUPERSTORE',
          database='<your_name>_MIGRATION', #Put your db name here
          schema='RAW')

success, nchunks, nrows, _ = write_pandas(snowflake_conn, sales_data_df, 'SALES_DATA4',  auto_create_table=True)
print(f'success = {success}, nchunks = {nchunks}, nrows = {nrows}')