diff --git a/README.md b/README.md index 47405f92..79c32165 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,9 @@ woudc-data-registry instrument update --help #### Data Processing ```bash +# Gather the files from the ftp account +woudc-data-registry data gather /path/to/dir + # ingest directory of files (walks directory recursively) woudc-data-registry data ingest /path/to/dir diff --git a/default.env b/default.env index 5c5824f7..f8e4c6ea 100644 --- a/default.env +++ b/default.env @@ -56,3 +56,10 @@ export WDR_EMAIL_TO=email export WDR_EMAIL_CC=email export WDR_EMAIL_BCC=email export WDR_TEMPLATE_PATH=/path/to/etc/woudc-contributor-feedback.txt + +# FTP configuration +export WDR_FTP_HOST=host +export WDR_FTP_USER=user +export WDR_FTP_PASS= +export WDR_FTP_BASEDIR_INCOMING=path +export WDR_FTP_KEEP_FILES=True \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5cafcf27..9e5f40a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,9 @@ click elasticsearch +ftputil jsonschema pyyaml +rarfile requests sqlalchemy wheel diff --git a/woudc_data_registry/config.py b/woudc_data_registry/config.py index 4167404f..a18fbb3c 100644 --- a/woudc_data_registry/config.py +++ b/woudc_data_registry/config.py @@ -84,6 +84,14 @@ WDR_EMAIL_BCC = os.getenv('WDR_EMAIL_BCC') WDR_TEMPLATE_PATH = os.getenv('WDR_TEMPLATE_PATH') WDR_FILE_TRASH = os.getenv('WDR_FILE_TRASH') +WDR_FTP_HOST = os.getenv('WDR_FTP_HOST') +WDR_FTP_USER = os.getenv('WDR_FTP_USER') +WDR_FTP_PASS = os.getenv('WDR_FTP_PASS') +WDR_FTP_BASEDIR_INCOMING = os.getenv('WDR_FTP_BASEDIR_INCOMING') +WDR_FTP_SKIP_DIRS_INCOMING = os.getenv('WDR_FTP_SKIP_DIRS_INCOMING') +WDR_FTP_KEEP_FILES = os.getenv( + 'WDR_FTP_KEEP_FILES', + 'True').strip().lower() in ('true', '1', 'yes') if not WDR_SEARCH_INDEX_BASENAME: msg = 'WDR_SEARCH_INDEX_BASENAME was not set. \ diff --git a/woudc_data_registry/controller.py b/woudc_data_registry/controller.py index 6ae2e509..2f23692a 100644 --- a/woudc_data_registry/controller.py +++ b/woudc_data_registry/controller.py @@ -53,7 +53,8 @@ from woudc_data_registry import config from woudc_data_registry.util import (is_text_file, read_file, - send_email, delete_file_from_record) + send_email, delete_file_from_record, + gathering) from woudc_data_registry.processing import Process @@ -341,8 +342,40 @@ def delete_record(ctx, file_path): LOGGER.info("Done deleting record") +@click.command() +@click.argument('folder_path') +@click.pass_context +def gather(ctx, folder_path): + """Gather all the files in a directory tree""" + while os.path.exists(folder_path): + click.echo(f"Folder '{folder_path}' already exists.") + folder_path = click.prompt( + "Please provide a folder path that does not exist yet", type=str + ) + # Folder doesn't exist, create it + os.makedirs(folder_path) + + try: + click.echo(f"Folder '{folder_path}' has been created successfully.") + skip_incoming_folders = ( + 'woudcadmin,level-0,org1,org2,provisional,calibration,' + 'px-testing,px-testing2' + ) + FILES_GATHERED = gathering(config.WDR_FTP_HOST, config.WDR_FTP_USER, + config.WDR_FTP_PASS, + config.WDR_FTP_BASEDIR_INCOMING, + skip_incoming_folders, + folder_path, config.WDR_FTP_KEEP_FILES) + click.echo(f"Gathered {FILES_GATHERED} files from the FTP server.") + except Exception as err: + LOGGER.error('Unable to gather: %s', err) + + LOGGER.info("Done Gathering files") + + data.add_command(ingest) data.add_command(verify) data.add_command(generate_emails, name='generate-emails') data.add_command(send_feedback, name='send-feedback') data.add_command(delete_record, name='delete-record') +data.add_command(gather) diff --git a/woudc_data_registry/util.py b/woudc_data_registry/util.py index 1c27ef28..4443a044 100644 --- a/woudc_data_registry/util.py +++ b/woudc_data_registry/util.py @@ -49,6 +49,13 @@ import os import shutil import smtplib + +import tarfile +import rarfile +import zipfile + +import ftputil + from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from woudc_data_registry.registry import Registry @@ -59,6 +66,180 @@ RFC3339_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ' +def gathering(host, username, password, basedir_incoming, skip_dirs, + px_basedir, keep_files): + """gather candidate files from incoming FTP""" + + gathered = [] + gathered_files_to_remove = [] + + with ftputil.FTPHost(host, username, password) as ftp: + LOGGER.info('Connected to %s', host) + LOGGER.info('Skip directories: %s', skip_dirs) + for root, dirs, files in ftp.walk(basedir_incoming, topdown=True): + LOGGER.debug('Removing skip directories: %s', skip_dirs) + dirs[:] = [d for d in dirs if d not in skip_dirs] + for name in files: + ipath = '%s/%s' % (root, name) + mtime = datetime.fromtimestamp(ftp.stat(ipath)[8]) + fpath = '%s%s%s' % (px_basedir, os.sep, ipath) + ipath_ftp_abs = '%s%s' % (host, ipath) + + LOGGER.info('Adding file %s to processing queue', + ipath_ftp_abs) + gathered.append({ + 'contributor': ipath.lstrip('/').split('/')[0], + 'ipath': ipath, + 'ipath_ftp_abs': ipath_ftp_abs, + 'mtime': mtime, + 'fpath': os.path.normpath(fpath), + 'decomp': False + }) + + LOGGER.info('%d files in processing queue', len(gathered)) + for gathered_file in gathered: + LOGGER.info('Inspecting gathered file %s', gathered_file) + if all([gathered_file['ipath'] is not None, + not gathered_file['decomp']]): + LOGGER.info('File passed in') + if ftp.path.isfile(gathered_file['ipath']): + try: + os.makedirs(os.path.dirname(gathered_file['fpath'])) + except OSError as err: + LOGGER.warning('Local directory %s not created: %s', + gathered_file['fpath'], err) + + ftp.download(gathered_file['ipath'], + gathered_file['fpath']) + + LOGGER.info('Downloaded FTP file %s to %s', + gathered_file['ipath_ftp_abs'], + gathered_file['fpath']) + + # handle compressed files here + if any([tarfile.is_tarfile(gathered_file['fpath']), + rarfile.is_rarfile(gathered_file['fpath']), + zipfile.is_zipfile(gathered_file['fpath'])]): + LOGGER.info('Decompressing file: %s', + gathered_file['fpath']) + comm_ipath_ftp_abs = gathered_file['ipath_ftp_abs'] + comm_fpath = gathered_file['fpath'] + comm_mtime = gathered_file['mtime'] + comm_dirname = os.path.dirname(comm_fpath) + + # decompress + try: + files = decompress(gathered_file['fpath']) + except Exception as err: + LOGGER.error('Unable to decompress %s: %s', + gathered_file['fpath'], err) + continue + LOGGER.info('Decompressed files: %s', + ', '.join(files)) + + for fil in files: + fpath = os.path.join(comm_dirname, fil) + if os.path.isfile(fpath): + gathered.append({ + 'contributor': gathered_file[ + 'ipath'].lstrip('/').split('/')[0], + 'ipath': fpath, + 'ipath_ftp_abs': comm_ipath_ftp_abs, + 'mtime': comm_mtime, + 'fpath': os.path.normpath(fpath), + 'decomp': True + }) + # remove from gathered + gathered_files_to_remove.append(gathered_file) + + if not keep_files: + LOGGER.info('Removing file %s', + gathered_file['ipath']) + ftp.remove(gathered_file['ipath']) + + else: + LOGGER.info('FTP file %s could not be downloaded', + gathered_file['ipath_ftp_abs']) + + LOGGER.info('%s archive files gathered to remove', + len(gathered_files_to_remove)) + for gftr in gathered_files_to_remove: + gathered.remove(gftr) + LOGGER.info('Removed %s from gathered list', gftr) + LOGGER.info('%s files gathered', len(gathered)) + + return gathered + + +def decompress(ipath): + """decompress compressed files""" + + file_list = [] + success = True + + LOGGER.debug('ipath: %s', ipath) + + if tarfile.is_tarfile(ipath): + tar = tarfile.open(ipath) + for item in tar: + try: + item.name = os.path.basename(item.name) + file_list.append(item.name) + tar.extract(item, os.path.dirname(ipath)) + except Exception as err: + success = False + LOGGER.error('Unable to decompress from tar %s: %s', + item.name, err) + + elif rarfile.is_rarfile(ipath): + rar = rarfile.RarFile(ipath) + for item in rar.infolist(): + try: + item.filename = os.path.basename(item.filename) + file_list.append(item.filename) + rar.extract(item, os.path.dirname(ipath)) + except Exception as err: + success = False + LOGGER.error('Unable to decompress from rar %s: %s', + item.filename, err) + + elif zipfile.is_zipfile(ipath): + zipf = zipfile.ZipFile(ipath) + for item in zipf.infolist(): + if item.filename.endswith('/'): # filename is dir, skip + LOGGER.info('item %s is a directory, skipping', item.filename) + continue + try: + item_filename = os.path.basename(item.filename) + file_list.append(item_filename) + data = zipf.read(item.filename) + filename = '%s%s%s' % (os.path.dirname(ipath), + os.sep, item_filename) + LOGGER.info('Filename: %s', filename) + with open(filename, 'wb') as ff: + ff.write(data) + # zipf.extract(item.filename, os.path.dirname(ipath)) + except Exception as err: + success = False + LOGGER.error('Unable to decompress from zip %s: %s', + item.filename, err) + else: + msg = ('File %s is not a compressed file and will not be compressed.' + % ipath) + LOGGER.warning(msg) + + if success: # delete the archive file + LOGGER.info('Deleting archive file: %s', ipath) + try: + os.unlink(ipath) + except Exception as err: + LOGGER.error(err) + else: + LOGGER.info('Decompress failed, not deleting %s', ipath) + + return file_list + + def send_email(message, subject, from_email_address, to_email_addresses, host, port, cc_addresses=None, bcc_addresses=None, secure=False, from_email_password=None):