# Transformaciones correspondientes a la capa silver de la tabla de contacts
Este notebook contiene el código para leer la tabla de contacts y realizar los respectivos cruces.

## Objetivos
Los objetivos de este notebook son:

* Lectura de tabla de la capa bronze azuresql
* Transformacion y limpieza del campo CIUDAD
* Escribir el resultado

## Creacion 
* Autor: Diego Torres
* Fecha creación: 27/11/2023

In [None]:
#%pip install pandas
#%pip install numpy
%pip install unidecode
#%pip install re

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting unidecode
  Using cached Unidecode-1.3.8-py3-none-any.whl (235 kB)
Installing collected packages: unidecode
Successfully installed unidecode-1.3.8
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
#dbutils.library.restartPython()

In [None]:
# Importar las bibliotecas necesarias
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, lower,col,when,to_timestamp,to_date,coalesce,lit,year,month,split
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from unidecode import unidecode
import re

# Crear una SparkSession
spark = SparkSession.builder.appName("AzureSQL").getOrCreate()

## Credenciales BD
inpath = "C:/Users/diego.torres/OneDrive/Datasets/Tuboleta/Credenciales.txt"
keys = pd.read_csv(inpath, sep = ',')
display(keys)

# Creo variables para cada fila del DataFrame que contiene las credenciales de la bd
for index, row in keys.iterrows():
    variable_name = row['key']
    variable_value = row['value']
    globals()[variable_name] = variable_value

# Configurar las propiedades de la conexión
jdbc_url = f"jdbc:sqlserver://{jdbc_hostname}:1433;database={jdbc_database_datamart}"
jdbc_properties = {
    "user": user,
    "password": password,
    "driver": driver
}

# Conexion al Blob
spark.conf.set(clave_blob, access_key_blob)

In [None]:
###  Importe de Contacts  ###
inpath = "abfss://storagebi@tbdwhstorage01.dfs.core.windows.net/bronze/contacts/Ds_B_Contacts_parquet/"
df_ini = spark.read.parquet(inpath)

#df_ini.count()
#display(df_ini.limit(10))

In [None]:
#temp = df_ini.filter(col('T_CONTACT_ID') == '10229527487512')#LAST_MODIFICATION
# temp = df_ini.select('LAST_MODIFICATION','LAST_UPDATE').distinct()
# display(temp.limit(20))

In [None]:
# #temp = df_ini.filter(col('T_CONTACT_ID') == '10229527487512')#LAST_MODIFICATION
# temp = df_ini.select(lower('MAIN_ADDR_LINE1'))
# print(temp.count())
# temp = df_ini.select(lower('MAIN_ADDR_LINE1')).distinct()
# print(temp.count())
# display(temp.limit(200))

In [None]:
# # Dividir cada línea en palabras y extraer la primera palabra
# df_with_first_word = df_ini.withColumn("primera_palabra", split(df_ini["MAIN_ADDR_LINE1"], " ")[0])

# # Comprobar si la primera palabra contiene "calle", "carrera" u otras variantes
# # df_with_extraction = df_with_first_word.withColumn("tipo_via",
# #                                         df_with_first_word["primera_palabra"].rlike("^(clle?|cra)$").cast("int"))
# temp = df_with_first_word.select(lower('primera_palabra')).distinct()
# print(temp.count())
# # Mostrar el DataFrame resultante
# display(temp)

In [None]:
###  Transformacion de Contacts  ###

columnas = ['T_CONTACT_ID','CONTACT_NUMBER',"FIRSTNAME", "LASTNAME","ID_NUMBER","BIRTHDATE","EMAIL","NAT_NUMBER_CELLPHONE","GENDER","ADDRESS_SALUTATION","MAIN_ADDR_LINE1","MAIN_ADDR_TOWN","MAIN_ADDR_GEO_ZONE","MAIN_ADDR_COUNTRY",'IS_GUEST',"CREATED_DATE",'LAST_MODIFICATION_FROM','CREATED_FROM',"LAST_UPDATE",'LAST_MODIFICATION','MAIN_ADDR_ZIPCODE']
 
