Skip to content

Commit

Permalink
User migration5 (#601)
Browse files Browse the repository at this point in the history
* bug: Fix typos in tick, string replacements
* f multi-tread gen_bso_users
* added `--start_bso`, `--end_bso` to `gen_bso_users.py`
* added `bso_num` arg (same as `--start_bso=# --end_bso=#`) to `migrate_node.py`
* `gen_bso_users.py` takes same `bso_users_file` template as `migrate_node.py`
* f remove default value for BSO_Users.run bso_num
* f fix lock issue in gen_bso_users, trap for `` states in gen_fxa_users
* f make threading optional.
 There's a locking issue that appears to be inside of the mysql.
 Turning threading off for now (can be run in parallel)
* f fix tick, threading flag
* f rename confusing args in gen_bso and gen_fxa
 gen_bso_users:
  `--bso_users_file` => `--output_file`
 gen_fxa_users:
  `--fxa_file` => `--users_file`
  `--fxa_users_file` => `--output_file`
* f more tick fixes
* f don't use threading on Report if threading isn't available.
* f make `--bso_users_file` / `--fxa_users_file` consistent
* `--bso_user_file` is now `--bso_users_file`

Issue #407
  • Loading branch information
jrconlin committed Apr 29, 2020
1 parent 0f25622 commit 8aaa449
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 37 deletions.
96 changes: 74 additions & 22 deletions tools/user_migration/gen_bso_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import argparse
import logging
import threading
import csv
import sys
import os
Expand All @@ -17,10 +18,10 @@

def tick(count):
mark = None
if not count % 100:
mark = "."
if not count % 1000:
if count % 1000 == 0:
mark = "|"
elif count % 100 == 0:
mark = "."
level = logging.getLogger().getEffectiveLevel()
if mark and level > logging.DEBUG:
print(mark, end='', flush=True)
Expand All @@ -32,16 +33,21 @@ class Report:
_failure = None
_success = None

def __init__(self, args):
def __init__(self, args, lock=None):
self._success_file = args.success_file
self._failure_file = args.failure_file
self._lock = lock

def success(self, uid):
if self._lock:
lock = self._lock.acquire()
if not self._success:
self._success = open(self._success_file, "w")
self._success.write("{}\t{}\n".format(self.bso, uid))

def fail(self, uid, reason=None):
if self._lock:
lock = self._lock.acquire()
if not self._failure:
self._failure = open(self._failure_file, "w")
logging.debug("Skipping user {}".format(uid))
Expand All @@ -64,8 +70,8 @@ class BSO_Users:

def __init__(self, args, report, dsn):
self.args = args
self.dsn = dsn
self.report = report
self.conf_mysql(dsn)
self.get_users(args)

def get_users(self, args):
Expand All @@ -78,29 +84,33 @@ def get_users(self, args):
):
if uid == "uid":
continue
self.tick(line)
tick(line)
logging.debug("Read: {} {}:{}".format(
uid, fxa_uid, fxa_kid))
self.users[int(uid)] = (fxa_uid, fxa_kid)
line += 1
print("")
except Exception as ex:
logging.error(
"Unexpected error",
exc_info=ex
)
self.report.fail(uid, "Unexpected error {}".format(ex))

