# Prática spark
## Objetivo
El objetivo de esta práctica es realizar el procesado de un fichero de logs para poder
## Práctica
### Conexión
El primer paso para realizar la práctica es establecer la conexión con el cluster de spark

In [1]:
from pyspark.sql import SparkSession
from  pyspark.sql.functions import col, concat_ws,split, explode ,desc, to_timestamp,from_unixtime,unix_timestamp, translate, create_map, lit,udf,StringType,map_keys
from datetime import datetime
from itertools import chain

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

22/11/16 14:30:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Lectura
Una vez que se ha establecido la conexión con el cluster de spark, es necesario cargar el archivo de texto que se va a tratar.
Para esta práctica se va a leer como un dataframe

In [2]:
ficheroDataframe = spark.read.text("logs.txt")
ficheroDataFrameRenamed = ficheroDataframe.withColumnRenamed("value","datos")
# Se muestran los primeros 20 elementos
ficheroDataFrameRenamed.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+
|               datos|
+--------------------+
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
|01-11-2022 13:24:...|
+--------------------+
only showing top 20 rows



                                                                                

### Preprocesado
Una vez que se ha cargado el fichero, será necesario dividir la columna por el carácter "-"

In [3]:
ficheroDataFrameSplit = ficheroDataFrameRenamed.select(split(col("datos")," - "))
ficheroDataFrameSplit = ficheroDataFrameSplit.withColumnRenamed("split(datos,  - , -1)","datos")

ficheroDataFrameSplit.printSchema()


root
 |-- datos: array (nullable = true)
 |    |-- element: string (containsNull = true)



Se construye el dataframe 

In [4]:
dataFrameDatos= ficheroDataFrameSplit.select(col("datos").getItem(0),col("datos").getItem(1),col("datos").getItem(2),col("datos").getItem(3))
# Se renombran las columnas
dataFrameDatos = dataFrameDatos.withColumnRenamed("datos[0]","Fecha")
dataFrameDatos = dataFrameDatos.withColumnRenamed("datos[1]","Log")
dataFrameDatos = dataFrameDatos.withColumnRenamed("datos[2]","Module")
dataFrameDatos = dataFrameDatos.withColumnRenamed("datos[3]","Info")
dataFrameDatos.show()


[Stage 1:>                                                          (0 + 1) / 1]

+-------------------+--------+------+--------------------+
|              Fecha|     Log|Module|                Info|
+-------------------+--------+------+--------------------+
|01-11-2022 13:24:13|    INFO|pandas|Aliquam amet adip...|
|01-11-2022 13:24:22|   ERROR|pandas|Tempora modi quiq...|
|01-11-2022 13:24:25|    INFO| spark|Non est porro por...|
|01-11-2022 13:24:26|CRITICAL|python|Porro labore eius...|
|01-11-2022 13:24:28|CRITICAL| numpy|Est ut tempora se...|
|01-11-2022 13:24:29|    INFO| spark|Sit dolorem dolor...|
|01-11-2022 13:24:30|CRITICAL| spark|Etincidunt aliqua...|
|01-11-2022 13:24:31|   DEBUG|python|Aliquam sed porro...|
|01-11-2022 13:24:32|CRITICAL| numpy|Numquam dolor ips...|
|01-11-2022 13:24:33|CRITICAL|python|Amet neque est ip...|
|01-11-2022 13:24:34|   DEBUG|python|Eius ut tempora i...|
|01-11-2022 13:24:36|   ERROR| numpy|Quisquam adipisci...|
|01-11-2022 13:24:37|CRITICAL| spark|Voluptatem dolor ...|
|01-11-2022 13:24:38|CRITICAL| spark| Dolor neque ut non

                                                                                

Ahora se tiene el dataframe con los datos separados por columnas, se tiene que transformar la fecha a timestamp

In [5]:
dataFrameDatos = dataFrameDatos.withColumn("Fecha",to_timestamp(col("Fecha"),"dd-MM-yyyy HH:mm:ss"))
dataFrameDatos.show()
dataFrameDatos.printSchema()

+-------------------+--------+------+--------------------+
|              Fecha|     Log|Module|                Info|
+-------------------+--------+------+--------------------+
|2022-11-01 13:24:13|    INFO|pandas|Aliquam amet adip...|
|2022-11-01 13:24:22|   ERROR|pandas|Tempora modi quiq...|
|2022-11-01 13:24:25|    INFO| spark|Non est porro por...|
|2022-11-01 13:24:26|CRITICAL|python|Porro labore eius...|
|2022-11-01 13:24:28|CRITICAL| numpy|Est ut tempora se...|
|2022-11-01 13:24:29|    INFO| spark|Sit dolorem dolor...|
|2022-11-01 13:24:30|CRITICAL| spark|Etincidunt aliqua...|
|2022-11-01 13:24:31|   DEBUG|python|Aliquam sed porro...|
|2022-11-01 13:24:32|CRITICAL| numpy|Numquam dolor ips...|
|2022-11-01 13:24:33|CRITICAL|python|Amet neque est ip...|
|2022-11-01 13:24:34|   DEBUG|python|Eius ut tempora i...|
|2022-11-01 13:24:36|   ERROR| numpy|Quisquam adipisci...|
|2022-11-01 13:24:37|CRITICAL| spark|Voluptatem dolor ...|
|2022-11-01 13:24:38|CRITICAL| spark| Dolor neque ut non

