# Pi Data Challenge

#### El problema
Un proceso ETL toma datos de un archivo y lo deposita en la tabla dbo.Unificado.
Por algún error en estos archivos, aparecieron registros duplicados en la tabla.
Consultando con el cliente, nos cuenta que es posible que estas cosas sucedan como consecuencia de errores en el sistema que genera estos archivos, pero que siempre tomemos el ultimo registro que fue copiado, considerando que un registro será duplicado si los campos [ID], [MUESTRA] y [RESULTADO] son iguales en dos filas distintas.

## Consignas

### Importo librerias

De ser necesario, habrá que instalar las mismas si el ambiente no está configurado.

In [40]:
import requests
import pandas as pd
from pandasql import sqldf
import pyodbc
from datetime import datetime

###  b. Descargo de manera automatica el archivo .csv

Aquí lo que hace es utilizar la libreria requests y datetime con el fin de realizar una bajada automatica del .csv, y colocarle la fecha al nombre, ya que el archivo final que vamos a guardar será de caracter incremental, no historico. Esto quiere decir, que sobre el directorio guardado habrá un archivo para 1 vez x semana, necesario para luego correr lo demás.

In [41]:
# import requests
# from datetime import datetime

# Enlace del archivo CSV
enlace_csv = "https://adlssynapsetestfrancis.blob.core.windows.net/challenge/nuevas_filas.csv?sp=r&st=2023-04-20T15:25:12Z&se=2023-12-31T23:25:12Z&spr=https&sv=2021-12-02&sr=b&sig=MZIobvBY6c7ht%2FdFLhtyJ3MZgqa%2B75%2BY3YWntqL%2FStI%3D"

try:
    # Realizar la solicitud HTTP para descargar el archivo
    respuesta = requests.get(enlace_csv)

    # Verificar si la solicitud fue exitosa (código de estado 200)
    if respuesta.status_code == 200:
        # Obtener el contenido del archivo CSV
        contenido_csv = respuesta.content

        # Obtener la fecha actual en el formato "yyyy-mm-dd"
        fecha_actual = datetime.now().strftime("%Y-%m-%d")

        # Agregar la fecha al nombre del archivo CSV
        nombre_archivo = f"nuevas_filas_{fecha_actual}.csv"

        # Guardar el contenido en un archivo local
        with open(nombre_archivo, 'wb') as archivo_local:
            archivo_local.write(contenido_csv)

        print(f"El archivo CSV se ha descargado y guardado correctamente como '{nombre_archivo}'.")
    else:
        print("No se pudo descargar el archivo CSV. Código de estado:", respuesta.status_code)

except requests.exceptions.RequestException as e:
    print("Error al descargar el archivo CSV:", e)


El archivo CSV se ha descargado y guardado correctamente como 'nuevas_filas_2023-07-28.csv'.


### c. Insercion de datos .csv hacia las tablas unificado.

* Como lo estoy haciendo de manera local y no montado al ecosistema de Azure, lo que hago es conectarme al servidor, que he configurado para montar el backup (hecho en SQL Server .bak).
* Luego lo que va a hacer, es via codigo python, insertar los valores del archivo .csv hacia la base de datos aloja en SQLServer.

In [53]:
# Establecer la cadena de conexión a la base de datos de SQL Server

# Es muy importante aquí tener en cuenta que la configuración depende del usuario en cuestión.

server = 'DESKTOP-U84P8HB'
database = 'Testing_ETL'
username = 'pidata'
password = 'admin'
conexion_str = f'DRIVER={{SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'

In [50]:


# Obtener la fecha actual en el formato "yyyy-mm-dd"
fecha_actual = datetime.now().strftime('%Y-%m-%d')

# Concatenar la fecha al nombre del archivo CSV
ruta_archivo_csv = f'nuevas_filas_{fecha_actual}.csv'


