<a href="https://colab.research.google.com/github/twagoo/europeana_colab/blob/master/europeana_fulltext_processing_example1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [105]:
#@title Imports and utility functions
import json
import logging
import os
import requests
import tarfile
import requests
import shelve
from lxml import etree

TEXT_RESOURCE_PROFILE_ID = 'clarin.eu:cr1:p_1633000337997'
CMD_NAMESPACES = {
    'cmd': 'http://www.clarin.eu/cmd/1',
    'cmdp': 'http://www.clarin.eu/cmd/1/profiles/clarin.eu:cr1:p_1633000337997'
}
FULLTEXT_RESOURCES_BASE_URL='https://www.europeana.eu/api/fulltext'

logger = logging.getLogger(__name__)
logging.basicConfig()
logger.setLevel(logging.INFO)

def flatten_list(t):
  return [item for sublist in t for item in sublist]


def retrieve_cmdis_archive(url, filename='cmdis.tgz'):
  cmdis_zip = requests.get(url, allow_redirects=True)
  with open(filename, 'wb') as tarball:
    tarball.write(cmdis_zip.content)
  return filename


def unpack_collection(tarball_filename, target_dir, collection):
  try:
    with tarfile.open(tarball_filename, 'r:gz') as tarball:
      if collection in tarball.getnames():
        members = [tar_info for tar_info in tarball.getmembers()
                  if tar_info.name.startswith(f'{collection}/')]
        if len(members) > 0:
          logger.info(f'Extracting {collection} from {tarball_filename} to {target_dir}/')
          tarball.extractall(members=members, path=target_dir)
          return True
      else:
        logger.warning(f'{collection} not found in tarball')
  except Exception as ex:
    logger.error(f'Something went wrong while trying to extract CMDIs from tarball: {ex}')
  return False


def index_filenames(index_def, cmdi_files_dir):
  index = {}
  for filename in os.listdir(cmdi_files_dir):
    if not (filename.endswith(".xml") or filename.endswith(".cmdi")):
      logger.info(f"Skipping file {filename} (not an XML file)")
    else:
      file_path = f'{cmdi_files_dir}/{filename}'
      logger.debug(f'Processing file {file_path}')
      # parse file
      xmldoc = etree.parse(file_path)
      # check if it's a text resource record
      md_profile_values = xmldoc.xpath('/cmd:CMD/cmd:Header/cmd:MdProfile/text()', namespaces=CMD_NAMESPACES)
      if not (TEXT_RESOURCE_PROFILE_ID in md_profile_values):
        logger.info(f'Skipping file {filename} (not a text resource record)')
      else:
        # get resource refs
        resource_refs = [ref for ref
                        in xmldoc.xpath('/cmd:CMD/cmd:Resources/cmd:ResourceProxyList/cmd:ResourceProxy/cmd:ResourceRef/text()', namespaces=CMD_NAMESPACES)
                        if ref.startswith(FULLTEXT_RESOURCES_BASE_URL)]
        
        # put in index according to definition
        for index_key in index_def:
          values = xmldoc.xpath(index_def[index_key], namespaces=CMD_NAMESPACES)
          
          if values and len(values) > 0:
            # create and/or get index for current key
            if not index.get(index_key, None):
              # key has not been indexed
              index[index_key] = {}
            key_index = index[index_key]

            # add refs to key/value index
            for value in values:
              if not key_index.get(value, None):
                key_index[value] = []
              key_index[value] += resource_refs
  return index

def get_json_from_http(url, session=None):
    logger.debug(f"Making request: {url}")
    if session is None:
        response = requests.get(url).text
    else:
        response = session.get(url).text
    logger.debug(f"API response: {url}")
    try:
        return json.loads(response)
    except json.JSONDecodeError:
        logger.error(f"Error decoding response from {url}")
        return None


def get_fulltext(urls_set, cache):
  text = []
  for url in urls_set:
    from_cache = cache.get(url, None)
    if from_cache:
      # use from cache
      text_value = from_cache
    else:
      # retrieve
      logger.debug(f'Retrieving text from {url}')
      json_response = get_json_from_http(url)
      if json_response:
        text_value = json_response.get('value', None)
        if text_value:
          cache[url] = text_value

    if text_value:
      text += [text_value]
    else:
      logger.warning(f'No response and/or text value at {url}')
  return text

In [86]:
#@title Constants and settings
COUNTRY = "Latvia" #@param {type:"string"}

INDEX_DEF = {
    'language': '/cmd:CMD/cmd:Components/cmdp:TextResource/cmdp:Language/cmdp:code/text()',
    'years': '/cmd:CMD/cmd:Components/cmdp:TextResource/cmdp:TemporalCoverage/cmdp:Start/cmdp:year/text()'
}

COLLECTIONS = {'Finland': '9200301', 'Latvia': '9200303', 'Luxembourg': '9200396'}
if COUNTRY in COLLECTIONS:
  COLLECTION = COLLECTIONS[COUNTRY]
else:
  raise RuntimeError(f"Country name '{COUNTRY}' not in collections dictionary keys: {list(COLLECTIONS)}")