Por último va a ser necesario realilzar un mapeo entre el nivel de incidencia y in valor numérico previamente establecido

In [6]:
log_mapping = {
    'CRITICAL': 50,
    'ERROR': 40,
    'WARNING': 30,
    'INFO': 20,
    'DEBUG': 10, 
    'NOTSET': 0}
log_mapping_spark = create_map([lit(x) for x in chain(*log_mapping.items())])

dataFrameDatos = dataFrameDatos.withColumn("Map",log_mapping_spark[col("Log")])
dataFrameDatos.show()


+-------------------+--------+------+--------------------+---+
|              Fecha|     Log|Module|                Info|Map|
+-------------------+--------+------+--------------------+---+
|2022-11-01 13:24:13|    INFO|pandas|Aliquam amet adip...| 20|
|2022-11-01 13:24:22|   ERROR|pandas|Tempora modi quiq...| 40|
|2022-11-01 13:24:25|    INFO| spark|Non est porro por...| 20|
|2022-11-01 13:24:26|CRITICAL|python|Porro labore eius...| 50|
|2022-11-01 13:24:28|CRITICAL| numpy|Est ut tempora se...| 50|
|2022-11-01 13:24:29|    INFO| spark|Sit dolorem dolor...| 20|
|2022-11-01 13:24:30|CRITICAL| spark|Etincidunt aliqua...| 50|
|2022-11-01 13:24:31|   DEBUG|python|Aliquam sed porro...| 10|
|2022-11-01 13:24:32|CRITICAL| numpy|Numquam dolor ips...| 50|
|2022-11-01 13:24:33|CRITICAL|python|Amet neque est ip...| 50|
|2022-11-01 13:24:34|   DEBUG|python|Eius ut tempora i...| 10|
|2022-11-01 13:24:36|   ERROR| numpy|Quisquam adipisci...| 40|
|2022-11-01 13:24:37|CRITICAL| spark|Voluptatem dolor .

### Filtrado por módulo
Con el dataframe preprocesado como se desea, se va a comenzar a extraer información.
En un inicio se van a extraer las filas que coincidan con un valor de módulo.

In [7]:
modulo = "spark"
resultado = dataFrameDatos.filter(modulo== col("Module"))
resultado.show()

+-------------------+--------+------+--------------------+---+
|              Fecha|     Log|Module|                Info|Map|
+-------------------+--------+------+--------------------+---+
|2022-11-01 13:24:25|    INFO| spark|Non est porro por...| 20|
|2022-11-01 13:24:29|    INFO| spark|Sit dolorem dolor...| 20|
|2022-11-01 13:24:30|CRITICAL| spark|Etincidunt aliqua...| 50|
|2022-11-01 13:24:37|CRITICAL| spark|Voluptatem dolor ...| 50|
|2022-11-01 13:24:38|CRITICAL| spark| Dolor neque ut non.| 50|
|2022-11-01 13:24:51|   DEBUG| spark|Modi est modi non...| 10|
|2022-11-01 13:24:51|   DEBUG| spark|Modi quiquia cons...| 10|
|2022-11-01 13:24:51|    INFO| spark|Neque adipisci ma...| 20|
|2022-11-01 13:24:51|   DEBUG| spark|Quaerat magnam se...| 10|
|2022-11-01 13:24:51|   DEBUG| spark|Sit ipsum quiquia...| 10|
|2022-11-01 13:24:51|   ERROR| spark|Porro dolorem sit...| 40|
|2022-11-01 13:24:51|   DEBUG| spark|Dolor sed etincid...| 10|
+-------------------+--------+------+------------------

### Filtrado por palabra en string
En este apartado se va filtrar si una palabra se encuentra en el campo de información

In [8]:
palabra = "Porro"
resultado = dataFrameDatos.filter(col("Info").contains(palabra))
resultado.show()

