# EXIF Date-based File Copier

This script scans (recursively) a directory of images, tries to get the best possible origin date for each image, and copies the images to new directory.

I'm specifically **copying** instead of moving the files because the source of the images is a consolidated backup that should not be modified.

Prerequsites:
- `pip install piexif exifread pillow progressbar2`
- `brew install exiftool`

For some older Canon photos, the `ExifRead` library chokes with one of:

    Possibly corrupted field Tag 0x0001 in MakerNote IFD
    Possibly corrupted field InteroperabilityIndex in Interoperability IFD
    OSError: [Errno 22] Invalid argument
    
For some random other photos, the `piexif` library chokes:

    error: unpack requires a buffer of 4 bytes

In order, I try reading files with `piexif` then `ExifRead` then `PIL`, and if they all failed, I try execing out to `exiftool` because it does a _great_ job at the cost of forking another process.

In [None]:
import glob
import logging
import multiprocessing
import os
import pprint
import re
import shutil
import subprocess
import time

import exifread
import piexif
import progressbar

from PIL import Image
from PIL.ExifTags import TAGS
from infinitewarp_utils import timing


progressbar.streams.wrap_stderr()
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)

In [None]:
class Config(object):
    source_dir = '/Volumes/Thorium/Backups/Dojo/Photos_Library'
    archive_dir = os.path.expanduser('~/Pictures/full-archive')
    output_dir = os.path.join(archive_dir, 'corrected-names')
    fail_exif_dir = os.path.join(archive_dir, 'failed-exif')
    fail_dupe_dir = os.path.join(archive_dir, 'failed-dupe')
    log_path = os.path.join(archive_dir, 'process-log.txt')

    @classmethod
    def makedirs(cls):
        os.makedirs(cls.output_dir, exist_ok=True)
        os.makedirs(cls.fail_exif_dir, exist_ok=True)
        os.makedirs(cls.fail_dupe_dir, exist_ok=True)

Config.makedirs()

In [None]:
class Patterns(object):
    desired_name = re.compile(r'^\d{4}-\d{2}-\d{2} \d{2}\.\d{2}\.\d{2}\.[a-zA-Z]{3}[a-zA-Z]?$')
    almost_desired_name_sub = re.compile(r'^(\d{4}-\d{2}-\d{2} \d{2}\.\d{2}\.\d{2})(.*)\.([a-zA-Z]{3}[a-zA-Z]?)$')
    almost_desired_name_rep = r'\1.\3'
    
    exif_date_sub = re.compile(r'(\d{4}):(\d{2}):(\d{2} \d{2})\:(\d{2})\:(\d{2})')
    exif_date_rep = r'\1-\2-\3.\4.\5.jpg'

In [None]:
class BaseWrapper(object):
    @classmethod
    def get_created_date(cls, filepath):
        tags = cls.get_tags(filepath)
        return cls.pick_date_tag(tags)

    @classmethod
    def dump(cls, filepath):
        tags = cls.get_tags(filepath)
        print(tags)
    

class ExifreadWrapper(BaseWrapper):
    """ExifRead wrapper."""
    
    @staticmethod
    def get_tags(filepath):
        with open(filepath, 'rb') as fp:
            return exifread.process_file(fp)
        
    @staticmethod
    def pick_date_tag(tags):
        return (tags.get('Image DateTime') or tags.get('EXIF DateTimeOriginal')).values


class PiexifWrapper(BaseWrapper):
    """piexif wrapper."""
    
    @staticmethod
    def get_tags(filepath):
        return piexif.load(filepath)
        
    @staticmethod
    def pick_date_tag(tags):
        exif = tags.get('Exif')
        return (
            exif.get(piexif.ExifIFD.DateTimeDigitized) or exif.get(piexif.ExifIFD.DateTimeOriginal)
        ).decode('utf-8')
    
    @classmethod
    def dump(cls, filename):
        tags = cls.get_tags(filename)
        tags.pop("thumbnail")
        for ifd_name in exif_dict:
            print("\n{0} IFD:".format(ifd_name))
            for key in tags[ifd_name]:
                try:
                    print(key, tags[ifd_name][key][:20])
                except:
                    print(key, tags[ifd_name][key])


