In [1]:
import pandas as pd
### SQL Alchemy Dependencies ###
import sqlalchemy
from sqlalchemy.orm import sessionmaker, mapper
from sqlalchemy import create_engine, exc, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.inspection import inspect
from sqlalchemy.orm.exc import UnmappedClassError

In [2]:
# Make sure the db is running in background
db_backend = 'mysql+pymysql://' #MySQL DB
'''
    Postgres(Psycopg2) = postgresql+psycopg2://
    MsSql(Microsoft SQL Server) = mssql+pymssql://
    For more db backend : https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls
'''
db_server = 'localhost'
db_name = 'test_db'
db_username = 'secret_user' # Of course, the secret user
db_password = 'public_password' # Why not public?
db_params = f"{db_backend}{db_username}:{db_password}@{db_server}/{db_name}"
engine = sqlalchemy.create_engine(db_params)
Session = sessionmaker(bind=engine)
session = Session()
# session

In [3]:
# path = 'datas/insert_df_50k.csv'
path = 'datas/upsert_df_100k.csv'
csv_df = pd.read_csv(path)
non_primary_id = 'non_primary_id'
# non_primary_id list in file -> f_npk_list
f_npk_list = csv_df[non_primary_id].to_list()

'''
  Case here:
    1. There exist 2 different ids, may be varchar of one column(typecast according to your need).
    2. We will be selecting with tuple of non pk id to extract the actual pk of table, and non pk id.
    3. We will be playing with the DF, now.
    4. We can segregate the rows of data of DF, one to insert, another for update.
      i) Select id, non pk with tuple(non pk list).
      ii) Convert the series data of non pk to list.
      iii) Convert the series data of actual pk to list.
      iv) Mask the DF of csv data with non pk list.
        target_values = [int(_) for _ in db_pk_list]
        column_name = "non_pk"
        mask = csv_df[column_name].isin(target_values)
      v) Now, segregate with the help of mask,
        # Insert--Filtered DF ==> that doesn't exist in the database
        insert_df = csv_df[~mask]
        # Update--Dropped DF ==> that does exist in the database
        update_df = csv_df[mask]
    5. Never forget to make it more efficient.
        need_update_df = False
        need_insert_df = False
        if len(insert_df.index) > 0:
          need_insert_df = True
        if len(update_df.index) > 0:
          need_update_df = True
'''
%time

Wall time: 0 ns


In [4]:
csv_npk_list = tuple(f_npk_list)
table_name = 'test_table'
existing_data_query = text(f"SELECT {non_primary_id}, id FROM {table_name} WHERE {non_primary_id} IN {csv_npk_list}")
existing_rows = pd.read_sql(sql=existing_data_query, con=engine)
db_npk_list = existing_rows[non_primary_id].to_list()
db_id_list = existing_rows['id'].to_list()


In [5]:
# target_values = db_non_pk_list
target_values = [int(_) for _ in db_npk_list]
# Filter and remove rows with target values in the 'npk' column
column_name = non_primary_id
mask = csv_df[column_name].isin(target_values)
# Insert--Filtered DF ==> that doesn't exist in the database
insert_df = csv_df[~mask]
# Update--Dropped DF ==> that does exist in the database
update_df = csv_df[mask]
need_insert_df = False
need_update_df = False
if len(insert_df) > 0:
    need_insert_df = True
if len(update_df) > 0:
    need_update_df = True

In [6]:
# Processing the df data for the update method
update_list_of_dicts = {}
if need_update_df:
    non_primary_id_list = update_df[non_primary_id].to_list()
    update_non_primary_id_tuple = tuple(non_primary_id_list)
    existing_boid_query = text(f"SELECT id FROM {table_name} WHERE {non_primary_id} IN {update_non_primary_id_tuple}")
    existing_rows_id = pd.read_sql(sql=existing_data_query, con=engine)
    update_df['id'] = existing_rows_id['id'].to_list()
    # moving the pk to 0th index as the docs recommend
    # to place id as first value to map the existing data
    desired_index = 0
    column_to_move = 'id'
    column = update_df.pop(column_to_move)
    update_df.insert(desired_index, column_to_move, column)
    update_list_of_dicts = update_df.to_dict(orient='records')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  update_df['id'] = existing_rows_id['id'].to_list()



## Mapper Function For Alchemy

> Autoload the db  with engine and create a mapper to communicate with existing db

> If you are working with the Temporary table, or trying to create programattically, follow the guide here: https://docs.sqlalchemy.org/en/20/core/metadata.html#specifying-the-schema-name


In [7]:
Base = automap_base()
Base.prepare(autoload_with=engine)
# Make sure to update the table name
# nice_table_name >> you_are_nice
Nice_Table_Name = Base.classes.test_table
# You_Are_Nice = Base.classes.you_are_nice

# Let Me Insert Now

In [8]:
import time
start_time = time.time()
# print('start_time',start_time)
if need_insert_df:
    session = Session()
    from sqlalchemy import insert
    insert_list_of_dicts = insert_df.to_dict(orient='records')
    session.bulk_insert_mappings(
        mapper=Nice_Table_Name,
        mappings=insert_list_of_dicts
    )
    session.commit()
    session.close()
end_time = time.time()
execution_time = end_time - start_time
# print('end_time', end_time)
print(f"Execution time: {execution_time}")
# insert_df
print(len(insert_list_of_dicts))
# %time

Execution time: 2.5638320446014404
50000


# Let's be updated

In [9]:
# import time
start_time = time.time()
# print('start_time',start_time)
if need_update_df:
    session = Session()
    from sqlalchemy import update
    session.bulk_update_mappings(
        mapper=Nice_Table_Name,
        mappings=update_list_of_dicts
    )
    session.commit()
    session.close()
print(len(update_list_of_dicts))
%time
end_time = time.time()
execution_time = end_time - start_time
# print('end_time', end_time)
print(f"Execution time: {execution_time}")

50000
Wall time: 0 ns
Execution time: 15.232576131820679
