Skip to content

Commit

Permalink
feat: latest ops requests
Browse files Browse the repository at this point in the history
* Add --hoard_limit to limit max number of records per user
* add reason to `failure_*.csv`
  • Loading branch information
jrconlin committed Apr 9, 2020
1 parent f3c9751 commit edd0017
Showing 1 changed file with 39 additions and 24 deletions.
63 changes: 39 additions & 24 deletions tools/user_migration/migrate_node.py
Expand Up @@ -45,9 +45,9 @@ def __init__(self, args):
def success(self, uid):
self._success.write("{}\t{}\n".format(self.bso, uid))

def fail(self, uid):
def fail(self, uid, reason=None):
logging.debug("Skipping user {}".format(uid))
self._failure.write("{}\t{}\n".format(self.bso, uid))
self._failure.write("{}\t{}\t{}\n".format(self.bso, uid, reason or ""))

def close(self):
self._success.close()
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(self, fxa_csv_file, args, report):
"invalid client state: {}".format(
uid, client_state
))
report.fail(uid)
report.fail(uid, "bad client state")
continue
fxa_kid = self.format_key_id(
int(keys_changed_at or generation),
Expand All @@ -123,7 +123,7 @@ def __init__(self, fxa_csv_file, args, report):
logging.error(
"User {} Unexpected error".format(uid),
exc_info=ex)
report.fail(uid)
report.fail(uid, "unexpected error")
except Exception as ex:
logging.critical("Error in fxa file around line {}".format(
line), exc_info=ex)
Expand Down Expand Up @@ -419,22 +419,22 @@ def spanner_transact_bso(transaction, data, fxa_kid, fxa_uid, args):
sid,
]]

if not args.dryrun:
logging.debug(
"###bso{} {}".format(
bso_num,
dumper(bso_columns, bso_values)
)
)
transaction.insert(
'bsos',
columns=bso_columns,
values=bso_values
)
else:
logging.debug("not writing {} => {}".format(
bso_columns, bso_values))
count += 1
if not args.dryrun:
logging.debug(
"###bso{} {}".format(
bso_num,
dumper(bso_columns, bso_values)
)
)
transaction.insert(
'bsos',
columns=bso_columns,
values=bso_values
)
else:
logging.debug("not writing {} => {}".format(
bso_columns, bso_values))
return count

cursor = databases['mysql'].cursor()
Expand Down Expand Up @@ -465,6 +465,17 @@ def spanner_transact_bso(transaction, data, fxa_kid, fxa_uid, args):
logging.info("Skipped {} of {} rows for {}".format(
abort_count, col_count, abort_col
))
if args.hoard_limit and args.hoard_limit < len(data):
logging.warn(
"User {} => {}:{} has too many items: {} ".format(
uid, fxa_uid, fxa_kid, len(data)
)
)
report.fail(uid, "hoarder: {}".format(len(data)))
return count
logging.info(
"Moving {} items for user {} => {}:{}".format(
len(data), uid, fxa_uid, fxa_kid))
for bunch in divvy(data, args.chunk or 1000):
# Occasionally, there is a batch fail because a
# user collection is not found before a bso is written.
Expand Down Expand Up @@ -495,9 +506,9 @@ def spanner_transact_bso(transaction, data, fxa_kid, fxa_uid, args):
"User {} already imported fxa_uid:{} / fxa_kid:{}".format(
uid, fxa_uid, fxa_kid
))
report.fail(uid)
report.fail(uid, "exists")
except InvalidArgument as ex:
report.fail(uid)
report.fail(uid, "exists")
if "already inserted" in ex.args[0]:
logging.warn(
"User {} already imported fxa_uid:{} / fxa_kid:{}".format(
Expand All @@ -506,7 +517,7 @@ def spanner_transact_bso(transaction, data, fxa_kid, fxa_uid, args):
else:
raise
except Exception as ex:
report.fail(uid)
report.fail(uid, "unexpected batch error")
logging.error("Unexpected Batch failure: {}:{}".format(
fxa_uid, fxa_kid), exc_info=ex)
finally:
Expand Down Expand Up @@ -535,7 +546,7 @@ def get_users(args, databases, fxa, bso_num, report):
logging.error(
"User {} not found in "
"tokenserver data.".format(user))
report.fail(user)
report.fail(user, "not found")
else:
try:
sql = ("""select distinct userid from bso{}"""
Expand All @@ -550,7 +561,7 @@ def get_users(args, databases, fxa, bso_num, report):
(fxa_kid, fxa_uid) = fxa.get(user)
users.append((user, fxa_kid, fxa_uid))
except TypeError:
report.fail(user)
report.fail(user, "not found")
logging.error(
("User {} not found in "
"tokenserver data".format(user)))
Expand Down Expand Up @@ -695,6 +706,10 @@ def get_args():
'--ms_delay', type=int, default=0,
help="inject a sleep between writes to spanner as a throttle"
)
parser.add_argument(
'--hoard_limit', type=int, default=0,
help="reject any user with more than this count of records"
)
parser.add_argument(
'--success_file', default="success_{}.csv".format(pid),
help="File of successfully migrated userids"
Expand Down

0 comments on commit edd0017

Please sign in to comment.