In [None]:
from dotenv import load_dotenv
import pandas as pd
import os
load_dotenv()

_Una empresa llamada DBU que vende equipos deportivos al aire libre, \
tiene muchas ubicaciones diferentes y ha estado registrando las ventas de diferentes ubicaciones en varios productos.\
Quiere saber cuáles son sus mejores productos y vendedores para mejorar su rendimiento general_

In [None]:
url_base:str = "https://raw.githubusercontent.com/CoderContenidos/Data.Engineering/main/Semana%207/Tablas/"
tables:list[str]= [
    "countryregioncurrency.csv",
    "currencyrate.csv",
    "product.csv",
    "productcategory.csv",
    "productdescription.csv",
    "productmodelproductdescriptionculture.csv",
    "productreview.csv",
    "productsubcategory.csv",
    "salesorderdetail.csv",
    "salesorderheader.csv",
    "salesperson.csv",
    "salesterritory.csv",
]


In [None]:
schema:str = "andru_ocatorres_coderhouse"

# Dataframes con SQL Alchemy

* [SQLAlchemy Documentation]( https://docs.sqlalchemy.org/en/20/intro.html#installation)

In [None]:
# Importando las librerias necesarias para hacer la conexion a la base
from sqlalchemy import create_engine 

In [None]:
# crea un objeto url para conectar con la warehouse
username = os.getenv('REDSHIFT_USERNAME')
password = os.getenv('REDSHIFT_PASSWORD')
host = os.getenv('REDSHIFT_HOST')
port = os.getenv('REDSHIFT_PORT', '5439')
dbname = os.getenv('REDSHIFT_DBNAME')

# Construct the connection URL
connection_url = f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{dbname}"
db_engine = create_engine(connection_url)

In [None]:
# verificamos la conexion
try:
    with db_engine.connect() as connection:
        print("Conexion creada")
except Exception as e:
    print(f"Conexion fallida: {e}")

In [None]:
from sqlalchemy import Table, Column, Integer, String, MetaData, exc , inspect, text


# funcion para crear la tabla si es que no existe
def create_table(table_name,df,db_engine):
    schema:str = "andru_ocatorres_coderhouse"
    metadata = MetaData(schema=schema)

    columns = []
    
    for col_name, col_type in zip(df.columns, df.dtypes):
        if pd.api.types.is_integer_dtype(col_type):
            columns.append(Column(col_name, Integer))
        elif pd.api.types.is_float_dtype(col_type):
            columns.append(Column(col_name, String))  # Redshift doesn't have a specific float type
        elif pd.api.types.is_string_dtype(col_type):
            columns.append(Column(col_name, String))
        elif pd.api.types.is_datetime64_any_dtype(col_type):
            columns.append(Column(col_name, String))  # Storing datetime as string for simplicity
        else:
            raise TypeError(f"Unsupported dtype: {col_type}")
    
    Table(table_name, metadata, *columns)
    metadata.create_all(db_engine)
    print(f"Tabla {table_name} creada con exito")
    return

In [None]:
# funcion para la carga de cada dataset
def upload_csv(**kwargs):
    base_path = kwargs["base_path"]
    csv_file = kwargs["csv"]
    engine = kwargs["engine"]
    chunksize = kwargs["chunksize"] 

    table_name = csv_file.split(".")[0]
    print(f"{csv_file} to be read")

    try:
        chunk_iterator = pd.read_csv(base_path + csv_file, chunksize=chunksize)
        create_table(table_name, next(chunk_iterator), engine)  
        print(f"Table {table_name} structure created")

        for chunk in chunk_iterator:
            chunk.to_sql(table_name, engine, if_exists='append', index=False)
            print(f"Uploaded chunk with {len(chunk)} records")

        print(f"Table {table_name} has been successfully populated")
        return 1
    
    except Exception as e:
        print(f"La carga de la tabla: {table_name} no se realizó con éxito")
        return 0
    
# function para crear views areas funcionales

def upload_views(**kwargs):
    engine = kwargs["engine"]
    queries = kwargs["queries"]
    schema = kwargs["schema"]
    
    # Transformation
    queries_to_view = {
        name: f"CREATE OR REPLACE VIEW public.{name} AS\n{query}"
        for name, query in queries.items()
    }

    try:
        with engine.begin() as connection:
            for name, view_query in queries_to_view.items():
                connection.execute(text(view_query))
                print(f"View '{name}' has been created successfully")

        print("Transaction committed successfully")
        return 1

    except Exception as e:
        print(f"Failed to create view: {e}")
        # Rollback the transaction in case of error
        engine.rollback()
        return 0



In [9]:
uploaded_status = [upload_csv(base_path=url_base,chunksize=100_000,csv=table,engine=db_engine) for table in tables]
uploaded_status

## PREGUNTAS A RESPONDER
* _MEJORES PRODUCTOS -> TOP PRODUCTOS EN REVIEW | TOP PRODUCTOS MAS VENDIDOS_
* _VENDEDORES CON PEOR VENTA -> TOP N PEORES VENDEDORES_
* _Encontrar los cinco vendedores con mejor desempeño usando la columna salesytd (Sales, year-to-date). (Solo necesitamos conocer el businessentityid de cada vendedor, ya que esto identifica de forma única a cada uno)._
* _Usando salesorderheader, buscar los 5 mejores vendedores que hicieron la mayor cantidad de ventas en el año más reciente (2014). (Hay una columna llamada subtotal; usarla). Las ventas que no tienen un vendedor asociado deben excluirse de los cálculos y producción final. Se deben incluir todos los pedidos que se realizaron dentro del año calendario 2014.
Pista: Pueden usar la sintaxis '1970-01-01' para generar un punto de comparación en el tiempo._

In [None]:
sql_worst_sellerstr = """

	SELECT 
		salespersonid 
	,	ROUND(CAST(SUM(totaldue) AS NUMERIC),2) AS TOTAL_AMOUNT_SELLED
	,	COUNT(1) AS AMOUNT_OF_SALES
	FROM public.salesorderheader  
	GROUP BY salespersonid 
	ORDER BY 
		TOTAL_AMOUNT_SELLED
	LIMIT 10;
"""

sql_top_10_best_sells:str = """
WITH TOP_BEST_SELLS AS
(	SELECT 
		S.productid
	,	COUNT(1) AS PRODUCT_TOTAL
	FROM public.salesorderdetail  S
	GROUP BY 
		S.productid
),
AGG_TABLE as (
SELECT 
	productid
,	P.productmodelid
,	P.name
,	PRODUCT_TOTAL
FROM TOP_BEST_SELLS
LEFT JOIN 
	public.product AS P
	USING(productid)
ORDER BY PRODUCT_TOTAL DESC
LIMIT 10
)
SELECT 
	AG.name
,	PD.description
,	AG.PRODUCT_TOTAL
FROM productdescription PD
INNER JOIN productmodelproductdescriptionculture AS PC USING(productdescriptionid)

INNER JOIN AGG_TABLE  AS AG USING(productmodelid)
WHERE PC.cultureid = 'en' ;

"""

sql_best_reviews:str = """
WITH producto_rating AS (
         SELECT pv.productid,
            p.productmodelid,
            p.name,
            avg(pv.rating) AS avg_rating,
            count(1) AS total_reviews
           FROM productreview pv
             LEFT JOIN product p USING (productid)
          GROUP BY pv.productid, p.name, p.productmodelid
          ORDER BY (avg(pv.rating)) DESC
        )
 SELECT pr.name,
    pd.description,
    pr.avg_rating,
    pr.total_reviews
   FROM productdescription pd
     JOIN productmodelproductdescriptionculture pc USING (productdescriptionid)
     JOIN producto_rating pr USING (productmodelid)
  WHERE pc.cultureid = 'en'::text;"""

sql_best_sales_person:str = """
SELECT 
   	businessentityid
,	salesytd
FROM salesperson
ORDER BY salesytd DESC
LIMIT 5 
"""
sql_best_sales_person_2014:str = """
SELECT 
    salespersonid,
    ROUND(SUM(subtotal)::NUMERIC, 2) AS total_sales
FROM 
    salesorderheader
WHERE 
    orderdate >= '2014-01-01'
    AND salespersonid IS NOT NULL
GROUP BY 
    salespersonid
ORDER BY 
    total_sales DESC
LIMIT 5;"""



queries_to_view = {    
	"best_sales_person_2014_view":sql_best_sales_person_2014,
    "best_sales_person_view":sql_best_sales_person,
    "best_reviews_view":sql_best_reviews,
    "best_sells_view":sql_top_10_best_sells,
    "worst_sellers_view" : sql_worst_sellerstr
}

### Generar carga de vistas

In [None]:
upload_views(engine=db_engine,queries=queries_to_view, schema=schema)


# Dataframes con Psycopg - Linux(Psycopg-binary)

* [Psycopg Documentation](https://www.psycopg.org/psycopg3/docs/basic/install.html#)

In [None]:
import psycopg2 as pg

In [None]:
# Genera un objeto connector
conn = pg.connect(
        user = os.getenv('REDSHIFT_USERNAME')
    ,   password = os.getenv('REDSHIFT_PASSWORD')
    ,   host = os.getenv('REDSHIFT_HOST')
    ,   port = os.getenv('REDSHIFT_PORT', '5439')
    ,   database = os.getenv('REDSHIFT_DBNAME')
)


In [None]:
def view_creator(**kwargs):
    connection = kwargs["connection"]
    queries = kwargs["queries"]

    try:
        with connection.cursor() as cursor:
            queries_to_view = {
                name: f"CREATE OR REPLACE VIEW public.{name} AS\n{query}"
                for name, query in queries.items()
            }

            for name, view_query in queries_to_view.items():
                cursor.execute(view_query)
                print(f"View '{name}' ha sido creada")

        # Commit the transaction outside of the cursor context manager
        connection.commit()
        print("Transaction committed successfully")

    except Exception as e:
        print(f"Error '{e}' occurred")
        connection.rollback() 

    finally:
        connection.close()
        print("Connection closed")

In [None]:
view_creator(connection=conn,queries=queries_to_view)

## psycopg2 vs SQLAlchemy

| Feature         | psycopg2                                    | SQLAlchemy                                          |
|-----------------|---------------------------------------------|-----------------------------------------------------|
| Type            | Database adapter                            | ORM library and SQL toolkit                        |
| Purpose         | Interact with PostgreSQL databases         | Interact with multiple database engines             |
| Functionality   | Low-level interface for executing SQL commands, managing connections, handling transactions | ORM for mapping Python objects to database tables, SQL toolkit for query building |
| Level of Abstraction | Low-level, requires writing SQL queries directly | High-level, provides abstraction over SQL queries, supports ORM |
| Performance     | Known for efficiency and performance      | Offers flexibility and abstraction, may have slightly more overhead |
| Suitability     | Developers comfortable with SQL, need direct control over PostgreSQL interactions | Developers preferring higher-level abstraction, multiple database support |
| Learning Curve  | Easier for SQL-savvy developers            | Steeper due to ORM and higher-level abstraction    |

