In [2]:
## Importación al bucket iedatalakeLanding de entidades del CRM online
## --------------------------------------------------------------------
##
## Las entidades necesarias se extraerán de la base de datos réplica de CRM online en bruto 
## prefiltradas para que no se incluyan registros que han sido borrados en CRM, y limitando
## que los registros sean activos en CRM en aquellas entidades donde proceda. 
##
## Para cada entidad, se creará un CSV en una subcarpeta "data" en la misma carpeta donde 
## se ejecuta el programa, que posteriormente se subirá junto al resto a la carpeta "crm" del bucket en S3.
##
## Los nombres de los ficheros generados llevarán los nombres de las tablas tal como vienen en 
## la réplica de CRM, y las columnas llevarán los mismos nombres que los campos de estas tablas, 
## preservando así en Landing su denominación original. 

import sqlalchemy
import csv
import string
import pandas as pd
import os
import shutil
import datetime
import uuid
import sys

# Importación de opciones de configuración

if (os.name == 'nt'):
    sys.path.append('D:\python\config')
else: 
    sys.path.append('/home/ubuntu/iedatalake/python/config')
import databaseconfig as dbc


In [3]:
## Proceso, versíón y fecha de ingesta para identificar la ejecución

ingestionProcess = 'Ingesta de Entidades CRM desde analytics01 en python'
ingestionVersion = str(uuid.uuid1())
ingestionInitialDate = str(datetime.datetime.now())

## Motor SQLAlchemy para la base de datos SQL Server réplica del CRM online 

crmEngine = sqlalchemy.create_engine(dbc.crm['dialect'] + '+' + dbc.crm['driver'] + '://' \
      + dbc.crm['username'] + ':' + dbc.crm['password'] + '@' \
      + dbc.crm['host'] + '/' + dbc.crm['database'] + dbc.crm['parameters'])

## Carpeta auxiliar para depositar los CSVs generados antes de subir

if not os.path.exists("data"):
    os.makedirs("data")


In [4]:
## Procedimiento de importación de una entidad de CRM a un fichero CSV local

def ImportCRMEntity(entityName, **optionalParameters): 
    
    # Ruta del fichero CSV, asumida en una subcarpeta "data" en la carpeta donde se ejecuta
    # y extensión .csv.gz porque se generará comprimido en formato GZIP
    
    csvFile = optionalParameters.get('csvFile', 'data/' + entityName + '.csv.gz')
    
    # Parámetro opcional con filtro a aplicar para restringir a registros activos de la entidad.
    # Por defecto es 'statecode = 0' al ser la condición que aplica a más entidades
    
    activeFilter = optionalParameters.get('activeFilter', 'statecode = 0')    
       
    # Parámetro opcional con el índice a aplicar al fichero CSV
    
    csvIndex = optionalParameters.get('csvIndex', None)
    if (csvIndex == None):
        csvIndex = str.lower(entityName + 'id')        
    
    # Parámetro opcional con el número de registros del bloque a recuperar de forma iterativa
    
    chunkSize = optionalParameters.get('chunkSize', None)
    if (chunkSize == None):
        # Si no viene de entrada ningún número de registros, no se limita, poniendo como alcance
        # el número total de filas de la tabla de la entidad original
        crmConnection = crmEngine.connect()
        df = pd.io.sql.read_sql("SELECT COUNT(*) as recuento FROM " + entityName, crmConnection)
        chunkSize = int(df["recuento"]) + 1
        crmConnection.close()
    else: 
        chunkSize = int(chunkSize)
    
    # Recuperación por base de datos de los registros de la entidad filtrados conforme 
    # a la condición de activo (si aplica) y a un número máximo de filas (si aplica)
    # en un DataFrame que vuelca al fichero CSV destino para la entidad
    
    crmConnection = crmEngine.connect()
    rowsPending = True
    offset = 0
    while rowsPending:
        sqlSentence = "WITH Results_SQL AS (SELECT "        
        sqlSentence += "*, ROW_NUMBER() OVER (ORDER BY " + csvIndex + ") as RowNum FROM " + entityName
        if (activeFilter != None):
            sqlSentence += " WHERE " + activeFilter
        sqlSentence += ") SELECT * FROM Results_SQL WHERE RowNum > " + str(offset) + " AND RowNum <=  " \
            + str(offset + chunkSize)        
        chunk = pd.io.sql.read_sql(sqlSentence, crmConnection)
        # Borramos columna auxiliar RowNum usada para ordenar por el índice
        del chunk["RowNum"]
        chunk.set_index(csvIndex, inplace = True)        
        if (offset == 0):            
            chunk.to_csv(csvFile, sep = ';', compression = 'gzip', quotechar='|', quoting=csv.QUOTE_MINIMAL, 
                 encoding = 'utf8', line_terminator = '\n\r')
        else:            
            chunk.to_csv(csvFile, sep = ';', compression = 'gzip', quotechar='|', quoting=csv.QUOTE_MINIMAL, 
                 encoding = 'utf8', mode= 'a', header = False, line_terminator = '\n\r')               
        offset += chunkSize
        if (len(chunk) < chunkSize):
            rowsPending = False            
        del chunk
    crmConnection.close()
        

In [5]:
## Procesamiento de la entidad BusinessUnit (Unidad de Negocio)

ImportCRMEntity('BusinessUnit', activeFilter = 'isdisabled = 0')

In [5]:
## Procesamiento de la entidad Contact (Persona)

ImportCRMEntity('Contact') #, chunkSize = '5000' )

In [6]:
## Procesamiento de la entidad IE_Admission (Solicitud de Admisión)

