# Imports

In [1]:
%load_ext autoreload
%autoreload 2
import sys
from nlppen.extraccion.utils.Txt2Numbers import Txt2Numbers
from nlppen.analisis import Analisis
from nlppen.seleccion import Seleccion
from nlppen.spark_udfs import solo_portanto, solo_considerando, solo_resultando, solo_encabezado, spark_get_spacy
from nlppen.sentencias_estructurales import SentenciasEstructurales
from pyspark.sql import SparkSession
from nlppen.extraccion.utils.extraerFechaRecibido import ExtraerFecha
from pyspark.sql.functions import length
from nlppen.extraccion.utils.misc import splitResolucion

import pandas as pd


## Configuracion Spark

In [2]:
spark = (SparkSession
         .builder
         .appName("Transforming sentences")
         .config("spark.num.executors", "6")
         .config("spark.executor.memory", "6g")
         .config("spark.executor.cores", "12")
         .config("spark.driver.memory", "12g")
         .config("spark.memory.offHeap.enabled", True)
         .config("spark.memory.offHeap.size", "16g")
         .config("spark.sql.execution.arrow.pyspark.enabled", "true")
         .getOrCreate())

sc = spark.sparkContext
sc.uiWebUrl

21/11/30 14:58:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/30 14:58:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


'http://7416269fd645:4041'

# Buscar terminos en la sección de por lo tanto de la sentencia y aplicar filtro del dataset

In [3]:
ejecutar = False
if ejecutar:
    terminos = {
        'se ordena': [r'\bse ordena\b', r'\bse le ordena\b', r'\bse les ordena\b'],
        'se condena': [r'\bse condena\b', r'\bse le condena\b', r'\bse les condena\b'],
        'plan': [r'\bplan\b'],
        'plazo': [r'\bplazo\b', r'\bt[éermino (improrrogable)? (de)?\b']
    }
    seleccion = Seleccion(terminos, spark, parquet_path='./dataset/partition', datasets_path='./datasets/estructurales')
    print("Cantidad elementos originales : " + str(seleccion.sdf.count()))
    seleccion.filtrar_sentencias(preprocess=solo_portanto, keepRowEmpty=True)
    print("Cantidad elementos despues de filtrados : " + str(seleccion.sdf.count()))
    estructurales = SentenciasEstructurales(seleccion)

# Procesamiento de todas las variables de sentencias estructurales

## La siguiente celda ejecuta el cálculo de todas las variables definidas para las sentencias estructurales

Nota: Modificar la bandera "ejecutar" para aplicar todo el procesamiento.

In [4]:
from pyspark.sql.types import *

