# Desarrollo flujo de datos para variables JIGSAW

En el siguiente notebook se encuentra las líneas de código que terminarán siendo el .py final que se incorporará al flujo de datos. Este último será a través de Airflow.

In [1]:
import pandas as pd
import numpy as np
import requests

from config import settings
from sqlalchemy import create_engine
from datetime import datetime, timedelta
from dateutil import tz
from azure.cosmos import CosmosClient
from copy import copy

In [2]:
# Parámetros de conexión a Azure

COSMOS_URL = settings['cosmos_url']
COSMOS_KEY = settings['cosmos_key']
COSMOS_DATABASE = settings['cosmos_database']
COSMOS_CONTAINER_JIGSAW = settings['cosmos_container_jigsaw']

# Parámetros de conexión a Jigsaw
JIGSAW_USER_NAM = settings['jigsaw_user_nam']
JIGSAW_USER_PWD = settings['jigsaw_user_pwd']
JIGSAW_SERVER_ADD = settings['jigsaw_server_add']

# Parámetros de conexión a BD módulos
MODULOS_USER_NAM = settings['modulos_user_nam']
MODULOS_USER_PWD = settings['modulos_user_pwd']
MODULOS_SERVER_ADD = settings['modulos_server_add']

In [3]:
cosmos_client = CosmosClient(url=COSMOS_URL, credential=COSMOS_KEY)
database_client = cosmos_client.get_database_client(database=COSMOS_DATABASE)
container_jigsaw_client = database_client.get_container_client(container=COSMOS_CONTAINER_JIGSAW)

In [4]:
cosmos_id_mod = container_jigsaw_client.query_items(
    query='SELECT c.id_mod FROM c', 
    enable_cross_partition_query=True
    )

list_cosmos_id_mod = [str(m['id_mod']) for m in copy(list(cosmos_id_mod))]

existing_mods = '(' + ','.join(list_cosmos_id_mod) + ')'

In [5]:
# Creación conexion a servidores
jigsaw_engine = create_engine('mssql+pymssql://' + JIGSAW_USER_NAM + ':' + JIGSAW_USER_PWD + '@' + JIGSAW_SERVER_ADD)
modulos_engine = create_engine('mssql+pymssql://' + MODULOS_USER_NAM + ':' + MODULOS_USER_PWD + '@' + MODULOS_SERVER_ADD)

# Zonas horarias para pulir datos de apilamiento
## Los datos de jigsaw están en UTC. Las fechas de los módulos están en el horario de America/Santiago

from_zone = tz.gettz('America/Santiago')
to_zone = tz.gettz('UTC')

# Tiempo hasta que el contenido de un bloque debiese llegar por completo 

tiempo_descarga_apilado_horas = 1.25

In [6]:
# Consulta a realizar por Módulos
query_modulos = f"SELECT id_mod, ini_apil, fin_apil, ini_benef FROM [pi_temp].[dbo].[bene_diario_modulos] WHERE id_mod NOT IN {existing_mods} AND ini_apil >= '2020-01-01' ORDER BY ini_apil DESC"

# Descarga datos de la consulta
modulos_table = pd.read_sql(query_modulos, modulos_engine)

In [7]:
if modulos_table.shape[0] > 0:

  camiones_detail_modulo = []
  grades_detail_modulo = []

  for M, row in modulos_table.iterrows():
    
    modulo_inicio_apilado_santiago = datetime.strptime(row['ini_apil'].replace('.000', ''), '%Y-%m-%d %H:%M:%S').replace(tzinfo=from_zone)
    modulo_fin_apilado_santiago = datetime.strptime(row['fin_apil'].replace('.000', ''), '%Y-%m-%d %H:%M:%S').replace(tzinfo=from_zone)

    modulo_inicio_beneficio = datetime.strptime(row['ini_benef'].replace('.000', ''), '%Y-%m-%d')
    
    modulo_inicio_apilado_utc_bloque = modulo_inicio_apilado_santiago.astimezone(to_zone) - timedelta(hours=tiempo_descarga_apilado_horas)
    modulo_fin_apilado_utc_bloque = modulo_fin_apilado_santiago.astimezone(to_zone) - timedelta(hours=tiempo_descarga_apilado_horas)

    modulo_inicio_utc_bloque_string = modulo_inicio_apilado_utc_bloque.strftime('%Y-%m-%d %H:%M:%S')
    modulo_fin_utc_bloque_string = modulo_fin_apilado_utc_bloque.strftime('%Y-%m-%d %H:%M:%S')
    
    jigsaw_query_custom_detail = f"SELECT * FROM [by_custom_cycle_time_detail] WHERE [time_empty] BETWEEN '{modulo_inicio_utc_bloque_string}' AND '{modulo_fin_utc_bloque_string}' ORDER BY id DESC"
    
    jigsaw_table_custom_detail = pd.read_sql(jigsaw_query_custom_detail, jigsaw_engine)
    jigsaw_table_custom_detail['id_mod'] = row['id_mod']
    jigsaw_table_custom_detail['Inicio_Apilamiento'] = row['ini_apil']
    jigsaw_table_custom_detail['Fin_Apilamiento'] = row['fin_apil']
    
    camiones_detail_modulo.append(jigsaw_table_custom_detail)
    
    jigsaw_block_ids_list = [str(i) for i in jigsaw_table_custom_detail['grade_id'].unique() if ~np.isnan(i)]
    jigsaw_blocks_ids_query = '(' + ','.join(jigsaw_block_ids_list) + ')'
    
    jigsaw_query_grades_detail = f"SELECT [grade_id], [start_date], [qualities] FROM [grade_qualities] WHERE [grade_id] IN {jigsaw_blocks_ids_query} ORDER BY id DESC"

    jigsaw_table_grades_detail = pd.read_sql(jigsaw_query_grades_detail, jigsaw_engine)
    jigsaw_table_grades_detail['id_mod'] = row['id_mod']
    
    grades_detail_modulo.append(jigsaw_table_grades_detail)
    
    
  camiones_detail_modulo_pd = pd.concat(camiones_detail_modulo)
  grades_detail_modulo_pd = pd.concat(grades_detail_modulo)

  # API de R que procesa las variables en el formato trabajado (esta la debemos trabajar con Docker y montar en Azure)

  mods_nuevos_variables_jigsaw = requests.post(
      url='http://127.0.0.1:8991/jigsaw_variables',
      json={
          'datos_camiones' : camiones_detail_modulo_pd.to_json(orient='records'),
          'datos_grades' : grades_detail_modulo_pd.to_json(orient='records')
          }
  )
  # Empuje a Cosmos

  id_mod_nuevos = mods_nuevos_variables_jigsaw.json()

  for record in id_mod_nuevos:
      try : 
          container_jigsaw_client.create_item(body=record)
          print('Insertado : ' + str(record['id_mod']))
      except:
          print('No insertado : ' + str(record['id_mod']))

else :
  print('No hay módulos nuevos')

No hay módulos nuevos
