# Utilización, procesamiento y visualización de grandes volúmenes de datos

## Marcela Ibarra Mora
## A01231973

Dentro de esta entrega se estara utilizado un dataset obtenido de la plataforma Kaggle, este dataset cuenta con información sobre las multas aplicadas dentro de la ciudad de Nueva York detro del perdiodo fiscal de Agosto 2013 a Junio 2014. 

Primero Inicializamos el entorno de Spark:

In [1]:
#Estableciendo variable de entorno
import os
# import pandas as pd
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-arm64"
os.environ["SPARK_HOME"] = "/home/mc/spark/spark-3.2.2-bin-hadoop3.2"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

'/home/mc/spark/spark-3.2.2-bin-hadoop3.2'

Se inicializa una sesion de Spark:

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
       .builder \
       .appName("Entrega 1. Modulo 1.") \
       .getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/10/31 01:43:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Se importan las librerias a utilizar

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.sql.functions import desc
import pandas as pd

Se importan los datos

In [4]:
df = spark.read.option('header', 'true').csv('../Mar:)/Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv')

                                                                                

Seleccionamos las columnas las cuales vamos a utilizar y cambiamos los nombres para no tener conflictos en futuras operaciones

In [5]:
df = df.select(['Vehicle Make','Violation Code','Street Code1','Violation Description'])
df = df.withColumnRenamed("Vehicle Make","VehicleMake")
df = df.withColumnRenamed("Violation Code","ViolationCode")
df = df.withColumnRenamed("Street Code1","StreetCode1")
df = df.withColumnRenamed("Violation Description","ViolationDesc")

In [6]:
df.show()

+-----------+-------------+-----------+-------------+
|VehicleMake|ViolationCode|StreetCode1|ViolationDesc|
+-----------+-------------+-----------+-------------+
|       AUDI|           46|      37250|         null|
|       FORD|           46|      37290|         null|
|      CHEVR|           46|      37030|         null|
|       FORD|           46|      37270|         null|
|        GMC|           41|      37240|         null|
|      DODGE|           14|      37250|         null|
|       null|           14|      37250|         null|
|       FORD|           24|      63430|         null|
|      TOYOT|           24|      63430|         null|
|      SUBAR|           24|          0|         null|
|      HYUND|           24|          0|         null|
|      TOYOT|           24|          0|         null|
|       AUDI|           24|          0|         null|
|      NISSA|           24|      23230|         null|
|      CHEVR|           14|      34190|         null|
|      VOLKS|           20| 

Verificamos el tipo de dato de cada columan y hacemos los cambios pertinentes

In [7]:
df.printSchema()

root
 |-- VehicleMake: string (nullable = true)
 |-- ViolationCode: string (nullable = true)
 |-- StreetCode1: string (nullable = true)
 |-- ViolationDesc: string (nullable = true)



In [8]:
df = df.withColumn("ViolationCode",df.ViolationCode.cast('int'))
df = df.withColumn("StreetCode1",df.StreetCode1.cast('int'))

Verificamos si tenemos valores nulos:

In [9]:
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()



+-----------+-------------+-----------+-------------+
|VehicleMake|ViolationCode|StreetCode1|ViolationDesc|
+-----------+-------------+-----------+-------------+
|      38298|            0|          1|       744783|
+-----------+-------------+-----------+-------------+



                                                                                

La columan de VehicleMake cuenta con una gran cantidad de valores nulos por lo tanto lo mejor es cambiarlos por el la marca de autos con mayor frecuencia:

In [10]:
#se obtiene la marca con mayor frecuencia, en este caso es FORD
df.groupBy(['VehicleMake']).count().sort(desc('count')).show()



+-----------+------+
|VehicleMake| count|
+-----------+------+
|       FORD|732565|
|      TOYOT|512799|
|      HONDA|471738|
|      CHEVR|436035|
|      NISSA|379398|
|      DODGE|186381|
|        GMC|186318|
|      ME/BE|176257|
|        BMW|159276|
|      INTER|156987|
|      FRUEH|156452|
|       JEEP|123225|
|      HYUND|109351|
|      LEXUS|105950|
|      VOLKS| 95108|
|      ACURA| 94209|
|      LINCO| 93571|
|      CHRYS| 91700|
|      MITSU| 77777|
|      INFIN| 72781|
+-----------+------+
only showing top 20 rows



                                                                                

In [11]:
#Sustituimos los NaN de la columna VehicleMake por FORD
df = df.na.fill('FORD', 'VehicleMake')