def run(self):
def run(self, bso_num):
connection = self.conf_mysql(self.dsn)
out_users = []
logging.info("Fetching users from BSO dbinto {}".format(
self.args.bso_users_file,
bso_file = self.args.output_file
bso_file = bso_file.replace("#", str(bso_num))
logging.info("Fetching users from BSO db into {}".format(
bso_file,
))
output_file = open(self.args.bso_users_file, "w")
output_file = open(bso_file, "w")
try:
cursor = self.connection.cursor()
cursor = connection.cursor()
sql = ("""select userid, count(*) as count from bso{}"""
""" group by userid order by userid""".format(
self.args.bso_num))
bso_num))
if self.args.user_range:
(offset, limit) = self.args.user_range.split(':')
sql = "{} limit {} offset {}".format(
Expand All @@ -125,6 +135,7 @@ def run(self):
("User {} not found in "
"tokenserver data".format(uid)))
if self.args.sort_users:
logging.info("Sorting users...")
out_users.sort(key=lambda tup: tup[1])
# Take a block of percentage of the users.
logging.info("Writing out {} users".format(len(out_users)))
Expand All @@ -135,21 +146,30 @@ def run(self):
uid, fxa_uid, fxa_kid
))
tick(line)
finally:
line += 1
output_file.flush()
print("")
except connector.errors.ProgrammingError as ex:
logging.error(ex)
output_file.close()
os.unlink(bso_file)
except Exception as e:
logging.error("### Exception {}:{}", exc_info=e)
output_file.close()
os.unlink(bso_file)
finally:
cursor.close()

def conf_mysql(self, dsn):
"""create a connection to the original storage system """
logging.debug("Configuring MYSQL: {}".format(dsn))
self.connection = connector.connect(
return connector.connect(
user=dsn.username,
password=dsn.password,
host=dsn.hostname,
port=dsn.port or 3306,
database=dsn.path[1:]
)
return self.connection


def get_args():
Expand All @@ -159,15 +179,26 @@ def get_args():
parser.add_argument(
'--dsns', default="move_dsns.lst",
help="file of new line separated DSNs")
parser.add_argument(
'--start_bso',
default=0,
help="Start of BSO range (default 0)"
)
parser.add_argument(
'--end_bso',
default=19,
help="End of BSO range inclusive (default 19)"
)
parser.add_argument(
'--bso_num',
default="0",
help="BSO database number"
type=int,
default=0,
help="Only read from this bso (default num)"
)
parser.add_argument(
'--bso_users_file',
default="bso_users_{}_{}.lst".format(
0, datetime.now().strftime("%Y_%m_%d")),
'--output_file',
default="bso_users_#_{}.lst".format(
datetime.now().strftime("%Y_%m_%d")),
help="List of BSO users."
)
parser.add_argument(
Expand Down Expand Up @@ -205,10 +236,16 @@ def get_args():
default="fxa_users_{}.lst".format(datetime.now().strftime("%Y_%m_%d")),
help="List of pre-generated FxA users."
)
parser.add_argument(
'--threading',
action="store_true",
help="use threading"
)
return parser.parse_args()


def main():
threads = []
args = get_args()
log_level = logging.INFO
if args.quiet:
Expand All @@ -219,7 +256,12 @@ def main():
stream=sys.stdout,
level=log_level,
)
report = Report(args)
if args.bso_num is not None:
args.start_bso = args.end_bso = args.bso_num
locker = None
if args.threading:
locker = threading.Lock()
report = Report(args, locker)
dsns = open(args.dsns).readlines()
db_dsn = None
for line in dsns:
Expand All @@ -231,7 +273,17 @@ def main():
RuntimeError("mysql dsn must be specified")

bso = BSO_Users(args, report, db_dsn)
bso.run()
# threading is currently in process.
if args.threading:
for bso_num in range(int(args.start_bso), int(args.end_bso) + 1):
t = threading.Thread(target=bso.run, args=(bso_num,))
threads.append(t)
t.start()
else:
bso.run(args.bso_num)

for thread in threads:
thread.join()


if __name__ == "__main__":
Expand Down
38 changes: 26 additions & 12 deletions tools/user_migration/gen_fxa_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@

def tick(count):
mark = None
if not count % 100:
mark = "."
if not count % 1000:
if count % 1000 == 0:
mark = "|"
level = logging.getLogger().getEffectiveLevel
elif count % 100 == 0:
mark = "."
level = logging.getLogger().getEffectiveLevel()
if mark and level > logging.DEBUG:
print(mark, end='', flush=True)

Expand Down Expand Up @@ -60,16 +60,17 @@ class FxA_Generate:

