**Notebook para la conversión de db sqlite to pg**

Bloque 0: Inicializacion - Definición de variables

*Importacion de librerias*

In [1]:
import sqlite3
import pandas as pd
import psycopg2
import sqlalchemy as al
from configparser import ConfigParser

*Configuraciones*

In [2]:
# Funcion para la eExtraccion de parametros del archivo 'pg.ini' en secciones
def config(archivo='pg.ini', seccion='pg_server'):
    # Crear el parser y leer el archivo
    parser = ConfigParser()
    parser.read(archivo)
 
    # Obtener la sección de conexión a la base de datos
    result = {}
    if parser.has_section(seccion):
        params = parser.items(seccion)
        for param in params:
            result[param[0]] = param[1]
        return result
    else:
        raise Exception('Seccion {0} no encontrada en el archivo {1}'.format(seccion, archivo))

params_sqlite = config('pg.ini','sqlite') # Se extraen los parametros de la base SQlite de pg.ini
params_pg_server = config() # Se extraen los parametros del servidor postgres de pg.ini
params_newDB = config('pg.ini','pg_new_db') # Se extraen los parametros de la nueva base postgres de pg.ini
# No usar mayusculas para el nombre de la base de datos postgres

Bloque 1: Sqlite Dump Scheme and data to Pandas Dataframes

In [3]:
# Establece la conexion con una BDD SQlite
conn = sqlite3.connect(params_sqlite['sqlite_db'])

In [4]:
# Definicion de dataframe 'dbSchemaQueries' con sentencias SQL para la creacion del esquema de la BDD
dbSchemaQueries = pd.read_sql("select sql from sqlite_master", con=conn)
dbSchemaQueries = dbSchemaQueries.mask(dbSchemaQueries.eq(None)).dropna()

# Definicion de dataframe 'tableNames' con los nombres de las tablas
tableNames = pd.read_sql("SELECT name FROM sqlite_master WHERE (type='table')", con=conn)

# Definicion de 'dbData', lista que contendra multiples dataframes, cada uno con los datos de una tabla de la BDD
dbData = []
selectQuery = "SELECT * from "

# Iteracion en la que se carga 'dbData' con los datos de cada tabla de la BDD (un SELECT por tabla existente)
for i in range(len(tableNames)):
    currentTable = tableNames['name'][i]
    dbData.append(pd.read_sql_query(selectQuery + currentTable, con=conn))

In [5]:
# Cierra la conexion con una BDD SQlite
conn.close()

Bloque 2: Datatypes convertion

In [6]:
# Cambia los datos de las columnas con identificadores a su tipo respectivo (boolean, timestamp, money, etc)
for i in dbData:
    for columnName in i.columns: 
        print (columnName)
        if "(BL)" in columnName:
            i[columnName] = i[columnName].astype(bool)
            i.rename(columns = {columnName:columnName.replace('(BL)','')}, inplace = True)
print ('********************')
print ('********************')

for i in dbData:
    for columnName in i.columns: 
        print (columnName)
        
        #elif "date" in columnName:
        #    i[columnName] = pd.to_datetime(i[columnName])
            #print(i[col_name])


version_num
id
name
creation_date
final_date
notes
answer
username
id
name
id
name
id
name
id
username
password_hash
email
surnames
names
notes
is_admin(BL)
is_manager(BL)
last_access
soft_delete(BL)
id
name
location
notes
soft_delete(BL)
id
cod
name
description
amount_per_package
packaging
notes
soft_delete(BL)
category_id
id
org_name
cuit
contacts
notes
soft_delete(BL)
category_id
id
org_name
cuit
contacts
notes
soft_delete(BL)
category_id
id
cod
name
creation_date
final_date
status
delivery_status
payment_status
internal_status
notes
delivery_fees
is_archived(BL)
organization_id
id
item_id
warehouse_id
quantity
id
cod
name
lab_analysis_type
creation_date
final_date
status
delivery_status
internal_status
number_of_patients
price_per_patient
iva_per_patient
payment_method
payment_status
invoice_status
first_fee
payment_date
notes
other_fees
is_archived(BL)
organization_client_id
already_paid_amount
id
purchase_id
item_id
quantity
notes
is_in_stock(BL)
final_date
id
surnames
names
dni_

Bloque 3: Scheme Parser

In [7]:
# Definicion de funcion 'boolCHECKDelete' (limpia una Query SQL eliminando los CHECKS booleanos que pueda tener)
def boolCHECKDelete(query):

    # Define una variable auxiliar para procesar la query recibida
    parsedQuery = query

    # Define un substring con todos los CHECK de la query
    allChecks = parsedQuery[parsedQuery.index('\tCHECK'):-1]

    # Separa cada CHECK como un nuevo substring
    allChecks = allChecks.split('\n')

    # Recorre todos los CHECK que posee la query, busca si existe el tag '(BL)' y borra los que los posean (boolean CHECKS utilizados en SQlite) 
    for i in range(len(allChecks)):
        if '(BL)' in allChecks[i]:
            parsedQuery = parsedQuery.replace('\n'+allChecks[i], '')

    # Chequea si existe una ',' en el final de la query y la elimina para mantener correcta la sintaxis de la misma
    if ',' in parsedQuery[len(parsedQuery)-5:]:
        parsedQuery = parsedQuery[:len(parsedQuery)-5] + '' + parsedQuery[len(parsedQuery)-5] + '\n)'
    return parsedQuery

In [8]:
#Parseo de todas las queries de la BDD SQlite almacenadas en el dataframe 'dbSchemaQueries'

