Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ woudc-data-registry instrument update --help
#### Data Processing

```bash
# Gather the files from the ftp account
woudc-data-registry data gather /path/to/dir

# ingest directory of files (walks directory recursively)
woudc-data-registry data ingest /path/to/dir

Expand Down
7 changes: 7 additions & 0 deletions default.env
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,10 @@ export WDR_EMAIL_TO=email
export WDR_EMAIL_CC=email
export WDR_EMAIL_BCC=email
export WDR_TEMPLATE_PATH=/path/to/etc/woudc-contributor-feedback.txt

# FTP configuration
export WDR_FTP_HOST=host
export WDR_FTP_USER=user
export WDR_FTP_PASS=<secret>
export WDR_FTP_BASEDIR_INCOMING=path
export WDR_FTP_KEEP_FILES=True
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
click
elasticsearch
ftputil
jsonschema
pyyaml
rarfile
requests
sqlalchemy
wheel
Expand Down
8 changes: 8 additions & 0 deletions woudc_data_registry/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@
WDR_EMAIL_BCC = os.getenv('WDR_EMAIL_BCC')
WDR_TEMPLATE_PATH = os.getenv('WDR_TEMPLATE_PATH')
WDR_FILE_TRASH = os.getenv('WDR_FILE_TRASH')
WDR_FTP_HOST = os.getenv('WDR_FTP_HOST')
WDR_FTP_USER = os.getenv('WDR_FTP_USER')
WDR_FTP_PASS = os.getenv('WDR_FTP_PASS')
WDR_FTP_BASEDIR_INCOMING = os.getenv('WDR_FTP_BASEDIR_INCOMING')
WDR_FTP_SKIP_DIRS_INCOMING = os.getenv('WDR_FTP_SKIP_DIRS_INCOMING')
WDR_FTP_KEEP_FILES = os.getenv(
'WDR_FTP_KEEP_FILES',
'True').strip().lower() in ('true', '1', 'yes')

if not WDR_SEARCH_INDEX_BASENAME:
msg = 'WDR_SEARCH_INDEX_BASENAME was not set. \
Expand Down
35 changes: 34 additions & 1 deletion woudc_data_registry/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@

from woudc_data_registry import config
from woudc_data_registry.util import (is_text_file, read_file,
send_email, delete_file_from_record)
send_email, delete_file_from_record,
gathering)


from woudc_data_registry.processing import Process
Expand Down Expand Up @@ -341,8 +342,40 @@ def delete_record(ctx, file_path):
LOGGER.info("Done deleting record")


@click.command()
@click.argument('folder_path')
@click.pass_context
def gather(ctx, folder_path):
"""Gather all the files in a directory tree"""
while os.path.exists(folder_path):
click.echo(f"Folder '{folder_path}' already exists.")
folder_path = click.prompt(
"Please provide a folder path that does not exist yet", type=str
)
# Folder doesn't exist, create it
os.makedirs(folder_path)

try:
click.echo(f"Folder '{folder_path}' has been created successfully.")
skip_incoming_folders = (
'woudcadmin,level-0,org1,org2,provisional,calibration,'
'px-testing,px-testing2'
)
FILES_GATHERED = gathering(config.WDR_FTP_HOST, config.WDR_FTP_USER,
config.WDR_FTP_PASS,
config.WDR_FTP_BASEDIR_INCOMING,
skip_incoming_folders,
folder_path, config.WDR_FTP_KEEP_FILES)
click.echo(f"Gathered {FILES_GATHERED} files from the FTP server.")
except Exception as err:
LOGGER.error('Unable to gather: %s', err)

LOGGER.info("Done Gathering files")


data.add_command(ingest)
data.add_command(verify)
data.add_command(generate_emails, name='generate-emails')
data.add_command(send_feedback, name='send-feedback')
data.add_command(delete_record, name='delete-record')
data.add_command(gather)
181 changes: 181 additions & 0 deletions woudc_data_registry/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
import os
import shutil
import smtplib

import tarfile
import rarfile
import zipfile

import ftputil

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from woudc_data_registry.registry import Registry
Expand All @@ -59,6 +66,180 @@
RFC3339_DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'


def gathering(host, username, password, basedir_incoming, skip_dirs,
px_basedir, keep_files):
"""gather candidate files from incoming FTP"""

gathered = []
gathered_files_to_remove = []

with ftputil.FTPHost(host, username, password) as ftp:
LOGGER.info('Connected to %s', host)
LOGGER.info('Skip directories: %s', skip_dirs)
for root, dirs, files in ftp.walk(basedir_incoming, topdown=True):
LOGGER.debug('Removing skip directories: %s', skip_dirs)
dirs[:] = [d for d in dirs if d not in skip_dirs]
for name in files:
ipath = '%s/%s' % (root, name)
mtime = datetime.fromtimestamp(ftp.stat(ipath)[8])
fpath = '%s%s%s' % (px_basedir, os.sep, ipath)
ipath_ftp_abs = '%s%s' % (host, ipath)

LOGGER.info('Adding file %s to processing queue',
ipath_ftp_abs)
gathered.append({
'contributor': ipath.lstrip('/').split('/')[0],
'ipath': ipath,
'ipath_ftp_abs': ipath_ftp_abs,
'mtime': mtime,
'fpath': os.path.normpath(fpath),
'decomp': False
})

LOGGER.info('%d files in processing queue', len(gathered))
for gathered_file in gathered:
LOGGER.info('Inspecting gathered file %s', gathered_file)
if all([gathered_file['ipath'] is not None,
not gathered_file['decomp']]):
LOGGER.info('File passed in')
if ftp.path.isfile(gathered_file['ipath']):
try:
os.makedirs(os.path.dirname(gathered_file['fpath']))
except OSError as err:
LOGGER.warning('Local directory %s not created: %s',
gathered_file['fpath'], err)

ftp.download(gathered_file['ipath'],
gathered_file['fpath'])

LOGGER.info('Downloaded FTP file %s to %s',
gathered_file['ipath_ftp_abs'],
gathered_file['fpath'])

# handle compressed files here
if any([tarfile.is_tarfile(gathered_file['fpath']),
rarfile.is_rarfile(gathered_file['fpath']),
zipfile.is_zipfile(gathered_file['fpath'])]):
LOGGER.info('Decompressing file: %s',
gathered_file['fpath'])
comm_ipath_ftp_abs = gathered_file['ipath_ftp_abs']
comm_fpath = gathered_file['fpath']
comm_mtime = gathered_file['mtime']
comm_dirname = os.path.dirname(comm_fpath)

# decompress
try:
files = decompress(gathered_file['fpath'])
except Exception as err:
LOGGER.error('Unable to decompress %s: %s',
gathered_file['fpath'], err)
continue
LOGGER.info('Decompressed files: %s',
', '.join(files))

for fil in files:
fpath = os.path.join(comm_dirname, fil)
if os.path.isfile(fpath):
gathered.append({
'contributor': gathered_file[
'ipath'].lstrip('/').split('/')[0],
'ipath': fpath,
'ipath_ftp_abs': comm_ipath_ftp_abs,
'mtime': comm_mtime,
'fpath': os.path.normpath(fpath),
'decomp': True
})
# remove from gathered
gathered_files_to_remove.append(gathered_file)

if not keep_files:
LOGGER.info('Removing file %s',
gathered_file['ipath'])
ftp.remove(gathered_file['ipath'])

else:
LOGGER.info('FTP file %s could not be downloaded',
gathered_file['ipath_ftp_abs'])

LOGGER.info('%s archive files gathered to remove',
len(gathered_files_to_remove))
for gftr in gathered_files_to_remove:
gathered.remove(gftr)
LOGGER.info('Removed %s from gathered list', gftr)
LOGGER.info('%s files gathered', len(gathered))

return gathered


def decompress(ipath):
"""decompress compressed files"""

file_list = []
success = True

LOGGER.debug('ipath: %s', ipath)

if tarfile.is_tarfile(ipath):
tar = tarfile.open(ipath)
for item in tar:
try:
item.name = os.path.basename(item.name)
file_list.append(item.name)
tar.extract(item, os.path.dirname(ipath))
except Exception as err:
success = False
LOGGER.error('Unable to decompress from tar %s: %s',
item.name, err)

elif rarfile.is_rarfile(ipath):
rar = rarfile.RarFile(ipath)
for item in rar.infolist():
try:
item.filename = os.path.basename(item.filename)
file_list.append(item.filename)
rar.extract(item, os.path.dirname(ipath))
except Exception as err:
success = False
LOGGER.error('Unable to decompress from rar %s: %s',
item.filename, err)

elif zipfile.is_zipfile(ipath):
zipf = zipfile.ZipFile(ipath)
for item in zipf.infolist():
if item.filename.endswith('/'): # filename is dir, skip
LOGGER.info('item %s is a directory, skipping', item.filename)
continue
try:
item_filename = os.path.basename(item.filename)
file_list.append(item_filename)
data = zipf.read(item.filename)
filename = '%s%s%s' % (os.path.dirname(ipath),
os.sep, item_filename)
LOGGER.info('Filename: %s', filename)
with open(filename, 'wb') as ff:
ff.write(data)
# zipf.extract(item.filename, os.path.dirname(ipath))
except Exception as err:
success = False
LOGGER.error('Unable to decompress from zip %s: %s',
item.filename, err)
else:
msg = ('File %s is not a compressed file and will not be compressed.'
% ipath)
LOGGER.warning(msg)

if success: # delete the archive file
LOGGER.info('Deleting archive file: %s', ipath)
try:
os.unlink(ipath)
except Exception as err:
LOGGER.error(err)
else:
LOGGER.info('Decompress failed, not deleting %s', ipath)

return file_list


def send_email(message, subject, from_email_address, to_email_addresses,
host, port, cc_addresses=None, bcc_addresses=None, secure=False,
from_email_password=None):
Expand Down