# Scripts to prepare metadata from Wismut Archive for import into oh.d system
    Copyright (C) 2022 Olaf Berg (Leibniz-Zentrum für Zeithistorische Forschung Potsdam, ZZF)

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.


# Read the Wismut API
In the first step we read the API of the Wismut archive and extract the information we need/want to import into the oh.d system. The result is written into a simple json file that can be stored and further processed (e.g. into an csv file for oh.d import, see next step below).

In [3]:
def read_api(urlpath):
    import requests
    if not urlpath:
        print('es muss eine gültige URL an die Funktion read_api übergeben werden!')
        return
    headers={'Accept' : 'application/json'}
    r=requests.get(url=urlpath, headers=headers)
    if r:
        try:
            data=r.json()
            return data
        except requests.exceptions.JSONDecodeError as e:
            print('JSON Daten konnten nicht verarbeitet werden:',e)
    else:
        print('Fehler bei Abfrage der URL:',urlpath,'mit Fehlercode:',r.status_code)
        return

def search_jsondata(searchlist, data):
    if not type(data) == dict:
        print('Keine Daten im passenden Format an die Funktion searach_jsondata übergeben')
        return
    if not type(searchlist) == dict:
        print('Keine Suchliste im passenden Format an die Funktion searach_jsondata übergeben')
        return
    selected = {}
    for key, val in searchlist.items():
        try:
            selected[key] = eval(val)
        except TypeError as e:
            print('Fehlerhafte Angaben zur Suche oder keine Daten gefunden für Key:', key, 'Schlüssel:', val,'Fehler:', e)
            selected[key] = ''
        except IndexError as e:
            print('Fehlerhafte Angaben zur Suche oder keine Daten gefunden für Key:', key, 'Schlüssel:', val,'Fehler:', e)
            selected[key] = ''
    return selected
            
    

def find_interviews(data):
    interviews = []
    if not data:
        print('Keine Daten zum Durchsuchen an die Funktion find_interviews übergeben.')
        return
    for item in data['items']:
        interviews.append(item['@id'])
    print(interviews)
    return interviews

def parse_interviews (urls):
    interviews_metadaten = []
    if not type(urls) == list:
        print('Es wurde keine passende Liste mit URLs an die Funktion parse_interviews übergeben')
        return
    for url in urls:
        data = read_api(url)
        metadaten = find_metadata_in_interview(data)
        interviews_metadaten.append(metadaten)
    return interviews_metadaten