### A) seleccion de columnas ###
df_contacts = df_ini.select(columnas).\
    withColumn("EMAIL", lower("EMAIL")).\
    withColumn("MAIN_ADDR_LINE1", lower("MAIN_ADDR_LINE1")).\
    withColumn("BIRTHDATE", to_timestamp(df_ini["BIRTHDATE"], "MM/dd/yyyy HH:mm:ss")).\
    withColumn("BIRTHDATE", to_date("BIRTHDATE")).\
    withColumn("CREATED_DATE", to_timestamp(df_ini["CREATED_DATE"], "MM/dd/yyyy HH:mm:ss")).\
    withColumn("LAST_UPDATE", to_timestamp(df_ini["LAST_UPDATE"], "MM/dd/yyyy HH:mm:ss")).\
    withColumn("LAST_MODIFICATION", to_timestamp(df_ini["LAST_MODIFICATION"], "MM/dd/yyyy HH:mm:ss")).\
    withColumn("MAIN_ADDR_TOWN", coalesce(df_ini["MAIN_ADDR_TOWN"], df_ini["MAIN_ADDR_GEO_ZONE"]))#.\
    #withColumn("MAIN_ADDR_LINE1_type", split(df_ini["MAIN_ADDR_LINE1"], " ")[0])

# ciudades con "-SELECCIONAR-", "COLOMBIA" pone el departamento
df_contacts = df_contacts.withColumn("MAIN_ADDR_TOWN_",
                   when(df_contacts["MAIN_ADDR_TOWN"].isin("-SELECCIONAR-", "COLOMBIA"),
                        df_contacts["MAIN_ADDR_GEO_ZONE"]).otherwise(df_contacts["MAIN_ADDR_TOWN"]))


#df_contacts.printSchema()
#display(df_contacts.limit(10))

df_contacts.createOrReplaceTempView("df_contacts_vw")


In [None]:
# tmp = df_contacts.filter(col('MAIN_ADDR_TOWN').isin('-SELECCIONAR-','COLOMBIA'))
# display(tmp.limit(100))

# tmp = df_contacts.filter(year('LAST_MODIFICATION') == 2024).\
#     groupBy("MAIN_ADDR_TOWN_").count().orderBy(col("count").desc())

# tmp = df_contacts.select('T_CONTACT_ID','ID_NUMBER','MAIN_ADDR_LINE1','MAIN_ADDR_TOWN','MAIN_ADDR_TOWN_','MAIN_ADDR_COUNTRY','CREATED_DATE','LAST_UPDATE','LAST_MODIFICATION').\
#     filter((year('CREATED_DATE') == 2024) & (month('CREATED_DATE') == 4))
# display(tmp)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

###############################################
####  FUNCION QUE AÑADE ESPACIOS

# Define una función que aplica regexp_replace a una cadena
def agregar_espacios_direccion(direccion):
    if isinstance(direccion, str):  # Comprueba si la dirección es una cadena de texto
        direccion_con_espacios = re.sub(r'(\D)(?=\d)', r'\1 ', direccion)
        direccion_con_espacios = re.sub(r'(\d)(?=\D)', r'\1 ', direccion_con_espacios)
        return direccion_con_espacios
    else:
        return direccion

# Register User Defined Function (UDF) con Spark
# udf: Es una función de PySpark que se utiliza para registrar una función definida por el usuario o UDF. Esta función toma dos argumentos: la función definida por el usuario y el tipo de datos de salida del UDF.
# StringType() Especifica el tipo de datos de salida del UDF.
# Si no registras tu función como un UDF, no podrás aplicarla directamente a una columna en un DataFrame de PySpark
agregar_espacios_direccion = udf(agregar_espacios_direccion, StringType())


################################################################
####  FUNCION QUE MODIFICA VALORES DISTINTOS DE CALLE Y CARRERA

# Función para convertir abreviaturas de direcciones
def ajuste_calles_carreras(direccion):
    if isinstance(direccion, str):  # Comprueba si la dirección es una cadena de texto
        # Diccionario de mapeo de abreviaturas
        mapeo_abreviaturas = {
            "cl": "calle",
            "cle": "calle",
            "cll": "calle",
            "clle": "calle",
            "call": "calle",
            "calle": "calle",
            "cr": "carrera",
            "crr": "carrera",
            "cra": "carrera",
            "carrera": "carrera",
            "kr": "carrera",
            "tranv": "transversal",
            "tr": "transversal",
            "transversal": "transversal",
            "av": "avenida",
            "apto": "apartamento",
            "apt": "apartamento",
            "apartame": "apartamento",
            "ap": "apartamento",
            "int": "interior",
            "interior": "interior",
            "torre": "interior",
            "no": "",
            "n": "",
            "numero": "",
            "dg": "diagonal",
            "diag": "diagonal",

            # Puedes agregar más abreviaturas según tus necesidades
        }
        # Separar la dirección en palabras
        palabras = direccion.split()
        # Reemplazar abreviaturas por sus equivalentes completos
        palabras = [mapeo_abreviaturas.get(palabra.lower(), palabra) for palabra in palabras]
        # Unir las palabras de nuevo en una cadena
        return ' '.join(palabras)
    else:
        return direccion


# Registra la función definida por el usuario como un UDF
ajuste_calles_carreras = udf(ajuste_calles_carreras, StringType())


