In [1]:
import org.apache.spark.sql.SparkSession
spark.stop()


Intitializing Scala interpreter ...

Spark Web UI available at http://EM2021002716.bosonit.local:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1619541490974)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession


In [2]:
val spark = SparkSession
    .builder()
    .appName("NASA web logs")
    .master("local")
    .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@65dbb9db


 **Comenzamos leyendo los datasets de los weblogs que tenemos como .txt**

In [3]:
val file = "/Users/mario.serrano/Desktop/NASA/datasets/*"
val logsDF = spark.read.text(file)

file: String = /Users/mario.serrano/Desktop/NASA/datasets/*
logsDF: org.apache.spark.sql.DataFrame = [value: string]


In [4]:
logsDF.show(5, false)
logsDF.cache()

+--------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                     |
+--------------------------------------------------------------------------------------------------------------------------+
|in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839|
|uplherc.upl.com - - [01/Aug/1995:00:00:07 -0400] "GET / HTTP/1.0" 304 0                                                   |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 304 0                          |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 304 0                        |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 304 0                           |


res1: logsDF.type = [value: string]


**Teniendo el dataframe con una sola columna, ahora toca recoger los valores que queremos para nuestra tabla, lo haré mediante expresiones regulares**

In [5]:
val parsedDF = logsDF
    .select(regexp_extract($"value","""^([^(\s)]+)""",1).as("host"),
            regexp_extract($"value","""^.*\s-\s-\s\[(\d{2}\/\w{3}\/\d{4}:\d{2}:\d{2}:\d{2})\s-\d{4}\]""",1).as("date"),
            regexp_extract($"value","""^.*\s\"(\w*)""",1).as("method"),
            regexp_extract($"value","""^.*\s(\/[^\"\s]*)""",1).as("resource"),
            regexp_extract($"value","""^.*\s(HTTP.+\"*)\"""",1).as("protocol"),
            regexp_extract($"value","""^.*\"\s([^\s]*)""",1).cast("int").as("status"),
            regexp_extract($"value","""^.*\s([^\s]*)$""",1).cast("int").as("bytes")
           )
parsedDF.show(10, false)
parsedDF.printSchema

+---------------------------+--------------------+------+-----------------------------------------------+--------+------+-----+
|host                       |date                |method|resource                                       |protocol|status|bytes|
+---------------------------+--------------------+------+-----------------------------------------------+--------+------+-----+
|in24.inetnebr.com          |01/Aug/1995:00:00:01|GET   |/shuttle/missions/sts-68/news/sts-68-mcc-05.txt|HTTP/1.0|200   |1839 |
|uplherc.upl.com            |01/Aug/1995:00:00:07|GET   |/                                              |HTTP/1.0|304   |0    |
|uplherc.upl.com            |01/Aug/1995:00:00:08|GET   |/images/ksclogo-medium.gif                     |HTTP/1.0|304   |0    |
|uplherc.upl.com            |01/Aug/1995:00:00:08|GET   |/images/MOSAIC-logosmall.gif                   |HTTP/1.0|304   |0    |
|uplherc.upl.com            |01/Aug/1995:00:00:08|GET   |/images/USA-logosmall.gif                      

parsedDF: org.apache.spark.sql.DataFrame = [host: string, date: string ... 5 more fields]


**Limpio los valores nulos para el campo bytes, ya que cuando el status era 404 los bytes recibidos era nulos.**

In [6]:
val cleanDF = parsedDF
    .withColumn("bytes", when($"bytes".isNull, 0).otherwise($"bytes"))

cleanDF: org.apache.spark.sql.DataFrame = [host: string, date: string ... 5 more fields]


Comprobación para ver que se había realizado bien la organización de los datos, y visto que
había filas sin protocolo, comprobar que el resto de datos estaba correcto.

In [7]:
val test = cleanDF
    .groupBy($"protocol")
    .count()
    .orderBy(desc("protocol"))
    .show(false)
cleanDF.where($"protocol" === "").show(10, false)

+-------------------------------------------------+-------+
|protocol                                         |count  |
+-------------------------------------------------+-------+
|HTTP/V1.0                                        |279    |
|HTTP/1.0From:  <berend@blazemonger.pc.cc.cmu.edu>|1235   |
|HTTP/1.0                                         |128    |
|HTTP/1.0                                         |3455074|
|HTTP/*                                           |13     |
|                                                 |4884   |
+-------------------------------------------------+-------+

+-----------------------+--------------------+------+---------------------------------------------------+--------+------+-----+
|host                   |date                |method|resource                                           |protocol|status|bytes|
+-----------------------+--------------------+------+---------------------------------------------------+--------+------+-----+
|pipe1.nyc.pipe

test: Unit = ()


In [8]:
/*https://medium.com/analytics-vidhya/spark-web-server-logs-analysis-with-scala-74e0ece40a4e

val month_map = Map("Jan" -> 1,"Feb" -> 2,"Mar" -> 3,"Apr" -> 4,"May" -> 5,"Jun" -> 6,"Jul" -> 7,
                    "Aug" -> 8,"Sep" -> 9,"Oct" -> 10,"Nov" -> 11,"Dec" -> 12)

def parse_clf_time(s: String): String ={
    "%3$s-%2$s-%1$s %4$s:%5$s:%6$s".format(s.substring(0,2), month_map(s.substring(3,6)),s.substring(7,11),
                                          s.substring(12, 14), s.substring(15,17), s.substring(18))
}

val toTimestamp = udf[String, String](parse_clf_time(_))
val logsDF = cleanDF
    .withColumn("date",to_timestamp(toTimestamp($"date")))
*/