class PillowWrapper(BaseWrapper):
    """PIL wrapper."""
    
    @staticmethod
    def get_tags(filepath):
        tags = {}
        info = Image.open(filepath)._getexif()
        for tag, value in info.items():
            decoded = TAGS.get(tag, tag)
            tags[decoded] = value
        return tags
        
    @staticmethod
    def pick_date_tag(tags):
        return (tags.get('Image DateTime') or tags.get('EXIF DateTimeOriginal')).values

    @classmethod
    def get_created_date(cls, filepath):
        tags = cls.get_tags(filepath)
        return tags.get('DateTime') or tags.get('DateTimeDigitized') or tags.get('DateTimeOriginal')


class ExiftoolWrapper(BaseWrapper):
    """exiftool CLI wrapper."""
    
    @staticmethod
    def get_tags(filepath):
        completed = subprocess.run(['exiftool', filepath], stdout=subprocess.PIPE)
        exif_dict = {}
        for line in completed.stdout.split(b'\n'):
            try:
                line = line.decode('utf-8')
            except:
                continue
            if ':' in line:
                loc = line.find(':')
                key = line.split(':')[0]
                value = line[len(key) + 1:]
                exif_dict[key.strip()] = value.strip()
        return exif_dict
        
    @staticmethod
    def pick_date_tag(tags):
        return tags.get('Date/Time Original') or tags.get('Create Date') or tags.get('Modify Date')


def get_exif_date(filepath):
    """Get the effective "created" date in the EXIF metadata."""
    wrappers = [
        ExifreadWrapper,
        PiexifWrapper,
        PillowWrapper,
        ExiftoolWrapper,
    ]

    for wrapper in wrappers:
        try:
            value = wrapper.get_created_date(filepath)
            if value is not None:
                return value
        except Exception as e:
            logger.error(e)
    return None

In [None]:
# example files with problematic exif data for some of the libraries
files = [
#     '/Volumes/Thorium/Backups/Dojo/Photos_Library/Chronological/2003-04/IMG_0054.jpg',  # Possibly corrupted field Tag 0x0001 in MakerNote IFD
#     '/Volumes/Thorium/Backups/Dojo/Photos_Library/Chronological/2003-11/IMG_1761B.jpg',  # mangled bytes in 'Canon Image Type' and 'Canon Firmware Version'
#     '/Volumes/Thorium/Backups/Dojo/Photos_Library/Chronological/2003-11/IMG_1788B.jpg',  # Possibly corrupted field InteroperabilityIndex in Interoperability IFD
#     '/Volumes/Thorium/Backups/Dojo/Photos_Library/Chronological/2005-12/IMG_3796.jpg',  # ???
#     os.path.expanduser('~/Pictures/old photos/IMG_1667.JPG'),  # Possibly corrupted field Tag 0x0001 in MakerNote IFD
#     '/Volumes/Thorium/Backups/Dojo/Photos_Library/More Archives/iPhone Downloads - 2013-05-13 Prime/IMG_0080.JPG',  # empty
]
file_dates = [(f, get_exif_date(f)) for f in files]
pprint.pprint(file_dates)

In [None]:
def get_ideal_name(filepath):
    """Determine desired filename for given JPEG filename."""
    filename = os.path.basename(filepath)
    if Patterns.desired_name.match(filename):
        return filename
    new_filename = Patterns.almost_desired_name_sub.sub(Patterns.almost_desired_name_rep, filename).lower()
    if Patterns.desired_name.match(new_filename):
        return new_filename
    created_date = get_exif_date(filepath)
    if created_date is None:
        return None
    new_filename = Patterns.exif_date_sub.sub(Patterns.exif_date_rep, created_date).lower()
    if Patterns.desired_name.match(new_filename):
        return new_filename
    return None


def find_all_files(source_dir):
    return glob.glob(os.path.join(source_dir,'**', '*.*'), recursive=True)