################################################################
####  FUNCION QUE ELIMINA CARACTERES ESPECIALES

# Define la función para limpiar direcciones utilizando expresiones regulares
def ajuste_caracteres_esp(direccion):
    if isinstance(direccion, str):  # Comprueba si la dirección es una cadena de texto
        # Elimina caracteres especiales
        patron = r'[^\w\s]'
        direccion = re.sub(patron, ' ', direccion)
        # Quita los acentos o tildes
        direccion_limpia = unidecode(direccion)
        return direccion_limpia
    else:
        return direccion

# Registra la función como un UDF
ajuste_caracteres_esp = udf(ajuste_caracteres_esp, StringType())


#################################################################################
####  FUNCION QUE ELIMINA TODO LO QUE SIGA DESPUES DE LAS PALABRAS EN EL PATRON

def limpiar_torre_apto(direccion):
    if isinstance(direccion, str):  # Comprueba si la dirección es una cadena de texto
        # Expresión regular para encontrar las palabras clave y todo lo que esté después de ellas
        patron = r'\b(barrio|apartamento|interior|anillo|piso)\b.*'
        # Elimina todo lo que está después de las palabras clave
        direccion_limpia = re.sub(patron, '', direccion)
        # Elimina los espacios adicionales al final de la cadena resultante
        direccion_limpia = direccion_limpia.strip()
        return direccion_limpia
    else:
        return direccion

# Registra la función como un UDF
limpiar_torre_apto = udf(limpiar_torre_apto, StringType())

#################################################################################
####  FUNCION QUE SEPARA NUMEROS DE 4 O MAS DIGITOS EN SUS VALORES MEDIOS
# Ej: 6292 quedaria 62 92
def separar_numeros(direccion):
    if isinstance(direccion, str):  # Comprueba si la dirección es una cadena de texto
        # Encuentra todos los números de 4 dígitos en la dirección
        numeros = re.findall(r'\b\d{4}\b', direccion)
        # Divide cada número por la mitad y lo reformatea como "XX YY"
        numeros_separados = [numero[:2] + ' ' + numero[2:] for numero in numeros]
        # Reemplaza los números originales en la dirección con los números separados
        for numero in numeros:
            direccion = direccion.replace(numero, numeros_separados[numeros.index(numero)], 1)
        return direccion
    else:
        return direccion

# Registra la función como un UDF
separar_numeros = udf(separar_numeros, StringType())



# Aplicamos las funciones creadas
df_contacts = df_contacts.withColumn("Direccion_new", ajuste_caracteres_esp("MAIN_ADDR_LINE1")).\
    withColumn("Direccion_new", agregar_espacios_direccion(col('Direccion_new'))).\
    withColumn("Direccion_new", ajuste_calles_carreras(col("Direccion_new"))).\
    withColumn("Direccion_new", limpiar_torre_apto(col("Direccion_new"))).\
    withColumn("Direccion_new", separar_numeros(col("Direccion_new")))
    

#display(df_contacts.limit(200))

In [None]:
# temp = df_contacts.select('MAIN_ADDR_LINE1','Direccion_new').distinct()
# #print(temp.count())
# display(temp.limit(200))

In [None]:
# tmp = df_contacts.filter(col('MAIN_ADDR_TOWN').isin('-SELECCIONAR-','COLOMBIA'))
# display(tmp.limit(100))

In [None]:
# tmp = df_contacts.filter(year('LAST_MODIFICATION') == 2024).\
#     groupBy("MAIN_ADDR_TOWN_").count().orderBy(col("count").desc())

# display(tmp)

In [None]:
#temp = df_ini.filter(col('T_CONTACT_ID') == '10229527487512')#LAST_MODIFICATION
# temp = df_ini.filter(col(year('LAST_UPDATE')) == 2024)
# display(temp.limit(20))

In [None]:
# Creamos la maestra de valores modificados
#df_contacts_dirty = df_contacts.filter(df_spark["MAIN_ADDR_COUNTRY"].isin("valor_deseado") )
dirty_cities = df_contacts.groupBy("MAIN_ADDR_TOWN_").count().orderBy(col("count").desc())#.\
    #filter(col('MAIN_ADDR_TOWN_') == lit('Tolima'))

inpath = "abfss://storagebi@tbdwhstorage01.dfs.core.windows.net/Modelos_analitica/Maestras_municipios/"

write_table = dirty_cities.coalesce(1) # coalesce indica que la tabla se va a entregar en una sola particion
write_table.write.csv(inpath + "dirty_cities", header=True, mode="overwrite", sep='|')

#cambio de nombre
files=dbutils.fs.ls(inpath + 'dirty_cities/')
output_file= [x for x in files if x.name.startswith("part-")]

