# Pré processamento da base

In [1]:
# Carregando o spark
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="projeto")

from pyspark.conf import SparkConf
session = pyspark.sql.SparkSession.builder.config(conf=SparkConf())
spark = session.getOrCreate()

In [2]:
# Carregando dados de linhas de transmissão
lt=spark.read.load("linhas.csv",format="csv", sep=",", inferSchema="true", header="true", encoding="iso-8859-1").\
select('CODG_ATIVO','CODG_INSTALACAO','RAP_CICLO_FUNCAO','ESTADO_ATIVO','EQUIPE_RESPONSAVEL').\
toDF('CODG_LINHA', 'CODG_INSTALACAO_LINHA', 'RAP_CICLO','ESTADO', 'EQUIPE')
lt.printSchema()

root
 |-- CODG_LINHA: string (nullable = true)
 |-- CODG_INSTALACAO_LINHA: string (nullable = true)
 |-- RAP_CICLO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- EQUIPE: string (nullable = true)



In [3]:
lt.show(3)

+------------+---------------------+---------+------+------+
|  CODG_LINHA|CODG_INSTALACAO_LINHA|RAP_CICLO|ESTADO|EQUIPE|
+------------+---------------------+---------+------+------+
|LTCHE.000001|             ABXMLUR1| 63930,25|     A|  SPML|
|LTCHE.000002|             ABXMXTR1| 52127,74|     A|  SPML|
|LTCHE.000003|             ABXZBUR1| 53111,28|     A|  SPML|
+------------+---------------------+---------+------+------+
only showing top 3 rows



In [4]:
# Carregando dados de estruturas
estr=spark.read.load("estruturas.csv",format="csv", sep=",", inferSchema="true", header="true", encoding="iso-8859-1").\
select('CODG_ATIVO','CODG_INSTALACAO','QTDCIRC','CRITICIDADE','VAO_DE_FRENTE', 'ALTURA_UTIL','TRAVESSIA_LT','TRAVESSIA_ESTRADA','POVOAMENTO', 'VANDALISMO','INVASAO')
estr.printSchema()

root
 |-- CODG_ATIVO: string (nullable = true)
 |-- CODG_INSTALACAO: string (nullable = true)
 |-- QTDCIRC: integer (nullable = true)
 |-- CRITICIDADE: integer (nullable = true)
 |-- VAO_DE_FRENTE: string (nullable = true)
 |-- ALTURA_UTIL: string (nullable = true)
 |-- TRAVESSIA_LT: integer (nullable = true)
 |-- TRAVESSIA_ESTRADA: integer (nullable = true)
 |-- POVOAMENTO: integer (nullable = true)
 |-- VANDALISMO: integer (nullable = true)
 |-- INVASAO: integer (nullable = true)



In [5]:
estr.select('CODG_ATIVO','CODG_INSTALACAO','QTDCIRC','CRITICIDADE','VAO_DE_FRENTE').show(3)

+-----------+---------------+-------+-----------+-------------+
| CODG_ATIVO|CODG_INSTALACAO|QTDCIRC|CRITICIDADE|VAO_DE_FRENTE|
+-----------+---------------+-------+-----------+-------------+
|ESTR.005494|       JALJALU1|   null|       null|          505|
|ESTR.005513|       JALJALU1|   null|       null|          152|
|ESTR.005794|       JALJALU1|   null|       null|          430|
+-----------+---------------+-------+-----------+-------------+
only showing top 3 rows



In [6]:
#fazendo join entre os dados
join = lt.join(estr, lt.CODG_INSTALACAO_LINHA == estr.CODG_INSTALACAO)

# limpando valores ausentes
menor_criticidade = join.agg({'CRITICIDADE':'min'}).collect()[0][0]
menor_criticidade

join = join.fillna({'RAP_CICLO': 0, 'QTDCIRC':0, 'VAO_DE_FRENTE': 0, 'ALTURA_UTIL': 0, 'CRITICIDADE':menor_criticidade})

In [7]:
#Normalizando a base 
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType, StringType, DecimalType

# Criando uma user defined function para fazer replace do ',' pelo '.'
udf = UserDefinedFunction(lambda x: x.replace(",","."), StringType())

join = join.withColumn('RAP_CICLO_NUMBER', udf(join.RAP_CICLO).cast(DecimalType(10,2)))

maximaRAP =  join.agg({'RAP_CICLO_NUMBER':'max'}).collect()[0][0]