if ejecutar:
    columnas = {
        'se ordena PER' : ArrayType(StringType()),
        'se ordena LOC' : ArrayType(StringType()),
        'se ordena ORG' : ArrayType(StringType()),
        'se ordena MISC' : ArrayType(StringType()),
        'se ordena GPE' : ArrayType(StringType()),
        'se ordena Ent Pub' : ArrayType(StringType())
    }
    estructurales.separarSeOrdena(columnas, True, True)

    columnas = {
        'se condena PER' : ArrayType(StringType()),
        'se condena LOC' : ArrayType(StringType()),
        'se condena ORG' : ArrayType(StringType()),
        'se condena MISC' : ArrayType(StringType()),
        'se condena GPE' : ArrayType(StringType()),
        'se condena Ent Pub' : ArrayType(StringType())
    }
    estructurales.separarSeCondena(columnas, True, True)

    columnas = {
        'extension sentencia' : IntegerType(),
        'extension por lo tanto' : IntegerType()
    }
    estructurales.extraerExtension(columnas, True)

    columnas = {
        'plazosDefinidos' : ArrayType(TimestampType()) #Correción día 27-11-2021. Correr de nuevo
    }
    estructurales.plazosDefinidos(columnas, True)

    columnas = {
        'FechaSolicitud' : TimestampType(),
    }

    estructurales.extrarFechaRecibido(columnas, True)

    columnas = {
        'num resolucion' : StringType()
    }

    estructurales.extraerNumeroSentencia(columnas, True)

    columnas = {
        'inst internacionales' : ArrayType(StringType())
    }

    estructurales.extraerInstrumentosInternacionales(columnas, True)

    columnas = {
        'derechos Norm' : ArrayType(StringType()),
        'derechos GenXPat' : ArrayType(StringType())
    }
    estructurales.extraerDerechos(columnas, True)

    columnas = {
        'derechos Acotados' : ArrayType(StringType()),
        'derechos General' : ArrayType(StringType()),
        'derechos Fundamental' : ArrayType(StringType()),
        'derechos Humano' : ArrayType(StringType())
    }

    estructurales.extraerDerechosSinNormalizar(columnas, True)
    
    sentencias = pd.read_csv("listaSentencias.csv", sep=';', encoding='latin-1')
    columnas = {
        'id_sentencia' : StringType(),
        'num_doc_oficial' : StringType(),
        'expediente_oficial' : StringType()
    }
    estructurales.agregarIDSentencia(columnas, sentencias, True)
    
    columnas = {
        'citasVotDate' : MapType(StringType(), TimestampType()), #Dicc de Numero de voto y Fecha
        'citasIDVoto' : MapType(StringType(), StringType()) #Dicc de id_Sentencia y # de voto
    }
    estructurales.extrarCitaSentenciasFecha(columnas, sentencias, True)
    
    columnas = {
        'seguimientoExt' : StringType(), #Texto extraido de seguimiento
        'seguimientoValue' : IntegerType() #Cantidad de ocurrencias
    }
    estructurales.extraerSeguimiento(columnas, True)
    #estructurales.seleccion.sdf.toPandas().head(5)
    #Sobreescribir el dataset de filtro de sentencias con las nuevas columnas
    #estructurales.seleccion.guardarDatos()

In [5]:
#writer = estructurales.seleccion.sdf.write.partitionBy("anno").mode('Overwrite').parquet('./dataset/procesado')

In [6]:
#df = spark.read.parquet("./dataset/procesado")
#df.toPandas().head()

## Guardar los resultados del preprocesamiento por año

In [7]:
#if ejecutar:
#    df = spark.read.parquet(seleccion.datasets_path+"/terminos.parquet")
#    writer = df.write.partitionBy("anno").mode('Overwrite').parquet('./dataset/procesado')


## Cargar los resultados preprocesados y divididos por año

In [8]:
s = estructurales.seleccion.sdf
s.filter(s.anno == "1998").limit(5).toPandas()

In [9]:
estructurales.seleccion.sdf.select("id_sentencia", "expediente_oficial").toPandas().head(5)

                                                                                

Unnamed: 0,id_sentencia,expediente_oficial
0,sen-1-0007-327374,050034420007CO
1,sen-1-0007-577976,040009040007CO
2,sen-1-0007-305337,040129030007CO
3,sen-1-0007-302317,040123680007CO
4,sen-1-0007-327684,050133110007CO


In [10]:
s = estructurales.seleccion.sdf

In [12]:
s.filter(s.plazosDefinidos.isNotNull()).count()

631

## Ejecutar un criterio

In [48]:
from pyspark.sql.types import *
"""
if not ejecutar:
    columnas = {
        'seguimientoExt2' : StringType(), #Texto extraido de seguimiento
        'seguimientoValue2' : IntegerType() #Cantidad de ocurrencias
    }
    estructurales.extraerSeguimiento(columnas, True, False)
    s = estructurales.seleccion.sdf
    s.toPandas().head(5)
    """

"\nif not ejecutar:\n    columnas = {\n        'seguimientoExt2' : StringType(), #Texto extraido de seguimiento\n        'seguimientoValue2' : IntegerType() #Cantidad de ocurrencias\n    }\n    estructurales.extraerSeguimiento(columnas, True, False)\n    s = estructurales.seleccion.sdf\n    s.toPandas().head(5)\n    "