# EN la misma carpeta de origen
dbutils.fs.mv(output_file[0].path, f"{inpath + 'dirty_cities/'}/dirty_cities.csv")

#dirty_cities.write.format("csv").option("header", "true").mode("overwrite").option("sep", "|").save(inpath + "dirty_cities.csv")
#dirty_cities.write.csv(inpath + "dirty_cities.csv", header=True, mode="overwrite", sep='|')
dirty_cities = dirty_cities.toPandas()

#display(dirty_cities.limit(10))
print(dirty_cities.head(5))

  MAIN_ADDR_TOWN_    count
0            None  1421197
1          BOGOTA   623072
2     BOGOTÁ D.C.   248772
3          BOGOTÁ   113745
4    BARRANQUILLA   112711


In [None]:
####  LECTURA DE CSV's REQUERIDOS  ###

## Codigos postales
codpost = spark.read.csv(inpath + 'codpost.csv', sep = '|', header=True, inferSchema=True)
print(codpost.count())
codpost = codpost.toPandas()
#print(codpost.head(5))


## maestra de municipios de Colombia
dim_towns = spark.read.csv(inpath + 'dim_municipios.csv', sep = '|', header=True, inferSchema=True)
print(dim_towns.count())
dim_towns = dim_towns.toPandas()
#print(dim_towns.head(5))


## localidades de Bogota
loc_bog = spark.read.csv(inpath + 'localidades_bogota.csv', sep = '|', header=True, inferSchema=True)
print(loc_bog.count())
loc_bog = loc_bog.toPandas()
#print(loc_bog.head(5))

3681
1044
20


In [None]:
####  FUNCIONES  ####

# Limpieza de texto
def limpiar_texto(texto):
    if texto is None:
        return 'Sin Registro'
    
    # Mayuscula
    texto_mayus = str.upper(texto)

    # Eliminar valores numéricos utilizando expresiones regulares
    texto_sin_numeros = re.sub(r'\d+', '', str(texto_mayus))

    # Remover tildes y caracteres especiales de entonacion
    texto_sin_tildes = unidecode(texto_sin_numeros)
    #texto_sin_tildes = unidecode(texto)

    # Quitar caracteres no alfabéticos y no espacios en blanco
    texto_limpio = re.sub(r'[^a-zA-Z\s]', '', texto_sin_tildes)

    # Aplicar trim para eliminar espacios en blanco al principio y al final
    texto_limpio = texto_limpio.strip()
    
    return texto_limpio


# Funcion like ampliada
def asignar_ciudad(row):
    # el diccionario se usa para buscar una coincidencia parcial entre las claves y los valores en la columna
    condiciones = {
        'BOGO': 'BOGOTA',
        'GOTA': 'BOGOTA',
        'LOCALID': 'BOGOTA',
        'MEDEL': 'MEDELLIN',
        'COMUNA': 'MEDELLIN',
        'POBLAD': 'MEDELLIN',
        'CALIFORNIA': 'SACRAMENTO',
        'MEXICALI': 'MEXICALI',
        'CALI': 'CALI',
        'DUPAR': 'VALLEDUPAR',
        'VALLE': 'CALI',
        'CARTAGE': 'CARTAGENA',
        'QUILLA': 'BARRANQUILLA',
        'BARRANQ': 'BARRANQUILLA',
        'VILLAVI': 'VILLAVICENCIO',
        'YOPAL': 'YOPAL',
        'YPAL': 'YOPAL',
        'TUNJA': 'TUNJA',
        'CUCUT': 'CUCUTA',
        'MANIZA': 'MANIZALES',
        'IBAG': 'IBAGUE',
        'NEIV': 'NEIVA', 
        'MARTA': 'SANTA MARTA',
        'BUCARA': 'BUCARAMANGA',
        'MONTERI': 'MONTERIA',
        'POPAYA': 'POPAYAN',
        'ARMENI': 'ARMENIA',
        'PEREIR': 'PEREIRA',
        'UBATE': 'UBATE',
        'CHIA': 'CHIA',
        'SOLEDA': 'SOLEDAD',
        'BELLO': 'BELLO',
        'SOACH': 'SOACHA',
        'BUGA': 'BUGA',
        'PAST': 'PASTO',
        'MANGA': 'BUCARAMANGA',
        'SOGAMO': 'SOGAMOSO',
        'DUITA': 'DUITAMA',
        # Puedes agregar más condiciones aquí según sea necesario
    }
    
    for condicion, ciudad in condiciones.items():
        if condicion in row:
            return ciudad
    
    return row