def __init__(self, args, report):
logging.info("Processing token file: {} into {}".format(
args.fxa_file,
args.fxa_users_file,
args.users_file,
args.output_file,
))
output_file = open(args.fxa_users_file, "w")
output_file = open(args.output_file, "w")
output_file.write("uid\tfxa_uid\tfxa_kid\n")
if not os.path.isfile(args.fxa_file):
raise IOError("{} not found".format(args.fxa_file))
with open(args.fxa_file) as csv_file:
if not os.path.isfile(args.users_file):
raise IOError("{} not found".format(args.users_file))
with open(args.users_file) as csv_file:
try:
line = 0
success = 0
for (uid, email, generation,
keys_changed_at, client_state) in csv.reader(
csv_file, delimiter="\t"):
Expand All @@ -95,6 +96,16 @@ def __init__(self, args, report):
"user {} has no k_c_a or "
"generation value".format(
uid))
# trap for actually blank values
if client_state is None or client_state == '':
logging.error(
"User {} "
"has an invalid, empty client state".format(
uid
)
)
report.fail(uid, "invalid client state")
continue
try:
client_state = binascii.unhexlify(client_state)
except binascii.Error:
Expand All @@ -115,6 +126,7 @@ def __init__(self, args, report):
output_file.write(
"{}\t{}\t{}\n".format(
uid, fxa_uid, fxa_kid))
success += 1
except Exception as ex:
logging.error(
"User {} Unexpected error".format(uid),
Expand All @@ -123,6 +135,8 @@ def __init__(self, args, report):
except Exception as ex:
logging.critical("Error in fxa file around line {}".format(
line), exc_info=ex)
print("")
logging.info("Processed {} users, {} successful".format(line, success))

# The following two functions are taken from browserid.utils
def encode_bytes_b64(self, value):
Expand All @@ -140,12 +154,12 @@ def get_args():
parser = argparse.ArgumentParser(
description="Generate FxA user id info")
parser.add_argument(
'--fxa_file',
'--users_file',
default="users.csv",
help="FXA User info in CSV format (default users.csv)"
)
parser.add_argument(
'--fxa_users_file',
'--output_file',
default="fxa_users_{}.lst".format(datetime.now().strftime("%Y_%m_%d")),
help="List of FxA users."
)
Expand Down
13 changes: 10 additions & 3 deletions tools/user_migration/migrate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,8 @@ def get_users(args, databases, fxa, bso_num, report):
report.fail(uid, "not found")
else:
try:
bso_user_file = args.bso_user_file.replace('#', bso_num)
with open(bso_user_file) as bso_file:
bso_users_file = args.bso_users_file.replace('#', str(bso_num))
with open(bso_users_file) as bso_file:
line = 0
for row in csv.reader(
bso_file, delimiter="\t"
Expand Down Expand Up @@ -683,6 +683,11 @@ def get_args():
type=int, default=19,
help="last BSO database to dump (default: 19)"
)
parser.add_argument(
'--bso_num',
type=int,
help="only move this bso (equivalent to start_bso == end_bso)"
)
parser.add_argument(
'--write_chunk',
dest="chunk",
Expand All @@ -700,7 +705,7 @@ def get_args():
help="delete any pre-existing --user data on spanner before the migration"
)
parser.add_argument(
'--bso_user_file',
'--bso_users_file',
default="bso_users_#_{}.lst".format(today),
help="name of the generated BSO user file. "
"(Will use bso number for `#` if present; "
Expand Down Expand Up @@ -771,6 +776,8 @@ def main():
args.user = user_list
elif args.wipe_user:
raise RuntimeError("--wipe_user requires --user")
if args.bso_num:
args.start_bso = args.end_bso = args.bso_num
for line in dsns:
dsn = urlparse(line.strip())
scheme = dsn.scheme
Expand Down

0 comments on commit 8aaa449

Please sign in to comment.