try:
    # Leer el archivo CSV en un DataFrame
    df = pd.read_csv(ruta_archivo_csv)

    # Agregar la fecha actual en la columna "FECHA_COPIA"
    fecha_copia_actual = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    df['FECHA_COPIA'] = fecha_copia_actual

    # Conectarse a la base de datos
    conexion = pyodbc.connect(conexion_str)

    # Crear un cursor para ejecutar las inserciones
    cursor = conexion.cursor()

    # Ejecutar la instrucción DELETE para eliminar filas con fecha_copia mayor o igual a la fecha actual
    delete_query = f'''
        DELETE FROM Unificado
        WHERE FECHA_COPIA >= '{fecha_actual}'
    '''
    cursor.execute(delete_query)

    # Iterar sobre las filas del DataFrame e insertarlas en la tabla "Unificado"
    for index, fila in df.iterrows():
        query = f'''
            INSERT INTO Unificado (CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO, FORMAT, MUESTRA, VALOR, ORIGEN, FECHA_COPIA, RESULTADO)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        '''
        parametros = tuple(fila.tolist())
        cursor.execute(query, parametros)

    # Confirmar los cambios y cerrar el cursor y la conexión
    conexion.commit()
    cursor.close()
    conexion.close()

    print("Inserción de datos completada correctamente.")

except Exception as e:
    print("Error en la inserción de datos:", e) 

Inserción de datos completada correctamente.


### d. Elimino los duplicados.

Esto es fundamental, ya que es el problema a abordar por parte del enunciado, debido a que hay que eliminar los duplicados. Vía codigo python, lo hacemos, utilizando también codigo sql para la ejecucion de la misma, creando un CTE, utilizando la función row number... Se podría haber hecho todo el problema, en Azure...

In [56]:


try:
    # Conectarse a la base de datos
    conexion = pyodbc.connect(conexion_str)

    # Crear un cursor para ejecutar la consulta
    cursor = conexion.cursor()

    # Consulta para seleccionar los registros no duplicados
    consulta = '''
    WITH t AS (
        SELECT *,
            RANK() OVER (PARTITION BY ID, MUESTRA, RESULTADO ORDER BY FECHA_COPIA DESC) AS Ranking
        FROM [Testing_ETL].[dbo].[Unificado]
    )
        SELECT CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO, FORMAT, MUESTRA, VALOR, ORIGEN, FECHA_COPIA, RESULTADO
        FROM t
        WHERE Ranking = 1
    '''

    # Ejecutar la consulta para obtener los registros no duplicados
    cursor.execute(consulta)

    # Obtener los resultados de la consulta
    registros_no_duplicados = cursor.fetchall()

    # Construir la consulta de eliminación
    consulta_truncate = '''
    TRUNCATE TABLE [Testing_ETL].[dbo].[Unificado]
    '''

    # Ejecutamos el truncate
    cursor.execute(consulta_truncate)
    
    # Sentencia SQL para insertar los registros no duplicados en la tabla dbo.unificado
    insert_query = '''
    INSERT INTO [Testing_ETL].[dbo].[unificado] (CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO, FORMAT, MUESTRA, VALOR, ORIGEN, FECHA_COPIA, RESULTADO)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    '''

    # Insertar los registros no duplicados en la tabla dbo.unificado
    cursor.executemany(insert_query, registros_no_duplicados)


    # Confirmar los cambios y cerrar el cursor y la conexión
    conexion.commit()
    cursor.close()
    conexion.close()

    print("Duplicados eliminados correctamente.")

except pyodbc.Error as e:
    print("Error en la conexión o ejecución de la consulta:", e)


Duplicados eliminados correctamente.


### e. Dejar de algún modo programado ese proceso para que se ejecute los lunes de cada semana, a las 5:00 AM. (Nivel Teorico)

Para crear un flujo de trabajo (pipeline) en Azure Data Factory, sigo los siguientes pasos:

1. Ingreso al portal de Azure (https://portal.azure.com) y accedo a mi Azure Data Factory.

2. Creo un nuevo flujo de trabajo (pipeline) en ADF.

3. Dentro del pipeline, agrego la actividad que deseo ejecutar una vez por semana. Por ejemplo, si tengo una actividad que copia datos desde una fuente a un destino, la agrego al pipeline.

4. Luego, creo un disparador (trigger) semanal para programar la ejecución del flujo de trabajo:

a. En la parte superior del pipeline, voy a la sección "Triggers".

b. Creo un nuevo disparador y selecciono "Schedule" como tipo de disparador.

c. Configuro la programación del disparador para que se ejecute una vez por semana en el día y hora específicos que deseo. Por ejemplo, selecciono "Weekly" y elijo el día de la semana y la hora en que quiero que se ejecute el pipeline.

5. Asocio el disparador que acabo de crear con el pipeline que contiene la actividad que deseo ejecutar semanalmente.

6. Guardo y despliego el flujo de trabajo en Azure Data Factory.