def find_metadata_in_interview(data):
    import datetime
    metadaten = {}
    if not type(data) == dict:
        print('Keine Daten im passenden Format an die Funktion find_metadata_in_interview übergeben')
        return
    searchlist = {
        'Orig_ID' : "data['UID']",
        'Medium' : '"Video"',
        'Language' : "data['language']['token']",
        'Accessibility' : "'Online Open Access'",
        'Indexing' : "data['subjects']",
        'link' : "data['@id']"
    }
    metadaten = search_jsondata(searchlist,data)
    try:
        metadaten['Date_of_Interview'] = data['edtf_date']
    except TypeError as e:
        metadaten['Date_of_Interview'] = ''
        print('Angabe zum Datum des Interviews ist fehlerhaft', e, 'Feld bleibt leer')
    try:
        metadaten['Year_of_Interview'] = data['edtf_date'][:4]
    except TypeError as e:
        metadaten['Date_of_Interview'] = ''
        metadaten['Year_of_Interview'] = ''
        print('Angabe zum Datum des Interviews ist fehlerhaft', e, 'Felder für Datum und Jahr bleiben leer')
    try:
        metadaten['Length'] = str(datetime.timedelta(seconds=data['duration']))
    except TypeError as e:
        metadaten['Length'] = ''
        print('Angabe zur Filmlänge fehlt', e, 'Feld bleibt leer')
    try:
        metadaten['Interviewer'] = data['interviewer'][0]['title']
    except IndexError as e:
        metadaten['Interviewer'] = ''
        print('Angabe für Interviewer fehlt', e, 'Feld bleibt leer')
    try:
        metadaten['Interviewtyp'] = data['taxonomy_interview_types']['title']
    except TypeError as e:
        metadaten["Interviewtyp"] = ''
        print('Angabe zum Interviewtyp fehlt', e, 'Feld bleibt leer')
    try:
        metadaten['Place_of_Interview'] = ""
        for location in data['location']:
            try:
                if metadaten['Place_of_Interview'] == '':
                    metadaten['Place_of_Interview'] = location['title']
                else:
                    metadaten['Place_of_Interview'] = metadaten['Place_of_Interview'] + ', ' + location['title']
            except TypeError as e:
                print('Angabe zum Interviewort fehlt', e, 'Feld bleibt leer')
            try:
                metadaten['Place_of_Interview_description'] = location['description']
            except  TypeError as e:
                print('Keine nähere Beschreibung des Ortes gefunden',e)
                metadaten['Place_of_Interview_description'] = ' '
    except IndexError as e:
        metadaten['Place_of_Interview'] = ' '
        metadaten['Place_of_Interview_description'] = ' '
        print('Keine Angaben zum Ort des Interviews gefunden', e, 'Feld bleibt leer')
    try:
        ortsdaten = read_api(data['location'][0]['@id'])
        metadaten['Geodata_Place_of_Interview'] = find_metadata_of_places(ortsdaten)
    except IndexError as e:
        print('Keine näheren Ortsangaben gefunden', e, 'Datenfeld bleibt leer')
        metadaten['Geodata_Place_of_Interview'] = ''



    personendaten = []
    for person in data['persons_interviewed']:
        print('person URL:', person['@id'])
        metadaten_person = read_api(person['@id'])
        #print('Metadaten zur Person als Json', metadaten_person)
        auswertung = find_metadata_in_person(metadaten_person)
        print('Auswertung der Metadaten zur Person:', type(auswertung), auswertung)
        personendaten.append(auswertung)
    metadaten['Interviewte'] = personendaten



    print(metadaten, '\n\n')
    return metadaten


def find_metadata_in_person(data):
    metadaten = {}
    if not type(data) == dict:
        print('Keine Daten im passenden Format an die Funktion find_metadata_in_person übergeben')
        return
    try:
        metadaten['First_Name'] = data['first_name']
    except TypeError as e:
        metadaten['First_Name'] = ''
        print('Angabe zum Vornamen der Person ist fehlerhaft', e, 'Feld bleibt leer')
    try:
        metadaten['First_Name'] = metadaten['First_Name'] + ' ' + data['middle_name']
    except TypeError as e:
        print('Keine Angabe zum Middlename. Feld bleibt leer', e)
    try:
        metadaten['Last_Name'] = data['last_name']
    except TypeError as e:
        metadaten['Last_Name'] = ''
        print('Keine Angabe zum Nachnamen gefunden', e, 'Feld bleibt leer')
    try:
        metadaten['Additional_Name'] = data['alternative_names']
    except TypeError as e:
        metadaten['Additional_Name'] = ''
        print('Keine Angabe zu alternativen Namen gefunden', e, 'Feld bleibt leer')
    try:
        metadaten['Short_Biography'] = data['biography_short']['data']
    except TypeError as e:
        metadaten['Short_Biography'] = ''
        print('Keine Angabe zu alternativen Namen gefunden', e, 'Feld bleibt leer')
    try:
        for item in data['items']:
            if item['@type'] == 'person_gender':
                metadaten['Gender'] = item['title']
            elif item['@type'] == 'person_birthplace':
                metadaten['Place_of_Birth'] = item['title']
            elif item['@type'] == 'person_birthday':
                metadaten['Date_of_Birth_Details'] = item['title']
            elif item['@type'] == 'person_job':
                print('Job', item['title'], type(item['title']))
                try:
                    metadaten['Job'] = metadaten['Job'] + ", " + item['title']
                except KeyError:
                    metadaten['Job'] = item['title']
            else:
                print('item gefunden, aber nicht die gesuchten Metadaten:', item)
    except IndexError as e:
        print('Keine weiteren Angabe zur Person gefunden', e, 'Zusatzfelder bleiben leer')

    return metadaten

