In [None]:
# Ref: https://elasticsearch-py.readthedocs.io/en/v8.15.1/api/elasticsearch.html#elasticsearch.client.Elasticsearch.bulk
import os
import sqlite3

from elasticsearch import Elasticsearch, helpers

SQLITE_DB_PATH = f"/home/user/nbgrader/{os.environ['MOODLECOURSE']}/exec_history.db"
SQLITE_QUERY = "SELECT * FROM log;"
ES_HOST = "http://localhost:9200" # ElasticSearchのURL
ES_INDEX = f"mcjcloudhub-{os.environ['MOODLECOURSE']}-logs" # インデックス名

conn = sqlite3.connect(SQLITE_DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(SQLITE_QUERY)

rows = cur.fetchall()
print(f"{len(rows)} rows fetched from SQLite")

es = Elasticsearch(ES_HOST)

mapping = {
    "mappings": {
        "properties": {
            "log_start": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss.SSSSSS"},
            "log_end":   {"type": "date", "format": "yyyy-MM-dd HH:mm:ss.SSSSSS"},
        }
    }
}

# 既存インデックスがある場合は削除して再作成
if es.indices.exists(index=ES_INDEX):
    es.indices.delete(index=ES_INDEX)

es.indices.create(index=ES_INDEX, body=mapping)

def generate_actions(rows):
    for row in rows:
        doc = dict(row)
        yield {
            "_op_type": "create",
            "_index": ES_INDEX,
            "_id": doc.get("id"),
            "_source": doc,
        }

helpers.bulk(es, generate_actions(rows))
print("Data successfully sent to Elasticsearch")

conn.close()