# Elimina la palabra que pertenezca a la lista
def eliminar_departamento(row):
    regiones = ['AMAZONAS','ANTIOQUIA', 'ARAUCA','ATLANTICO','BOLIVAR','BOYACA','CALDAS','CAQUETA',
                'CASANARE','CAUCA','CESAR','CHOCO','CUNDINAMARCA','CORDOBA','GUAINIA','GUAVIARE','HUILA',
                'LA GUAJIRA','MAGDALENA','META','NARINO','NORTE DE SANTANDER','PUTUMAYO','QUINDIO',
                'RISARALDA','ISLA','ISLAS','SANTANDER','SUCRE','TOLIMA','VALLE DEL CAUCA','VAUPES','VICHADA']

    # Crear un diccionario donde las claves sean los valores de la lista y los valores sean None
    dic= {region: '' for region in regiones}
    
    for condicion, ciudad in dic.items():
        if condicion in row:
            row = row.replace(condicion, ciudad).strip()
    
    return row


# Reemplaza algunas versiones adicionales de ciudad
def replace_cities_aliases(ciudad):
    # bogota_aliases = ['BOOGOTA','BTA','BOGTA','BOG','BOOGTA','BGTA','BGOTA','BGA','CUNDINAMARCA','BGOOTA','BOOTA',
    #                   'BIGOTA','BOGITA','CEDRITOS','BOBOTA','B OGOTA','BOGATA','BOHOTA','BOGPTA',]
    bogota_aliases = ['BOTOTA','BTA','BOGTA','BOG','BOOGTA','BGTA','KENEDY','BGA','CUNDINAMARCA','BGOOTA','BOOTA','BOFOTA','BOGITA','CEDRITOS','BOBOTA','NOGOTA','BOGATA','BOHOTA','BOGPTA','CIUDAD VERDE','CIUDADVERDE','TINTAL','HAYUELOS','SALITRE']
    barranquilla_aliases = ['ATLANTICO','BAQ','BQLLLA','BQ','BQA','BQLLA','BARRAQNUILLA','BARRANUILLA','BQUILLLA']
    cali_aliases = ['VALLE DEL CAUCA']
    cartagena_aliases = ['CTG']
    ibague_aliases = ['IABGUE']
    medellin_aliases = ['ANTIOQUIA','POBLADO','MED','MDELLIN','RIO NEGRO']
    santanderdequilichao = ['DE QUILICHAO']
    if ciudad in bogota_aliases:
        return 'BOGOTA'
    elif ciudad in barranquilla_aliases:
        return 'BARRANQUILLA'
    elif ciudad in cali_aliases:
        return 'CALI'
    elif ciudad in cartagena_aliases:
        return 'CARTAGENA'
    elif ciudad in medellin_aliases:
        return 'MEDELLIN'
    elif ciudad in ibague_aliases:
        return 'IBAGUE'
    elif ciudad in santanderdequilichao:
        return 'SANTANDER DE QUILICHAO'
    else:
        return ciudad

In [None]:
## Transformacion 1: Codigos postales

# left join de dirty_cities con codigo postal
cities_ = pd.merge(dirty_cities, codpost[['Codigo Postal', 'Ciudad']],\
                          left_on='MAIN_ADDR_TOWN_', right_on= codpost['Codigo Postal'].astype(str),\
                          how='left')

# Rellenar los valores nulos en 'Ciudad' con los valores de 'MAIN_ADDR_TOWN'
cities_['Ciudad'].fillna(cities_['MAIN_ADDR_TOWN_'], inplace=True)

print(cities_.count())
print(cities_['Ciudad'].unique())

MAIN_ADDR_TOWN_    36867
count              36868
Codigo Postal        531
Ciudad             36867
dtype: int64
[None 'BOGOTA' 'BOGOTÁ D.C.' ... 'EL RUBY' 'CUCUTA/LOS PATIOS '
 ' WASHINGTON D. C.,VAIL']


In [None]:
# tmp = cities_['Ciudad'].unique()
# tmp = spark.createDataFrame(tmp)

# display(tmp)


In [None]:
## Transformacion 2: limpieza de texto y join con localidades bogota

total = sum(cities_['count'])
#print('num registros: ', total)

# Limpieza del campo ciudad de acuerdo con la funcion limpiar_texto
dim_municipios_ = cities_.sort_values(by='count', ascending=False).\
       assign(Ciudad = lambda x: x['Ciudad'].apply(limpiar_texto),
              PARTICIPACION = lambda x: x['count']/total,
              TOTAL = lambda x: np.cumsum(x['PARTICIPACION'])).\
       rename(columns = {'count': 'COUNT'})
       #groupby('DEPARTAMENTO').agg({'COUNT': 'sum','PORCENTAJE': 'sum'}).reset_index().\


# left join con localidades
dim_municipios_1 = pd.merge(dim_municipios_, loc_bog,\
                          left_on='Ciudad', right_on= 'LOCALIDAD',\
                          how='left')

