# 1. Create Index

Terdapat penyesuaian mapping dari sistem existing, dengan tidak menyertakan field dalam bahasa inggris, karena model multilingual telah mampu mengatasi pencarian lintas bahasa. 

In [None]:
from elasticsearch import Elasticsearch

# Inisialisasi koneksi Elasticsearch
es = Elasticsearch("http://10.100.244.126:9200")

mapping = {
  "settings": {
    "analysis": {
      "filter": {
        "synonym_filter": {
          "type": "synonym",
          "synonyms_path": "analysis/synonyms.txt"
        }
      },
      "analyzer": {
        "synonym_analyzer": {
          "tokenizer": "standard",
          "filter": [
            "lowercase",
            "synonym_filter"
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id": {"type": "text"},
      "konten": {"type": "keyword"},
      "jenis": {"type": "keyword"},
      "judul": {
        "type": "text",
        "analyzer": "synonym_analyzer"
      },
      "deskripsi": {
        "type": "text",
        "analyzer": "synonym_analyzer"
      },
      "title_embeddings_384": {
        "type": "dense_vector",
        "dims": 384,
        "index": True
      },
      "mfd": {"type": "keyword"},
      "tgl_rilis": {
        "type": "date",
        "format": "yyyy-MM-dd||yyyy/MM/dd||dd-MM-yyyy||dd/MM/yyyy||yyyy-MM-dd HH:mm:ss||epoch_millis"
      },
      "last_update": {
        "type": "date",
        "format": "yyyy-MM-dd||yyyy/MM/dd||dd-MM-yyyy||dd/MM/yyyy||yyyy-MM-dd HH:mm:ss||epoch_millis"
      },
      "source": {"type": "keyword"},
      "url": {"type": "keyword"}
    }
  }
}

# Nama indeks
index_name = "datacontent"

# Hapus indeks jika sudah ada
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)
    print(f"Indeks '{index_name}' lama dihapus.")

# Buat indeks baru dengan mapping
try:
    es.indices.create(index=index_name, body=mapping)
    print(f"Indeks '{index_name}' berhasil dibuat.")
except Exception as e:
    print("Gagal membuat indeks:", e)

# (Opsional) Verifikasi mapping
try:
    current_mapping = es.indices.get_mapping(index=index_name)
    print("Mapping saat ini:", current_mapping)
except Exception as e:
    print("Gagal mendapatkan mapping:", e)

# 2. Indexing

Inti perubahan indexing dari sistem existing adalah penambahan proses encoding text menjadi dense_vector dan menyimpan hasil encoding tersebut.
Pada contoh berikut, dilakukan indexing dari sumber webapi bps. Sumber data dapat disesuaikan dengan kondisi dan perubahan aktual di sistem produksi.

In [None]:
import pandas as pd
import json
import requests
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
from datetime import datetime
import logging
import base64
import time
import re
import math
import numpy as np
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import os

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class ElasticsearchIndexer:
    def __init__(self, es_host='http://127.0.0.1:9200', index_name='datacontent'):
        self.es = Elasticsearch([es_host])
        self.index_name = index_name
        self.batch_size = 1000  # Small for debugging
        self.api_key = '7a62d6af2de1e805bc5f44d8a0f6ad17'  # Replace with your BPS API key
        self.cache_path = "/cache"
        self.domain_cache = self._load_mfd_cache()
        self.mfd_cache = self._load_mfd_cache()
        self.embedding_model = SentenceTransformer('yahyaabd/allstats-search-mini-v1-1-mnrl-sts')  # 384-dim embeddings
        # self.embedding_model = SentenceTransformer('sentence-transformers/LaBSE')  # 384-dim embeddings
        self.domain_names = self.fetch_domain_names()  # Fetch domain names
        self.ensure_index()

    def ensure_index(self):
        """Ensure the Elasticsearch index exists with compatible mapping."""
        mapping = {
            "mappings": {
                "properties": {
                    "id": {"type": "keyword"},
                    "judul": {"type": "text"},
                    "deskripsi": {"type": "text"},
                    "title_embeddings_384": {
                        "type": "dense_vector",
                        "dims": 384,
                        # "dims": 768,
                        "index": True
                    },
                    "mfd": {"type": "keyword"},
                    "tgl_rilis": {
                        "type": "date",
                        "format":"yyyy-MM-dd||yyyy/MM/dd||dd-MM-yyyy||dd/MM/yyyy||yyyy-MM-dd HH:mm:ss||epoch_millis",
                        "ignore_malformed": True
                    },
                    "last_update": {
                        "type": "date",
                        "format": "yyyy-MM-dd||yyyy/MM/dd||dd-MM-yyyy||dd/MM/yyyy||yyyy-MM-dd HH:mm:ss||epoch_millis",
                        "ignore_malformed": True
                    },
                    "konten": {"type": "keyword"},
                    "jenis": {"type": "keyword"},
                    "source": {"type": "keyword"},
                    "url": {"type": "keyword"}
                }
            }
        }
        try:
            if not self.es.indices.exists(index=self.index_name):
                self.es.indices.create(index=self.index_name, body=mapping)
                logger.info(f"Created index {self.index_name}")
            else:
                logger.info(f"Index {self.index_name} already exists, skipping mapping update")
        except Exception as e:
            logger.error(f"Failed to create index {self.index_name}: {e}")

    def get_last_indexed_date(self):
        """Retrieve the last indexed date from Elasticsearch."""
        try:
            response = self.es.options(ignore_status=[404]).get(index=self.index_name, id='_metadata')
            if response.get('found'):
                return response['_source'].get('last_indexed_date')
        except Exception as e:
            logger.error(f"Error retrieving last indexed date: {e}")
        return None

    def save_last_indexed_date(self, date):
        """Save the last indexed date to Elasticsearch."""
        try:
            self.es.options(refresh=True).index(
                index=self.index_name,
                id='_metadata',
                body={'last_indexed_date': date}
            )
            logger.info(f"Updated last indexed date: {date}")
        except Exception as e:
            logger.error(f"Failed to save last indexed date: {e}")

    def clean_title(self, title):
        """Clean title for URL formatting."""
        if title is None or (isinstance(title, float) and math.isnan(title)):
            logger.warning("Title is None or NaN, returning empty string")
            return ""
        title = str(title).strip()
        if not title:
            logger.warning("Title is empty after conversion, returning empty string")
            return ""
        title = title.lower()
        title = re.sub(r'[^\w\s-]', '', title)
        return title.replace(' ', '-')[:100]  # Limit to 100 chars

    def encode_table_id(self, table_id, table_source):
        """Encode table ID with table source."""
        return base64.b64encode(f"{table_id}#{table_source}".encode()).decode()

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type(requests.RequestException),
        before_sleep=lambda retry_state: logger.warning(
            f"Retrying API call (attempt {retry_state.attempt_number}/3) due to {retry_state.outcome.exception()}"
        )
    )
    def fetch_api_response(self, url):
        """Fetch API response with retries."""
        logger.debug(f"Calling API: {url}")
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        logger.debug(f"API response status: {response.status_code}")
        return response.json()

    def _load_mfd_cache(self):
        if os.path.exists(self.cache_path):
            with open(self.cache_path, "r", encoding="utf-8") as f:
                return json.load(f)
        return {}

    def _save_mfd_cache(self):
        with open(self.cache_path, "w", encoding="utf-8") as f:
            json.dump(self.mfd_cache, f, ensure_ascii=False, indent=2)

    def mfdName(self, mfd):
        if mfd == "0000":
            return "https://www.bps.go.id"

        try:
            mfd = str(mfd).zfill(4)
        except:
            return None

        # Check from cache
        if mfd in self.mfd_cache:
            return self.mfd_cache[mfd]

        try:
            # Domain provinsi
            if mfd.endswith("00"):
                url = f"https://webapi.bps.go.id/v1/api/domain/type/prov/key/{self.api_key}/"
                resp = requests.get(url)
                if resp.status_code == 200:
                    data_list = resp.json().get("data", [])
                    if isinstance(data_list, list) and len(data_list) > 1:
                        for item in data_list[1]:
                            domain_id = str(item.get("domain_id", "")).zfill(4)
                            self.mfd_cache[domain_id] = item.get("domain_url")
                        self._save_mfd_cache()
                        return self.mfd_cache.get(mfd)

            # Domain kabupaten/kota
            else:
                prov = mfd[:2] + "00"
                url = f"https://webapi.bps.go.id/v1/api/domain/type/kabbyprov/key/{self.api_key}/prov/{prov}"
                resp = requests.get(url)
                if resp.status_code == 200:
                    data_list = resp.json().get("data", [])
                    if isinstance(data_list, list) and len(data_list) > 1:
                        for item in data_list[1]:
                            domain_id = str(item.get("domain_id", "")).zfill(4)
                            self.mfd_cache[domain_id] = item.get("domain_url")
                        self._save_mfd_cache()
                        return self.mfd_cache.get(mfd)

        except Exception as e:
            logger.error(f"Error in mfdName({mfd}): {e}")

        return None

    def fetch_domain_names(self):
        """Fetch domain names from BPS API for provinces and regencies/cities."""
        domain_names = {}
        try:
            # Fetch province domains
            prov_url = f"https://webapi.bps.go.id/v1/api/domain/type/prov/key/{self.api_key}/"
            prov_resp = self.fetch_api_response(prov_url)
            prov_data = prov_resp.get("data", [])
            if isinstance(prov_data, list) and len(prov_data) > 1:
                for item in prov_data[1]:
                    domain_id = str(item.get("domain_id", "")).zfill(4)
                    domain_name = item.get("domain_name", "")
                    if domain_name:
                        domain_names[domain_id] = domain_name

            # Fetch regency/city domains for each province
            for prov_id in domain_names.keys():
                if prov_id.endswith("00"):
                    kab_url = f"https://webapi.bps.go.id/v1/api/domain/type/kabbyprov/key/{self.api_key}/prov/{prov_id}"
                    kab_resp = self.fetch_api_response(kab_url)
                    kab_data = kab_resp.get("data", [])
                    if isinstance(kab_data, list) and len(kab_data) > 1:
                        for item in kab_data[1]:
                            domain_id = str(item.get("domain_id", "")).zfill(4)
                            domain_name = item.get("domain_name", "")
                            if domain_name:
                                domain_names[domain_id] = domain_name

            logger.info(f"Fetched {len(domain_names)} domain names")
        except Exception as e:
            logger.error(f"Failed to fetch domain names: {e}")
        return domain_names

    def test_concatenation(self, test_cases):
        """Test the concatenation logic with provided test cases."""
        results = []
        for title, domain_id, expected in test_cases:
            domain_name = self.domain_names.get(domain_id, "")
            combined_text = f"{str(title) if pd.notna(title) else ''} {domain_name}".strip()
            passed = combined_text == expected
            results.append({
                "title": title,
                "domain_id": domain_id,
                "combined_text": combined_text,
                "expected": expected,
                "passed": passed
            })
            logger.info(
                f"Test case: title='{title}', domain_id='{domain_id}', "
                f"combined_text='{combined_text}', expected='{expected}', passed={passed}"
            )
        return results

    def format_data(self, row, title_embedding, table_source="1"):
        """Format a DataFrame row for Elasticsearch indexing."""
        domain_id = row.get('domain_id', '0000')
        domain_url = self.mfdName(domain_id)
        encoded_table_id = self.encode_table_id(row['table_id'], table_source)
        cleaned_title = self.clean_title(row['title'])
        
        # Format last_update to match mapping (yyyy-MM-dd or None)
        updt_date = row.get('updt_date')
        formatted_date = None
        if updt_date and updt_date != '':
            try:
                parsed_date = pd.to_datetime(updt_date)
                if pd.notna(parsed_date):
                    formatted_date = parsed_date.strftime('%Y-%m-dd')
            except Exception as e:
                logger.warning(f"Failed to format date for table_id {row.get('id')}: {updt_date}, Error: {e}")

        formatted = {
            "corpus_id": str(row['corpus_id']),
            "id": str(row['table_id']),
            "judul": row['title'],
            "deskripsi": row['title'],
            "title_embeddings_384": title_embedding.tolist(),
            "url": f"{domain_url}/statistics-table/{table_source}/{encoded_table_id}/{cleaned_title}.html",
            "tgl_rilis": None,
            "last_update": formatted_date,
            "mfd": domain_id,
            "jenis": "statictable",
            "konten": "table",
            "source": table_source
        }
        logger.debug(f"Formatted document: {formatted}")
        return formatted

    def index_dataframe(self, df):
        """Index DataFrame data into Elasticsearch with concatenation checks."""
        start_time = time.time()
        success_count = 0
        failure_count = 0
        error_messages = []
        batch_data = []
        batch_titles = []
        latest_date = 0
        sample_combined_texts = []  # Store samples for inspection

        last_indexed_date = self.get_last_indexed_date()
        last_indexed_timestamp = pd.to_datetime(last_indexed_date).timestamp() if last_indexed_date else 0
        logger.info(f"Last indexed timestamp: {last_indexed_timestamp}")

        # Check for duplicate table_id
        duplicates = df['corpus_id'].duplicated().sum()
        logger.info(f"Duplicate table_id count: {duplicates}")

        for idx, row in df.iterrows():
            item_date = 0
            updt_date = row.get('updt_date')
            if updt_date and updt_date != '':
                try:
                    parsed_date = pd.to_datetime(updt_date)
                    if pd.notna(parsed_date):
                        item_date = parsed_date.timestamp()
                    else:
                        logger.warning(f"Invalid date format for table_id {row.get('table_id')}: {updt_date}")
                except Exception as e:
                    logger.warning(f"Failed to parse date for table_id {row.get('table_id')}: {updt_date}, Error: {e}")

            # Temporarily disable date filter
            logger.debug(f"Processing table_id: {row.get('table_id')}, updt_date: {updt_date}")
            domain_id = row.get('domain_id', '0000')
            domain_name = self.domain_names.get(domain_id, "")
            title = str(row['title']) if pd.notna(row['title']) else ""
            combined_text = f"{title} {domain_name}".strip()

            # Log the concatenated text for verification
            logger.debug(f"table_id: {row.get('table_id')}, title: '{title}', domain_name: '{domain_name}', combined_text: '{combined_text}'")

            # Validation: Warn if combined_text is empty or unexpected
            if not combined_text:
                logger.warning(f"Empty combined_text for table_id: {row.get('table_id')}, title: '{title}', domain_id: '{domain_id}'")
            elif not title:
                logger.warning(f"Title is empty for table_id: {row.get('table_id')}, domain_id: '{domain_id}', combined_text: '{combined_text}'")

            # Collect samples for inspection (first 5 rows)
            if idx < 5:
                sample_combined_texts.append({
                    "table_id": row.get('table_id'),
                    "title": title,
                    "domain_id": domain_id,
                    "domain_name": domain_name,
                    "combined_text": combined_text
                })

            batch_titles.append(combined_text)
            batch_data.append(row)
            latest_date = max(latest_date, item_date)

            if len(batch_data) >= self.batch_size:
                embeddings = self.embedding_model.encode(batch_titles, show_progress_bar=False)
                logger.debug(f"Generated embeddings for batch: shape={embeddings.shape}")
                
                formatted_batch = [
                    self.format_data(row, embedding, table_source="1")
                    for row, embedding in zip(batch_data, embeddings)
                ]
                success, errors = self.process_batch(formatted_batch)
                success_count += success
                failure_count += len(errors)
                error_messages.extend(errors)
                batch_data = []
                batch_titles = []

        if batch_data:
            embeddings = self.embedding_model.encode(batch_titles, show_progress_bar=False)
            logger.debug(f"Generated embeddings for final batch: shape={embeddings.shape}")
            formatted_batch = [
                self.format_data(row, embedding, table_source="1")
                for row, embedding in zip(batch_data, embeddings)
            ]
            success, errors = self.process_batch(formatted_batch)
            success_count += success
            failure_count += len(errors)
            error_messages.extend(errors)

        # Print sample combined texts for manual inspection
        logger.info("Sample concatenated texts (first 5 rows):")
        for sample in sample_combined_texts:
            logger.info(
                f"table_id: {sample['table_id']}, title: '{sample['title']}', "
                f"domain_id: '{sample['domain_id']}', domain_name: '{sample['domain_name']}', "
                f"combined_text: '{sample['combined_text']}'"
            )

        # Verify embeddings for a sample
        if sample_combined_texts:
            sample_text = sample_combined_texts[0]['combined_text']
            sample_embedding = self.embedding_model.encode([sample_text], show_progress_bar=False)[0]
            logger.info(
                f"Sample embedding verification: text='{sample_text}', "
                f"embedding_shape={sample_embedding.shape}, embedding_sample={sample_embedding[:5]}"
            )

        if success_count > 0 and latest_date > last_indexed_timestamp:
            latest_date_str = datetime.fromtimestamp(latest_date).strftime('%Y-%m-%d %H:%M:%S')
            self.save_last_indexed_date(latest_date_str)

        execution_time = time.time() - start_time
        result = (
            f"Indexing completed. "
            f"Success: {success_count}, Failed: {failure_count}, Time: {round(execution_time, 2)} seconds. "
            f"Errors: {'; '.join(error_messages) if error_messages else 'None'}"
        )
        logger.info(result)
        return result

    def process_batch(self, batch_data):
        """Process a batch of data for Elasticsearch bulk indexing."""
        actions = []
        for item in batch_data:
            action = {
                "update": {
                    "_index": self.index_name,
                    "_id": item["corpus_id"]
                }
            }
            actions.append(action)
            actions.append({
                "doc": item,
                "doc_as_upsert": True
            })
    
        bulk_payload = '\n'.join(json.dumps(action) for action in actions) + '\n'
        logger.debug(f"Bulk payload (first 2000 chars):\n{bulk_payload[:2000]}")
    
        logger.info(f"Sending {len(actions)//2} documents to Elasticsearch")
        success_count = 0
        error_messages = []
        try:
            response = self.es.options(request_timeout=30).bulk(body=bulk_payload)
            response_body = response.body
            logger.debug(f"Bulk response: {json.dumps(response_body, indent=2)}")
    
            if response_body.get('errors', False):
                for item in response_body.get('items', []):
                    if 'update' in item:
                        update = item['update']
                        if update.get('result') in ['created', 'updated']:
                            success_count += 1
                        elif 'error' in update:
                            error_msg = f"ID: {update['_id']}, Error: {json.dumps(update['error'])}"
                            logger.error(error_msg)
                            error_messages.append(error_msg)
                        else:
                            error_msg = f"ID: {update['_id']}, Unknown error: {json.dumps(update)}"
                            logger.error(error_msg)
                            error_messages.append(error_msg)
            else:
                success_count = len(response_body.get('items', []))
    
            logger.info(f"Bulk result: Success={success_count}, Errors={len(error_messages)}")
        except Exception as e:
            error_msg = f"Bulk indexing failed: {str(e)}"
            logger.error(error_msg)
            error_messages.append(error_msg)
            return 0, error_messages
    
        return success_count, error_messages

def main():
    csv_file = 'statictable-all-cleaned.csv'  # Replace with your CSV file path
    try:
        df = pd.read_csv(csv_file)
        df = df.drop_duplicates(subset=['title'])
        df = df.dropna(subset=['title'])  # Hapus baris dengan title NaN
        df = df[df['title'].str.strip() != ""]  # Hapus baris dengan title berupa string kosong setelah di-trim

        df['updt_date'] = pd.to_datetime(df['updt_date'], errors='coerce').astype(str).replace('NaT', '')
        df['table_id'] = df['table_id'].astype(str)
        df['domain_id'] = df['domain_id'].astype(str).str.zfill(4)  # Ensure domain_id is 4 digits
        logger.info(f"Loaded CSV with {len(df)} rows")
        logger.info(f"Unique domain_id values: {df['domain_id'].unique().tolist()}")
        print(df[['corpus_id', 'title', 'updt_date', 'domain_id']].head())
    except Exception as e:
        logger.error(f"Failed to load CSV: {e}")
        return

    indexer = ElasticsearchIndexer()

    # Run concatenation tests
    test_cases = [
        ("Statistik Penduduk", "1100", "Statistik Penduduk Aceh"),  # Normal case
        ("Data Ekonomi", "1101", "Data Ekonomi Simeulue"),         # Regency case
        ("", "1100", "Aceh"),                                     # Empty title
        (None, "1100", "Aceh"),                                   # None title
        ("Nasional", "0000", "Nasional"),                         # No domain_name
    ]
    test_results = indexer.test_concatenation(test_cases)
    print("Concatenation Test Results:")
    for result in test_results:
        print(
            f"title='{result['title']}', domain_id='{result['domain_id']}', "
            f"combined_text='{result['combined_text']}', expected='{result['expected']}', "
            f"passed={result['passed']}"
        )

    result = indexer.index_dataframe(df)
    print(result)

if __name__ == "__main__":
    main()