# Importação de bibliotecas

In [10]:
from ftplib import FTP
import os
import pandas as pd

from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, streaming_bulk
from elasticsearch.exceptions import TransportError
import json
import codecs
import pprint
import glob
import urllib3

In [11]:
database = 'SIHSUS'

region = 'rj'
year = 2009
month = 10

# Elastic Search

## Variaveis e parâmetros do Elastic Search

In [12]:
# ElasticSearch settings
ES_CLUSTER = ['http://localhost:32787']
ES_INDEX = 'datasus'
ES_TYPE = 'SIHSUS'

elastic_search_client = Elasticsearch(ES_CLUSTER)

## Verificar se Index existe e criar caso nao exista

In [13]:
# Criando o indice para ser utilizado, (TODO ->)caso exista, e deseje deletar, 
# entrar no console do kibana em Management, selecionar o index e clicar 
# em manage index e deletar index e rodar novamente
# Importante para primeiro acesso
def create_index_or_recreate(elastic_search_client, index):
    create_index_body = {
      'settings': {
        # just one shard, no replicas for testing
        'number_of_shards': 1,
        'number_of_replicas': 0
      }
    }

    # create empty index
    try:
        elastic_search_client.indices.create(
            index=index,
            body=create_index_body
        )
    except TransportError as e:
        # ignore already existing index
        if e.error == 'index_already_exists_exception':
            print('index - index alread_exists')

            pass
        else:
            raise

## Ajuste de dados para indexacao

In [14]:
create_index_or_recreate(elastic_search_client, ES_INDEX)

#

In [15]:
# Normalizar as chaves para letras minusculas pq o elastic search eh case sensitive
# Isso evita duplicatas  erradas (Mes == mes)
def normalize(data, ES_INDEX, ES_TYPE):
    data = data.to_dict('records') 
    
    for row in  data:
        yield { "_index": ES_INDEX,
               "_type": ES_TYPE,
               "_source":{str(k).lower(): v for k, v in row.items()}}

In [16]:
def get_dataframe_from_datasus_by(region, year, month):
    d = [{'col1' : 1, 'col2': 2}, {'col1' : 3, 'col2': 4}]
    df = pd.DataFrame(data=d)
    
    return df

In [17]:
# carregar o dado no ELK em streaming
def load_repo(elastic_search_client, data, ES_INDEX, ES_TYPE):

    # quantidade de itens processados com sucesso e falha
    success, failed = 0, 0
    
    # list of errors to be collected is not stats_only
    errors = []
    
    # se true somente printar a informacao do erro e contar as  falhas
    # se false, acumular os erros na lista criada acima
    stats_only = True
    
    pprint.pprint(data)
    pprint.pprint('for a do streaming ' + ES_TYPE)

    # we let the streaming bulk continuously process the commits as they come
    # in - since the `parse_commits` function is a generator this will avoid
    # loading all the commits into memory
    for ok, result in streaming_bulk(
            elastic_search_client,
            data,
            index=ES_INDEX,
            doc_type=ES_TYPE
        ):

        action, result = result.popitem()
        doc_id = '/%s/doc/%s' % (ES_INDEX, result['_id'])
        # process the information from ES whether the document has been
        # successfully indexed
        if not ok:
            if not stats_only:
                errors.append(result)
            print('Failed to %s document %s: %r' % (action, doc_id, result))
            failed += 1
        else:
            print(doc_id)
            success += 1
    return success, failed, errors
    


In [18]:
def run(elastic_search_client, region, year, month):
    data = get_dataframe_from_datasus_by(region, year, month)
    
    # garantindo que poderemos rastrear os dados 
    data['_region'] = region
    data['_year'] = year
    data['_month'] = month
    
    data = normalize(data, ES_INDEX, ES_TYPE)

    success, failed, errors = load_repo(elastic_search_client, data, ES_INDEX, ES_TYPE)
    pprint.pprint('Sucess quantity: ' + str(success))
    pprint.pprint('Failed quantity : ' + str(failed))
    pprint.pprint('Errors : ' + str(errors))

In [19]:
    csv_file = '/Users/rafaelstomaz/.tmpEtlDATASUS//DNBA2015.csv'

In [20]:
file_name = csv_file.split('/')[-1]
region = file_name[2:4]
year = file_name[4:8]
year

'2015'

In [21]:
def run2(elastic_search_client, csv_file):
    file_name = csv_file.split('/')[-1]
    region = file_name[2:4]
    year = file_name[4:8]
    
    data = pd.read_csv(csv_file)
    data = data.where(pd.notnull(data), None)
    
    # garantindo que poderemos rastrear os dados 
    data['_region'] = region
    data['_year'] = year
    #data['_month'] = month
    
    data = normalize(data, ES_INDEX, ES_TYPE)

    success, failed, errors = load_repo(elastic_search_client, data, ES_INDEX, ES_TYPE)
    pprint.pprint('Sucess quantity: ' + str(success))
    pprint.pprint('Failed quantity : ' + str(failed))
    pprint.pprint('Errors : ' + str(errors))

In [23]:
run2(elastic_search_client, csv_file)

FileNotFoundError: [Errno 2] File /Users/rafaelstomaz/.tmpEtlDATASUS//DNBA2015.csv does not exist: '/Users/rafaelstomaz/.tmpEtlDATASUS//DNBA2015.csv'