ImportCRMEntity('IE_Admission')

In [7]:
## Procesamiento de la entidad IE_Agrupation (Agrupaciones de Programas). 

## Aunque "Agrupation" no exista en inglés y debieran haber puesto otra traducción más oportuna como
## "Program Group", lo dejaramos así en Landing para preservar la nomenclatura original, pero en la 
## capa Process (o área de Staging) deberíamos normalizar esta denominación.

ImportCRMEntity('IE_Agrupation')

In [8]:
## Procesamiento de la entidad IE_ApplicationFAFellowship (Beca / Fellowship)

ImportCRMEntity('IE_ApplicationFAFellowship')

In [9]:
## Procesamiento de la entidad IE_Bonus (Bono)

ImportCRMEntity('IE_Bonus')

In [10]:
## Procesamiento de la entidad IE_Country (País)

ImportCRMEntity('IE_Country')

In [11]:
## Procesamiento de la entidad IE_DiscountsType (Tipo de Descuento)

## No se excluyen inactivos para darle carácter histórico y disponer de tipos de descuento que hayan
## sido aplicados en el pasado a becas o bonos, pero en la actualidad no se ofrezcan.

ImportCRMEntity('IE_DiscountsType', activeFilter = None)

In [12]:
## Procesamiento de la entidad IE_GeographicalArea (Área Geográfica)

ImportCRMEntity('IE_GeographicalArea')

In [13]:
## Procesamiento de la entidad IE_Grant (Solicitud de Ayuda Financiera / Scholarship)

ImportCRMEntity('IE_Grant')

In [14]:
## Procesamiento de la entidad IE_Loan (Préstamo)

ImportCRMEntity('IE_Loan')

In [15]:
## Procesamiento de la entidad relacional IE_IE_IESchool_IE_Program 
## (Asociación múltiple entre programas y escuelas)

## No se requiere filtro de activo pues no tiene campo de estado.

ImportCRMEntity('IE_IE_IESchool_IE_Program', activeFilter = None)


In [16]:
## Procesamiento de la entidad IE_IESchool (Escuela IE)

ImportCRMEntity('IE_IESchool')

In [17]:
## Procesamiento de la entidad IE_Nationality (Nacionalidad)

ImportCRMEntity('IE_Nationality')

In [18]:
## Procesamiento de la entidad relacional IE_NationalityPerson 
## (Asociación múltiple entre personas y nacionalidades, anexando cuando se puede un documento identificativo)

ImportCRMEntity('IE_NationalityPerson')

In [19]:
## Procesamiento de la entidad IE_Program (Programa)

## No se filtran programas activos pues en general necesitamos los programas con carácter histórico a fin
## de poder hacer análisis evolutivos o comparativos a lo largo del tiempo.
## Si en la capa de Process algún caso de uso requiriese limitar a activos, o comercializables, o atendiendo
## a otros criterios, se aplicarían en la transformación que los obtenga en esa otra capa para esa finalidad.

ImportCRMEntity('IE_Program', activeFilter = None)

In [20]:
## Procedamiento de la entidad IE_ProgramToProgramCore (equivalencia para el antiguo Sistema Plus)

ImportCRMEntity('IE_ProgramToProgramCore')

In [21]:
## Procesamiento de la entidad IE_ProgramType (Tipo de Programa)

## No se filtran tipos de programas activos para poder disponer de ellos con carácter histórico

ImportCRMEntity('IE_ProgramType', activeFilter = None)

In [22]:
## Procesamiento de la entidad Opportunity (Oportunidad / Interés en Programa)

## Limitado a 10000 registros hasta que se cambie el tipo de instancia y se pueda cargar todo en memoria.
## No se filtran registros activos pues el campo de estado no tiene un valor "Activo" para esta entidad
## sino que tiene tres posibles estados y ninguno de ellos lo podemos descartar en esta carga.

ImportCRMEntity('Opportunity', chunkSize = '5000') # activeFilter = None) #, chunkSize = '5000')

In [23]:
## Procesamiento de la entidad Product (Producto o Convocatoria de un Programa)

## No se filtran productos activos pues en general necesitamos las convocatorias con carácter histórico.

ImportCRMEntity('Product', activeFilter = None)

In [24]:
## Procesamiento de StringMap (Catálogo de listas de valores y sus textos asociados)

## No se requiere filtro de activo pues no tiene campo de estado. 

ImportCRMEntity('StringMap', activeFilter = None)

In [25]:
## Procesamiento de la entidad Team (Equipo de Ventas)

## No se requiere filtro de activo pues no tiene campo de estado.

ImportCRMEntity('Team', activeFilter = None)

In [26]:
## Celda auxiliar de comprobación para cargar CSV y visualizar como DataFrame de pandas

##pd.read_csv('data/Team.csv.gz', sep = ';', compression = 'gzip', quotechar='|', quoting=csv.QUOTE_MINIMAL)

In [27]:
## Sincronización de carpeta local "data" con carpeta "crm" del bucket S3 iedatalakelanding
## Hace uso de AWSCLI (previamente instalado mediante la instrucción: sudo apt install awscli)

ingestionFinalDate = str(datetime.datetime.now())
os.system('aws s3 sync data s3://iedatalakelanding/crm --metadata ingestionprocess="' + ingestionProcess  
          + '",ingestionversion="' + ingestionVersion + '",ingestioninitialdate="' + ingestionInitialDate 
          + '",ingestionfinaldate="' + ingestionFinalDate + '",code="Ingest-CRM-Entities"');


In [28]:
## Borrado de la carpeta auxiliar "data" y todos los ficheros generados en ella
# shutil.rmtree('data')