+-------------------+--------+------+--------------------+---+
|              Fecha|     Log|Module|                Info|Map|
+-------------------+--------+------+--------------------+---+
|2022-11-01 13:24:26|CRITICAL|python|Porro labore eius...| 50|
|2022-11-01 13:24:51|   ERROR| numpy|Porro est neque n...| 40|
|2022-11-01 13:24:51|   ERROR| spark|Porro dolorem sit...| 40|
|2022-11-01 13:24:51|   ERROR|pandas|Porro ipsum conse...| 40|
|2022-11-01 13:24:51|CRITICAL| spark|Porro eius consec...| 50|
|2022-11-01 13:24:51|CRITICAL|pandas|Porro amet sed mo...| 50|
|2022-11-01 13:24:51|CRITICAL|pandas|Porro modi non qu...| 50|
|2022-11-01 13:24:51|    INFO|python|Porro modi aliqua...| 20|
|2022-11-01 13:24:51|   DEBUG|pandas|Porro amet sit la...| 10|
|2022-11-01 13:24:51|   DEBUG|pandas|Porro quiquia dol...| 10|
|2022-11-01 13:24:51|    INFO|pandas|Porro tempora adi...| 20|
|2022-11-01 13:24:51|   DEBUG| spark|Porro sed dolore ...| 10|
|2022-11-01 13:24:51|   DEBUG| numpy|Porro quiquia ali.

                                                                                

### Filtrado por input de palabra
Se van a mostrar las filas que tengan un nivel de incidencia igual o superior que el nivel solicitado. De esta forma si se introduce como nivel mínimo "Warning", se deberían de mostrar las filas con niveles "Warning", "Error" y "Critical"

In [9]:
nivel = "WARNING"

# Primero se obtiene el valor numérico para el nivel
valor = log_mapping[nivel]

resultado = dataFrameDatos.filter(valor <= col("Map"))
resultado.show()

+-------------------+--------+------+--------------------+---+
|              Fecha|     Log|Module|                Info|Map|
+-------------------+--------+------+--------------------+---+
|2022-11-01 13:24:22|   ERROR|pandas|Tempora modi quiq...| 40|
|2022-11-01 13:24:26|CRITICAL|python|Porro labore eius...| 50|
|2022-11-01 13:24:28|CRITICAL| numpy|Est ut tempora se...| 50|
|2022-11-01 13:24:30|CRITICAL| spark|Etincidunt aliqua...| 50|
|2022-11-01 13:24:32|CRITICAL| numpy|Numquam dolor ips...| 50|
|2022-11-01 13:24:33|CRITICAL|python|Amet neque est ip...| 50|
|2022-11-01 13:24:36|   ERROR| numpy|Quisquam adipisci...| 40|
|2022-11-01 13:24:37|CRITICAL| spark|Voluptatem dolor ...| 50|
|2022-11-01 13:24:38|CRITICAL| spark| Dolor neque ut non.| 50|
|2022-11-01 13:24:40|   ERROR| numpy|Velit numquam dol...| 40|
|2022-11-01 13:24:41|CRITICAL|pandas|Voluptatem quaera...| 50|
|2022-11-01 13:24:42|CRITICAL|pandas|Velit quiquia ali...| 50|
|2022-11-01 13:24:42|CRITICAL|python|Sed adipisci cons.

### Filtrado por fecha
Se filtra para obtener todas las entradas del log posteriores al valor de fecha establecido

In [10]:
after = datetime.strptime('11/01/22 13:24:41', '%m/%d/%y %H:%M:%S')

resultado = dataFrameDatos.filter(col("Fecha") >(after)) 
resultado.show()

+-------------------+--------+------+--------------------+---+
|              Fecha|     Log|Module|                Info|Map|
+-------------------+--------+------+--------------------+---+
|2022-11-01 13:24:42|   DEBUG|pandas|Aliquam modi magn...| 10|
|2022-11-01 13:24:42|CRITICAL|pandas|Velit quiquia ali...| 50|
|2022-11-01 13:24:42|CRITICAL|python|Sed adipisci cons...| 50|
|2022-11-01 13:24:51|   DEBUG| spark|Modi est modi non...| 10|
|2022-11-01 13:24:51|   DEBUG|pandas|Ut consectetur ei...| 10|
|2022-11-01 13:24:51|CRITICAL|python|Quaerat labore al...| 50|
|2022-11-01 13:24:51|   ERROR|pandas|Quaerat modi non ...| 40|
|2022-11-01 13:24:51|   ERROR|pandas|Est ipsum tempora...| 40|
|2022-11-01 13:24:51|   DEBUG|python|Quaerat ipsum est...| 10|
|2022-11-01 13:24:51|    INFO|pandas|Modi quisquam num...| 20|
|2022-11-01 13:24:51|   DEBUG|python|Velit sed quiquia...| 10|
|2022-11-01 13:24:51|   ERROR|python|Amet quisquam ut ...| 40|
|2022-11-01 13:24:51|CRITICAL|pandas|Sit tempora quaer.