In [12]:
from datetime import datetime
import itertools
import time
import json
import os

## Função de Busca e Manipulção dos Arquivos JSON

In [13]:
def get_all_json_paths(data_path):
    """
		Recebe o path da pasta Data que contem todas as pastas de Json
        Retorna uma lista com os paths de todos os arquivos .json
        que não tem teste no nome
    """
    json_files = []

    with os.scandir(data_path) as days_data:
        for day_data in days_data:
            json_files = json_files + [day_data.path + '/' + hour_data_json for hour_data_json in os.listdir(day_data) if hour_data_json.endswith('.json') and not hour_data_json.startswith('teste')]

    return json_files

In [14]:
def convert_files_to_jsons(all_json_data_paths):
    """
		Recebe os paths dos jsons, abre os arquivos e os converte em listas de objeto JSON
        Retorna uma lista com todas as listas de JSONs extraídas de cada arquivo
    """
    jsons_list = []
    for json_data_path in all_json_data_paths:
        with open(json_data_path) as json_file:
            jsons_list.append(json.load(json_file))
    return jsons_list

In [15]:
def join_jsons_lists(json_lists):
    """
		Recebe uma lista de lista de objetos Json,
        Efetua o Join para concatenar as listas em uma lista só
        Retorna uma lista com todos os JSON
    """
    return list(itertools.chain.from_iterable(json_lists))

## Funções de Formatação dos JSONs

In [16]:
def format_location(data):
    """ 
		Recebe o Json e converte os campos latitude e longitude em
        objeto geometrico POINT do Postgis
    """

    # Convertendo , para . pois o Postgres trabalha com .
    if 'latitude' in data:
        data['latitude'] = float(data['latitude'].replace(',', '.'))
    if 'longitude' in data:
        data['longitude'] = float(data['longitude'].replace(',', '.'))
    if 'longitude' in data and 'latitude' in data:
        lat = data['latitude']
        lon = data['longitude']
        data['localizacao_wkt'] = f'POINT({lat} {lon})'

def format_veloc(data):
	""" 
		Recebe o Json e converte o campo velocidade em float
	"""
	if 'velocidade' in data:
		data['velocidade'] = float(data['velocidade'])

def format_timestamp(data):
    """ 
		Recebe o Json e converte os campos de data em datetime
	"""
    if 'datahora' in data:
        data['datahora'] = datetime.fromtimestamp(int(data['datahora']) / 1000) 
    if 'datahoraenvio' in data:
        data['datahoraenvio'] = datetime.fromtimestamp(int(data['datahoraenvio']) / 1000)  
    if 'datahoraservidor' in data:
        data['datahoraservidor'] = datetime.fromtimestamp(int(data['datahoraservidor']) / 1000)  

def format_jsons(json_list):
	""" 
		Recebe lista de jsons e faz as devidas formatações
        Retorna a lista com os Jsons formatados
	"""
	for single_json in json_list:
		format_location(single_json)
		format_veloc(single_json)
		format_timestamp(single_json)
	return json_list

## Funções de Geração das Queries de Insert

In [None]:
from psycopg2 import connect
from psycopg2.extras import Json
from datetime import datetime
from shapely.geometry import Point
from threading import Thread

def build_insert_args(data):
	"""
      Monta argumentos que serão appendados na query 
	"""
	ordem = data['ordem']
	linha = data['linha']
	velocidade = data['velocidade']
	localizacao = data['localizacao_wkt']
	datahora = data['datahora']
	datahoraenvio = data['datahoraenvio']
	datahoraservidor = data['datahoraservidor']
	final_args=(ordem, linha, velocidade, localizacao, datahora, datahoraenvio, datahoraservidor)
	return final_args

def chunks(lst, n):
    """Divide os argumentos em batches"""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def insert_in_database(list_args):
	""" 
		Recebe uma lista de argumentos de cada json e insere no banco em batches de 2000
	"""
	conn = connect(dbname='busInfo', user='bus_admin', password='bus_admin', host='localhost', port='5432')
	cursor = conn.cursor()

	for args in chunks(list_args, 2000):
		args_str = ','.join(cursor.mogrify("(%s, %s, %s, ST_GeomFromText(%s, 4326), %s, %s, %s)", x).decode("utf-8") for x in args)
		cursor.execute("INSERT INTO bus_location (ordem, linha, velocidade, localizacao, datahora, datahoraenvio, datahoraservidor) VALUES " + args_str)
		conn.commit()

	cursor.close()
	conn.close()

