In [None]:
# Instalación de Java con apt-get.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Configuración del entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
# Instalamos los paquetes de PySpark.
!pip install -q pyspark
!pip install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# Librerías
import pyspark
from pyspark.sql.window import Window
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract, regexp_replace, arrays_zip, explode, upper, split, asc, to_date, lit, trim, pandas_udf
from pyspark.sql.functions import format_number, row_number
from pyspark.sql.types import FloatType
import pandas as pd
from pathlib import Path
import psutil
import json
import pandas as pd
from datetime import datetime

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Inicio el contexto en Spark
spark = SparkSession.builder \
      .master("local[3]") \
      .appName("minTrabajoJson.com") \
      .config("spark.some.config.option", "config-value") \
      .getOrCreate()

In [None]:
#print(f"RAM: {psutil.virtual_memory()[2]}%")

directorioDocumentos = '/content/drive/MyDrive/Bextsa/MinTrabajo/archivos'
listaArchivos = pd.Series([archivo.as_posix() for archivo in Path(directorioDocumentos.replace(":","")).glob('*/*')])
listaArchivos = listaArchivos.apply(lambda x: x.replace(":",""))
#listaArchivos.values
#print(f"RAM: {psutil.virtual_memory()[2]}%")

In [None]:
def aplicar_tiempo(fecha):
  cadena_tiempo = fecha.split('/')
  dia, mes, anio, horas = cadena_tiempo[0], cadena_tiempo[1], cadena_tiempo[2].split(' ')[0], cadena_tiempo[2].split(' ')[1]
  tiempo = datetime.strptime(f"{anio}/{mes}/{dia} {horas}", "%Y/%m/%d %H:%M:%S").timestamp()
  return tiempo

def obtener_duracion(fechaLista):
  inicio = aplicar_tiempo(fechaLista[0].strip())
  final = aplicar_tiempo(fechaLista[len(fechaLista)-1].strip())
  return (final-inicio)/60

@pandas_udf(FloatType())
def duracion(s:pd.Series)->pd.Series:
  return s.apply(obtener_duracion)



In [None]:
dfs = spark.read.option('multiline','true').json('/content/drive/MyDrive/Bextsa/MinTrabajo/Prueba_30_6.json')\
            .select('ChatRecordList','id')
dfs.show(1)
dfsPrincipal = dfs.select(col('ChatRecordList.UserInput').alias('userInput'),
                          col('ChatRecordList.TimeLog').alias('timeLog'),
                          col('ChatRecordList.id').alias('id'))\
                  .withColumn('aux',arrays_zip('userInput','timeLog','id'))\
                  .withColumn('aux',explode('aux'))\
                  .select(upper(col("aux.userInput")).alias('conversaciones'),
                          trim(col("aux.timeLog")).alias('tiempo'),
                          split(trim(col("aux.timeLog")),' ').getItem(0).alias('fecha'),
                          regexp_replace(split(trim(col("aux.timeLog")),' ').getItem(0),"(\D)","").alias('fechaCargue'),
                          col('aux.id').alias('id'),
                          lit(None).alias('agilidad/claridad'))
dfsPrincipal.show(1)
dfsDuracion = dfs.select('id', col('ChatRecordList.TimeLog').alias('timeLog'))\
                  .select('id',format_number(duracion('timeLog'),3).alias('duracion'))
dfsDuracion.show(1)
#print(f"RAM: {psutil.virtual_memory()[2]}%")

