In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import * 

# Creamos una sesión usando SparkSession
spark = (SparkSession
.builder
.appName("Nasa")
.getOrCreate())

In [2]:
julio = "NASA_Access_Log_Jul95.gz"

In [3]:
nasaLogsJulio = spark.read.options(delimiter = " ").csv(julio)

In [4]:
nasaLogsJulio.show(1, False)

+------------+---+---+---------------------+------+-----------------------------+---+----+
|_c0         |_c1|_c2|_c3                  |_c4   |_c5                          |_c6|_c7 |
+------------+---+---+---------------------+------+-----------------------------+---+----+
|199.72.81.55|-  |-  |[01/Jul/1995:00:00:01|-0400]|GET /history/apollo/ HTTP/1.0|200|6245|
+------------+---+---+---------------------+------+-----------------------------+---+----+
only showing top 1 row



In [5]:
nasaLogsJulio = nasaLogsJulio.withColumn("_c3", F.regexp_replace(("_c3"), "\[", ""))
nasaLogsJulio = nasaLogsJulio.withColumn("_c4", F.regexp_replace(("_c4"), "\]", ""))
nasaLogsJulio = nasaLogsJulio.withColumn("RequestMethod", F.regexp_replace(("_c5"), "(\w+).*", "$1"))
nasaLogsJulio = nasaLogsJulio.withColumn("Resource", F.regexp_replace(("_c5"), "\S+\s(\S+).*", "$1"))
nasaLogsJulio = nasaLogsJulio.withColumn("Protocol", F.regexp_replace(("_c5"), "\S+\s+\S+\s(\S+).*", "$1"))
nasaLogsJulio = nasaLogsJulio.drop("_c5")

In [6]:
nasaLogsJulio.show(1, truncate = False)

+------------+---+---+--------------------+-----+---+----+-------------+----------------+--------+
|_c0         |_c1|_c2|_c3                 |_c4  |_c6|_c7 |RequestMethod|Resource        |Protocol|
+------------+---+---+--------------------+-----+---+----+-------------+----------------+--------+
|199.72.81.55|-  |-  |01/Jul/1995:00:00:01|-0400|200|6245|GET          |/history/apollo/|HTTP/1.0|
+------------+---+---+--------------------+-----+---+----+-------------+----------------+--------+
only showing top 1 row



In [7]:
nasaLogsJulio = nasaLogsJulio.withColumnRenamed("_c0", "Host")\
                             .withColumnRenamed("_c1", "UserIdentifier")\
                             .withColumnRenamed("_c2", "UserId")\
                             .withColumnRenamed("_c4", "TimeZone")\
                             .withColumnRenamed("_c6", "HttpStatus")\
                             .withColumnRenamed("_c7", "Size")

In [8]:
nasaLogsJulio.show(1, truncate = False)

+------------+--------------+------+--------------------+--------+----------+----+-------------+----------------+--------+
|Host        |UserIdentifier|UserId|_c3                 |TimeZone|HttpStatus|Size|RequestMethod|Resource        |Protocol|
+------------+--------------+------+--------------------+--------+----------+----+-------------+----------------+--------+
|199.72.81.55|-             |-     |01/Jul/1995:00:00:01|-0400   |200       |6245|GET          |/history/apollo/|HTTP/1.0|
+------------+--------------+------+--------------------+--------+----------+----+-------------+----------------+--------+
only showing top 1 row



In [9]:
from pyspark.sql.types import *
nasaLogs = nasaLogsJulio.withColumn("DateTime", F.to_timestamp(F.col("_c3"), "dd/MMM/yyyy:HH:mm:ss"))

In [10]:
nasaLogs = nasaLogs.drop("_c3")
nasaLogs.printSchema()
nasaLogs.show(1,False)

root
 |-- Host: string (nullable = true)
 |-- UserIdentifier: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- TimeZone: string (nullable = true)
 |-- HttpStatus: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- RequestMethod: string (nullable = true)
 |-- Resource: string (nullable = true)
 |-- Protocol: string (nullable = true)
 |-- DateTime: timestamp (nullable = true)