def get_processable_paths(file_paths):
    """Split found paths into lists of files we can handle, files we should note, and files we don't care about."""

    file_extensions = (
        'bmp',
        'jpg',
        'png',
        'tif',
        'tiff',
    )
    noted_extensions = (
        'avi',
        'psd',
        'mov',
        'mp4',
        'mpg',
        'm4v',
        'aif',
        'aiff',
        'wav',
        'xcf',
    )
    image_paths = []
    noted_paths = []
    trash_paths = []
    
    for file_path in file_paths:
        ext = file_path.split('.')[-1].lower()
        if ext in file_extensions:
            image_paths.append(file_path)
        elif ext in noted_extensions:
            noted_paths.append(file_path)
        else:
            trash_paths.append(file_path)
    
    return image_paths, noted_paths, trash_paths


def prepare_copy_renames(source_dir=None, use_multiprocessing=False):
    """Prepare list of file copy/rename operations."""
    if not source_dir:
        source_dir = Config.source_dir
    
    new_names = []
    exif_failures = []
    dupe_failures = []
    copies = []

    image_paths, noted_paths, trash_paths = get_processable_paths(find_all_files(source_dir))
    
    bar = progressbar.ProgressBar()
    if use_multiprocessing:
        with multiprocessing.Pool(processes=4) as pool:
            multiple_results = [(pool.apply_async(get_ideal_name, (filepath,)), filepath) for filepath in image_paths]
            new_filenames_for_paths = [(res.get(), filepath) for res, filepath in bar(multiple_results)]
    else:
        new_filenames_for_paths = [(get_ideal_name(filepath), filepath) for filepath in bar(image_paths)]
    
    for new_filename, filepath in new_filenames_for_paths:
        if new_filename is None:
            logger.info('cannot determine name for "{}" because EXIF processing failed'.format(filepath))
            exif_failures.append(filepath)
            continue
        if new_filename in new_names:
            logger.info('cannot copy "{}" to "{}" because it already exists'.format(filepath, new_filename))
            dupe_failures.append((filepath, new_filename))
            continue
        new_names.append(new_filename)
        copies.append((filepath, new_filename))

    return copies, exif_failures, dupe_failures, noted_paths, trash_paths


In [None]:
def datestamped_filename(filepath, filename=None):
    if not filename:
        filename = os.path.basename(filepath)
    filename_extension = filename.split('.')[-1]
    filename_base = filename[:-(len(filename_extension)+1)]
    new_filename = '{}-{}.{}'.format(filename_base, time.time(), filename_extension)
    return new_filename
    

def do_copies(copies, exif_failures, dupe_failures, logpath):
    with open(logpath, 'a') as log_fp:
        print('-' * 70, file=log_fp)
        print('exif failures', file=log_fp)
        print('-' * 70, file=log_fp)
        for filepath in exif_failures:
            new_filepath = os.path.join(Config.fail_exif_dir, datestamped_filename(filepath))
            line = f'{filepath} → {new_filepath}'
            print(line, file=log_fp)
            shutil.copy(filepath, new_filepath)
        print('-' * 70, file=log_fp)
        print('dupe failures', file=log_fp)
        print('-' * 70, file=log_fp)
        for filepath, new_filename in dupe_failures:
            new_filepath = os.path.join(Config.fail_dupe_dir, datestamped_filename(filepath, new_filename))
            line = f'{filepath} → {new_filepath}'
            print(line, file=log_fp)
            shutil.copy(filepath, new_filepath)
        print('-' * 70, file=log_fp)
        print('copies', file=log_fp)
        print('-' * 70, file=log_fp)
        for filepath, new_filename in copies:
            new_filepath = os.path.join(Config.output_dir, new_filename)
            line = f'{filepath} → {new_filepath}'
            print(line, file=log_fp)
            shutil.copy(filepath, new_filepath)
        print('-' * 70, file=log_fp)

In [None]:
# Here is where the magic happens, folks!

with timing.Timer(action=f'execute prepare_copy_renames({Config.source_dir})', verbose=True):
    copies, exif_failures, dupe_failures, noted_paths, trash_paths = prepare_copy_renames(Config.source_dir)

len(copies), len(exif_failures), len(dupe_failures), len(noted_paths), len(trash_paths)

with timing.Timer(action='do_copies', verbose=True):
    do_copies(copies, exif_failures, dupe_failures, Config.log_path)
