## Configuração de Env e Sparks

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/local/openjdk-8"
os.environ["SPARK_HOME"] = "/user_data/spark-3.3.0-bin-hadoop2"

import findspark
findspark.init('spark-3.3.0-bin-hadoop2')

## Imports

In [2]:
# Importando bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = (
    SparkSession.builder.appName("spark_flight")
    .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
    .config("spark.sql.catalogImplementation", "hive")
    .getOrCreate()
)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/user_data/spark-3.3.0-bin-hadoop2/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/04/26 22:20:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Dataset escolhido

O dataset escolhido foi o [Flight Status Prediction (link do Kaggle)](https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022/data). Esse dataset possui diversas informações sobre voos realizados, incluindo dados sobre cancelamento e atrasos.

Apesar da disponibilidade de dados adquiridos desde 2018, selecionamos o arquivo CSV referente ao ano de 2022, que contém 1.42 GB de dados.

## Leitura do dataset

In [3]:
dataframe = spark.read.csv("hdfs://spark-master:9000/datasets/flights/Combined_Flights_2022.csv", header=True, inferSchema=True)
num_linhas = dataframe.count()
print(f"Número de linhas no DataFrame: {num_linhas}")

[Stage 2:>                                                        (0 + 16) / 16]

Número de linhas no DataFrame: 4078318


                                                                                

## Análise Exploratória de Dados (EDA)

In [6]:
# Visualizando o esquema dos dados
dataframe.printSchema()

# Exibindo algumas informações dos atributos
dataframe.summary().show()

root
 |-- FlightDate: timestamp (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = tru

[Stage 5:>  (0 + 16) / 16][Stage 6:>   (0 + 0) / 16][Stage 7:>   (0 + 0) / 16]6]

24/04/26 22:37:16 ERROR Executor: Exception in task 12.0 in stage 5.0 (TID 46)
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:141)
	at org.apache.spark.SparkThrowableHelper$.getMessage(ErrorInfo.scala:70)
	at org.apache.spark.SparkNumberFormatException.<init>(SparkException.scala:189)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:150)
	at org.apache.spark.sql.catalyst.expressions.CastBase.$anonfun$castToDouble$2(Cast.scala:858)
	at org.apache.spark.sql.catalyst.expressions.CastBase$$Lambda$3613/531293312.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.CastBase.buildCast(Cast.scala:336)
	at org.apache.spark.sql.catalyst.expressions.

[Stage 5:>  (0 + 16) / 16][Stage 6:>   (0 + 1) / 16][Stage 7:>   (0 + 0) / 16]

24/04/26 22:37:30 WARN TaskSetManager: Lost task 12.0 in stage 5.0 (TID 46) (spark-master executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:141)
	at org.apache.spark.SparkThrowableHelper$.getMessage(ErrorInfo.scala:70)
	at org.apache.spark.SparkNumberFormatException.<init>(SparkException.scala:189)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:150)
	at org.apache.spark.sql.catalyst.expressions.CastBase.$anonfun$castToDouble$2(Cast.scala:858)
	at org.apache.spark.sql.catalyst.expressions.CastBase$$Lambda$3613/531293312.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.CastBase.buildCast(Cast.scala:336)
	at org.apache.sp

[Stage 5:>  (0 + 15) / 16][Stage 6:>   (0 + 1) / 16][Stage 7:>   (0 + 0) / 16]

24/04/26 22:37:36 WARN TaskSetManager: Lost task 11.0 in stage 5.0 (TID 45) (spark-master executor driver): TaskKilled (Stage cancelled)
24/04/26 22:37:36 ERROR Inbox: Ignoring error
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@5eb11c0f rejected from java.util.concurrent.ThreadPoolExecutor@754444c6[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 49]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.apache.spark.executor.Executor.launchTask(Executor.scala:305)
	at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
	at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:9

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/IPython/core/interactiveshell.py", line 3550, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_3330/3422717083.py", line 5, in <module>
    dataframe.summary().show()
  File "spark-3.3.0-bin-hadoop2/python/pyspark/sql/dataframe.py", line 1885, in summary
    jdf = self._jdf.summary(self._jseq(statistics))
  File "spark-3.3.0-bin-hadoop2/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "spark-3.3.0-bin-hadoop2/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "spark-3.3.0-bin-hadoop2/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (mo

ConnectionRefusedError: [Errno 111] Connection refused

In [5]:
# Mostrando as primeiras linhas do DataFrame
dataframe.show(5)

24/02/15 23:58:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+------------+--------------------+--------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
| ID| Source|Severity|         Start_Time|           End_Time|        Start_Lat|         Start_Lng|End_Lat|End_Lng|Distance(mi)|         Description|              Street|        City|    County|State|   Zipcode|Country|  Tim

## Clean Dataset

In [6]:
# Tratando valores nulos
dataframe = dataframe.dropna()

In [7]:
# Lista de tipos de dados permitidos
allowed_types = ['int', 'double']

# Filtrando colunas com tipos permitidos
selected_columns = [col_name for col_name, col_type in dataframe.dtypes if any(data_type in col_type for data_type in allowed_types)]

# Selecionando apenas as colunas permitidas
df_filtered = dataframe.select(*selected_columns)

# Mostrando o esquema do DataFrame resultante
df_filtered.printSchema()