+------------+--------------+------+--------+----------+----+-------------+----------------+--------+-------------------+
|Host        |UserIdentifier|UserId|TimeZone|HttpStatus|Size|RequestMethod|Resource        |Protocol|DateTime           |
+------------+--------------+------+--------+----------+----+-------------+----------------+--------+-------------------+
|199.72.81.55|-             |-     |-0400   |200       |6245|GET          |/history/apollo/|HTTP/1.0|1995-07-01 00:00:01|
+------------+--------------+------+--------+----------+----+-------------+----------------+--------

In [11]:
nasaLogs.show(1,False)

+------------+--------------+------+--------+----------+----+-------------+----------------+--------+-------------------+
|Host        |UserIdentifier|UserId|TimeZone|HttpStatus|Size|RequestMethod|Resource        |Protocol|DateTime           |
+------------+--------------+------+--------+----------+----+-------------+----------------+--------+-------------------+
|199.72.81.55|-             |-     |-0400   |200       |6245|GET          |/history/apollo/|HTTP/1.0|1995-07-01 00:00:01|
+------------+--------------+------+--------+----------+----+-------------+----------------+--------+-------------------+
only showing top 1 row



In [12]:
nasaLogs = nasaLogs.withColumn("Date",F.to_date("DateTime"))
nasaLogs = nasaLogs.withColumn('Time', F.date_format('DateTime', 'HH:mm:ss'))
nasaLogs = nasaLogs.drop("DateTime")

In [13]:
nasaLogs = nasaLogs.select("Host","UserIdentifier","UserId","Date","Time","TimeZone","RequestMethod","Resource","Protocol",\
                                     "HttpStatus","Size")

In [14]:
nasaLogs.printSchema()
nasaLogs.show(1,False)

root
 |-- Host: string (nullable = true)
 |-- UserIdentifier: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- TimeZone: string (nullable = true)
 |-- RequestMethod: string (nullable = true)
 |-- Resource: string (nullable = true)
 |-- Protocol: string (nullable = true)
 |-- HttpStatus: string (nullable = true)
 |-- Size: string (nullable = true)

+------------+--------------+------+----------+--------+--------+-------------+----------------+--------+----------+----+
|Host        |UserIdentifier|UserId|Date      |Time    |TimeZone|RequestMethod|Resource        |Protocol|HttpStatus|Size|
+------------+--------------+------+----------+--------+--------+-------------+----------------+--------+----------+----+
|199.72.81.55|-             |-     |1995-07-01|00:00:01|-0400   |GET          |/history/apollo/|HTTP/1.0|200       |6245|
+------------+--------------+------+----------+--------+--------+--------

### Respuestas:  
1) - ¿Cuáles son los distintos protocolos web utilizados? Agrúpalos.

In [15]:
uno = nasaLogs.groupBy("Protocol").count().limit(1)
uno = uno.withColumn("Consulta", F.lit("Protocolo mas usado"))

2) - ¿Cuáles son los códigos de estado más comunes en la web? Agrúpalos y ordénalos para ver cuál es el más común.

In [16]:
dos = nasaLogs.groupBy("HttpStatus").count().sort(F.col("count").desc()).limit(1)
dos = dos.withColumn("Consulta", F.lit("HttpStatus mas comun"))

3) - ¿Y los métodos de petición (verbos) más utilizados?

In [17]:
tres = nasaLogs.groupBy("RequestMethod").count().sort(F.col("count").desc()).limit(1)
tres = tres.withColumn("Consulta", F.lit("RequestMethod mas utilizado"))

4) - ¿Qué recurso tuvo la mayor transferencia de bytes de la página web?

In [18]:
cuatro = nasaLogs.select(F.col("Resource"), F.col("Size")).orderBy(F.col("Size").desc()).limit(1)
cuatro = cuatro.withColumn("Consulta", F.lit("Resource con mas transferencia de bytes"))