base_dados = join.withColumn('RAP_CICLO_NORM', join.RAP_CICLO_NUMBER/maximaRAP).\
            withColumn('CRITICIDADE_NORM', join.CRITICIDADE/maximaRAP).\
            withColumn('QTDCIRC_NORM', join.QTDCIRC/maximaRAP).\
            withColumn('VAO_DE_FRENTE_NORM', join.VAO_DE_FRENTE.cast(DoubleType())/maximaRAP).\
            withColumn('ALTURA_UTIL_NORM', join.ALTURA_UTIL.cast(DoubleType())/maximaRAP).\
            withColumn('TRAVESSIA_LT_NORM', join.TRAVESSIA_LT/maximaRAP).\
            withColumn('TRAVESSIA_ESTRADA_NORM', join.TRAVESSIA_ESTRADA/maximaRAP).\
            withColumn('POVOAMENTO_NORM', join.POVOAMENTO/maximaRAP).\
            withColumn('VANDALISMO_NORM', join.VANDALISMO/maximaRAP).\
            withColumn('INVASAO_NORM', join.INVASAO/maximaRAP)

base_dados.select('RAP_CICLO', 'RAP_CICLO_NORM').show(3)

+---------+---------------+
|RAP_CICLO| RAP_CICLO_NORM|
+---------+---------------+
| 63930,25|0.0010992653310|
| 63930,25|0.0010992653310|
| 63930,25|0.0010992653310|
+---------+---------------+
only showing top 3 rows



In [8]:
# preparação da base para aplicar no modelo
from pyspark.ml.feature import VectorAssembler

base_dados = base_dados.na.fill(0)

vecAssembler = VectorAssembler(inputCols=['RAP_CICLO_NORM','QTDCIRC_NORM','CRITICIDADE_NORM','VAO_DE_FRENTE_NORM','ALTURA_UTIL_NORM','TRAVESSIA_LT_NORM','TRAVESSIA_ESTRADA_NORM','POVOAMENTO_NORM','VANDALISMO_NORM','INVASAO_NORM'], outputCol="features")
new_df = vecAssembler.transform(base_dados)
new_df.select('CODG_LINHA', 'CODG_ATIVO', 'features').show(3)

+------------+-----------+--------------------+
|  CODG_LINHA| CODG_ATIVO|            features|
+------------+-----------+--------------------+
|LTCHE.000001|ESTR.000039|(10,[0,1,3,4],[0....|
|LTCHE.000001|ESTR.000038|(10,[0,1,3,4],[0....|
|LTCHE.000001|ESTR.000037|(10,[0,1,3,4],[0....|
+------------+-----------+--------------------+
only showing top 3 rows



# Aplicando o Modelo

In [9]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=5, seed=1)  # 5 clusters here
model = kmeans.fit(new_df.select('features'))
transformed = model.transform(new_df)
transformed.select('CODG_LINHA', 'CODG_ATIVO', 'prediction').show(3)  

+------------+-----------+----------+
|  CODG_LINHA| CODG_ATIVO|prediction|
+------------+-----------+----------+
|LTCHE.000001|ESTR.000039|         3|
|LTCHE.000001|ESTR.000038|         3|
|LTCHE.000001|ESTR.000037|         3|
+------------+-----------+----------+
only showing top 3 rows



In [10]:
#salva resultado num arquivo csv
transformed.select('CODG_LINHA', 'CODG_INSTALACAO', 'CODG_ATIVO', 'RAP_CICLO','QTDCIRC','CRITICIDADE','VAO_DE_FRENTE','ALTURA_UTIL','TRAVESSIA_LT','TRAVESSIA_ESTRADA','POVOAMENTO','VANDALISMO','INVASAO', 'prediction').\
            withColumn('QTDCIRC', transformed.QTDCIRC.cast(StringType())).\
            withColumn('CRITICIDADE', transformed.CRITICIDADE.cast(StringType())).\
            withColumn('VAO_DE_FRENTE', transformed.VAO_DE_FRENTE.cast(StringType())).\
            withColumn('ALTURA_UTIL', transformed.ALTURA_UTIL.cast(StringType())).\
            withColumn('TRAVESSIA_LT', transformed.TRAVESSIA_LT.cast(StringType())).\
            withColumn('TRAVESSIA_ESTRADA', transformed.TRAVESSIA_ESTRADA.cast(StringType())).\
            withColumn('POVOAMENTO', transformed.POVOAMENTO.cast(StringType())).\
            withColumn('VANDALISMO', transformed.VANDALISMO.cast(StringType())).\
            withColumn('INVASAO', transformed.INVASAO.cast(StringType())).\
            withColumn('prediction', transformed.prediction.cast(StringType())).\
            write.csv('predictions.csv', header='true', sep=';')