In [None]:

import os
import glob
import json
import fastparquet
import pandas as pd
from elasticsearch import helpers
from elasticsearch import Elasticsearch

In [None]:

system_path_separator = os.path.sep
index_name = 'ads-details'
base_path = os.path.abspath('/opt/data/mates_pbd')
process_folder = "sahibinden_ads_details"
template_name = 'ads-details-template'
template_pattern = "ads-details-*"
index_mapping = {
    "@timestamp": {"type": "date"},
    "folder_index": {"type": "keyword"},
    "external_id": {"type": "keyword"},
    "post_date": {"type": "date"},
    "make": {"type": "keyword"},
    "model": {"type": "keyword"},
    "trimlevel": {"type": "keyword"},
    "car_year": {"type": "integer"},
    "fueltype": {"type": "keyword"},
    "transmission": {"type": "keyword"},
    "km": {"type": "integer"},
    "bodytype": {"type": "keyword"},
    "horse_power": {"type": "keyword"},
    "cyl_capacity": {"type": "keyword"},
    "traction": {"type": "keyword"},
    "price": {"type": "integer"},
    "warranty": {"type": "keyword"},
    "plate_type": {"type": "keyword"},
    "ad_type": {"type": "keyword"},
    "video_call_visible": {"type": "keyword"},
    "status": {"type": "keyword"},
    "seller": {"type": "keyword"},
    "dealer_name": {"type": "keyword"},
    "car_location": {"type": "keyword"},
    "trade_in_flag": {"type": "boolean"},
    "region": {"type": "keyword"},
    "car_name": {"type": "keyword"},
    "url": {"type": "keyword"},
    "seller_gallery_url": {"type": "keyword"},
    "seller_phone_number": {"type": "keyword"},
    "damages": {"type": "byte"},
    "color": {"type": "keyword"},
    "misc": {"type": "byte"}
}

### Conexion Elastic

In [None]:
es = Elasticsearch(hosts="http://localhost:9200")

### Configuración Indice

In [None]:
# Create index
index_config = {
    "settings": {
        "index": {
            "refresh_interval": "5s"
        },
        "number_of_shards": 1,
        "number_of_replicas": 0,
    },
    "mappings": {
        "properties": index_mapping
    }
}

# BORRAR INDICE!!!
# if es.indices.exists(index=index_name):
#     es.indices.delete(index=index_name)

# Crear el índice
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=index_config)


### Proceso de carga de datos

In [None]:
failed_files = []

In [None]:
# Insertar los datos mediante una funcion generadora (para no cargar todo en memoria)
def generator(df, folder_name):
    for index, row in df.iterrows():
        yield {
            "_index": index_name,
            "_source": row.to_dict()
        }


# Recorrer los archivos recursivamente
for filename in glob.iglob(os.path.join(base_path, process_folder, '**/*.parquet'), recursive=True):


    try:
        #
        parquet_folder = os.path.dirname(filename).split(system_path_separator)[-1]
        parquet_date = os.path.dirname(filename).split(system_path_separator)[-2]

        # Solo cargar nuevos
        # if int(parquet_date) < 20230300:
        #     continue


        # Comprobar si ya se ha cargado el fichero
        query = {
            "query": {
                "match": {
                    "folder_index": {
                        "query": parquet_date+"_"+parquet_folder, # 20220211_46c09720-bee9-4da1-a9d2-8b062b5d151c
                        "auto_generate_synonyms_phrase_query": False,
                        "fuzziness": 0
                    }
                },
            },
        }
        res = es.search(index=index_name, body=query,)
        if res['hits']['total']['value'] > 0:
            # print(f'Already loaded {filename}')
            found = False
            for hit in res['hits']['hits']:
                if hit['_source']['folder_index'] == parquet_date+"_"+parquet_folder:
                    found = True
                    break
            if found:
                print(f'Already loaded {filename}')
                continue

        # Cargar el fichero parquet
        pf = fastparquet.ParquetFile(filename)
        df = pf.to_pandas()
        # Parse scrapping_datetime a datetime (2023-02-02 19:44:16)
        df['scrapping_datetime'] = pd.to_datetime(df['scrapping_datetime'], format='%Y-%m-%d %H:%M:%S')
        # Renombrear scrapping_datetime a @timestamp
        df = df.rename(columns={'scrapping_datetime': '@timestamp'})

        # Eliminar columnas que no se van a usar
        if 'crawling_job_id' in df.columns:
            df = df.drop(columns=['crawling_job_id'])
        if 'crawling_job_datetime' in df.columns:
            df = df.drop(columns=['crawling_job_datetime'])
        if 'crawling_job_date' in df.columns:
            df = df.drop(columns=['crawling_job_date'])

        # Identificador del fichero
        df["folder_index"] = parquet_date+"_"+parquet_folder

        # Insertar el archivo en bulk
        helpers.bulk(es, generator(df, parquet_folder))
        print(f'Loaded {filename} With {len(df)} rows')

    except Exception as e:
        print(e)
        print('Error')
        failed_files.append(filename)
        # break


In [None]:

# Query data
import json
from elasticsearch import Elasticsearch

es = Elasticsearch(hosts="http://localhost:9200")

# Query
query = {
    "query": {
        "match_all": {}
    }
}

# Search
res = es.search(index=index_name, body=query)

# Print
print(res)