def find_metadata_of_places(data):
    metadaten = {}
    if not type(data) == dict:
        print('Keine Daten im passenden Format an die Funktion find_metadata_of_places übergeben')
        return
    try:
        metadaten['Place_Name'] = data['hierarchical_title']
    except TypeError as e:
        metadaten['Place_Name'] = ""
        print('Keine Angabe zum Ortsnamen gefunden', e, 'Feld bleibt leer')
    try:
        metadaten['Place_Description'] =  data['description']
    except TypeError as e:
        print('Keine Beschreibung des Ortes gefunden', e)
        metadaten['Place_Description'] = ''
    try:
        metadaten['Geonames_ID'] = data['geonames_id']
    except TypeError as e:
        metadaten['Geonames_ID'] = ""
        print('Keine Geonames ID gefunden', e, 'Feld bleibt leer')
    try:
        metadaten['GND_ID'] = data['gnd_id']
    except TypeError as e:
        metadaten['GND_ID'] = ""
        print('Keine GND ID gefunden', e, 'Feld bleibt leer')
    try:
        metadaten['Geolocation'] = data['geolocation']
    except TypeError as e:
        metadaten['Geolocation'] = ""
        print('Keine Längen- und Breitengradangaben gefunden', e, 'Feld bleibt leer')

    return metadaten



def write_to_json(data,*filename):
    import json
    if not filename:
        filename = input("Verarbeitete Daten als JSON speichern unter:")
    if not type(data) == list or type(data) == dict:
        print('Kein zu JSON wandelbares Objekt übergeben!', type(data))
        return
    if not filename is None:
        with open(filename, 'w') as f:
            json.dump(data, f)
        print('Verarbeitete Daten als JSON gespeichert unter:',filename)
    else:
        print('Kein Dateiname vergeben. Es wird nichts gespeichert')
    return





def main(*starturl):
    if not starturl:
        starturl = "https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/?b_size=100"
    rdata = read_api(starturl)
    print(type(rdata))
    interviewliste = find_interviews(rdata)
    metadaten = parse_interviews(interviewliste)
    write_to_json(metadaten)


if __name__ == '__main__':
    main()