5) - Además, queremos saber que recurso de nuestra web es el que más tráfico recibe. Es decir, el recurso con más registros 
         en nuestro log.

In [19]:
cinco = nasaLogs.groupBy("Resource").count().sort(F.col("count").desc()).limit(1)
cinco = cinco.withColumn("Consulta", F.lit("Resource con mas registros"))

6) - ¿Qué días la web recibió más tráfico?

In [20]:
seis = nasaLogs.select(F.date_format("Date","d").alias("Dia"))\
                                .groupBy(F.col("Dia"))\
                                .agg(F.count("*").alias("Trafico"))\
                                .orderBy(F.desc(F.col("Trafico")))\
                                .limit(1)
seis = seis.withColumn("Consulta", F.lit("Dia con mas trafico"))

7) - ¿Cuáles son los hosts más frecuentes?    

In [21]:
siete = nasaLogs.groupBy("Host").count().sort(F.col("count").desc()).limit(1)
siete = siete.withColumn("Consulta", F.lit("Host mas frecuente"))

8) - ¿A qué horas se produce el mayor número de tráfico en la web?

In [22]:
ocho = nasaLogs.select(F.date_format("Time","H").alias("Hora"))\
                                .groupBy(F.col("Hora"))\
                                .agg(F.count("*").alias("Trafico"))\
                                .orderBy(F.desc(F.col("Trafico")))\
                                .limit(1)
ocho = ocho.withColumn("Consulta", F.lit("Hora con mayor trafico"))

9) - ¿Cuál es el número de errores 404 que ha habido cada día?

In [23]:
nueve = nasaLogs.select(F.date_format("Date","d").alias("Dia"),F.col("HttpStatus"))\
                                .where(F.col("HttpStatus") == 404)\
                                .groupBy(F.col("Dia"))\
                                .agg(F.count("*").alias("Errores"))\
                                .orderBy(F.desc(F.col("Errores")))\
                                .limit(1)
nueve = nueve.withColumn("Consulta", F.lit("Dia con mayor numero de error 404"))

Otra consulta de la pregunta 9, solo por usar un pattern distinto del date_format

In [24]:
nasaLogs.select(F.date_format('Date', 'E').alias('DiaPalabra'),F.col("HttpStatus"))\
                                .where(F.col("HttpStatus") == 404)\
                                .groupBy(F.col("DiaPalabra"))\
                                .agg(F.count("*").alias("Errores"))\
                                .orderBy(F.desc(F.col("Errores")))\
                                .show(3,False)

+----------+-------+
|DiaPalabra|Errores|
+----------+-------+
|Wed       |1941   |
|Thu       |1935   |
|Tue       |1756   |
+----------+-------+
only showing top 3 rows



Unimos las preguntas en un unico dataframe

In [25]:
diez = uno.union(dos)
once = diez.union(tres)
doce = once.union(cuatro)
trece = doce.union(cinco)
catorce = trece.union(seis)
quince = catorce.union(siete)
dieciseis = quince.union(ocho)
diecisiete = dieciseis.union(nueve)

In [26]:
dieciocho = diecisiete.withColumnRenamed("Protocol","Respuesta")
dieciocho = dieciocho.select("Consulta", "Respuesta", "count")

In [27]:
dieciocho.show(truncate=False)

+---------------------------------------+------------------------------+-------+
|Consulta                               |Respuesta                     |count  |
+---------------------------------------+------------------------------+-------+
|Protocolo mas usado                    |GET /ksc.html                 |159    |
|HttpStatus mas comun                   |200                           |1701534|
|RequestMethod mas utilizado            |GET                           |1887643|
|Resource con mas transferencia de bytes|/images/cdrom-1-95/img0007.jpg|99981  |
|Resource con mas registros             |/images/NASA-logosmall.gif    |111388 |
|Dia con mas trafico                    |13                            |134203 |
|Host mas frecuente                     |piweba3y.prodigy.com          |17572  |
|Hora con mayor trafico                 |14                            |122479 |
|Dia con mayor numero de error 404      |6                             |640    |
+---------------------------