#volvemos a obtener los NaN 
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()



+-----------+-------------+-----------+-------------+
|VehicleMake|ViolationCode|StreetCode1|ViolationDesc|
+-----------+-------------+-----------+-------------+
|          0|            0|          1|       744783|
+-----------+-------------+-----------+-------------+



                                                                                

Todavia contamos con NaN ahora en las columnas de ViolationDesc y StreetCode1, estos se eliminaran del dataset:

In [12]:
#se eliminan NaN restantes
df = df.na.drop()

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()



+-----------+-------------+-----------+-------------+
|VehicleMake|ViolationCode|StreetCode1|ViolationDesc|
+-----------+-------------+-----------+-------------+
|          0|            0|          0|            0|
+-----------+-------------+-----------+-------------+



                                                                                

In [13]:
df.show()

+-----------+-------------+-----------+------------------+
|VehicleMake|ViolationCode|StreetCode1|     ViolationDesc|
+-----------+-------------+-----------+------------------+
|      TOYOT|            5|          0|BUS LANE VIOLATION|
|       FORD|            5|          0|BUS LANE VIOLATION|
|      TOYOT|            5|          0|BUS LANE VIOLATION|
|      TOYOT|            5|          0|BUS LANE VIOLATION|
|       FORD|            5|          0|BUS LANE VIOLATION|
|      TOYOT|            5|          0|BUS LANE VIOLATION|
|       FORD|            5|       6110|BUS LANE VIOLATION|
|      PONTI|            5|          0|BUS LANE VIOLATION|
|      DODGE|            5|          0|BUS LANE VIOLATION|
|       FORD|            5|          0|BUS LANE VIOLATION|
|      DODGE|            5|          0|BUS LANE VIOLATION|
|      TOYOT|            5|          0|BUS LANE VIOLATION|
|      TOYOT|            5|          0|BUS LANE VIOLATION|
|      TOYOT|            5|       6110|BUS LANE VIOLATIO

Se guarda este dataset final en un csv:

In [14]:
df.repartition(1).write.option("header",True).csv("../Mar:)/df")

                                                                                

Ahora se crea un modelo y se hace su entrenamiento.

Primero se eliminan las columnas que no nos ayudaran dentro del modelo:

In [15]:
df = df.drop('VehicleMake','StreetCode1')

Se separan los datos en un subset de train que contara con el 80% de los datos y otro de test que contara con el 20% de los datos

In [16]:
train, test = df.randomSplit([0.8, 0.2])

Creamos un pipeline para poder realizar diferentes funciones, en este caso, se hay un Tokenizer y un Hashing para convertir la columna de ViolationDesc (string) en una entrada prosesable para el modelo. Despues se crea el modelo de Regresión Logistica

In [17]:
tokenizer = Tokenizer(inputCol="ViolationDesc", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01, labelCol='ViolationCode')
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

Se entrena el pipeline con los datos de train:

In [18]:
model = pipeline.fit(train)

22/10/31 01:44:46 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/10/31 01:44:46 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/10/31 01:46:14 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/10/31 01:46:14 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Una vez entrenado el modelo, dentro de un dataset nuevo se realiza una predicción con los datos de test:

In [19]:
predictionsDf = model.transform(test)
predictionsDf.registerTempTable('Predictions')

22/10/31 01:59:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Verificamos que se hizo la predicción al ver las columnas del dataset

In [20]:
predictionsDf.columns

['ViolationCode',
 'ViolationDesc',
 'words',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

Desplegamos las predicciones:

In [21]:
predictionsDf.select('ViolationCode','prediction').show()

[Stage 42:>                                                         (0 + 1) / 1]

+-------------+----------+
|ViolationCode|prediction|
+-------------+----------+
|            4|       4.0|
|            4|       4.0|
|            4|       4.0|
|            4|       4.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
|            5|       5.0|
+-------------+----------+
only showing top 20 rows



                                                                                

Para verificar el performance del modelo, contamos la cantidad de veces que el modelo acerto y lo dividimos con la cantidad registros que tenemos:

In [22]:
correctPred = predictionsDf.where("""(prediction == ViolationCode)""").count()
numTickets = predictionsDf.count()

                                                                                

In [23]:
print ("There were", numTickets, "Tickets and there were", correctPred, "successful predictions")
print ("This is a", str((float(correctPred) / float(numTickets)) * 100) + "%", "success rate")

There were 912812 Tickets and there were 912812 successful predictions
This is a 100.0% success rate
