### Overview

While Siren has many unique ways of ingesting data, the [bulk ingest](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) endpoints work just the same as vanilla ES.

The University of Prague maintains a fascinating set of [relational datasets](https://relational.fit.cvut.cz/) that should be perfect for demos of Siren's relational database capabilities.  For the most part, they are accessible by connecting to their public MariaDB `mysql+pymysql://guest:relational@relational.fit.cvut.cz:3306/<table>`.  In theory Siren should also be able to pull in data as a `Datasource` / `Virtual Index` via a `jdbc` connection (https://relational.fit.cvut.cz/about) but I wasn't able to get that to work myself in the Siren demo container.  It is easy enough to just write data to ES.

#### Notebook setup

In [None]:
%load_ext nb_black

In [None]:
import warnings

# Stop the flood of insecure request warnings
warnings.filterwarnings("ignore")

In [None]:
from tqdm.auto import tqdm

### Pull data

In [None]:
DATABASE = "ccs"

In [None]:
import sqlalchemy as sa
import sqlalchemy.orm

user = "guest"
password = "relational"

engine = sa.create_engine(
    f"mysql+pymysql://{user}:{password}@relational.fit.cvut.cz:3306/{DATABASE}"
)

SessionLocal = sa.orm.sessionmaker(bind=engine)
session = SessionLocal()
session

In [None]:
meta = sa.MetaData()
meta.reflect(bind=engine)
meta

In [None]:
tables = meta.sorted_tables
tables

In [None]:
[t.name for t in tables]

In [None]:
for table in tables:
    print(table.name, session.query(table).count())

In [None]:
table = tables[0]
item = session.query(table).first()
dict(item)

### Preview data

In [None]:
for table in tables:
    print(table.name)
    row = session.query(table).first()
    if row:
        print(dict(row))
    else:
        print("No data")
    print("")

### Write to Siren

In [None]:
from elasticsearch import Elasticsearch

es = Elasticsearch(
    "https://siren:9220", http_auth=("sirenadmin", "password"), verify_certs=False
)
es.info()

In [None]:
from elasticsearch.helpers import bulk
import datetime
import decimal


def clean_row(row):
    "Convert sqlalchemy Date, Datetime, and Decimal objects to json serializable things"
    d = dict(row)
    for k, v in d.items():
        if isinstance(v, (datetime.date, datetime.time)):
            d[k] = v.isoformat()
        elif isinstance(v, decimal.Decimal):
            d[k] = float(v)
    return d


for table in tqdm(tables):
    idx = f"{DATABASE}-{table}".lower()  # es indices must be lowercase
    # drop and rewrite any existing tables
    if es.indices.exists(idx):
        es.indices.delete(idx)
    data = [clean_row(row) for row in session.query(table).all()]
    output = bulk(index=idx, client=es, actions=data, stats_only=True)
    # output is (<n success written>, <n failed written>)
    print(idx, output)