# Rellenar los valores nulos en 'CIUDAD' (de loc_bog)con los valores de 'Ciudad'
dim_municipios_1 = dim_municipios_1.assign(CIUDAD = lambda x: x['CIUDAD'].fillna(dim_municipios_1['Ciudad'])).\
                                    assign(CIUDAD = lambda x: x['CIUDAD'].apply(asignar_ciudad)).\
                                    assign(CIUDAD = lambda x: x['CIUDAD'].apply(replace_cities_aliases))

print(dim_municipios_1)

              MAIN_ADDR_TOWN_    COUNT  ...  LOCALIDAD              CIUDAD
0                        None  1421197  ...        NaN        Sin Registro
1                      BOGOTA   623072  ...        NaN              BOGOTA
2                 BOGOTÁ D.C.   248772  ...        NaN              BOGOTA
3                      BOGOTÁ   113745  ...        NaN              BOGOTA
4                BARRANQUILLA   112711  ...        NaN        BARRANQUILLA
...                       ...      ...  ...        ...                 ...
36863   URB NORMANDIA/SOLEDAD        1  ...        NaN             SOLEDAD
36864                   GOREY        1  ...        NaN               GOREY
36865              Ruefenacht        1  ...        NaN          RUEFENACHT
36866             SCARRINGTON        1  ...        NaN         SCARRINGTON
36867   WASHINGTON D. C.,VAIL        1  ...        NaN  WASHINGTON D CVAIL

[36868 rows x 8 columns]


In [None]:
## Transformacion 3:  llave departamento para traer capital a los que tienen departamento en el campo ciudad
dim_regiones = dim_towns[['DEPARTAMENTO','CAPITAL']].drop_duplicates()
#print(dim_regiones)

dim_municipios = pd.merge(dim_municipios_1, dim_regiones,\
                          left_on='CIUDAD', right_on= 'DEPARTAMENTO',\
                          how='left')

# Rellenar los valores nulos en 'Ciudad' con los valores de 'MAIN_ADDR_TOWN'
dim_municipios['CAPITAL'].fillna(dim_municipios['CIUDAD'], inplace=True)

# Finalmente seleccionamos los campos necesarios
dim_municipios = dim_municipios[['MAIN_ADDR_TOWN_','COUNT','PARTICIPACION','CAPITAL']]

dim_municipios.rename(columns={'CAPITAL': 'CIUDAD'}, inplace=True)

print(dim_municipios)


              MAIN_ADDR_TOWN_    COUNT  PARTICIPACION              CIUDAD
0                        None  1421197   3.719936e-01        Sin Registro
1                      BOGOTA   623072   1.630870e-01              BOGOTA
2                 BOGOTÁ D.C.   248772   6.511524e-02              BOGOTA
3                      BOGOTÁ   113745   2.977237e-02              BOGOTA
4                BARRANQUILLA   112711   2.950173e-02        BARRANQUILLA
...                       ...      ...            ...                 ...
36863   URB NORMANDIA/SOLEDAD        1   2.617467e-07             SOLEDAD
36864                   GOREY        1   2.617467e-07               GOREY
36865              Ruefenacht        1   2.617467e-07          RUEFENACHT
36866             SCARRINGTON        1   2.617467e-07         SCARRINGTON
36867   WASHINGTON D. C.,VAIL        1   2.617467e-07  WASHINGTON D CVAIL

[36868 rows x 4 columns]


In [None]:
## Transformacion 4:  left join para validar cuales valores quedaron limpios y traer la region y el departamento
dim_municipios = pd.merge(dim_municipios, dim_towns,\
                          left_on='CIUDAD', right_on= 'MUNICIPIO',\
                          how='left')

# Traer los valores que ya sabiamos eran nulos o ''
dim_municipios['MUNICIPIO'] = np.where(
    (dim_municipios['CIUDAD'] == '') |
    (dim_municipios['CIUDAD'] == 'nan') |
    (dim_municipios['CIUDAD'] == 'None') |
    (dim_municipios['CIUDAD'].isnull()),
    None,
    dim_municipios['MUNICIPIO']
)
#print(dim_municipios.head(5))

# Valores con municipio matched
# prueba = dim_municipios[dim_municipios['MUNICIPIO'].notnull()]


# print('Registros: ', prueba['COUNT'].sum())
# print('Percent: ', prueba['PARTICIPACION'].sum())

In [None]:

# Finalmente seleccionamos los campos necesarios
dim_municipios = dim_municipios[['MAIN_ADDR_TOWN_','COUNT','PARTICIPACION','CIUDAD','MUNICIPIO','REGION','DEPARTAMENTO','CAPITAL','PAIS']]

