Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 268 lines (218 sloc) 9.02 KB
#!/usr/bin/python3
import os
import logging
import random
import re
import shutil
import string
import sys
import urllib.request
import urllib.error
import yaml
import cloudinary.uploader
import cloudinary.exceptions
# Regex to search for in the files
RE_SEARCH = r'http(?:s?)://\S*\b(?:blogspot|googleusercontent)\b[^"\s]+'
CONTENT_FILE_EXTENSION = '.md'
DOWNLOAD_DIR = '/home/dk/tmp/_reimage_images'
IMAGE_MAP_FILE = '/home/dk/tmp/_reimage_map.yml'
FAILED_IMAGES_FILE = '/home/dk/tmp/_reimage_failed_urls.txt'
FAILED_UPLOADS_FILE = '/home/dk/tmp/_reimage_failed_uploads.txt'
NEW_URL_BASE = 'https://res.cloudinary.com/yktoo/image/upload/blog/'
class Scanner:
"""Extracts image URLs from content files."""
def __init__(self, path: str):
self.path = path
self.image_urls = set()
def scan(self):
"""Recursively scan all directories starting at the scanner's path."""
num_files = 0
logging.info('Scanning %s...', self.path)
for dir_path, dirs, files in os.walk(self.path):
for file in files:
if file.endswith(CONTENT_FILE_EXTENSION):
self._scan_file(os.path.join(dir_path, file))
num_files += 1
# Done
logging.info(
'Scanning complete. %d files scanned, %d unique URLs have been discovered.',
num_files,
len(self.image_urls))
def _scan_file(self, file_path):
"""Scan the provided file."""
logging.debug('Processing %s', file_path)
with open(file_path) as file:
for line in file:
for match in re.findall(RE_SEARCH, line):
logging.debug('>>>> Match: "%s"', match)
self.image_urls.add(match)
class Downloader:
"""Downloads the provided image URLs."""
def __init__(self, urls: set):
self.urls = urls
self.url_map = {}
self.failed_urls = {}
def download(self):
"""Download all image URLs into the target directory, and write out a map YAML file."""
logging.info('Downloading %d URLs...', len(self.urls))
# Make sure the target directory exists and is empty
shutil.rmtree(DOWNLOAD_DIR, ignore_errors=True)
os.makedirs(DOWNLOAD_DIR)
# Iterate all URLs
count_ok = 0
count_err = 0
size = 0
for url in self.urls:
# Generate a new ID
new_name = '{}{:04}'.format(''.join(random.choices(string.ascii_lowercase + string.digits, k=12)), count_ok)
# Determine target name
new_name += os.path.splitext(url)[1]
# Download the file
logging.debug('Downloading %s => %s', url, new_name)
data = None
reason = ''
try:
response = urllib.request.urlopen(url)
data = response.read()
except urllib.error.HTTPError as e:
reason = 'HTTP error {}'.format(e.code)
except urllib.error.URLError as e:
reason = 'URL error: {}'.format(
e.reason if hasattr(e, 'reason') else e.code if hasattr(e, 'code') else '')
# If failed
if data is None:
logging.warning('Failed to download %s: %s', url, reason)
self.failed_urls[url] = reason
count_err += 1
# Save the data into a file otherwise
else:
file_path = os.path.join(DOWNLOAD_DIR, new_name)
with open(file_path, 'wb') as file:
file.write(data)
logging.debug('Saved %d bytes to %s', len(data), file_path)
self.url_map[url] = new_name
count_ok += 1
size += len(data)
# Export the map
with open(IMAGE_MAP_FILE, 'w') as file:
file.write('map:\n')
file.writelines([' "{}": {}\n'.format(url, name) for url, name in self.url_map.items()])
# Export failed URLs
with open(FAILED_IMAGES_FILE, 'w') as file:
file.writelines(['{}\t{}\n'.format(url, reason) for url, reason in self.failed_urls.items()])
# Done
logging.info("Done downloading. %d images downloaded, %d URLs failed, %d bytes", count_ok, count_err, size)
class Uploader:
"""Uploads the provided image URLs to Cloudinary."""
def __init__(self):
# Validate the environment
if 'CLOUDINARY_URL' not in os.environ:
raise EnvironmentError('CLOUDINARY_URL is not set')
self.failed_uploads = []
# Read in the URL map
with open(IMAGE_MAP_FILE) as file:
self.url_map = yaml.safe_load(file)['map']
logging.info('Loaded %d URL map entries from file', len(self.url_map))
def upload(self):
"""Upload all images loaded from the map file."""
count_ok = 0
count_err = 0
for url, name in self.url_map.items():
file_path = os.path.join(DOWNLOAD_DIR, name)
file_id = os.path.splitext(name)[0]
logging.debug('Uploading %s => %s', url, file_id)
try:
cloudinary.uploader.upload(
file_path,
public_id=file_id,
folder='blog',
resource_type='image',
context={'origFileName': os.path.basename(url), 'origUrl': url})
count_ok += 1
except cloudinary.exceptions.Error as e:
count_err += 1
self.failed_uploads.append(url)
logging.warning('Failed to upload %s: %s', url, str(e))
# Export failed uploads
with open(FAILED_UPLOADS_FILE, 'w') as file:
file.writelines([line + '\n' for line in self.failed_uploads])
# Done
logging.info("Done uploading. %d images uploaded, %d failed", count_ok, count_err)
class Processor:
"""Processes all pages and updates image URLs."""
def __init__(self, path: str):
self.path = path
self.count_replaced = 0
self.count_skipped = 0
# Read in the URL map
with open(IMAGE_MAP_FILE) as file:
self.url_map = yaml.safe_load(file)['map']
logging.info('Loaded %d URL map entries from file', len(self.url_map))
def process(self):
"""Recursively process all directories starting at the scanner's path."""
num_files = 0
logging.info('Processing %s...', self.path)
for dir_path, dirs, files in os.walk(self.path):
for file in files:
if file.endswith(CONTENT_FILE_EXTENSION):
self._process_file(os.path.join(dir_path, file))
num_files += 1
# Done
logging.info(
'Processing complete. %d files processed, %d replacements made, %d occurrences skipped.',
num_files,
self.count_replaced,
self.count_skipped)
def _process_file(self, file_path):
"""Process the provided file."""
logging.debug('Processing %s', file_path)
out_lines = []
# Process the input file
with open(file_path) as in_file:
# Iterate the file's lines
for line in in_file:
while True:
# Try to find a match
match = re.search(RE_SEARCH, line)
if match is None:
break
# Found - replace it if there's a replacement
str_found = match.group()
if str_found in self.url_map:
str_repl = NEW_URL_BASE + self.url_map[str_found]
line = line[:match.start()] + str_repl + line[match.end():]
self.count_replaced += 1
else:
self.count_skipped += 1
# If failed to make a replacement, move on to the next line (otherwise it'll be an endless loop)
break
out_lines.append(line)
# Write the output back
with open(file_path, 'w') as out_file:
out_file.writelines(out_lines)
logging.debug('Saved %s', file_path)
# Main routine
if __name__ == '__main__':
# Setup logging
logging.basicConfig(level=logging.DEBUG if '-v' in sys.argv else logging.INFO, format='%(levelname)-8s %(message)s')
# Root of the project
root_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
logging.info('Project root: %s', root_dir)
# Content path
content_path = os.path.join(root_dir, 'content')
# If we need to download
if '-d' in sys.argv:
# Run the scanner
scanner = Scanner(content_path)
scanner.scan()
# Run the downloader
downloader = Downloader(scanner.image_urls)
downloader.download()
# If we need to upload
if '-u' in sys.argv:
uploader = Uploader()
uploader.upload()
# If we need to process pages
if '-p' in sys.argv:
processor = Processor(content_path)
processor.process()
You can’t perform that action at this time.