Downloads files from EUMETSAT using HTTP.

See the environment variable EUMETSAT_PASSWORD with your password :)

In [None]:
import requests
from bs4 import BeautifulSoup
import os
import math
import subprocess
import pandas as pd
import queue
import threading
import logging
import sys

from eumetsat import get_filesize_megabytes
from consts import PATH

In [None]:
DESTINATION_DIR = os.path.join(PATH, 'auto_downloads')
LOG_DIR = os.path.join(PATH, 'logs/download')
DOWNLOADED_FILES = os.path.join(DESTINATION_DIR, 'downloaded_files.csv')
NUM_WORKER_THREADS = 24

for directory in [DESTINATION_DIR, LOG_DIR]:
    if not os.path.exists(directory):
        os.makedirs(directory)

## Logging
STREAM_HANDLER = True
LOG_FILENAME = os.path.join(LOG_DIR, 'eumetsat_download.log')

In [None]:
log = logging.getLogger('eumetsat_download')
log.setLevel(logging.DEBUG)
log.handlers = [logging.FileHandler(filename=LOG_FILENAME, mode='a')]
if STREAM_HANDLER:
    log.handlers.append(logging.StreamHandler(sys.stdout))
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(threadName)s - %(levelname)s - %(message)s')
for handler in log.handlers:
    handler.setFormatter(formatter)

# Attach urllib3's logger to our logger.
loggers_to_attach = ['urllib3', 'requests']
for logger_name_to_attach in loggers_to_attach:
    logger_to_attach = logging.getLogger(logger_name_to_attach)
    logger_to_attach.parent = log
    logger_to_attach.propagate = True

In [None]:
def walk_dir(base_url):
    """Recursively searches through directories to find accepted files.
    
    Return dict mapping from filename to filesize in megabytes (if available).
    """
    page = requests.get(base_url)
    soup = BeautifulSoup(page.text, 'html.parser')
    
    # Filesizes for the .tar files are available on the eumetsat website.  These filesizes
    # can be identified by the 'align=right' attribute.
    file_sizes_mb = soup.find_all(align='right')
    file_sizes_mb = [float(file_size.text) for file_size in file_sizes_mb]
    
    # Find all the links on the web page
    links = soup.find_all('a')
    
    # Loop through the links to find the files.
    files = {}
    file_size_i = 0
    for link in links:
        href = link.get('href')
        href = os.path.join(base_url, href.replace('./', ''))
        if href.endswith('index.htm'):
            # Recursively get files from subdirectories on eumetsat's website
            subdir = walk_dir(href.replace('index.htm', ''))
            files.update(subdir)
        elif href.endswith('.tar'):
            files[href] = file_sizes_mb[file_size_i]
            file_size_i += 1
        elif href.endswith('.pdf'):
            files[href] = None
    
    return files

In [None]:
password = os.environ['EUMETSAT_PASSWORD']
url = "http://jack_kelly:{}@archive.eumetsat.int/umarf/onlinedownload/jack_kelly/".format(password)
log.info('Searching for files to download from %s', url)
files = walk_dir(url)
files = pd.Series(files, name='filesize_MB')
log.info('Found %d files on EUMETSAT website', len(files))

In [None]:
# Filter out files which have already been downloaded
if os.path.exists(DOWNLOADED_FILES):
    with open(DOWNLOADED_FILES, 'r') as fh:
        downloaded_files = fh.readlines()
    downloaded_files = [filename.strip() for filename in downloaded_files]
else:
    downloaded_files = []
    
files_to_download = set(files.index) - set(downloaded_files)
files_to_download = files[files_to_download]
log.info('%d files still to download.', len(files_to_download))

In [None]:
filename_queue = queue.Queue()
for filename, filesize_mb in files_to_download.items():
    filename_queue.put({'filename': filename, 'filesize_mb': filesize_mb})
    
filename_queue.qsize()

In [None]:
def download_file(url, destination_dir='.'):
    # Adapted from https://stackoverflow.com/a/16696317/732596
    # And alternative pure-Python approach, adapted from https://stackoverflow.com/a/16696317/732596
    # uses far more CPU resources than wget.
    if not os.path.exists(destination_dir):
        os.makedirs(destination_dirs)
    base_filename = os.path.basename(url)
    full_local_filename = os.path.join(destination_dir, base_filename)
    if os.path.exists(full_local_filename):
        os.remove(full_local_filename)
    wget_log_filename = os.path.join(LOG_DIR, base_filename + '.log')
    subprocess.run([
        'wget', 
        '--output-file={}'.format(wget_log_filename),
        '--no-verbose',  # don't make the log file too verbose
        '--tries=16',
        # '--continue',  # doesn't work - maybe EUMETSAT aren't providing a 'size' header?
        # '--unlink',    # remove file before clobber - doesn't appear to work
        '--timeout=300',
        '--directory-prefix={}'.format(destination_dir),
        url])
    return full_local_filename


class DownloadFileSizeMisMatch(Exception):
    pass


def download_and_check_size(url, destination_dir, expected_filesize_mb, retries=5):
    MATCH_TOLERANCE_MEGABYTES = 0.01
    for retry in range(retries):
        full_local_filename = download_file(url, destination_dir)
        local_filesize_mb = get_filesize_megabytes(full_local_filename)
        if math.isclose(local_filesize_mb, expected_filesize_mb, abs_tol=MATCH_TOLERANCE_MEGABYTES):
            return full_local_filename
        else:
            log.warn(
                'Filesize mismatch!  Try %d of %d.  Expected %.2f MB, got %.2f MB for %s', 
                retry+1, retries, expected_filesize_mb, local_filesize_mb, url)
            
    log.error('Filesizes still mismatch after %d tries for %s', retries, url)
    raise DownloadFileSizeMisMatch(url)

In [None]:
downloaded_files_lock = threading.Lock()

def worker():
    while True:
        log.info('Files left to download: %d.', filename_queue.qsize())
        try:
            filename_and_size = filename_queue.get(block=False)
        except queue.Empty:
            break

        remote_filename = filename_and_size['filename']
        remote_filesize_mb = filename_and_size['filesize_mb']

        log.info('downloading %s', remote_filename)
        if remote_filename.endswith('.pdf'):
            local_filename = download_file(remote_filename, os.path.join(DESTINATION_DIR, 'shipping_notes'))
        elif remote_filename.endswith('.tar'):
            log.debug('Expected filesize = %.2f MB', remote_filesize_mb)
            local_filename = download_and_check_size(remote_filename, DESTINATION_DIR, remote_filesize_mb)

        log.debug('Finished downloading %.2f MB', get_filesize_megabytes(local_filename))
        success = downloaded_files_lock.acquire(timeout=60)
        if success:
            with open(DOWNLOADED_FILES, 'a') as fh:
                fh.write('{}\n'.format(remote_filename))
            downloaded_files_lock.release()
        else:
            log.error('Failed to get lock for DOWNLOAD_FILES table!')
            break

        filename_queue.task_done()

In [None]:
threads = []
for i in range(NUM_WORKER_THREADS):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

In [None]:
for thread in threads:
    thread.join()