In [None]:
# !pip install psycopg2-binary

In [31]:
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import os

In [2]:
DB_PARAMS = {
    "user": "postgres",
    "password": "123456",
    "host": "127.0.0.1",
    "port": "5432",
    "database": "postgres"  
}

DB_NAME = "lego_database"

In [9]:
def database_exists(conn, db_name):
    """
    Verifica si una base de datos ya existe.
    """
    with conn.cursor() as cur:
        cur.execute("SELECT 1 FROM pg_database WHERE datname=%s", (db_name,))
        return cur.fetchone() is not None

In [3]:
def execute_sql_from_file(conn, file_path, **kwargs):
    """
    Executes SQL commands from a given file.
    """
    with open(file_path, 'r') as file:
        sql = file.read().format(**kwargs)
    with conn.cursor() as cur:
        cur.execute(sql)

In [33]:
def create_database(connection_params, db_name):
    try:
        # Conectar a la base de datos predeterminada para realizar la verificación y creación
        conn = psycopg2.connect(**connection_params)
        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        with conn.cursor() as cur:
            # Verificar si la base de datos ya existe
            cur.execute("SELECT 1 FROM pg_database WHERE datname = %s", (db_name,))
            exists = cur.fetchone()
            if exists:
                print(f"Database '{db_name}' already exists.")
            else:
                # Ejecutar el comando CREATE DATABASE si no existe
                cur.execute(f"CREATE DATABASE {db_name}")
                DB_PARAMS["database"] = db_name
                print(f"Database '{db_name}' created successfully.")
        conn.close()

    except Exception as e:
        print(f"Error creating database: {e}")

In [42]:
def create_tables(db_params):
    """
    Creates tables in the database.
    """
    
    sql_files = [
    'create_colors_table.sql',
    'create_part_categories_table.sql',
    'create_parts_table.sql',
    'create_themes_table.sql',
    'create_sets_table.sql',
    'create_inventories_table.sql',
    'create_inventory_parts_table.sql',
    'create_inventory_sets_table.sql'
]

    try:
        with psycopg2.connect(**db_params) as conn:
            conn.set_session(autocommit=True)
            
            for sql_file in sql_files:
                execute_sql_from_file(conn, f'sql/{sql_file}')
                print(f"Table from '{sql_file}' created.")
    except Exception as e:
        print(f"Error creating tables: {e}")

In [43]:
create_database(DB_PARAMS, DB_NAME)

Database 'lego_database' already exists.


In [44]:
create_tables(DB_PARAMS)

Table from 'create_colors_table.sql' created.
Table from 'create_part_categories_table.sql' created.
Table from 'create_parts_table.sql' created.
Table from 'create_themes_table.sql' created.
Table from 'create_sets_table.sql' created.
Table from 'create_inventories_table.sql' created.
Table from 'create_inventory_parts_table.sql' created.
Table from 'create_inventory_sets_table.sql' created.


In [47]:
import csv
import sqlite3

# Conexión a la base de datos (ajustar según sea necesario)
conn = sqlite3.connect('lego_database.db')
cursor = conn.cursor()

# Función modificada para insertar datos desde un archivo CSV a una tabla
def insert_data_from_csv(file_path, table_name):
    with open(file_path, newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        columns = next(reader)  # Obtiene el encabezado para las columnas
        placeholders = ', '.join(['?'] * len(columns))  # Crea marcadores de posición
        columns_formatted = ', '.join(columns)  # Formatea las columnas para la consulta SQL
        sql = f'INSERT INTO {table_name} ({columns_formatted}) VALUES ({placeholders})'
        for row in reader:
            print(row)
            print(sql)
            cursor.execute(sql, row)
    conn.commit()

In [48]:
# Ejemplo de uso para la tabla 'colors'
insert_data_from_csv('./raw/colors.csv', 'colors')

['-1', 'Unknown', '0033B2', 'f']
INSERT INTO colors (id, name, rgb, is_trans) VALUES (?, ?, ?, ?)


OperationalError: no such table: colors

In [None]:

# No olvides cerrar la conexión al final
conn.close()

In [52]:
import csv
from psycopg2 import sql, connect, extensions

def insert_data_from_csv_to_db(db_params, file_path, table_name):
    with open(file_path, newline='', encoding='utf-8') as csvfile:
        reader = csv.reader(csvfile)
        columns = next(reader)  # Obtiene el encabezado para las columnas
        records = [row for row in reader]  # Lee todos los registros en una lista

        # Inicia una conexión a la base de datos
        conn = connect(**db_params)
        conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        cursor = conn.cursor()

        # Prepara la consulta SQL para insertar todos los registros
        placeholders = ', '.join(['%s'] * len(columns))  # Marcadores de posición para una fila
        all_placeholders = ', '.join([f"({placeholders})"] * len(records))  # Repite los marcadores para todas las filas
        query = sql.SQL("INSERT INTO {table} ({fields}) VALUES {values}").format(
            table=sql.Identifier(table_name),
            fields=sql.SQL(', ').join(map(sql.Identifier, columns)),
            values=sql.SQL(all_placeholders)
        )

        # Aplana la lista de registros para pasarla como argumentos a la consulta
        flat_records = [item for sublist in records for item in sublist]

        try:
            cursor.execute(query, flat_records)
            print("Todos los registros insertados exitosamente.")
        except (Exception, psycopg2.DatabaseError) as error:
            print(f"Error al ejecutar la consulta: {error}")
        finally:
            cursor.close()
            conn.close()
            print("Conexión cerrada.")

In [57]:
table = 'colors'
insert_data_from_csv_to_db(DB_PARAMS, f'./raw/{table}.csv', table)

Todos los registros insertados exitosamente.
Conexión cerrada.


# Consultas.

In [36]:
def execute_query(db_params, query):
    try:
        conn = psycopg2.connect(**db_params)
        conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        cursor = conn.cursor()
        
        cursor.execute(sql.SQL(query))
        
        if query.strip().lower().startswith("select"):
            records = cursor.fetchall()
            return records
        else:
            print("Consulta ejecutada exitosamente.")
        
    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error al ejecutar la consulta: {error}")
    finally:
        if conn is not None:
            cursor.close()
            conn.close()
            print("Conexión cerrada.")

In [56]:
query = "select * from colors"
execute_query(DB_PARAMS, query)

Conexión cerrada.


[]

In [50]:
query = "INSERT INTO colors (id, name, rgb, is_trans) VALUES ('-1', 'Unknown', '0033B2', 'f')"

execute_query(DB_PARAMS, query)

Consulta ejecutada exitosamente.
Conexión cerrada.