In [49]:
s.toPandas().head(5)

  Unsupported type in conversion to Arrow: ArrayType(TimestampType,true)
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,archivo,txt,tema,tema_prob,expediente,fechahora_ext,redactor_ext,tipoResolucion_ext,termino_ext,voto_salvado,...,derechos_Fundamental,derechos_Humano,id_sentencia,num_doc_oficial,expediente_oficial,citasVotDate,citasIDVoto,seguimientoExt,seguimientoValue,anno
0,6E317.html,*050034420007CO* Exp: 05-003442-0007-CO Res. N...,TRABAJO,0.457667,050034420007CO,NaT,Cruz Castro,Recurso de Amparo,Con lugar,False,...,,,sen-1-0007-327374,14244,050034420007CO,,,,0,2005
1,C27B5.html,Exp: 04-000904-0007-CO Res: 2005-07158 SALA CO...,PETICION,0.086251,040009040007CO,2005-06-08 08:42:00,Solano Carrera,Acción de Inconstitucionalidad,Con lugar,False,...,[derecho fundamental a un ambiente sano y],,sen-1-0007-577976,7158,040009040007CO,"{'1413—98': None, '440—00': None, '2345—96': N...","{'sen-1-0007-81976': '2345', 'sen-1-0007-23715...",,0,2005
2,65301.html,Exp: 04-012903-0007-CO Res: 2005-03407 SALA CO...,TRABAJO,0.465011,040129030007CO,2005-03-29 18:32:00,Jinesta Lobo,Recurso de Amparo,Con lugar,False,...,,,sen-1-0007-305337,3407,040129030007CO,,,,0,2005
3,63F90.html,Exp: 04-012368-0007-CO Res: 2005-00764 SALA CO...,TRABAJO,0.839489,040123680007CO,2005-01-28 10:06:00,Jinesta Lobo,Recurso de Amparo,Con lugar,False,...,,,sen-1-0007-302317,764,040123680007CO,"{'2001-7309': None, '2002-4842': 2002-05-21 16...","{'sen-1-0007-201367': '4842', 'sen-1-0007-1692...",,0,2005
4,6E46E.html,*050133110007CO* Exp: 05-013311-0007-CO Res. N...,TRABAJO,0.590432,050133110007CO,2005-11-09 12:05:00,Cruz Castro,Recurso de Amparo,Con lugar,False,...,,,sen-1-0007-327684,15447,050133110007CO,,,,0,2005


In [83]:
s = estructurales.seleccion.sdf
print(s.filter(s.plazo > 0).count())
print(s.filter(s.plazosDefinidos.isNotNull()).count())
s.filter(s.plazosDefinidos.isNotNull()).filter(s.plazo == 0).select( "id_sentencia", "plazosDefinidos" , "plazo").toPandas()

709
631


Unnamed: 0,id_sentencia,plazosDefinidos,plazo


# Cargar procesamiento de sentencias estructurales.

# Zona de trabajo y cosas utiles

### Sobreescribir el dataset

In [69]:
#writer = estructurales.seleccion.sdf.write.partitionBy("anno").mode('Overwrite').parquet('./dataset/procesado')

### Sobreescribir terminos

In [71]:
"""terminos = {
        'se ordena': [r'\bse ordena\b', r'\bse le ordena\b', r'\bse les ordena\b'],
        'se condena': [r'\bse condena\b', r'\bse le condena\b', r'\bse les condena\b'],
        'plan': [r'\bplan\b'],
        'plazo': [r'\bplazo\b', r'\bt[éermino (improrrogable)? (de)?\b']
    }
    seleccion = Seleccion(terminos, spark, parquet_path='./dataset/procesado', datasets_path='./datasets/estructuralesx')
    #seleccion = Seleccion(terminos, spark, parquet_path='./dataset/partition', datasets_path='./datasets/estructurales')
    print("Cantidad elementos originales : " + str(seleccion.sdf.count()))
    seleccion.filtrar_sentencias(preprocess=solo_portanto, precargar=False, keepRowEmpty=True, overwriteColumns=True)
    estructurales.seleccion = seleccion"""