<class 'dict'>
['https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-andrejew-georgij-georgiewitsch', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-baumann-regina-und-baumann-rommy', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-jutta-bergholz', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-bohnwagner-elfriede', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-brumme-reiner', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-daenecke-rudolf', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-nindel-kerstin', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-ducke-dr-guenter', 'https://wismut.saw-leipzig.de/api/de/bestaende/zeitzeugen-interviews/interview-mit-fischer-hans-georg', 'https://wismut.saw-leipzig.de

# Write CSV for Oral History digital
This script loads the resulting data (json file) from the script above and writes it into a csv file for upload into the oh.d system.
Change the path variable in the main function at the end of this script to the current data file you want to write into csv.

In [5]:
def load_json(*path):
    import json
    if not path:
        path = input('Welche JSON Datei soll eingelesen werden?')
    else:
        path = path[0]
    with open(path, 'r') as j:
        data = json.load(j)
    return data

def prepare_data(wismut_data):
    from datetime import datetime
    from bs4 import BeautifulSoup
    rows = []
    i = 0
    print('Typ wismut_data',type(wismut_data))
    for dataset in wismut_data:
        i = i+1
        print('Bearbeite Interview nr',i)
        row = {}
        row['Interview-ID'] = 'wismut' + str(i).zfill(4)
        try:
            row['Vorname'] = dataset['Interviewte'][0]['First_Name']
        except (TypeError, KeyError, IndexError):
            row['Vorname'] = " "
        try:
            row['Nachname'] = dataset['Interviewte'][0]['Last_Name']
        except (TypeError, KeyError, IndexError):
            row['Nachname'] = " "
        try:
            row['Weitere Namen'] = dataset['Interviewte'][0]['Additional_Name']
        except (TypeError, KeyError, IndexError):
            row['Weitere Namen'] = " "
        row['Geburtsname'] = ' '
        row['Weitere Vornamen'] = ' '
        try:
            row['Geschlecht']= dataset['Interviewte'][0]['Gender']
        except (TypeError, KeyError, IndexError):
            row ['Geschlecht']= " "
        try:
            row['Geburtsdatum'] = dataset['Interviewte'][0]['Date_of_Birth_Details']
        except (TypeError, KeyError, IndexError):
            row['Geburtsdatum'] = " "
        try:
            row['Geburtsort'] = dataset['Interviewte'][0]['Place_of_Birth']
        except (TypeError, KeyError, IndexError):
            row['Geburtsort']= " "
        row['Geburtsort (Subkategorie)'] = ' ' # not provided
        try:
            html = dataset['Interviewte'][0]['Short_Biography']
            soup = BeautifulSoup(html)
            short_bio = soup.get_text()
            short_bio = short_bio.replace('\t', ' ')
            row['Biographie'] = short_bio
        except (TypeError, KeyError, IndexError):
            row['Biographie'] = " "
        row['Teilsammlung'] = " " # Teilsammlung not used
        try:
            row['Originalsignatur'] = dataset['Orig_ID']
        except (TypeError, KeyError):
            row['Originalsignatur'] = " "
        row['Medientyp'] = "Video" # Medium
        try:
            row['Sprache'] = dataset['Language']
        except TypeError:
            row['Sprache'] = " "
        try:
            interviewdatum = datetime.strptime(dataset['Date_of_Interview'], '%Y-%m-%d')
            row['Interview-Datum'] = interviewdatum.strftime("%d.%m.%Y")
        except ValueError:
            row['Interview-Datum'] = dataset['Date_of_Interview']
        except (TypeError, KeyError, IndexError):
            row['Interview-Datum'] = " "
        try:
            row['Ort des Interviews'] = dataset['Place_of_Interview']
        except TypeError:
            row['Ort des Interviews'] = " "
        try:
            row['Ort des Interviews (Subkategorie)'] = dataset['Place_of_Interview_description']
        except (TypeError, KeyError):
            row['Ort des Interviews (Subkategorie)'] = " "
        try:
            row['Dauer'] = dataset['Length']
        except TypeError:
            row['Dauer'] = " "
        row['Anzahl der Bänder'] = '1'
        #try:
        #    row['Accessibility'] = dataset['Accessibility']
        #except TypeError:
        #    row['Accessibility'] = " "
        try:
            row['Interviewführung'] = dataset['Interviewer']
        except TypeError:
            row['Interviewführung'] = " "
        row['Transkription'] = " " # Transkription separate provided
        row['Übersetzung'] = " " # Translation not provided
        # oh.d import script can not handle multiple keywords at the moment
        # so for the moment wie just set the keyword "Bergbau"
        #try:
        #    schlagworte = dataset['Indexing']
        #    schlagwortliste = ''
        #    for schlagwort in schlagworte:
        #        schlagwortliste = schlagwortliste + '#' + schlagwort
        #    row['Thema'] = schlagwortliste
        #except (TypeError, KeyError, IndexError):
        #    row['Thema'] = " "
        row['Thema'] = 'Bergbau'
        row['Thema (Subkategorie)'] = ' '
        # interview will be linked interanlly in oh.d system
        #try:
        #    row['Link to interview'] = dataset['link']
        #except TypeError:
        #    row['Link to interview'] = " "
        rows.append(row)
    return rows

def write_csv_neu(path_to_csv, rows, *colnames):
    import csv, sys
    if not colnames:
        colnames = set()
        for row in rows:
            for colname in row:
                colnames.add(colname)
        colnames = list(colnames)
    else:
        colnames = colnames[0]
    if len(rows):
        with open(path_to_csv, 'w', newline='')  as f:
            csv.register_dialect('custom', delimiter=';')
            dict_writer = csv.DictWriter(f, fieldnames=colnames, dialect='custom')
            dict_writer.writeheader()
    else:
        sys.stderr.write('No data to write to file "{}"\n'.format(path_to_csv))
    return path_to_csv


def write_csv(path_to_csv, rows, **kwargs):
    """*Args*: path_to_csv, rows. *kwargs*: colnames=[]"""
    import csv, sys
    colnames = kwargs.get('colnames', [])
    if not colnames:
        colnames = set()
        for row in rows:
            for colname in row:
                colnames.add(colname)
        colnames = list(colnames)
    if len(rows):
        with open(path_to_csv, 'w', newline='')  as f:
            csv.register_dialect('custom', delimiter='\t')
            colnames = colnames or rows[0].keys()
            dict_writer = csv.DictWriter(f, colnames, dialect='custom')
            dict_writer.writeheader()
            dict_writer.writerows(rows)
    else:
        sys.stderr.write('No data to write to file "{}"\n'.format(path_to_csv))
    return path_to_csv

def main():
    path = 'wismut-to-ohd.json'
    wismut_data = load_json(path)
    print('wismut Daten aus Json:',type(wismut_data),wismut_data)
    print('--------------------------')
    prepared_data = prepare_data(wismut_data)
    print('aufbereitete Daten:',type(prepared_data),prepared_data)
    colnames = ['Interview-ID','Originalsignatur','Sprache','Teilsammlung','Interview-Datum','Medientyp','Dauer',
                'Protokoll','Beschreibung','Anzahl der Bänder','Vorname','Nachname','Geburtsname','Weitere Namen',
                'Weitere Vornamen','Geschlecht','Geburtsdatum','Biographie','Interviewführung','Transkription',
                'Übersetzung','Erschließung','Geburtsort','Geburtsort (Subkategorie)','Ort des Interviews',
                'Ort des Interviews (Subkategorie)','Thema','Thema (Subkategorie)']
    path = input('In welche Datei sollen die Daten gespeichert werden?')
    write_csv(path,prepared_data,colnames=colnames)
    print('Transformation to csv Done. Result in:', path)

if __name__ == '__main__':
    main()

wismut Daten aus Json: <class 'list'> [{'Orig_ID': '735c5cf99ee848f4866a37ac743b5881', 'Medium': 'Video', 'Language': 'de', 'Accessibility': 'Online Open Access', 'Indexing': ['Lokomotivfabrik Luhansk', 'Zweiter Weltkrieg', 'Sibirien', 'Evakuation', 'Fliegerangriff', 'Omsk', 'Kindheitserinnerungen', 'Krieg', 'Überleben', 'Vater', 'Hauptingenieur', 'Schule', 'Charkiw', 'Hochschule für Bergbau der UdSSR Charkiw', 'WNIIOMSchS', 'Abteilung für Mechanisierung', 'wissenschaftliche Tätigkeit', 'Patente', 'Aspirantur', 'Doktor', 'Dissertation', 'Kandidat der Wissenschaften', 'SDAG Wismut', 'Kadermangel', 'Spezialisten', 'Leitender Ingenieur', 'Abteilungsleiter', 'Leiter der Abteilung für Mechanisierung', 'Ventilation', 'Belüftung', 'Errichtung von Fördertürmen', 'Förderturm', 'Planvorgaben', 'Drosen', 'Arbeitsaufgabe', 'Geschichte der Wismut', 'Kriegshandlungen', 'Uranerzvorkommen', 'Geschichte des Bergbaus', 'Uranfarben', 'Pechblende', 'Lungenkrebs', 'Schneeberger Krankheit', 'Uransuche', 'SB

# Find media files and change their name according to oh.d standard
This script is highly specific to the single case of video files delivered to zzf on hdd. It creates alias names to the existing files according to oh.d import secifications dependent to the metadata created by the scripts abouve. The files can than be uploaded to oh.d and automatically connected to the corresponding metadata set.
Change the base_path and csv_path variable in main function according to actual needs.


In [None]:
def load_csv(*path):
    import csv
    from collections import defaultdict
    result = []
    if not path:
        path = input('Welche CSV Datei soll eingelesen werden?')
    else:
        path = path[0]
    with open(path, 'r') as f:
        data = csv.DictReader(f, delimiter='\t')
        for row in data:
            result.append(row)
    print(type(result))
    return result


def create_alias(interviewlist,basepath):
    from datetime import datetime
    import os, glob
    print(type(interviewlist))

    for interview in interviewlist:
        print('------')
        try:
            interview_date = datetime.strptime(interview['Interview-Datum'], '%d.%m.%Y')
        except ValueError:
            print('Für das interview mit der ID', interview['Interview-ID'],
                  'konnte die Datumsangabe:', interview['Interview-Datum'], 'nicht in ein Datumsobjekt gewandelt werden')
        try:
            find_archive = datetime.strftime(interview_date, '%Y.%m.%d') + ' - ' \
                                 + interview['Nachname'] + ', ' + interview['Vorname'] + '*'
            find_filename = datetime.strftime(interview_date, '%Y.%m.%d') + ' - *' \
                                  + interview['Nachname'] + ', ' + interview['Vorname'] + '*.mkv'
            searchpath = os.sep.join([basepath, find_archive, find_filename])
            print('searchpath:', searchpath)
            files_found = glob.glob(searchpath)
            if len(files_found) == 0:
                find_filename = datetime.strftime(interview_date, '%Y.%m.%d') + ' -*' \
                                + interview['Vorname'] + ' ' + interview['Nachname'] + '*.mkv'
                searchpath = os.sep.join([basepath, find_archive, find_filename])
                print('alternativer searchpath:', searchpath)
                files_found = glob.glob(searchpath)
            print('Gefunden:', len(files_found))
            if len(files_found) == 0:
                print('Kein Video gefunden für Interview ID:', interview['Interview-ID'])
            else:
                n = 0
                for file in files_found:
                    n += 1
                    if len(files_found) == 1:
                        new_symlinkname = interview['Interview-ID'] + '_01_01_master.mkv'
                        new_symlink = os.sep.join([basepath, 'ohd', new_symlinkname])
                        print('new_symlink')
                        try:
                            os.symlink(file, new_symlink)
                        except FileExistsError:
                            print('Für Interview:', interview['Interview-ID'], 'existiert bereits ein Alias:', new_symlink)

                    else:
                        print('Mehrere Videos gefunden für Inverwiew-ID', interview['Interview-ID'])
                        new_symlinkname = interview['Interview-ID'] + '_01_01_master_' + n + '.mkv'
                        new_symlink = os.sep.join([basepath, 'ohd', new_symlinkname])
                        os.symlink(file, new_symlink)
        except ValueError:
            print('Für Interview ID:', interview['Interview-ID'], 'konnte kein Suchpfad erstellt werden')

def main():
    print('============== Start creating Aliases ================')
    basepath = '/media/viejo/Elements/Wismut_AV_Daten'
    csv_path = 'metadata-import-Wismut_zu_ohd-2022-03-01_cg_ob.csv'
    csv_data = load_csv(csv_path)
    print('CSV eingelesen')
    create_alias(csv_data,basepath)
    print('================== Done =============================')


if __name__ == '__main__':
    main()