**Cambio los valores de los meses del date a forma númerica por si necesitamos en un futuro cambiarlo a un formato de date o timestamp**

In [9]:
val logsDF = cleanDF
    .withColumn("date", regexp_replace($"date", lit("Jan"), lit("01")))
    .withColumn("date", regexp_replace($"date", lit("Feb"), lit("02")))
    .withColumn("date", regexp_replace($"date", lit("Mar"), lit("03")))
    .withColumn("date", regexp_replace($"date", lit("Apr"), lit("04")))
    .withColumn("date", regexp_replace($"date", lit("May"), lit("05")))
    .withColumn("date", regexp_replace($"date", lit("Jun"), lit("06")))
    .withColumn("date", regexp_replace($"date", lit("Jul"), lit("07")))
    .withColumn("date", regexp_replace($"date", lit("Aug"), lit("08")))
    .withColumn("date", regexp_replace($"date", lit("Sep"), lit("09")))
    .withColumn("date", regexp_replace($"date", lit("Oct"), lit("10")))
    .withColumn("date", regexp_replace($"date", lit("Nov"), lit("11")))
    .withColumn("date", regexp_replace($"date", lit("Dec"), lit("12")))

logsDF: org.apache.spark.sql.DataFrame = [host: string, date: string ... 5 more fields]


**Ya tendríamos nuestro dataset limpio y listo, para realizar consultas sobre él, en este caso lo guardaremos en parquet para realizar las consultas**

In [10]:
logsDF.write.format("parquet").mode("overwrite").save("/Users/mario.serrano/Desktop/NASA/datasets/parquet/")

In [11]:
val fileParquet = "/Users/mario.serrano/Desktop/NASA/datasets/parquet/*"
val parquetDF = spark.read.format("parquet").load(fileParquet)

fileParquet: String = /Users/mario.serrano/Desktop/NASA/datasets/parquet/*
parquetDF: org.apache.spark.sql.DataFrame = [host: string, date: string ... 5 more fields]


In [12]:
parquetDF.cache()
parquetDF.printSchema
parquetDF.show(10,false)

root
 |-- host: string (nullable = true)
 |-- date: string (nullable = true)
 |-- method: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- bytes: integer (nullable = true)

+--------------------+-------------------+------+-----------------------------------------------+--------+------+-----+
|host                |date               |method|resource                                       |protocol|status|bytes|
+--------------------+-------------------+------+-----------------------------------------------+--------+------+-----+
|199.72.81.55        |01/07/1995:00:00:01|GET   |/history/apollo/                               |HTTP/1.0|200   |6245 |
|unicomp6.unicomp.net|01/07/1995:00:00:06|GET   |/shuttle/countdown/                            |HTTP/1.0|200   |3985 |
|199.120.110.21      |01/07/1995:00:00:09|GET   |/shuttle/missions/sts-73/mission-sts-73.html   |HTTP/1.0|200   |4085 |
|burger