CMDI_TARBALL_URL='https://alpha-vlo.clarin.eu/data/test/europeana-aggregations.tar.gz'

CMDIS_DIR='./cmdis'
OUTPUT_DIR='./output'

cache={}

In [24]:
logger.info('Retrieving CMDIs')
tarball_filename = retrieve_cmdis_archive(CMDI_TARBALL_URL)

INFO:__main__:Retrieving CMDIs


In [25]:
logger.info(f'Reading tarball contents (looking for {COLLECTION})')

if not unpack_collection(tarball_filename, CMDIS_DIR, COLLECTION):
  raise RuntimeError(f'Failed to extract member {collection} from tarball!')

COLLECTION_FILES_DIR=f'{CMDIS_DIR}/{COLLECTION}'
logger.info(f'CMDI files available in {COLLECTION_FILES_DIR}/')

INFO:__main__:Reading tarball contents (looking for 9200303)
INFO:__main__:Extracting 9200303 from cmdis.tgz to ./cmdis/
INFO:__main__:CMDI files available in ./cmdis/9200303/


In [26]:
logger.info('Indexing resource links from records')
index = index_filenames(INDEX_DEF, COLLECTION_FILES_DIR)

INFO:__main__:Indexing resource links from records
INFO:__main__:Skipping file Wentspils_Apskats_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file Jaun__Dz_ve_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file R_gas_Pils_tas_Policijas_Av_ze_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file Sarkanais_Sports_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file Zemgale_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file Bit_te_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file _Drywas__Ziniskajs_pilykums_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file Kurzeme_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file Latvijas_PSR_Augst_k_s_Padomes_prezidija_zi_ot_js_collection.cmdi (not a text resource record)
INFO:__main__:Skipping file Jaun__Straume_collection.cmdi (not a text resource record)
INFO:__main__:Skipping fil

In [74]:
logger.info('Index summary:')
for index_key in index:
  key_index = index[index_key]
  logger.info(f'{index_key}: {sorted(list(key_index))}')

INFO:__main__:Index summary:
INFO:__main__:language: ['deu', 'est', 'lav', 'pol', 'rus']
INFO:__main__:years: ['1868', '1869', '1870', '1871', '1872', '1873', '1874', '1875', '1876', '1877', '1878', '1879', '1880', '1881', '1882', '1883', '1884', '1885', '1886', '1887', '1888', '1889', '1890', '1891', '1892', '1893', '1894', '1895', '1896', '1897', '1898', '1899', '1900', '1901', '1902', '1903', '1904', '1905', '1906', '1907', '1908', '1909', '1910', '1911', '1912', '1913', '1914', '1915', '1916', '1917', '1918', '1919', '1920', '1921', '1922', '1923', '1924', '1925', '1926', '1927', '1928', '1929', '1930', '1931', '1932', '1933', '1934', '1935', '1936', '1937', '1938', '1939', '1940', '1941', '1942', '1943', '1944', '1945', '1946']


In [101]:
# Create segments, then retrieve text

urls_lav = set(index['language']['lav'])
urls_deu = set(index['language']['deu'])
urls_20s = set(flatten_list([ index['years'][year] for year in list(index['years']) if (int(year) >= 1920) and (int(year) < 1930) ]))
urls_30s = set(flatten_list([ index['years'][year] for year in list(index['years']) if (int(year) >= 1930) and (int(year) < 1940) ]))

# make intersections
segments_urls = {
  # 'urls_lav_20s': urls_lav.intersection(urls_20s),
  # 'urls_lav_30s': urls_lav.intersection(urls_30s),
  'urls_deu_20s': urls_deu.intersection(urls_20s),
  'urls_deu_30s': urls_deu.intersection(urls_30s),
}


In [106]:
# retrieve text for segments
segments_text = {}
with shelve.open('fulltext_cache') as cache:
  for seg_name in segments_urls:
    urls = segments_urls[seg_name]
    logger.info(f"Retrieving text content for segment '{seg_name}' ({len(urls)} urls)")
    segments_text[seg_name] = get_fulltext(urls, cache)

INFO:__main__:Retrieving text content for segment 'urls_deu_20s' (3079 urls)
INFO:__main__:Retrieving text content for segment 'urls_deu_30s' (283 urls)


In [110]:
# save retrieved text to file
os.makedirs(OUTPUT_DIR, exist_ok=True)
for seg_name in segments_text:
  filename=f'{OUTPUT_DIR}/{seg_name}.txt'
  with open(filename, 'w') as file:
    logger.info(f"Writing all text for segment '{seg_name}' to '{filename}'")
    written = sum([file.write(text) for text in segments_text[seg_name]])
    logger.info(f'{written} characters written')

INFO:__main__:Writing all text for segment 'urls_deu_20s' to './output/urls_deu_20s.txt'
INFO:__main__:41932571 characters written
INFO:__main__:Writing all text for segment 'urls_deu_30s' to './output/urls_deu_30s.txt'
INFO:__main__:1648545 characters written


In [None]:
# send to voyant