def threaded_insert(list_args):
    middle_index = len(list_args) // 2
    part1 = list_args[:middle_index]
    part2 = list_args[middle_index:]

    t1 = Thread(target=insert_in_database, args=(part1,))
    t2 = Thread(target=insert_in_database, args=(part2,))

    t1.start()
    t2.start()

    t1.join()
    t2.join() 

In [None]:
def build_insert_query(data):
	"""
		Recebe Json e cria a query de insert com os valores dos campos
	"""
	ordem = data['ordem']
	linha = data['linha']
	velocidade = data['velocidade']
	localizacao = data['localizacao_wkt']
	datahora = data['datahora']
	datahoraenvio = data['datahoraenvio']
	datahoraservidor = data['datahoraservidor']
	insert_query = f'INSERT INTO bus_location (ordem, linha, velocidade, localizacao, datahora, datahoraenvio, datahoraservidor) VALUES (\'{ordem}\', \'{linha}\', \'{velocidade}\', ST_GeomFromText(\'{localizacao}\', 4326), \'{datahora}\', \'{datahoraenvio}\', \'{datahoraservidor}\')'
	return insert_query

def generate_queries_list(json_formatted_list):
	""" 
		Recebe lista de Jsons já formatados
		Retorna a lista de argumentos do insert de cada json
	"""
	insert_queries = []
	for single_json in json_formatted_list:
		#insert_queries.append(build_insert_query(single_json))
		insert_queries.append(build_insert_args(single_json))
	return insert_queries

## Funções de Criação e Exclusão do Arquivo Final de Queries

In [19]:
import os
def clean_insert_file(file_path):
	""" 
		Recebe o caminho do arquivo .sql de inserts dos dados
		Apaga o arquivo caso exista
	"""
	if os.path.exists(file_path):
		os.remove(file_path)
		print(f"File '{file_path}' deleted successfully.")
	else:
		print(f"File '{file_path}' does not exist.")

def generate_insert_file(insert_queries, insert_file_path):
	""" 
		Recebe a lista de queries de insert e o caminho com o nome desejado do arquivo
		Insere as queries criadas no arquivo .sql
	"""
	with open(insert_file_path, "a") as f:
		for insert_query in insert_queries:
  			f.write(insert_query + ';\n')

## Main

In [20]:
def show_elapsed_time(start_time, method_name):
    """ 
		Recebe o tempo de inicio de execucao e o nome do método
        Imprime o nome do método e seu tempo total de execução em segundos
    """
    end_time = time.time()
    elapsed_time = end_time - start_time 
    print(method_name,"time:", str(elapsed_time), "secs")

In [None]:
def main():
	""" 
		Executa todo o passo de busca, formatação e criação dos arquivos de insert.
		Mostra o tempo de execução de cada etapa.
		Os arquivos gerado são encontrados na pasta de scripts.
	"""
	
	data_path = '../data/'
	insert_file_path = './scripts/04-insert_bus_data.sql'
	clean_insert_file(insert_file_path)
	
	start_time = time.time() 
	all_json_data_paths = get_all_json_paths(data_path) 
	show_elapsed_time(start_time, "get_all_json_paths")
	

	start_time = time.time()
	all_json_data = convert_files_to_jsons(all_json_data_paths)
	show_elapsed_time(start_time, "convert_files_to_jsons")

	start_time = time.time()
	json_data = join_jsons_lists(all_json_data)
	show_elapsed_time(start_time, "join_jsons_lists")

	start_time = time.time()
	json_data_formatted = format_jsons(json_data)
	show_elapsed_time(start_time, "format_jsons")

	start_time = time.time()
	insert_queries = generate_queries_list(json_data_formatted)
	show_elapsed_time(start_time, "generate_queries_list")
      
	start_time = time.time()
	#generate_insert_file(insert_queries, insert_file_path)
	#show_elapsed_time(start_time, "generate_insert_file")
	print("total args", len(insert_queries))
	threaded_insert(insert_queries)
	show_elapsed_time(start_time, "insert_in_database")

In [22]:
main()

File './scripts/04-insert_bus_data.sql' does not exist.
get_all_json_paths time: 0.0043065547943115234 secs
convert_files_to_jsons time: 84.0915629863739 secs
join_jsons_lists time: 9.808472156524658 secs
format_jsons time: 130.22779512405396 secs
generate_queries_list time: 122.01142859458923 secs
total args 16552806
insert_in_database time: 409.359011888504 secs
