<h1>E-rara data transformation with Spark and ingest to Elasticsearch<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Prerequisites" data-toc-modified-id="Prerequisites-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Prerequisites</a></span></li><li><span><a href="#Processing-e-rara-data" data-toc-modified-id="Processing-e-rara-data-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Processing e-rara data</a></span><ul class="toc-item"><li><span><a href="#Load-from-AWS-S3" data-toc-modified-id="Load-from-AWS-S3-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Load from AWS S3</a></span></li><li><span><a href="#Get-to-know-the-data" data-toc-modified-id="Get-to-know-the-data-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>Get to know the data</a></span></li><li><span><a href="#Cleaning-and-formatting-the-data" data-toc-modified-id="Cleaning-and-formatting-the-data-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>Cleaning and formatting the data</a></span></li><li><span><a href="#Text-processing:-Named-entity-recognition" data-toc-modified-id="Text-processing:-Named-entity-recognition-2.4"><span class="toc-item-num">2.4&nbsp;&nbsp;</span>Text processing: Named entity recognition</a></span></li></ul></li><li><span><a href="#Ingest-to-Elasticsearch" data-toc-modified-id="Ingest-to-Elasticsearch-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Ingest to Elasticsearch</a></span></li><li><span><a href="#Query-Elasticsearch" data-toc-modified-id="Query-Elasticsearch-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Query Elasticsearch</a></span></li></ul></div>

# Prerequisites

In [1]:
# Import basic libraries
import os, re, json
import pandas as pd
print('Successfully imported necessary libraries.')

Successfully imported necessary libraries.


In [3]:
# for I/O AWS S3
!pip install boto3
import boto3



In [4]:
# Import the required Python dependencies

import findspark
findspark.init()

# modules for handling dataframes
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, split, lit, when, regexp_replace, concat
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, IntegerType, StringType

# for user defined functions
from pyspark.sql.functions import udf

In [5]:
# Start Spark Session

import pyspark
conf = pyspark.SparkConf()

# Session configuration
#conf.setMaster("spark://spark-master:7077")
conf.setMaster("local[2]")

conf.set("spark.executor.memory", "8g")
conf.set("spark.executor.cores", "1")
conf.set("spark.driver.memory", "4g")
conf.set("spark.core.connection.ack.wait.timeout", "1200")

# Elasticsearch
conf.set("spark.executor.extraClassPath", "elasticsearch-hadoop-7.12.0/dist/elasticsearch-hadoop-20_2.11-7.12.0.jar, \
            elasticsearch-hadoop-7.12.0/dist/elasticsearch-spark-20_2.11-7.12.0.jar")
conf.set("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0")
conf.set("es.index.auto.create", "true")

# Initialize a Spark session with configuration
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('elastic') \
        .config(conf=conf) \
        .getOrCreate()

# Instantiate a Spark Context
sc = spark.sparkContext


In [6]:
spark

In [34]:
#  stop Spark Session, if necessary
#spark.stop()

In [7]:
# check executable binaries for the Python interpreter
import sys
print("Python on driver: " + sys.executable)
distData = sc.parallelize(range(100))
python_distros = distData.map(
    lambda x: sys.executable).distinct().collect()
print("Python on nodes: ", python_distros)

Python on driver: /opt/conda/bin/python
Python on nodes:  ['/opt/conda/bin/python']


# Processing e-rara data

## Load from AWS S3

In [8]:
# Get number of, and keys resp. filepaths of objects in AWS S3

s3 = boto3.client('s3')
response = s3.list_objects(Bucket='bgd-content', MaxKeys=1000, \
                           Prefix='bernensia-json-fulltext/')
#response['ResponseMetadata']
keys = []
file_paths = []
for i in response['Contents'][1:]:
    keys.append(i['Key'])
    file_paths.append('s3a://bgd-content/' + i['Key'])
print(len(keys))
keys[0:5]

571


['bernensia-json-fulltext/10179500.json',
 'bernensia-json-fulltext/10347968.json',
 'bernensia-json-fulltext/10381638.json',
 'bernensia-json-fulltext/10722710.json',
 'bernensia-json-fulltext/10733431.json']

