Skip to content

Commit

Permalink
feat: add a --wipe_user mode
Browse files Browse the repository at this point in the history
deletes pre-existing user data on spanner before migrating. only usable
in --user mode.

and fix parsing of the new gen_fxa_users.py output

Closes #596
  • Loading branch information
pjenvey committed Apr 20, 2020
1 parent 0f6ebc9 commit 16058f2
Showing 1 changed file with 57 additions and 7 deletions.
64 changes: 57 additions & 7 deletions tools/user_migration/migrate_node.py
Expand Up @@ -21,6 +21,7 @@
import grpc
from mysql import connector
from google.cloud import spanner
from google.cloud.spanner_v1 import param_types
from google.api_core.exceptions import AlreadyExists, InvalidArgument
try:
from urllib.parse import urlparse
Expand Down Expand Up @@ -94,14 +95,14 @@ def __init__(self, users_file, args, report):
line = 0
for (uid, fxa_uid, fxa_kid) in csv.reader(
csv_file, delimiter="\t"):
if args.user:
if int(uid) not in args.user:
continue
line += 1
tick(line)
if uid == 'uid':
# skip the header row.
continue
if args.user:
if int(uid) not in args.user:
continue
try:
self.users[int(uid)] = (fxa_kid, fxa_uid)
except Exception as ex:
Expand Down Expand Up @@ -338,6 +339,42 @@ def move_user(databases, user_data, collections, fxa, bso_num, args, report):
bso.collection, bso.id""".format(bso_num)
unique_key_filter = set()

def spanner_transact_wipe_user(
transaction, fxa_uid, fxa_kid, args):
result = transaction.execute_sql(
"""
SELECT
uc.collection_id, c.name
FROM
user_collections as uc
LEFT JOIN
collections as c
ON
uc.collection_id = c.collection_id
WHERE
uc.fxa_uid = @fxa_uid
AND uc.fxa_kid = @fxa_kid
""",
params=dict(fxa_uid=fxa_uid, fxa_kid=fxa_kid),
param_types=dict(fxa_uid=param_types.STRING, fxa_kid=param_types.STRING),
)
cols = [(row[0], row[1]) for row in result]
if not args.dryrun:
logging.debug("Wiping user, collections: {}".format(cols))
transaction.execute_update(
"""
DELETE FROM
user_collections
WHERE
fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
""",
params=dict(fxa_uid=fxa_uid, fxa_kid=fxa_kid),
param_types=dict(fxa_uid=param_types.STRING, fxa_kid=param_types.STRING),
)
else:
logging.debug("Not wiping user, collections: {}".format(cols))

def spanner_transact_uc(
transaction, data, fxa_uid, fxa_kid, args):
# user collections require a unique key.
Expand Down Expand Up @@ -457,6 +494,15 @@ def spanner_transact_bso(transaction, data, fxa_uid, fxa_kid, args):
logging.info(
"Moving {} items for user {} => {}:{}".format(
len(data), uid, fxa_uid, fxa_kid))

if args.wipe_user:
databases['spanner'].run_in_transaction(
spanner_transact_wipe_user,
fxa_uid,
fxa_kid,
args,
)

for bunch in divvy(data, args.chunk or 1000):
# Occasionally, there is a batch fail because a
# user collection is not found before a bso is written.
Expand Down Expand Up @@ -542,8 +588,6 @@ def get_users(args, databases, fxa, bso_num, report):
try:
(fxa_kid, fxa_uid) = fxa.get(uid)
users.append((uid, fxa_uid, fxa_kid))
if args.sort_users:
users.sort(key=lambda tup: tup[1])
except TypeError:
logging.error(
"User {} not found in "
Expand Down Expand Up @@ -650,6 +694,11 @@ def get_args():
type=str,
help="BSO#:userId[,userid,...] to move."
)
parser.add_argument(
'--wipe_user',
action="store_true",
help="delete any pre-existing --user data on spanner before the migration"
)
parser.add_argument(
'--bso_user_file',
default="bso_users_#_{}.lst".format(today),
Expand Down Expand Up @@ -713,22 +762,23 @@ def main():

if args.user:
args.user_percent = "1:100"
if args.user:
(bso, userid) = args.user.split(':')
args.start_bso = int(bso)
args.end_bso = int(bso)
user_list = []
for id in userid.split(','):
user_list.append(int(id))
args.user = user_list
elif args.wipe_user:
raise RuntimeError("--wipe_user requires --user")
for line in dsns:
dsn = urlparse(line.strip())
scheme = dsn.scheme
if 'mysql' in dsn.scheme:
scheme = 'mysql'
databases[scheme] = conf_db(dsn)
if not databases.get('mysql') or not databases.get('spanner'):
RuntimeError("Both mysql and spanner dsns must be specified")
raise RuntimeError("Both mysql and spanner dsns must be specified")
fxa_info = FXA_info(args.fxa_users_file, args, report)
collections = Collections(databases)
logging.info("Starting:")
Expand Down

0 comments on commit 16058f2

Please sign in to comment.