# Valores con MUNICIPIO LIMPIO
notnull = dim_municipios[dim_municipios['MUNICIPIO'].notnull()]

print('Registros: ', notnull['COUNT'].sum())
print('Percent: ', notnull['PARTICIPACION'].sum())

#################
# Descriptivos
# print(notnull.head(5))
# print(' ')
# print('Dimensiones:')
print(notnull.shape)


Registros:  2289952
Percent:  0.5993873034020785
(11161, 9)


In [None]:
# Valores con MUNICIPIO sin limpiar
null = dim_municipios[dim_municipios['MUNICIPIO'].isnull()]

print('Registros: ', null['COUNT'].sum())
print('Percent: ', null['PARTICIPACION'].sum())

#################
# Descriptivos
print('Dimensiones:')
print(null.shape)

Registros:  1530536
Percent:  0.40061269659792137
Dimensiones:
(25707, 9)


In [None]:
# # Convertir el DataFrame de Pandas a un DataFrame de PySpark
# dim_municipios_ = spark.createDataFrame(dim_municipios)

# tmp = dim_municipios_.groupBy("MUNICIPIO").count().orderBy(col("count").desc())

# #display(contacts_silver.limit(100))
# display(tmp)

In [None]:
# Aplicamos la funcion de eliminar departamento para limpiar valores estilo: 'ABEJORRAL ANTIOQUIA'
null = null[['MAIN_ADDR_TOWN_','COUNT','PARTICIPACION','CIUDAD']].\
    assign(CIUDAD = lambda x: x['CIUDAD'].apply(eliminar_departamento))



# left join para traer la region y el departamento NUEVAMENTE
null = pd.merge(null, dim_towns,\
                    left_on='CIUDAD', right_on= 'MUNICIPIO',\
                    how='left')

null2 = null[null['MUNICIPIO'].notnull()]

print('Registros: ', null2['COUNT'].sum())
print('Percent: ', null2['PARTICIPACION'].sum())

# #################
# # Descriptivos
# print('Dimensiones:')
# print(null2.shape)

Registros:  9772
Percent:  0.0025577884291221437


In [None]:
# UNIMOS AMBAS TABLAS PARA TENER LA LIMPIEZA COMPLETA
dim_municipios_ = pd.concat([notnull, null2])

# Rellenar los valores nulos en 'Ciudad' con los valores de 'MAIN_ADDR_TOWN'
#dim_municipios_['Ciudad'].fillna(cities_['MAIN_ADDR_TOWN'], inplace=True)

# Convertir el DataFrame de Pandas a un DataFrame de PySpark
dim_municipios_ = spark.createDataFrame(dim_municipios_)
#display(dim_municipios_)

In [None]:
#prueba = dim_municipios.groupBy("MAIN_ADDR_TOWN").count().orderBy(F.col("count").desc())
#display(prueba)

In [None]:
### TABLA SILVER CONTACTS

# Pasamos a vista la tabla Contacts y la maestra dim_municipios
df_contacts.createOrReplaceTempView("df_contacts_vw")
dim_municipios_.createOrReplaceTempView("dim_municipios_vw")