In [9]:
def merge_json(path, bigfile):
    '''
    Binds the JSON files of a directy together in one big JSON file.
    
    :param path: path to directory with the JSON files which shall be integrated
    :param bigfile: name of the resulting big JSON file
    '''
    result = list()
    files = [file for file in os.listdir(path) if file.endswith('.json')]
    for f in files:
        with open(path + f, 'r') as infile:
            result.append(json.load(infile))
    with open(bigfile, 'w') as output_file:
        json.dump(result, output_file)

In [10]:
# AWS S3 example URI 's3a://bgd-content/bernensia-json-fulltext/15251979.json'
# AWS S3 example URI 's3a://bgd-content/eperiodica-json-fulltext/zgh-001:1941:3::313.json'

# make local directory for downloaded JSON files

os.makedirs('download/bernensia-json-fulltext/')

for i in range(len(keys)):
    s3.download_file('bgd-content', keys[i], 'download/{}'.format(keys[i]))

merge_json('download/bernensia-json-fulltext/', 'erara_big.json')

# load big JSON file into Spark DataFrame
df = spark.read.json('erara_big.json',  multiLine=True)
df.printSchema()

root
 |-- contributor: string (nullable = true)
 |-- coverage: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- fulltext: string (nullable = true)
 |-- id_intern: string (nullable = true)
 |-- identifier: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- language: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- rights: string (nullable = true)
 |-- source: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- title: string (nullable = true)
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Get to know the data

In [11]:
df.show(1, vertical=True)  # show the first row