root
 |-- Severity: integer (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Precipitation(in): double (nullable = true)



## Data Preprocessing

In [10]:
# Adicionando coluna Severity4 e definindo valores
df_filtered = df_filtered.withColumn('Severity4', col('Severity').cast('int'))
df_filtered = df_filtered.withColumn('Severity4', (col('Severity4') == 4).cast('int'))
df_filtered = df_filtered.drop('Severity')
df_filtered.groupBy('Severity4').count().show()



+---------+-------+
|Severity4|  count|
+---------+-------+
|        1|  94421|
|        0|3264408|
+---------+-------+



                                                                                

In [11]:
# Criando um vetor de features para o modelo
target = 'Severity4'
feature_columns = df_filtered.columns
feature_columns.remove(target)

vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_vector = vector_assembler.transform(df_filtered)

In [12]:
# Dividindo o conjunto de dados em treino e teste
train_data, test_data = df_vector.randomSplit([0.8, 0.2], seed=42)

## Model

### Random Forest

In [13]:
# Criando e treinando um modelo de classificação (Random Forest)
rf_classifier = RandomForestClassifier(labelCol=target, featuresCol="features", numTrees=10)
pipeline = Pipeline(stages=[rf_classifier])
model = pipeline.fit(train_data)



24/02/16 00:07:12 WARN MemoryStore: Not enough space to cache rdd_64_16 in memory! (computed 19.7 MiB so far)
24/02/16 00:07:12 WARN BlockManager: Persisting block rdd_64_16 to disk instead.
24/02/16 00:07:12 WARN MemoryStore: Not enough space to cache rdd_64_17 in memory! (computed 29.6 MiB so far)
24/02/16 00:07:12 WARN BlockManager: Persisting block rdd_64_17 to disk instead.




24/02/16 00:07:15 WARN MemoryStore: Not enough space to cache rdd_64_19 in memory! (computed 13.1 MiB so far)
24/02/16 00:07:15 WARN BlockManager: Persisting block rdd_64_19 to disk instead.




24/02/16 00:07:18 WARN MemoryStore: Not enough space to cache rdd_64_21 in memory! (computed 13.1 MiB so far)
24/02/16 00:07:18 WARN BlockManager: Persisting block rdd_64_21 to disk instead.




24/02/16 00:07:22 WARN MemoryStore: Not enough space to cache rdd_64_20 in memory! (computed 29.6 MiB so far)
24/02/16 00:07:22 WARN BlockManager: Persisting block rdd_64_20 to disk instead.


                                                                                

24/02/16 00:07:24 WARN MemoryStore: Not enough space to cache rdd_64_12 in memory! (computed 19.7 MiB so far)
24/02/16 00:07:24 WARN MemoryStore: Not enough space to cache rdd_64_11 in memory! (computed 19.7 MiB so far)
24/02/16 00:07:24 WARN MemoryStore: Not enough space to cache rdd_64_13 in memory! (computed 13.1 MiB so far)
24/02/16 00:07:24 WARN MemoryStore: Not enough space to cache rdd_64_10 in memory! (computed 29.6 MiB so far)


                                                                                

24/02/16 00:07:25 WARN MemoryStore: Not enough space to cache rdd_64_12 in memory! (computed 3.7 MiB so far)
24/02/16 00:07:25 WARN MemoryStore: Not enough space to cache rdd_64_10 in memory! (computed 8.3 MiB so far)
24/02/16 00:07:25 WARN MemoryStore: Not enough space to cache rdd_64_13 in memory! (computed 13.1 MiB so far)
24/02/16 00:07:25 WARN MemoryStore: Not enough space to cache rdd_64_11 in memory! (computed 19.7 MiB so far)


                                                                                

24/02/16 00:07:27 WARN MemoryStore: Not enough space to cache rdd_64_10 in memory! (computed 13.1 MiB so far)
24/02/16 00:07:27 WARN MemoryStore: Not enough space to cache rdd_64_13 in memory! (computed 13.1 MiB so far)
24/02/16 00:07:27 WARN MemoryStore: Not enough space to cache rdd_64_12 in memory! (computed 8.3 MiB so far)
24/02/16 00:07:27 WARN MemoryStore: Not enough space to cache rdd_64_11 in memory! (computed 13.1 MiB so far)


                                                                                

24/02/16 00:07:29 WARN MemoryStore: Not enough space to cache rdd_64_12 in memory! (computed 8.3 MiB so far)
24/02/16 00:07:29 WARN MemoryStore: Not enough space to cache rdd_64_13 in memory! (computed 3.7 MiB so far)
24/02/16 00:07:29 WARN MemoryStore: Not enough space to cache rdd_64_10 in memory! (computed 13.1 MiB so far)
24/02/16 00:07:29 WARN MemoryStore: Not enough space to cache rdd_64_11 in memory! (computed 19.7 MiB so far)


                                                                                

In [14]:
# Fazendo previsões no conjunto de teste
predictions = model.transform(test_data)

In [15]:
# Avaliando o desempenho do modelo
evaluator = MulticlassClassificationEvaluator(labelCol="Severity4", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)



Accuracy: 0.972008995725541


                                                                                

In [None]:
# Salvando o DataFrame em formato Parquet
parquet_output_path="hdfs://spark-master:9000/datasets/accidents_output"
df_filtered.write.parquet(parquet_output_path)