# se hacen las transformaciones finales
contacts_silver = spark.sql("""
    select 
	    c.T_CONTACT_ID,
      c.CONTACT_NUMBER,
      c.FIRSTNAME, 
      c.LASTNAME,
      c.ID_NUMBER,
      c.BIRTHDATE as FECHA_DE_NACIMIENTO,
      -- Obtiene la edad en años basada en la diferencia entre la fecha actual y la fecha de nacimiento
      CASE 
            WHEN c.BIRTHDATE IS NULL THEN NULL
            ELSE FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12)
        END AS Edad,
        CASE 
            WHEN FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12) IS NULL THEN 'SIN DATO'
            WHEN FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12) < 18 THEN 'Menor de 18'
            WHEN FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12) <= 24 THEN '18 a 24'
            WHEN FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12) <= 34 THEN '25 a 34'
            WHEN FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12) <= 44 THEN '35 a 44'
            WHEN FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12) <= 54 THEN '45 a 54'
            WHEN FLOOR(DATEDIFF(MONTH, c.BIRTHDATE, GETDATE()) / 12) <= 64 THEN '55 a 64'
            ELSE 'Mayor a 64'
        END AS Rango_de_edad,
      c.EMAIL,
      c.NAT_NUMBER_CELLPHONE,
      CASE 
            WHEN c.GENDER = 'MALE' THEN 'HOMBRE'
            WHEN c.GENDER = 'FEMALE' THEN 'MUJER'
            WHEN c.GENDER = 'UNKNOWN' AND c.ADDRESS_SALUTATION IN ('Mr', 'Señor', 'Señor,', 'Don') THEN 'HOMBRE'
            WHEN c.GENDER = 'UNKNOWN' AND c.ADDRESS_SALUTATION IN ('Ms','Mrs', 'Miss', 'Señora', 'Señorita') THEN 'MUJER'
            WHEN c.GENDER = '' AND c.ADDRESS_SALUTATION IN ('Mr', 'Señor', 'Señor,', 'Don') THEN 'HOMBRE'
            WHEN c.GENDER = '' AND c.ADDRESS_SALUTATION IN ('Ms','Mrs', 'Miss', 'Señora', 'Señorita') THEN 'MUJER'
            WHEN COALESCE(c.GENDER, '') = '' AND c.ADDRESS_SALUTATION IN ('Mr', 'Señor', 'Señor,', 'Don') THEN 'HOMBRE'
            WHEN COALESCE(c.GENDER, '') = '' AND c.ADDRESS_SALUTATION IN ('Ms','Mrs', 'Miss', 'Señora', 'Señorita') THEN 'MUJER'
            WHEN c.GENDER IS NULL AND c.ADDRESS_SALUTATION IN ('Mr', 'Señor', 'Señor,', 'Don') THEN 'HOMBRE'
            WHEN c.GENDER IS NULL AND c.ADDRESS_SALUTATION IN ('Ms','Mrs', 'Miss', 'Señora', 'Señorita') THEN 'MUJER'
            ELSE 'SIN DATO'
        END AS Genero,
      c.ADDRESS_SALUTATION,
      c.MAIN_ADDR_LINE1,
      CONCAT(c.Direccion_new,', ',CASE 
            WHEN m.MUNICIPIO IS NULL THEN lower(c.MAIN_ADDR_TOWN_)
            ELSE lower(m.MUNICIPIO)
        END) AS Direccion_new,
      c.IS_GUEST,
      c.CREATED_DATE,
      c.LAST_UPDATE,
      c.LAST_MODIFICATION_FROM,
      c.CREATED_FROM,
      c.MAIN_ADDR_ZIPCODE,
      c.MAIN_ADDR_TOWN,
      c.MAIN_ADDR_GEO_ZONE,
      c.MAIN_ADDR_COUNTRY,
      c.MAIN_ADDR_TOWN_,
      CASE 
            WHEN m.MUNICIPIO IS NULL THEN c.MAIN_ADDR_TOWN_
            ELSE m.MUNICIPIO
        END AS CIUDAD,
      m.REGION,
      m.DEPARTAMENTO,
      m.CAPITAL,
      CASE 
            WHEN m.PAIS  IS NULL THEN c.MAIN_ADDR_COUNTRY
            ELSE m.PAIS
        END AS PAIS
    from df_contacts_vw c
    left join dim_municipios_vw m
      ON c.MAIN_ADDR_TOWN_ = m.MAIN_ADDR_TOWN_
    order by T_CONTACT_ID 
""") 

# Realizar el left join entre los DataFrames df1 y df2
#contacts_silver = df_contacts.join(dim_municipios, df_contacts['MAIN_ADDR_TOWN'] == dim_municipios['MAIN_ADDR_TOWN'], how='left')

print(contacts_silver.count())
#display(contacts_silver.limit(100))


3820488


In [None]:
# tmp = contacts_silver.filter(col('MAIN_ADDR_TOWN').isin('-SELECCIONAR-','COLOMBIA'))
#display(contacts_silver.limit(100))

In [None]:
#modificar colombia, seleccionar
#contacts_silver = contacts_silver.withColumn("CIUDAD", when(col("CIUDAD") == "-SELECCIONAR-", "Sin Registro").#otherwise(df["columna_a_reemplazar"]))

In [None]:
# Guarda el DataFrame en formato Parquet en Azure Blob Storage
inpath = "abfss://storagebi@tbdwhstorage01.dfs.core.windows.net/silver/Contacts/"
write_table = contacts_silver.coalesce(1) # coalesce indica que la tabla se va a entregar en una sola particion
write_table.write.mode("overwrite").parquet(inpath)

#cambio de nombre
files=dbutils.fs.ls(inpath)
output_file= [x for x in files if x.name.startswith("part-")]
 
# EN la misma carpeta de origen
dbutils.fs.mv(output_file[0].path, f"{inpath}/Ds_S_Contacts.parquet")

True

### Prueba para clientes nuevos ultimos 15 dias

In [None]:
# from pyspark.sql.functions import col, current_date, date_sub

# # Calcular la fecha hace 15 días
# fecha_limite = date_sub(current_date(), 15)

# # Filtrar los registros de los últimos 15 días
# contacts_ultimos_15_dias = contacts_silver.filter(col("CREATED_DATE") >= fecha_limite)
