# desafioSemantix

### Desafio Técnico - Engenheiro de Dados 

### Leonardo Damasio

## Parte Prática

### 0. Setup

#### Importando bibliotecas

In [1]:
from pyspark.sql import SparkSession

#### Configurando SparkSession

In [2]:
spark = SparkSession.builder \
   .appName("desafioSemantix") \
   .config("spark.executor.memory", "4gb") \
   .getOrCreate()

#### Configurando SparkContext

In [3]:
sc = spark.sparkContext

#### Importando RDDs

In [4]:
jul95 = sc.textFile('data/access_log_Jul95')
aug95 = sc.textFile('data/access_log_Aug95')

#### Unificando em uma única RDD

In [5]:
rdd = jul95 + aug95

#### Amostra

In [6]:
rdd.take(10)

['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245',
 'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085',
 'burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0',
 '199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179',
 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0',
 'burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0',
 '205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985',
 'd104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985',
 '129.94.144.152 - - [01/Jul/

### 1. Número de hosts únicos.

#### Realizando contagem

In [7]:
hosts = rdd.map(lambda line: line.split(" ")[0]) \
    .map(lambda value: (value, 1)) \
    .reduceByKey(lambda a,b: a+b) 

#### Amostra da contagem

In [8]:
hosts.take(5)

[('unicomp6.unicomp.net', 14),
 ('dial22.lloyd.com', 4),
 ('www-a1.proxy.aol.com', 6661),
 ('dave.dev1.ihub.com', 4),
 ('brandt.xensei.com', 80)]

#### Resultado

In [9]:
Q1 = hosts.count()
Q1

137979

### 2. O total de erros 404.

In [10]:
error404 = rdd.filter(lambda line: " 404 -"  in line or " 404 0" in line)

#### Resultado

In [11]:
Q2 = error404.count()
Q2

20901

### 3. Os 5 URLs que mais causaram erro 404.

In [12]:
top5urls = rdd.filter(lambda line: line.split(" ")[0] and " 404 -"  in line or " 404 0" in line) \
    .map(lambda line: line.split(" ")[0]) \
    .map(lambda value: (value, 1)) \
    .reduceByKey(lambda a,b: a+b) \
    .sortBy(lambda x: x[1], ascending=False)

#### Resultado

In [13]:
Q3 = top5urls.take(5)
Q3

[('hoohoo.ncsa.uiuc.edu', 251),
 ('piweba3y.prodigy.com', 157),
 ('jbiagioni.npt.nuwc.navy.mil', 132),
 ('piweba1y.prodigy.com', 114),
 ('www-d4.proxy.aol.com', 91)]

### 4. Quantidade de erros 404 por dia.

In [19]:
error404_days = aug95.filter(lambda line: line.split("[")[1].split(":")[0] and " 404 -"  in line or " 404 0" in line ) \
    .map(lambda line: line.split("[")[1].split(':')[0]) \
    .map(lambda value: (value, 1)) \
    .reduceByKey(lambda a,b: a+b) \
    .sortBy(lambda x: x[0])

In [21]:
Q4 = error404_days.collect()
Q4

[('01/Aug/1995', 243),
 ('03/Aug/1995', 304),
 ('04/Aug/1995', 346),
 ('05/Aug/1995', 236),
 ('06/Aug/1995', 373),
 ('07/Aug/1995', 537),
 ('08/Aug/1995', 391),
 ('09/Aug/1995', 279),
 ('10/Aug/1995', 315),
 ('11/Aug/1995', 263),
 ('12/Aug/1995', 196),
 ('13/Aug/1995', 216),
 ('14/Aug/1995', 287),
 ('15/Aug/1995', 327),
 ('16/Aug/1995', 259),
 ('17/Aug/1995', 271),
 ('18/Aug/1995', 256),
 ('19/Aug/1995', 209),
 ('20/Aug/1995', 312),
 ('21/Aug/1995', 305),
 ('22/Aug/1995', 288),
 ('23/Aug/1995', 345),
 ('24/Aug/1995', 420),
 ('25/Aug/1995', 415),
 ('26/Aug/1995', 366),
 ('27/Aug/1995', 370),
 ('28/Aug/1995', 410),
 ('29/Aug/1995', 420),
 ('30/Aug/1995', 571),
 ('31/Aug/1995', 526)]

### 5. O total de bytes retornados.

In [16]:
totalbytes = rdd.map(lambda line: line.split(" ")[-1]) \
    .filter(lambda x: x != None and x != "-")
    reduce(lambda a,b: a+b)

##### Obs.: Na etapa de redução deste código estou recebendo "Connection Timed Out", porém acredito que esteja correto.

#### Resultado

In [22]:
Q5 = totalbytes.collect()
Q5

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 31.0 failed 1 times, most recent failure: Lost task 2.0 in stage 31.0 (TID 145, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


### Encerrando a sessão

In [None]:
spark.stop()