# Definicion de 'parsed_dbSchemaQueries', lista que contendra todas las queries parseadas
parsed_dbSchemaQueries = []

# Iteracion que recorre todo el dataframe, query a query.
for i in dbSchemaQueries.index:

    # Define una variable auxiliar para procesar la query actual
    currentQuery = dbSchemaQueries['sql'][i]
    
    # Reemplaza los campos DATETIME con TIMESTAMP para el pasaje a PosgreSQL
    currentQuery = currentQuery.replace('DATETIME', 'TIMESTAMP')

    # Reemplaza los campos id para ser autoincrementales
    currentQuery = currentQuery.replace('id INTEGER NOT NULL', 'id INTEGER GENERATED BY DEFAULT AS IDENTITY')

    # Conditions for table named "user".
    """currentQuery = currentQuery.replace('CREATE TABLE user', 'CREATE TABLE "user"')
    currentQuery = currentQuery.replace('CREATE TABLE "user"_', 'CREATE TABLE user_')
    currentQuery = currentQuery.replace('ON user', 'ON "user"')
    currentQuery = currentQuery.replace('ON "user"_', 'ON user_')
    currentQuery = currentQuery.replace('REFERENCES user', 'REFERENCES "user"')
    currentQuery = currentQuery.replace('REFERENCES "user"_', 'REFERENCES user_')"""

    # Sentencia ocasional debido al fallo en un campo de la tabla purchase, donde la longitud de los datos almacenados supera la asignada al campo
    currentQuery = currentQuery.replace('VARCHAR(16)', 'VARCHAR(32)')

    # Se verifica que la query posea CHECKS, en caso de tenerlos, ejecuta 'boolCHECKDelete' en la query actual
    if(currentQuery.find('CHECK') != -1):
        currentQuery = boolCHECKDelete(currentQuery)

    # Añade la query parseada a la lista
    currentQuery = currentQuery.replace('(BL)', '')
    parsed_dbSchemaQueries.append(currentQuery)

# Impresion por pantalla de las queries parseadas
for i in range(len(parsed_dbSchemaQueries)):
    print(parsed_dbSchemaQueries[i])

CREATE TABLE alembic_version (
	version_num VARCHAR(32) NOT NULL, 
	CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num)
)
CREATE TABLE feedback (
	id INTEGER NOT NULL, 
	name VARCHAR(128), 
	creation_date TIMESTAMP, 
	final_date TIMESTAMP, 
	notes TEXT, 
	answer TEXT, 
	username VARCHAR(32), 
	PRIMARY KEY (id)
)
CREATE TABLE item_category (
	id INTEGER NOT NULL, 
	name VARCHAR(32), 
	PRIMARY KEY (id)
)
CREATE UNIQUE INDEX ix_item_category_name ON item_category (name)
CREATE TABLE organization_category (
	id INTEGER NOT NULL, 
	name VARCHAR(32), 
	PRIMARY KEY (id)
)
CREATE UNIQUE INDEX ix_organization_category_name ON organization_category (name)
CREATE TABLE organization_client_category (
	id INTEGER NOT NULL, 
	name VARCHAR(32), 
	PRIMARY KEY (id)
)
CREATE UNIQUE INDEX ix_organization_client_category_name ON organization_client_category (name)
CREATE TABLE "user_" (
	id INTEGER NOT NULL, 
	username VARCHAR(32), 
	password_hash VARCHAR(128), 
	email VARCHAR(64), 
	surnames VARCHAR

Bloque 4: Conexión al servidor de bases de datos PostgreSQL y Creación de nueva BDD

In [9]:
# Creamos el cursor con el objeto conexion
conexion = psycopg2.connect(**params_pg_server)
conexion.autocommit = True
cur = conexion.cursor()

# creamos la nueva BDD utilizando el cursor
createDBQuery = 'CREATE DATABASE ' + params_newDB['new_pg_db_name']
cur.execute(createDBQuery)

# crea nuevos parametros para conectarse a la BDD recien creada
#newDBparams['database'] = newPostgreSQLDbName
#newDBparams['user'] = 'gdeluca'
#newDBparams['password'] = 'gdeluca'

# Cerramos la conexión
conexion.commit()
conexion.close()

Bloque 5: Creacion de las tablas en la nueva BDD

In [10]:
# Creamos el cursor con el objeto conexion
aux_params_pg_server = params_pg_server.copy()
aux_params_pg_server['database'] = params_newDB['new_pg_db_name']
conexion = psycopg2.connect(**aux_params_pg_server)
conexion.autocommit = True
cur = conexion.cursor()

# iteracion en la cual el cursor ejecuta todas las queries previamente parseadas una a una. (Crea las tablas en la nueva BDD PostgreSQL)
for i in range(len(parsed_dbSchemaQueries)):
    cur.execute(parsed_dbSchemaQueries[i])

# Cerramos la conexión
conexion.commit()
conexion.close()

Bloque 6: Carga de los datos

In [11]:
# Creamos una conexion en 'engine' para utilizar SQLAlchemy
dbConexionString = 'postgresql://'+params_pg_server['user']+':'+params_pg_server['password']+'@'+params_pg_server['host']+':'+params_pg_server['port']+'/' + params_newDB['new_pg_db_name']
engine = al.create_engine(dbConexionString)

with engine.begin() as connection:
    # Iteracion donde recorre cada dataframe en 'dbData' y carga los datos formateados de cada df a la tabla correspondiente en la nueva BDD
    for i in range(len(dbData)):
        dbData[i].to_sql(tableNames['name'][i], con=connection, if_exists="append", index = False)

In [12]:
# Cerramos el engine de SQLAlchemy
connection.close()
engine.dispose()