-RECORD 0---------------------------
 contributor | null                 
 coverage    | Bern                 
 creator     | [s.n.]               
 date        | ["1812","1824"]      
 description | ["EnthÃ¤lt Druckb... 
 format      | 1 Mappe (10 Druck... 
 fulltext    | null                 
 id_intern   | 20150342             
 identifier  | [doi:10.3931/e-ra... 
 language    | ger                  
 publisher   | [Haller]             
 relation    | vignette : https:... 
 rights      | pdm                  
 source      | null                 
 subject     | Kaffeehandel         
 title       | [Kaffee-Verpackun... 
 type        | [Text, Book]         
only showing top 1 row



In [12]:
df.groupBy("type").count().orderBy('count', ascending=False).show(truncate=False)

+-----------------------------------------------+-----+
|type                                           |count|
+-----------------------------------------------+-----+
|[Text, Book]                                   |472  |
|[Image, Illustrated Material]                  |80   |
|[Image, Map]                                   |9    |
|[Text, Zeitschrift]                            |4    |
|[Text, Periodical, Zeitschrift]                |3    |
|[Text, Periodical, Zeitschrift, [PÃ©riodiques]]|1    |
|[Other, Music Print]                           |1    |
|[Text, Monografische Reihe]                    |1    |
+-----------------------------------------------+-----+



In [13]:
# Check for generally empty fields
possibly_empty = ['source', 'contributor']
for field in possibly_empty:
    print(df.where(df[field] != "null").count())

33
103


In [14]:
df.groupBy("rights").count().orderBy('count', ascending=False).show(truncate=False)

+------+-----+
|rights|count|
+------+-----+
|pdm   |571  |
+------+-----+



In [15]:
df.groupBy("subject").count().orderBy('count', ascending=False).show(truncate=False)

+-----------------------------------------+-----+
|subject                                  |count|
+-----------------------------------------+-----+
|null                                     |435  |
|Schausteller                             |19   |
|Wasserbau                                |10   |
|Menagerie                                |7    |
|Zirkus                                   |6    |
|["Artist","KnieFamilie"]                 |6    |
|Museumsgesellschaft                      |6    |
|["Schausteller","Wachsfigurenkabinett"]  |5    |
|JuragewÃ¤sserkorrektion                  |3    |
|Artist                                   |3    |
|Villmergerkrieg (1712)                   |3    |
|["Elefanten","Menagerie"]                |3    |
|["Ringen","Schausteller"]                |2    |
|Volkswirtschaft                          |2    |
|Ballonfahrt                              |2    |
|["Mechanisches Kunstwerk","Schausteller"]|2    |
|Panorama                                 |2    |


In [16]:
df.groupBy("coverage").count().orderBy('count', ascending=False).show(truncate=False)

+------------------------------------+-----+
|coverage                            |count|
+------------------------------------+-----+
|Bern                                |312  |
|null                                |151  |
|949.4                               |21   |
|["Bern","949.4"]                    |8    |
|330                                 |7    |
|550                                 |6    |
|320                                 |5    |
|624                                 |3    |
|Basel                               |3    |
|["Bern (Kanton)","949.4"]           |3    |
|["090","020"]                       |3    |
|["Bern (Kanton)","Geschichte","340"]|2    |
|Geschichte                          |2    |
|340                                 |2    |
|["949.4","060"]                     |2    |
|Bern (Kanton)                       |2    |
|300                                 |1    |
|390                                 |1    |
|["Geschichte","949.4"]              |1    |
|["Geschic

In [17]:
df.groupBy("language").count().orderBy('count', ascending=False).show(truncate=False)

+--------+-----+
|language|count|
+--------+-----+
|ger     |478  |
|fre     |90   |
|lat     |3    |
+--------+-----+



## Cleaning and formatting the data

In [13]:
# Clean 'type' field and rename 'type' to prevent confusion in Elasticsearch use
# Not yet in use
'''
df_type_1 = df.withColumn('type_new_1', df['type'][0].cast('String'))
df_type_2 = df_type_1.withColumn('type_new_2', df['type'][1].cast('String')).replace("Zeitschrift", "Periodical") \
                    .replace("Monografische Reihe", "Monograph Series").drop(test['type'])
df_type_3 = df_type_2.withColumn('document_type', concat(lit('['), 'type_new_1', lit(', '), 'type_new_2', lit(']'))) \
                .drop(df_type_2['type_new_1']).drop(df_type_2['type_new_2'])
'''
# Add field for source collection
df_coll = df.withColumn("source_collection", lit('E-Rara'))

# Change language codes into readable ones
df_cleaned = df_coll.withColumn('language', df_coll['language'].cast('String')) \
                    .replace("ger", "German") \
                    .replace("fre", "French") \
                    .replace("lat", "Latin")

df_cleaned.persist().printSchema()

root
 |-- contributor: string (nullable = true)
 |-- coverage: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- fulltext: string (nullable = true)
 |-- id_intern: string (nullable = true)
 |-- identifier: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- language: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- rights: string (nullable = true)
 |-- source: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- title: string (nullable = true)
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- source_collection: string (nullable = false)



In [14]:
# Make German and 'other' language subsets
df_cleaned_german = df_cleaned.filter(df_cleaned.language == "German").persist()
#df_cleaned_other = df_cleaned.filter(df_cleaned.language != "German")

## Text processing: Named entity recognition

In [15]:
# Install SpaCy
!pip install -U spacy==3.1       # to force the latest version
import spacy

In [16]:
# Download spaCy large German language model, 571.2 MB
!python -m spacy download de_core_news_lg  

In [17]:
# Load the language model
import de_core_news_lg
nlp = de_core_news_lg.load()              # alternative if problems occur: nlp = spacy.load('de_core_news_lg')     
print("SpaCy language model implemented successfully")

SpaCy language model implemented successfully


In [18]:
spacy.info()

{'spacy_version': '3.1.0',
 'location': '/opt/conda/lib/python3.8/site-packages/spacy',
 'platform': 'Linux-5.4.0-1018-aws-x86_64-with-glibc2.10',
 'python_version': '3.8.6',
 'pipelines': {'de_core_news_lg': '3.1.0'}}

In [19]:
# Define UDF for cleaning the raw text (remove special chars and words < 3) with NLTK

# Install NLTK
!pip install nltk
import nltk  
from nltk import word_tokenize   

def preprocess_nltk(text):
    wordlist = nltk.word_tokenize(str(text), language='german')
    # punctuation: all special characters, but not sentence endings
    punctuation = [',', ';', ':', '(', ')', '[', ']', '{', '}', '\"', '\'','\'\'', '\`', '\`\`', \
                   '\-', '«', '»', '£', '\^', '~', '*', '®', '•', '■', '♦', '§']
    wordlist_stripped = [w for w in wordlist if w not in punctuation]
    wordlist_stripped = [w for w in wordlist if len(w) > 2]
    wordlist = ' '.join(wordlist_stripped)
    return wordlist

udf_nltk_tokenize = udf(preprocess_nltk, StringType())



In [20]:
# Define UDF for applying large Spacy german model on fulltext to recognize named entites of localities and persons

nltk.download('punkt')                # "punkt" = standard classifier for sentence segmentation 
from nltk import sent_tokenize
    
def spacy_ner_loc(text):
    sents = nltk.sent_tokenize(text)
    ner_loc = []
    for s in sents:
        doc = nlp(s)
        ner_loc.append([ent.lemma_ for ent in doc.ents if ent.label_ == 'LOC'])
    return ner_loc

udf_spacy_ner_loc = udf(spacy_ner_loc, StringType())

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [21]:
# Apply text cleaning and NER
# Use German subset
df_nltk = df_cleaned_german.withColumn("nltk", udf_nltk_tokenize(col("fulltext")))

# Extract LOC NER with SpaCy
df_final = df_nltk.withColumn("locality", udf_spacy_ner_loc(col("nltk"))).drop("nltk")

df_final.persist()

DataFrame[contributor: string, coverage: string, creator: string, date: string, description: string, format: string, fulltext: string, id_intern: string, identifier: array<string>, language: string, publisher: string, relation: string, rights: string, source: string, subject: string, title: string, type: array<string>, source_collection: string, locality: string]

In [22]:
# Not in use
'''
def clean_locality(text):
    #return re.sub(r'\[+|\]+', '', text)

udf_clean_locality = udf(clean_locality, StringType())

# Clean the 'locality' from duplicate '[' and ']'
df_loc_clean = df_spacy.withColumn("locality_clean", udf_clean_locality(col("locality"))) \
                                   .drop(df_spacy['locality'])
df_final = df_loc_clean.withColumn("locality", split(col("locality_clean"), ",").cast("array<int>")) \
                                    .drop(df_loc_clean['locality_clean'])
                                   
df_final.persist()
'''

'\ndef clean_locality(text):\n    #return re.sub(r\'\\[+|\\]+\', \'\', text)\n\nudf_clean_locality = udf(clean_locality, StringType())\n\n# Clean the \'locality\' from duplicate \'[\' and \']\'\ndf_loc_clean = df_spacy.withColumn("locality_clean", udf_clean_locality(col("locality")))                                    .drop(df_spacy[\'locality\'])\ndf_final = df_loc_clean.withColumn("locality", split(col("locality_clean"), ",").cast("array<int>"))                                     .drop(df_loc_clean[\'locality_clean\'])\n                                   \ndf_final.persist()\n'

In [24]:
df_final.show(1, vertical=True)

-RECORD 0---------------------------------
 contributor       | null                 
 coverage          | Bern                 
 creator           | [s.n.]               
 date              | ["1812","1824"]      
 description       | ["EnthÃ¤lt Druckb... 
 format            | 1 Mappe (10 Druck... 
 fulltext          | null                 
 id_intern         | 20150342             
 identifier        | [doi:10.3931/e-ra... 
 language          | German               
 publisher         | [Haller]             
 relation          | vignette : https:... 
 rights            | pdm                  
 source            | null                 
 subject           | Kaffeehandel         
 title             | [Kaffee-Verpackun... 
 type              | [Text, Book]         
 source_collection | E-Rara               
 locality          | [[]]                 
only showing top 1 row



In [25]:
df_final.printSchema()

root
 |-- contributor: string (nullable = true)
 |-- coverage: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- fulltext: string (nullable = true)
 |-- id_intern: string (nullable = true)
 |-- identifier: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- language: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- rights: string (nullable = true)
 |-- source: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- title: string (nullable = true)
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- source_collection: string (nullable = false)
 |-- locality: string (nullable = true)



# Ingest to Elasticsearch

In [23]:
# Install the Elasticsearch APIs
!pip install elasticsearch
!pip install elasticsearch-dsl
import elasticsearch
import elasticsearch_dsl

Collecting elasticsearch
  Downloading elasticsearch-7.15.0-py2.py3-none-any.whl (378 kB)
[K     |████████████████████████████████| 378 kB 26.1 MB/s eta 0:00:01
Installing collected packages: elasticsearch
Successfully installed elasticsearch-7.15.0
Collecting elasticsearch-dsl
  Downloading elasticsearch_dsl-7.4.0-py2.py3-none-any.whl (63 kB)
[K     |████████████████████████████████| 63 kB 4.0 MB/s  eta 0:00:01
Installing collected packages: elasticsearch-dsl
Successfully installed elasticsearch-dsl-7.4.0


In [24]:
# Configure the basic API with a Elasticsearch client
from elasticsearch import Elasticsearch
es = Elasticsearch("elasticsearch-1")
es.info()

{'name': 'elasticsearch-1',
 'cluster_name': 'docker-cluster',
 'cluster_uuid': 'dfUUhrhyTGqWE_tW3ciIMg',
 'version': {'number': '7.10.1',
  'build_flavor': 'default',
  'build_type': 'docker',
  'build_hash': '1c34507e66d7db1211f66f3513706fdf548736aa',
  'build_date': '2020-12-05T01:00:33.671820Z',
  'build_snapshot': False,
  'lucene_version': '8.7.0',
  'minimum_wire_compatibility_version': '6.8.0',
  'minimum_index_compatibility_version': '6.0.0-beta1'},
 'tagline': 'You Know, for Search'}

In [None]:
# Create explicit mapping for the index
# See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/explicit-mapping.html
# and https://www.elastic.co/guide/en/elasticsearch/reference/7.x/mapping-params.html

In [26]:
%%bash
# Test standard analyzer
# Use private IP
curl -X POST "http://172.26.10.176:9200/_analyze?pretty" -H 'Content-Type: application/json' -d'
{
  "analyzer": "standard",
  "text": "D\u00fcrfte von Anfang. Ein anderer Hinweis als\nselbstverst\u00e4ndlich."
}
'

{
  "tokens" : [
    {
      "token" : "dürfte",
      "start_offset" : 0,
      "end_offset" : 6,
      "type" : "<ALPHANUM>",
      "position" : 0
    },
    {
      "token" : "von",
      "start_offset" : 7,
      "end_offset" : 10,
      "type" : "<ALPHANUM>",
      "position" : 1
    },
    {
      "token" : "anfang",
      "start_offset" : 11,
      "end_offset" : 17,
      "type" : "<ALPHANUM>",
      "position" : 2
    },
    {
      "token" : "ein",
      "start_offset" : 19,
      "end_offset" : 22,
      "type" : "<ALPHANUM>",
      "position" : 3
    },
    {
      "token" : "anderer",
      "start_offset" : 23,
      "end_offset" : 30,
      "type" : "<ALPHANUM>",
      "position" : 4
    },
    {
      "token" : "hinweis",
      "start_offset" : 31,
      "end_offset" : 38,
      "type" : "<ALPHANUM>",
      "position" : 5
    },
    {
      "token" : "als",
      "start_offset" : 39,
      "end_offset" : 42,
      "type" : "<ALPHANUM>",
      "position" : 6
    },
    {


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1271  100  1154  100   117   160k  16714 --:--:-- --:--:-- --:--:--  177k


In [27]:
# Ingest to Elasticsearch as JSON
# https://www.elastic.co/guide/en/elasticsearch/hadoop/7.x/spark.html
# See settings https://www.elastic.co/guide/en/elasticsearch/hadoop/7.x/configuration.html
from pyspark.sql import SQLContext

esconf={}
esconf["es.resource"] = "index/erara"         # format: index/type
esconf["es.mapping.id"] = "id_intern"         # if 'id_intern' should be used as global ID in ES -> updating!
esconf["es.nodes"] = "elasticsearch-1"
# OR
#esconf["es.nodes"] = IP                        # default: 'localhost'
#esconf["es.port"] = "9200" 

#esconf["es.mapping.include"] = None             # fields to include with ingest
#esconf["es.mapping.exclude"] = None             # fields to exclude with ingest

#esconf["es.read.field.as.array.exclude"] (default empty)   # Fields/properties that should NOT be considered as arrays/lists
#esconf["es.read.field.as.array.include"] = nested.bar:3    # maps nested.bar as a 3-level/dimensional arra

esconf["es.write.operation"] = "upsert"
#index (default): new data is added while existing data (based on its id) is replaced (reindexed). 
#create: adds new data - if the data already exists (based on its id), an exception is thrown. 
#update: updates existing data (based on its id). If no data is found, an exception is thrown. 
#upsert: known as merge or insert if the data does not exist, updates if the data exists (based on its id). 

df_final.write.format("org.elasticsearch.spark.sql") \
                    .options(**esconf).save()     #for adding fields: .mode('append')

In [282]:
# Short form
from pyspark.sql import SQLContext

df_final.write.format("org.elasticsearch.spark.sql") \
    .option("es.resource", "index/eperiodica") \
    .option("es.mapping.id", "id_intern") \
    .option("es.write.operation", "index") \
    .option("es.nodes", "elasticsearch-1").save()

# Query Elasticsearch

In [6]:
es.cluster.health(wait_for_status='yellow', request_timeout=1)   # 6 shards are default

{'cluster_name': 'docker-cluster',
 'status': 'yellow',
 'timed_out': False,
 'number_of_nodes': 1,
 'number_of_data_nodes': 1,
 'active_primary_shards': 7,
 'active_shards': 7,
 'relocating_shards': 0,
 'initializing_shards': 0,
 'unassigned_shards': 1,
 'delayed_unassigned_shards': 0,
 'number_of_pending_tasks': 0,
 'number_of_in_flight_fetch': 0,
 'task_max_waiting_in_queue_millis': 0,
 'active_shards_percent_as_number': 87.5}

In [11]:
# Whole mapping of index
! curl -X GET "http://172.26.6.171:9200/index_1/_mapping?pretty" > mapping_index.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2324  100  2324    0     0  1134k      0 --:--:-- --:--:-- --:--:-- 1134k


In [10]:
# Search on field of index + type via REST API
# Use private IP

#! curl -GET "http://172.26.6.171:9200/index_1/eperiodica/_search?q=title:aare&pretty"

In [39]:
# Simple search via DSL API
from elasticsearch_dsl import Search

s = Search().using(es).query("match", publisher="verein")    # fulltext="bern"
response = s.execute()
print(response, "\n")
for hit in s:
    print(hit.title, '\n\n', hit.publisher)

<Response: [<Hit(index_1/pB-tT3wBsgv0SeAE1D_2): {'creator': 'Singeisen, Raphael', 'date': '2018', 'format': ...}>]> 

Aus Wald wird Ackerland : Kriegsrodungen im Kanton Bern 1941-1946 : ein umstrittenes Kapitel der Anbauschlacht 

 Historischer Verein des Kantons Bern


In [43]:
s = Search()\
        .using(es)\
        .query("match", fulltext="frau")\
        .extra(track_total_hits=True)

response = s.execute()
print('Total %d hits found.' % response.hits.total.value)

h = response.hits[0]
print('/%s/%s/%s returned with score %f' % (
    h.meta.index, h.meta.doc_type, h.meta.id, h.meta.score))

Total 213 hits found.
/index_1/eperiodica/hB-tT3wBsgv0SeAE1D-A returned with score 2.094948


In [46]:
print(response.success())
print(response.took)
print(response.hits.total.relation)
print(response.hits.total.value)

True
6
eq
213


In [45]:
for hit in s[0]:
    print(hit.meta)
    print(hit.meta.score)
    print(hit.title)
    print(hit.creator)
    print(hit.id_intern)
    print(hit.localities)

{'index': 'index_1', 'id': 'hB-tT3wBsgv0SeAE1D-A', 'score': ...}
2.0949483
Fremder Besuch im Oberland um 1810 : kleine Hörszene von Christian Lerch, gesendet von Radio Bern im Mai 1940
Lerch, Christian
zgh-001:1944:6::275
[[Pfarrhaus Lauterbrunnen, Ahlfeld, Bern, Kuli, Schweizerisch, Ahlf, you know, Europa, Schweiz, Schweiz, Ahlf, Seje cha nid, Lippe, Überhoupt, Staubbach, Staubbach, Berg, Ahlf, begei¬, Ahlf, Scotland, Ahlf, ge¬, Ahlf Huuh, Ahlf, Bergabgehen, Ahlf, Ahlf, Oberland, Hörszene]]


In [54]:
s = Search().using(es)\
        .query("multi_match", query='winterthur', fields=['title', 'localities'])
#s = s.extra(explain=True)

response = s.execute()
for hit in s:
    print(hit.title)
    print(hit.localities)
    print('---')

Die Zwangsanleihen Massenas : ein Bild aus dunkler Zeit
[[Frankreich, Frankreich, Schweiz, Frankreich, Glarus, Gotthard, Zürich, Österreicher, Stadt, Russe, Österreicher, Waldstätten, Linth, Paris, Stadt Zürich, Zürich, Frankreich, Massena, Stadt Zürich, St. Galle, Zürich, Bürgerschaft, Zürich, Stadt, Zürich, Zürich, Stadt Zürich, Basel, Stadt Basel, Stadt Basel, Stadt Winterthur, Winterthur, Basel, Frankreich, Zürich, Winterthur, Basel, Bern, Schweiz, Frankreich, Binningen, Basler, Stadt, Stadt Basel, Helvetiens, Basel, Stadt Basel, Basel]]
---
Der Hafner Heinrich Hess uns sein Hand- und Hausbuch : ein Beitrag zur Geschichte der stadtbernischen Hafnerei am Ende des 17. Jahrhunderts. Teil 2
[[Biel, Basel, Johannesen, Zürich, Zürich, Züricher, Zürich, Zürich, Zollikon Zürich, Winterthur, Schiben, Reutlinger, Jegenstorf, Winterthur, gsin, gsin, Brugg, Jegenstorf, Schneckenhäfeli, bruni, gsin, Weinachten, Burgdorf, Burdlofer Heiri, Lichtmeß, Lichtmeß, Giben, Burgdorf, Burgdorf, Badhaus, B

In [48]:
## Search Elasticsearch API via "scan"

In [34]:
# Simple search via Elasticsearch API  and 'helpers' module
from elasticsearch.helpers import scan

search = scan(es,
    query={"query": {"match": {"title": "biel"}}},
    index="index_1"         
)
[[i['_source']['title'], i['_source']['localities']] for i in search]

[['Die Apotheker der Stadt Biel : Vortrag gehalten an der Jahresversammlung des Kant. Apothekervereins in St. Immer, 10. November 1957',
  "[[Schweiz, Basel, Apothekenwesen, Biel, Schweiz, Biel, Apothekerwesens, Stadtarchivs, Säßhaus, Graubünden, Hintersäßen, Biel, Bieler, Bern, Bieler, Biel, nisgeld, Wyttenbach, Bern, Stadt Bern, Corallen-, Nördlingen, Biel, Biel, Berner, Biel, Biel, Land, Bieler, Venneramt, Armengutes, Darmstadt, Ritter'sche Apotheke, Burgplatz, Stadt, Chorgerichtshandels, Provisorenstelle, Eckhaus Ring, Franzose, Biel, Haus der obern Apotheke, Biel, Bern, Windsheim, Biel, Bern, Bartolomäus Knecht, Haus der Bäcker Müller der Schmiedengasse, Bacharach, Nidau, Biel, Bern, Bern, Knecht'schen Apotheke Bern, Leipzig, Provisorenstellung, Bern, Haus der Schmiedengasse, Willstädt, Heidenheim, Biel, Basel, Brotschal, Lausanne, Württemberger, Glarus, Bern, Brennkirschen, England, Stadt, Frankreich, Biel, Biel, Basel, Neuenburg, Biel, Zürich, Jurahöhen, Orbe, Lausanne, Genf, Ju