**<h3 style="text-align: center;">INGEST DATA FROM ITS</h3>**

In [1]:
import pandas as pd
import requests
import os
from dotenv import load_dotenv
import json
import psutil
import csv
from datetime import datetime
import urllib3  
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # shut down warnings

In [2]:
os.environ["OMP_NUM_THREADS"] = "8"
os.environ["OPENBLAS_NUM_THREADS"] = "8"
os.environ["MKL_NUM_THREADS"] = "8"
os.environ["NUMEXPR_NUM_THREADS"] = "8"
os.environ["VECLIB_MAXIMUM_THREADS"] = "8"

try:
    p = psutil.Process()
    p.cpu_affinity([0, 1, 2, 3, 4, 5, 6, 7]) 
    print("Limit core and thread sucessful.")
except Exception as e:
    print(f"Error while litmiting resources: {e}")

load_dotenv()
API_KEY_ELASTIC=os.getenv("ELASTIC_KEY")

Limit core and thread sucessful.


In [None]:
import os
import requests
import pandas as pd
import csv

def fetch_and_save_elastic_data(
    index_name,
    start_query,
    end_query,
    event_action,
    event_module,
    output_file,
    api_key,
):
    url = f'https://103.9.206.216:9200/{index_name}/_search?scroll=2m'
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'{api_key}'
    }

    query = {
        "size": 10000,
        "sort": ["@timestamp"],
        "_source": True,
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": start_query,
                                "lt": end_query,
                                "time_zone": "+07:00"
                            }
                        }
                    },
                    {
                        "match": {
                            "event.action": event_action
                        }
                    },
                    {
                        "match": {
                            "event.module": event_module
                        }
                    },
                    {
                        "exists": {
                            "field": "user.id"
                        }
                    }
                ]
            }
        }
    }

    # first request
    try:
        response = requests.post(url, headers=headers, json=query, verify=False)
        response.raise_for_status()
        data = response.json()
    except Exception as e:
        print("Initial fetch error:", e)
        return

    hits = data.get('hits', {}).get('hits', [])
    scroll_id = data.get('_scroll_id')

    if os.path.exists(output_file):
        os.remove(output_file)

    def write_batch_to_csv(hits, write_header=False):
        records = [hit['_source'] for hit in hits]
        if not records:
            return
        try:
            df = pd.json_normalize(records)

            def safe_format(x):
                if isinstance(x, list):
                    return ','.join(map(str, x))
                elif isinstance(x, dict):
                    return str(x)
                return x

            df = df.applymap(safe_format)

            # write file, prevent error format
            df.to_csv(output_file,mode='a',index=False,header=write_header,encoding='utf-8',quoting=csv.QUOTE_ALL,lineterminator='\n')
        except Exception as e:
            print("Write CSV error:", e)

    write_batch_to_csv(hits, write_header=True)

    scroll_url = 'https://103.9.206.216:9200/_search/scroll'
    while True:
        scroll_payload = {
            "scroll": "2m",
            "scroll_id": scroll_id
        }
        try:
            response = requests.post(scroll_url, headers=headers, json=scroll_payload, verify=False)
            response.raise_for_status()
            data = response.json()
        except Exception as e:
            print("Scroll fetch error:", e)
            break

        hits = data.get('hits', {}).get('hits', [])
        if not hits:
            break

        write_batch_to_csv(hits, write_header=False)
        scroll_id = data.get('_scroll_id')

    try:
        requests.delete(
            "https://103.9.206.216:9200/_search/scroll",
            headers=headers,
            json={"scroll_id": [scroll_id]},
            verify=False
        )
    except:
        pass

    print(f"✅ Done writing '{event_action}' to {output_file} !")


In [4]:
start_query = "2025-05-15T00:00:00" # from 17h 14/5/2025 to 17h 15/6/2025
end_query = "2025-06-15T23:59:59"

In [5]:
def sanitize_filename(ts: str) -> str:
    return ts.replace(":", "-").replace("T", "_")

In [None]:
fetch_and_save_elastic_data(
    index_name="event_gamo_m952",
    start_query=start_query,
    end_query=end_query,
    event_action="its_login",
    event_module="sources",
    output_file=f"data/m952_login_{sanitize_filename(start_query)}_to_{sanitize_filename(end_query)}.csv",
    api_key=API_KEY_ELASTIC,
)

  df = df.applymap(safe_format)


In [None]:
fetch_and_save_elastic_data(
    index_name="event_gamo_m952",
    start_query=start_query,
    end_query=end_query,
    event_action="its_purchase",
    event_module="sources",
    output_file=f"data/m952_purchase_{sanitize_filename(start_query)}_to_{sanitize_filename(end_query)}.csv",
    api_key=API_KEY_ELASTIC,
)

Done writing 'its_purchase' to data/m952_purchase_2025-05-15_00-00-00_to_2025-06-15_23-59-59.csv !


In [None]:
fetch_and_save_elastic_data(
    index_name="event_gamo_m952",
    start_query=start_query,
    end_query=end_query,
    event_action="its_equipenhance",
    event_module="sources",
    output_file=f"data/m952_equipenhance_{sanitize_filename(start_query)}_to_{sanitize_filename(end_query)}.csv",
    api_key=API_KEY_ELASTIC,
)

Done writing 'its_equipenhance' to data/m952_equipenhance_2025-05-15_00-00-00_to_2025-06-15_23-59-59.csv !
