Bulk upsert in Mysql dialect using on_duplicate_key_update #9328
-
The doc explains in detail how to run an upsert on a single row in the MySQL dialect of Sqlalchemy 2.0: Is it possible to perform bulk upsert? People have been asking on SO for some years: Thanks in advance for your expert guidance! |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 6 replies
-
the main "upsert" documentation is at https://docs.sqlalchemy.org/en/20/orm/queryguide/dml.html#orm-queryguide-upsert , which goes into detail over passing lists of values to the |
Beta Was this translation helpful? Give feedback.
-
Thanks @zzzeek for the pointer to the doc, I should have included that link in the OP. That doc gives an example of bulk upsert in SQLite, and gives pointers to the MySQL dialect. Sorry to be repetitive, as I mentioned in the OP, the MySQL dialect doc for upsert only shows how to handle a single row. I've studied the doc and SO posts for clues on bulk upsert using the MySQL dialect. I have not yet figured it out, and could not find any answer on SO that explains it either. I hope I'm not missing something totally obvious. Please give more hints, tips or suggestions here. |
Beta Was this translation helpful? Give feedback.
-
Thank you very much for the code! I extended it slightly to connect a local MySQL server running version 8.0.27. Please note I'm using SQLAlchemy version 2.0.4. Unfortunately I do not see the desired upsert behavior. First time I run the script it emits this line:
And when I query the database, I see 4 new rows have appeared:
Then I ran the script a second time, checked the table and was surprised to see 8 rows:
Maybe I'm failing to grasp something important here? Here's the slightly extended code:
|
Beta Was this translation helpful? Give feedback.
-
Thanks for educating me here, it works! I also revised my example code to add setup and demonstrate upsert. Please see below. import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "foo"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()
# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)
# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')
# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "x"},
{"id": 2, "name": "y"},
{"id": 3, "name": "w"},
{"id": 4, "name": "z"},
]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')
# demonstrate upsert
ups_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "xx"},
{"id": 2, "name": "yy"},
{"id": 3, "name": "ww"},
{"id": 5, "name": "new"},
]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}') |
Beta Was this translation helpful? Give feedback.
Thanks for educating me here, it works! I also revised my example code to add setup and demonstrate upsert. Please see below.