'terminos = {\n        \'se ordena\': [r\'\x08se ordena\x08\', r\'\x08se le ordena\x08\', r\'\x08se les ordena\x08\'],\n        \'se condena\': [r\'\x08se condena\x08\', r\'\x08se le condena\x08\', r\'\x08se les condena\x08\'],\n        \'plan\': [r\'\x08plan\x08\'],\n        \'plazo\': [r\'\x08plazo\x08\', r\'\x08t[éermino (improrrogable)? (de)?\x08\']\n    }\n    seleccion = Seleccion(terminos, spark, parquet_path=\'./dataset/procesado\', datasets_path=\'./datasets/estructuralesx\')\n    #seleccion = Seleccion(terminos, spark, parquet_path=\'./dataset/partition\', datasets_path=\'./datasets/estructurales\')\n    print("Cantidad elementos originales : " + str(seleccion.sdf.count()))\n    seleccion.filtrar_sentencias(preprocess=solo_portanto, precargar=False, keepRowEmpty=True, overwriteColumns=True)\n    estructurales.seleccion = seleccion'

### Funciones pandas y visualizacion de datos

In [None]:
"""from pyspark.sql.functions import explode, desc
from pyspark.sql import functions as F
import pandas as pd
pd.set_option('display.max_rows', 50)
pd.set_option('display.max_columns', 30)
pd.set_option('display.width', 80)
pd.set_option('display.max_colwidth', 80)
pd.set_option('display.width', None)
"""


### Funcion de testing

In [115]:
from spacy.matcher import Matcher
from spacy.tokens import Span
from nlppen.extraccion.utils.Txt2Numbers import Txt2Numbers
from nlppen.extraccion.utils.Txt2Date import Txt2Date
import re
def testing(txt):
    #print(txt)
    regularExpresion = re.compile(r"[MDCLXVI]+[\.-]+.+?(?=\.)")
    extraccion = regularExpresion.findall(txt)
    for match in extraccion:
        print("-*-*-*-*-*")
        print(match)
    
    

### Recorrer dataset con una funcion de testing

In [117]:
citas = s.filter(s.anno == "2000").toPandas()
#citas = s.filter(s.CitaSentencias_ents.isNotNull()).toPandas()
#nlp = spark_get_spacy("es_core_news_lg")
#size = len(citas)
size = 1
#print(citas.columns)
for i in range(1, 2):
    sent = citas.iloc[i, 4]
    id_sentencia = citas.iloc[i, -8]
    txt = citas.iloc[i, 1]
    print("******************************")
    print(sent)
    print(id_sentencia)
    termino_ext = citas.iloc[i, 8]
    txt += " además en el término de DOS AÑOS reporte la vara" 
    testing(solo_considerando(txt))

******************************
000015690007CO
sen-1-0007-128196
-*-*-*-*-*
I.- Hechos probados
-*-*-*-*-*
II.- Sobre el fondo


21/12/03 04:08:31 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 18602409 ms exceeds timeout 120000 ms
21/12/03 04:08:31 WARN SparkContext: Killing executors is not supported by current scheduler.


### Cargar spacy

In [68]:
"""nlp = spark_get_spacy('es_core_news_lg')
doc = nlp("plan remedial")
for token in doc:
    print(token.text, token.lemma_, token.pos_, token.tag_, token.dep_,token.shape_, token.is_alpha, token.is_stop)
"""

'nlp = spark_get_spacy(\'es_core_news_lg\')\ndoc = nlp("plan remedial")\nfor token in doc:\n    print(token.text, token.lemma_, token.pos_, token.tag_, token.dep_,token.shape_, token.is_alpha, token.is_stop)\n'