# core

> Fill in a module description here

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
# import collections
import datetime
# import fnmatch
import math
import os
# import sys
from lxml import etree, objectify
import pandas as pd
import matplotlib.pyplot as plt
import re

In [None]:
#| export
def convert_size(size):
    # convert size to human-readable form
    size_name = ("bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size,1024)))
    p = math.pow(1024,i)
    s = round(size/p)
    s = str(s)
    s = s.replace('.0', '')
    return '{} {}'.format(s,size_name[i])


class METSFile(object):
    """
    Class for METS file parsing methods
    """

    def __init__(self, path):
        # , dip_id, nickname
        self.path = os.path.abspath(path)
        # self.dip_id = dip_id
        # self.nickname = nickname

    def __str__(self):
        return self.path

    def parse_dc(self, root):
        """
        Parse SIP-level Dublin Core metadata into dc_model dictionary.
        Based on parse_dc function from Archivematica parse_mets_to_db.py script:

        https://github.com/artefactual/archivematica/blob/92d7abd238585e64e6064bc3f1ddfc663c4d3ace/
        src/MCPClient/lib/clientScripts/parse_mets_to_db.py
        """
        # Parse DC
        dmds = root.xpath('dmdSec/mdWrap[@MDTYPE="DC"]/parent::*')
        dcmetadata = []
        
        # Find which DC to parse
        if len(dmds) > 0:
            # Want most recently updated
            dmds = sorted(dmds, key=lambda e: e.get('CREATED', ""))
            # Only want SIP DC, not file DC
            div = root.find('structMap/div/div[@TYPE="Directory"][@LABEL="objects"]')
            dmdids = div.get('DMDID')
            # No SIP DC
            if dmdids is None:
                return
            dmdids = dmdids.split()
            for dmd in dmds[::-1]:  # Reversed
                if dmd.get('ID', "") in dmdids:
                    dc_xml = dmd.find('mdWrap/xmlData/dublincore')
                    break
            for elem in dc_xml:
                dc_element = dict()
                dc_element['element'] = elem.tag
                dc_element['value'] = elem.text
                if not dc_element['value'] is None:
                    dcmetadata.append(dc_element)
            return dcmetadata

    def parse_mets(self):
        """
        Parse METS file and save data to METS model
        """
        # create list
        original_files = []
        original_file_count = 0

        # get METS file name
        mets_filename = os.path.basename(self.path)

        # open xml file and strip namespaces
        tree = etree.parse(self.path)
        root = tree.getroot()

        for elem in root.getiterator():
            if not hasattr(elem.tag, 'find'): continue  # (1)
            i = elem.tag.find('}')
            if i >= 0:
                elem.tag = elem.tag[i+1:]
        objectify.deannotate(root, cleanup_namespaces=True)

        # create dict for names and xpaths of desired info from individual files
        xml_file_elements = {
            'filepath': './techMD/mdWrap/xmlData/object/originalName',
            'uuid': './techMD/mdWrap/xmlData/object/objectIdentifier/objectIdentifierValue',
            'hashtype': './techMD/mdWrap/xmlData/object/objectCharacteristics/fixity/messageDigestAlgorithm',
            'hashvalue': './techMD/mdWrap/xmlData/object/objectCharacteristics/fixity/messageDigest',
            'bytes': './techMD/mdWrap/xmlData/object/objectCharacteristics/size',
            'format': './techMD/mdWrap/xmlData/object/objectCharacteristics/format/formatDesignation/formatName',
            'version': './techMD/mdWrap/xmlData/object/objectCharacteristics/format/formatDesignation/formatVersion',
            'puid': './techMD/mdWrap/xmlData/object/objectCharacteristics/format/formatRegistry/formatRegistryKey',
            'modified_date': './techMD/mdWrap/xmlData/object/objectCharacteristics/creatingApplication/dateCreatedByApplication',
            'fits_modified_unixtime': './techMD/mdWrap/xmlData/object/objectCharacteristics/objectCharacteristicsExtension/fits/fileinfo/fslastmodified[@toolname="OIS File Information"]',
            }

        # build xml document root
        mets_root = root

        # gather info for each file in filegroup "original"
        for target in mets_root.findall(".//fileGrp[@USE='original']/file"):

            original_file_count += 1

            # create new dictionary for this item's info
            file_data = dict()

            # create new list of dicts for premis events in file_data
            file_data['premis_events'] = list()

            # gather amdsec id from filesec
            amdsec_id = target.attrib['ADMID']
            file_data['amdsec_id'] = amdsec_id
                
            # parse amdSec 
            amdsec_xpath = ".//amdSec[@ID='{}']".format(amdsec_id)
            for target1 in mets_root.findall(amdsec_xpath):
                
                # iterate over elements and write key, value for each to file_data dictionary
                for key, value in xml_file_elements.items():
                    try:
                        file_data['{}'.format(key)] = target1.find(value).text
                    except AttributeError:
                        file_data['{}'.format(key)] = ''

                # parse premis events related to file
                premis_event_xpath = ".//digiprovMD/mdWrap[@MDTYPE='PREMIS:EVENT']"
                for target2 in target1.findall(premis_event_xpath):

                    # create dict to store data
                    premis_event = dict()

                    # create dict for names and xpaths of desired elements
                    premis_key_values = {
                        'event_uuid': './xmlData/event/eventIdentifier/eventIdentifierValue', 
                        'event_type': '.xmlData/event/eventType', 
                        'event_datetime': './xmlData/event/eventDateTime', 
                        'event_detail': './xmlData/event/eventDetail', 
                        'event_outcome': './xmlData/event/eventOutcomeInformation/eventOutcome', 
                        'event_detail_note': './xmlData/event/eventOutcomeInformation/eventOutcomeDetail/eventOutcomeDetailNote'
                    }

                    # iterate over elements and write key, value for each to premis_event dictionary
                    for key, value in premis_key_values.items():
                        try:
                            premis_event['{}'.format(key)] = target2.find(value).text
                        except AttributeError:
                            premis_event['{}'.format(key)] = ''

                    # write premis_event dict to file_data
                    file_data['premis_events'].append(premis_event)

            # format filepath
            file_data['filepath'] = file_data['filepath'].replace('%transferDirectory%', '')
            file_data['filepath'] = file_data['filepath'].replace('data/objects/', '')
            file_data['filepath'] = file_data['filepath'].replace('objects/', '')
            file_data['filename'] = os.path.basename(file_data['filepath'])

            # format PUID
            if not 'fido' in file_data['puid'].lower():
                file_data['puid'] = "<a href=\"http://nationalarchives.gov.uk/PRONOM/%s\" target=\"_blank\">%s</a>" % (file_data['puid'], file_data['puid'])

            # create human-readable size
            file_data['bytes'] = int(file_data['bytes'])
            file_data['size'] = '0 bytes' # default to none
            if file_data['bytes'] != 0:
                file_data['size'] = convert_size(file_data['bytes'])

            # create human-readable version of last modified Unix time stamp if file was characterized by FITS
            if file_data['fits_modified_unixtime']:
                unixtime = int(file_data['fits_modified_unixtime'])/1000 # convert milliseconds to seconds
                file_data['modified_unix_timestamp'] = datetime.datetime.fromtimestamp(unixtime).isoformat() # convert from unix to iso8601

            # append file_data to original files
            original_files.append(file_data)

        # gather dublin core metadata from most recent dmdSec
        dc_metadata = self.parse_dc(root)

        # print("original_files", original_files)

        # print("dc_metadata", dc_metadata)

        # print("original_file_count", original_file_count)

        self.original_files = original_files
        self.dc_metadata = dc_metadata
        self.original_file_count = original_file_count

        self.mets_root = mets_root

        # add file info to database
        # mets_instance = METS(mets_filename, self.nickname, original_files, dc_metadata, original_file_count)
        # db.session.add(mets_instance)
        # db.session.commit()

    def get_original_files(self):
        data = self.original_files
        df = pd.DataFrame([{k: v for k, v in d.items() if k != 'premis_events'} for d in data])
        return df
    
    def get_file_format_counts(self):
        data = self.original_files
        df = pd.DataFrame(data)
        file_format_counts = df['format'].value_counts()
        return file_format_counts

    def visualize_file_format_counts(self):
        file_format_counts = self.get_file_format_counts()
        # 円グラフを作成
        plt.figure(figsize=(8, 8))
        file_format_counts.plot(kind='pie', autopct='%1.1f%%', startangle=90, counterclock=False)
        plt.title('File Count by Format')
        plt.ylabel('')  # y軸ラベルを非表示
        plt.show()

    def visualize_file_events_count(self):
        data = self.original_files
        # 集計: ファイルごとのイベントタイプの数をカウント
        file_events_count = {}
        for entry in data:
            filename = entry['filename']
            event_types = [event['event_type'] for event in entry['premis_events']]
            file_events_count[filename] = pd.Series(event_types).value_counts().to_dict()

        # DataFrameに変換して表示
        df = pd.DataFrame(file_events_count).fillna(0)

        # プロットの準備
        df.plot(kind='bar', stacked=True, figsize=(10, 6))
        plt.title('Event Type Count per File')
        plt.xlabel('Event Type')
        plt.ylabel('Count')
        plt.xticks(rotation=45)
        plt.tight_layout()

        plt.show()

    ns = {
        'mets': 'http://www.loc.gov/METS/',
        'xlink': 'http://www.w3.org/1999/xlink'
    }

    def parse_file_sec(self):
        mets_root = self.mets_root

        file_data = []

        # Namespace dictionary to handle the xlink namespace
        ns = self.ns

        # Iterate over each fileGrp and extract relevant data
        for fileGrp in mets_root.find('fileSec', '').findall('fileGrp', ns):
            use = fileGrp.attrib.get('USE', '')
            for file in fileGrp.findall('file', ns):
                file_id = file.attrib.get('ID', '')
                group_id = file.attrib.get('GROUPID', '')
                admid = file.attrib.get('ADMID', '')
                for flocat in file.findall('FLocat', ns):
                    href = flocat.attrib.get('{http://www.w3.org/1999/xlink}href', '')
                    loctype = flocat.attrib.get('LOCTYPE', '')
                    otherloctype = flocat.attrib.get('OTHERLOCTYPE', '')
                    # Append data to the list
                    file_data.append({
                        'USE': use,
                        'File ID': file_id,
                        'Group ID': group_id,
                        'ADMID': admid,
                        'File Location': href,
                        'LOCTYPE': loctype,
                        'OTHERLOCTYPE': otherloctype
                    })

        # Create a pandas DataFrame from the extracted data
        df = pd.DataFrame(file_data)

        return df
    
    # 再帰的にmets:structMapを可視化する関数
    def print_structMap(self, div, level=0, is_last=False, prefix=""):

        ns = self.ns

        # ツリー表示用の線のパターンを設定
        branch = "└── " if is_last else "├── "
        space = "    " if is_last else "│   "

        # 階層に応じてインデントを作成し、ツリーの分岐を作成
        indent = f"{prefix}{branch}"
        
        # divのタイプとラベルを表示
        div_type = div.attrib.get('TYPE', '')
        div_label = div.attrib.get('LABEL', '')
        print(f"{indent}{div_type}: {div_label}")
        
        # 子のmets:div要素を再帰的に処理
        children = div.findall('div', ns)
        for i, child_div in enumerate(children):
            is_last_child = (i == len(children) - 1)
            self.print_structMap(child_div, level + 1, is_last_child, prefix + space)

    def visualize_structMap(self):

        mets_root = self.mets_root

        ns = self.ns

        # mets:structMapを取得して可視化
        for struct_map in mets_root.findall('structMap', ns):
            struct_map_type = struct_map.attrib.get('TYPE', '')
            struct_map_label = struct_map.attrib.get('LABEL', '')
            print(f"StructMap (TYPE: {struct_map_type}, LABEL: {struct_map_label})")

            # ルートのmets:div要素を処理
            root_divs = struct_map.findall('div', ns)
            for i, div in enumerate(root_divs):
                is_last_div = (i == len(root_divs) - 1)
                self.print_structMap(div, is_last=is_last_div)

    def show_file_changes(self):

        log_file_path = self.path
    
        # ログファイルのパスを指定
        # log_file_path = "change.log"
        
        # ファイル名の変更を示す行を抽出する正規表現
        change_pattern = r'Changed name:\s*(.*)\s*->\s*(.*)'
        
        # ファイル名の変更を格納するリスト
        file_changes = []
        
        # ログファイルを読み込んで処理
        with open(log_file_path, 'r') as file:
            for line in file:
                # ファイル名の変更を示す行を正規表現で抽出
                match = re.search(change_pattern, line)
                if match:
                    old_name = match.group(1).strip()
                    new_name = match.group(2).strip()
                    file_changes.append((old_name, new_name))
        
        # 結果を見やすく表示
        if file_changes:
            print("ファイル名の変更一覧:")
            for old_name, new_name in file_changes:
                print(f"- 変更前: {old_name.split('/')[-1]}")
                print(f"  変更後: {new_name.split('/')[-1]}")
                print("-" * 40)
        else:
            print("ファイル名の変更は見つかりませんでした。")

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()