+--------------------+--------------------+
|      ChatRecordList|                  id|
+--------------------+--------------------+
|[{\n\n😃 Por favo...|457f7470-a0bc-11e...|
+--------------------+--------------------+
only showing top 1 row

+--------------------+--------------------+----------+-----------+--------------------+-----------------+
|      conversaciones|              tiempo|     fecha|fechaCargue|                  id|agilidad/claridad|
+--------------------+--------------------+----------+-----------+--------------------+-----------------+
|INICIAR CONVERSACIÓN|30/01/2023 11:36:...|30/01/2023|   30012023|457f7470-a0bc-11e...|             null|
+--------------------+--------------------+----------+-----------+--------------------+-----------------+
only showing top 1 row

+--------------------+--------+
|                  id|duracion|
+--------------------+--------+
|457f7470-a0bc-11e...|   1.283|
+--------------------+--------+
only showing top 1 row



In [None]:
ventanaParticion = Window.partitionBy('id').orderBy(col('fecha'))
listaTipo = ['1.CONSULTAS GENERALES.','2. RADICACIÓN DE  PQRSD.','3.TRÁMITES Y SERVICIOS.']
consultasGenerales = r'(\d)(\.)(\D)|(\d)(\d)(\.)(\D)'
#consultasGeneralesSiguientes = r'(\d)(\.)(\d)'
# 1. Extraer el tipo de consulta (general, pqrsd o trámites)
dfsTipo = dfsPrincipal.filter(col('conversaciones').isin(listaTipo))\
            .select('id', 'conversaciones', 'agilidad/claridad','fecha', 
                    'tiempo', 'fechaCargue', lit('TIPO').alias('tipo_msg'), )
dfsTipo.show(1)
# 2. Extraer el tipo de consulta general (1-10)
dfsConsultasGenerales = dfsPrincipal.filter(~col('conversaciones').isin(listaTipo))\
                          .filter(col('conversaciones').rlike(consultasGenerales))\
                          .select('id', 'conversaciones','agilidad/claridad', 'fecha', 
                                  'tiempo', 'fechaCargue', lit('CONG').alias('tipo_msg'))
dfsConsultasGenerales.show(1)
# 3. Extraer la agilidad
dfsAgilidad = dfsPrincipal.filter(col('conversaciones').contains('AGILIDAD'))\
                .select('id', 'conversaciones', 'agilidad/claridad',
                        'fecha', 'tiempo', 'fechaCargue', lit('AGIL').alias('tipo_msg'))\
                .withColumn('agilidad/claridad',regexp_replace(col('conversaciones'),"(\D)","").cast('float'))\
                .withColumn('conversaciones',regexp_replace(col('conversaciones'),"(\D)",""))
dfsAgilidad.show(1)
# 4. Extraer claridad
dfsClaridad = dfsPrincipal.filter(col('conversaciones').contains('CLARIDAD'))\
                .select('id', 'conversaciones', 'agilidad/claridad',
                        'fecha', 'tiempo', 'fechaCargue', lit('CLAR').alias('tipo_msg'))\
                .withColumn('agilidad/claridad',regexp_replace(col('conversaciones'),"(\D)","").cast('float'))\
                .withColumn('conversaciones',regexp_replace(col('conversaciones'),"(\D)",""))
dfsClaridad.show(1)
# 5. Extraer si precisa asesoramiento
dfsAsesor = dfsPrincipal.filter(col('conversaciones').contains('COMUNICARME CON UN ASESOR'))\
              .select('id', 'conversaciones', 'agilidad/claridad',
                      'fecha', 'tiempo', 'fechaCargue', lit('ASES').alias('tipo_msg'))
dfsAsesor.show(1)
# 6. Extraer si descarga la conversación
dfsDescargar = dfsPrincipal.filter(col('conversaciones').contains('DESCARGAR'))\
              .select('id', 'conversaciones', 'agilidad/claridad',
                      'fecha', 'tiempo', 'fechaCargue', lit('DESC').alias('tipo_msg'))
dfsDescargar.show(1)
# 7. Extraer final
dfsConversacion = dfsPrincipal.select('id', 'conversaciones', 'agilidad/claridad',
                       'fecha', 'tiempo', 'fechaCargue', lit(None).alias('tipo_msg'))\
              .filter(~col('conversaciones').isin(listaTipo))\
              .filter(~col('conversaciones').rlike(consultasGenerales))\
              .filter(~col('conversaciones').contains('AGILIDAD'))\
              .filter(~col('conversaciones').contains('CLARIDAD'))\
              .filter(~col('conversaciones').contains('COMUNICARME CON UN ASESOR'))\
              .filter(~col('conversaciones').contains('DESCARGAR'))\
              .union(dfsTipo)\
              .union(dfsConsultasGenerales)\
              .union(dfsAgilidad)\
              .union(dfsClaridad)\
              .union(dfsAsesor)\
              .union(dfsDescargar)\
              .withColumn('row',row_number().over(ventanaParticion))

dfsConversacion.show(1)

+--------------------+--------------------+-----------------+----------+--------------------+-----------+--------+
|                  id|      conversaciones|agilidad/claridad|     fecha|              tiempo|fechaCargue|tipo_msg|
+--------------------+--------------------+-----------------+----------+--------------------+-----------+--------+
|457f7470-a0bc-11e...|1.CONSULTAS GENER...|             null|30/01/2023|30/01/2023 11:36:...|   30012023|    TIPO|
+--------------------+--------------------+-----------------+----------+--------------------+-----------+--------+
only showing top 1 row

+--------------------+----------------+-----------------+----------+--------------------+-----------+--------+
|                  id|  conversaciones|agilidad/claridad|     fecha|              tiempo|fechaCargue|tipo_msg|
+--------------------+----------------+-----------------+----------+--------------------+-----------+--------+
|457f7470-a0bc-11e...|6. INCAPACIDADES|             null|30/01/2023|

In [None]:
joinConditions = [
    dfsConversacion.id==dfsDuracion.id,
    dfsConversacion.row==1
]
dfsFinal = dfsConversacion.join(dfsDuracion,joinConditions,'left')\
                  .select(dfsConversacion.id,
                          'conversaciones',
                          'agilidad/claridad',
                          dfsDuracion.duracion,
                          'tiempo',
                          'fecha',
                          'tipo_msg',
                          'fechaCargue'
                          )

In [None]:
dfsFinal.sort('duracion').toPandas()#.to_csv('/content/drive/MyDrive/Bextsa/MinTrabajo/csv/prueba.csv',index=False)

Unnamed: 0,id,conversaciones,agilidad/claridad,duracion,tiempo,fecha,tipo_msg,fechaCargue
0,075c9760-a3da-11ed-8afb-1f71a0866db9|livechat,1.1 ¿QUÉ ES UN CONTRATO DE TRABAJO?,,,03/02/2023 10:47:38 -05:00,03/02/2023,,03022023
1,075c9760-a3da-11ed-8afb-1f71a0866db9|livechat,1.1.4 ¿QUÉ DIFERENCIA HAY ENTRE UN CONTRATO DE...,,,03/02/2023 10:47:48 -05:00,03/02/2023,,03022023
2,075c9760-a3da-11ed-8afb-1f71a0866db9|livechat,1.CONSULTAS GENERALES.,,,03/02/2023 10:47:29 -05:00,03/02/2023,TIPO,03022023
3,075c9760-a3da-11ed-8afb-1f71a0866db9|livechat,1. CONTRATO DE TRABAJO,,,03/02/2023 10:47:33 -05:00,03/02/2023,CONG,03022023
4,11cc2000-a3e2-11ed-8afb-1f71a0866db9|livechat,1.1 ¿QUÉ ES UN CONTRATO DE TRABAJO?,,,03/02/2023 11:45:17 -05:00,03/02/2023,,03022023
...,...,...,...,...,...,...,...,...
540,44Xp0rAnyuCB5NGQWp3BBN-us,INICIAR CONVERSACIÓN,,5.833,30/01/2023 15:51:10 -05:00,30/01/2023,,30012023
541,9LA10zzS2ZoIFsB3114vP1-us,INICIAR CONVERSACIÓN,,6.067,02/02/2023 09:26:45 -05:00,02/02/2023,,02022023
542,ERSLdXuOvvH1RE0hi6quvt-us,INICIAR CONVERSACIÓN,,6.383,01/02/2023 09:57:56 -05:00,01/02/2023,,01022023
543,E2mRXR9lJRuFYwwybPlp8E-us,INICIAR CONVERSACIÓN,,8.767,01/02/2023 17:04:18 -05:00,01/02/2023,,01022023


In [None]:
!pip install azure-cosmos==4.3.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import azure.cosmos.cosmos_client as cosmos_client
from azure.cosmos import CosmosClient, PartitionKey, exceptions

In [None]:
cosmosEndpoint = 'ENDPOINT'
cosmosMasterKey = 'SECRET OR KEY'
cosmosDatabaseName = 'Database name'
cosmosContainerName = 'Container name' 
client = cosmos_client.CosmosClient(cosmosEndpoint, {'masterKey': cosmosMasterKey})
dbClient = client.get_database_client(cosmosDatabaseName)
containerClient = dbClient.get_container_client(cosmosContainerName)
for items in containerClient.query_items(
        query='SELECT * FROM c',
        enable_cross_partition_query = True):
    print(items)