# Descripción del sistema
El sistema consiste en el desarrollo de 5 gráficos diferentes, estos gráficos son realizados con dos conjuntos de datos de dos entidades costarricenses diferentes que son el INEC y el OIJ, ambos conjuntos de datos fueron descargados en formato csv y son almacenados como dataframes utilizando la función de spark en python, además de estos los datos son limpiados ya que tienen incongruencias en su formato, una vez con los datos limpios se envían los mismo a postgres y se grafican

In [1]:
#Bibliotecas necesarias para el uso de spark
from pyspark.sql import SparkSession
from pyspark.files import SparkFiles
from pyspark.sql import functions as F
from pyspark.sql.functions import col, unix_timestamp, to_date

In [2]:
#Se guardan ambos archivos como dataframes
oij_csv=spark.read.format("csv").option("header","true").option("inferSchema","true").load("Estadisticas.csv")
inec_csv=spark.read.format("csv").option("header","true").option("inferSchema","true").load("reempleocenso2011-22.csv")

In [3]:
#Se imprimen ambos esquemas para observar el formato en el que vienen los datos, esto servirá para ajustar
#el dataframe a lo que necesitemos
#este de abajo será el formato del OIJ
oij_csv.printSchema()

root
 |-- Delito: string (nullable = true)
 |-- SubDelito: string (nullable = true)
 |-- Fecha: string (nullable = true)
 |-- Hora: string (nullable = true)
 |-- Victima: string (nullable = true)
 |-- SubVictima: string (nullable = true)
 |-- Edad: string (nullable = true)
 |-- Genero: string (nullable = true)
 |-- Nacionalidad: string (nullable = true)
 |-- Provincia: string (nullable = true)
 |-- Canton: string (nullable = true)
 |-- Distrito: string (nullable = true)



In [4]:
#Este otro formato correspone al INEC
inec_csv.printSchema()

root
 |-- ProvinciaCantonDistrito: string (nullable = true)
 |-- PoblacionMayor15: integer (nullable = true)
 |-- TasaNetaParticipacion: double (nullable = true)
 |-- TasaOcupacion: double (nullable = true)
 |-- TasaDesempleoAbierto: string (nullable = true)
 |-- PorcentajeEconomicamenteInactivo: double (nullable = true)
 |-- RelacionDependenciaEconomica: double (nullable = true)



In [5]:
#Se registran ambos dataframe como tablas de SQL, esto nos permite realizar consultas sql por medio de sqlContext
sqlContext.registerDataFrameAsTable(oij_csv, "oij")
sqlContext.registerDataFrameAsTable(inec_csv, "inec")

In [6]:
'''
Entradas: Un entero
Salidas: Un dataframe
Descripción general: Se encarga de validar cual conjunto de datos se debe enviar 
y realiza la consulta SQL correspondiente la cual removerá espacios en blanco
'''
def quitaEspacios(ind):
    #Este if es de validación para enviar datos de INEC o del OIJ
    #por medio de sqlContext se utiliza la función TRIM que acorta todos los espacios en blanco
    #almacenados antes o después de nuestros datos
    if ind == 1:
        return sqlContext.sql("SELECT Delito, SubDelito,Hora, Fecha, Victima, SubVictima, Edad, Genero, Nacionalidad, Provincia,Canton, TRIM(Distrito) as Distrito from oij")
    else:
        return sqlContext.sql("SELECT TRIM(ProvinciaCantonDistrito) AS ProvinciaCantonDistrito, PoblacionMayor15, TasaNetaParticipacion, TasaOcupacion, TasaDesempleoAbierto, PorcentajeEconomicamenteInactivo, RelacionDependenciaEconomica from inec")

In [7]:
#Simplemente se llama a la función con el número entero correspondiente y se registra el nuevo dataframe
#como la tabla almacenada anteriormente en el sistema, esto se aplica para ambos conjuntos de datos
oij = quitaEspacios(1)
sqlContext.registerDataFrameAsTable(oij, "oij")
inec = quitaEspacios(2)
sqlContext.registerDataFrameAsTable(inec, "inec")

In [8]:
'''
Entradas: Un entero
Salidas: Un dataframe
Descripción general: Se encarga de validar cual conjunto de datos se debe enviar 
y realiza la consulta SQL correspondiente la cual pasará los datos a minúscula
'''
def minusculas(ind):
    #Este if es de validación para enviar datos de INEC o del OIJ
    #por medio de sqlContext se utiliza la función LOWER que se encarga de pasar los datos a minúscula
    if ind == 1:
        return sqlContext.sql("SELECT Delito, SubDelito,Hora, Fecha, Victima, SubVictima, Edad, Genero, Nacionalidad, Provincia,Canton, LOWER(Distrito) as Distrito from oij")
    else:
        return sqlContext.sql("SELECT LOWER(ProvinciaCantonDistrito) AS ProvinciaCantonDistrito, PoblacionMayor15, TasaNetaParticipacion, TasaOcupacion, TasaDesempleoAbierto, PorcentajeEconomicamenteInactivo, RelacionDependenciaEconomica from inec")

In [9]:
#Se llama a la función con el número entero correspondiente y se registra el nuevo dataframe
#como la tabla almacenada anteriormente en el sistema, se aplica a ambos conjuntos de datos
oij = minusculas(1)
sqlContext.registerDataFrameAsTable(oij, "oij")
inec = minusculas(2)
sqlContext.registerDataFrameAsTable(inec, "inec")

In [10]:
'''
Entradas: No posee
Salidas: Un dataframe
Descripción general: Se encarga de hacer la consulta sql para mostrar los datos del OIJ
que no coinciden con ningún dato del INEC
'''
def sacaNoExistentes():
    #Esta consulta se encarga de sacar los distintos por medio del uso de DISTINCT que no se encuentren
    #en la consulta realizada a los datos del INEC
    return sqlContext.sql("SELECT DISTINCT(Distrito) FROM oij WHERE NOT EXISTS(SELECT 1 FROM inec WHERE inec.ProvinciaCantonDistrito = oij.Distrito)") 

In [11]:
#Llama a la función y lo almacena como un dataframe nuevo y una tabla nueva
noCoinciden = sacaNoExistentes()
sqlContext.registerDataFrameAsTable(noCoinciden, "noCoincidencias")

In [12]:
#Se muestran los datos que no coincidieron, principalmente fue porque el OIJ utiliza abreviaciones y no usa tildes,
#el INEC no
noCoinciden.show()

+-------------+
|     Distrito|
+-------------+
|       pococi|
|      guacimo|
|     la union|
|         null|
|  leon cortes|
|     san jose|
|    san ramon|
|        canas|
|        limon|
|perez zeledon|
|        tibas|
|    sarapiqui|
+-------------+



In [13]:
'''
Entradas: No posee
Salidas: Un dataframe
Descripción general: Se encarga de contar los elementos de la tabla de noCoincidencias
estos elementos corresponen a todos aquellos distritos del OIJ que no corresponden a distritos del INEC
'''
def cuentaNoExistentes():
    return sqlContext.sql("SELECT COUNT(*) as num FROM noCoincidencias") 

In [14]:
#Almacena el dataframe con el dato
numNoCoinciden = cuentaNoExistentes()

In [15]:
#Se despliega el número correspondiente
numNoCoinciden.show()
print("Cantidad de registros que no coincidieron: ", noCoinciden.count())

+---+
|num|
+---+
| 12|
+---+

Cantidad de registros que no coincidieron:  12


In [16]:
#Se modifican datos del dataframe del INEC para que correspondan a algunos datos del OIJ
#esto se encarga de buscar por columna los datos que tengan almaceanado los nombres de la manera indicada
#luego se modifican por sus correpondientes en el dataframe del OIJ
inec = inec.withColumn("ProvinciaCantonDistrito", F.when(F.col("ProvinciaCantonDistrito")=='pococí','pococi').otherwise(F.col("ProvinciaCantonDistrito")))
inec = inec.withColumn("ProvinciaCantonDistrito", F.when(F.col("ProvinciaCantonDistrito")=='la unión','la union').otherwise(F.col("ProvinciaCantonDistrito")))
inec = inec.withColumn("ProvinciaCantonDistrito", F.when(F.col("ProvinciaCantonDistrito")=='belén','belen').otherwise(F.col("ProvinciaCantonDistrito")))
inec = inec.withColumn("ProvinciaCantonDistrito", F.when(F.col("ProvinciaCantonDistrito")=='león cortés castro','leon cortes').otherwise(F.col("ProvinciaCantonDistrito")))
inec = inec.withColumn("ProvinciaCantonDistrito", F.when(F.col("ProvinciaCantonDistrito")=='san josé','san jose').otherwise(F.col("ProvinciaCantonDistrito")))
#se guarda el dataframe modificado como una tabla
sqlContext.registerDataFrameAsTable(inec, "inec")

In [17]:
#Se integran por medio de un inner join los conjuntos de datos,y se almacenan como un nuevo dataframe
#este dataframe será enviado a postgres
datosIntegrados = sqlContext.sql("SELECT a.Delito, a.SubDelito, a.Hora, a.Fecha, a.Victima, a.SubVictima, a.Edad, a.Genero, a.Nacionalidad, a.Provincia, a.Canton, a.Distrito, b.PoblacionMayor15, b.TasaNetaParticipacion, b.TasaOcupacion, b.TasaDesempleoAbierto, b.PorcentajeEconomicamenteInactivo, b.RelacionDependenciaEconomica FROM oij a INNER JOIN inec b ON a.Distrito = b.ProvinciaCantonDistrito")

In [18]:
datosIntegrados.printSchema()

root
 |-- Delito: string (nullable = true)
 |-- SubDelito: string (nullable = true)
 |-- Hora: string (nullable = true)
 |-- Fecha: string (nullable = true)
 |-- Victima: string (nullable = true)
 |-- SubVictima: string (nullable = true)
 |-- Edad: string (nullable = true)
 |-- Genero: string (nullable = true)
 |-- Nacionalidad: string (nullable = true)
 |-- Provincia: string (nullable = true)
 |-- Canton: string (nullable = true)
 |-- Distrito: string (nullable = true)
 |-- PoblacionMayor15: integer (nullable = true)
 |-- TasaNetaParticipacion: double (nullable = true)
 |-- TasaOcupacion: double (nullable = true)
 |-- TasaDesempleoAbierto: string (nullable = true)
 |-- PorcentajeEconomicamenteInactivo: double (nullable = true)
 |-- RelacionDependenciaEconomica: double (nullable = true)



In [19]:
datosIntegrados=datosIntegrados.withColumn('TasaDesempleoAbierto',datosIntegrados['TasaDesempleoAbierto'].cast("float").alias('TasaDesempleoAbierto'))
datosIntegrados=datosIntegrados.withColumn('TasaNetaParticipacion',datosIntegrados['TasaNetaParticipacion'].cast("float").alias('TasaNetaParticipacion'))
datosIntegrados=datosIntegrados.withColumn('TasaOcupacion',datosIntegrados['TasaOcupacion'].cast("float").alias('TasaOcupacion'))
datosIntegrados=datosIntegrados.withColumn('PorcentajeEconomicamenteInactivo',datosIntegrados['PorcentajeEconomicamenteInactivo'].cast("float").alias('PorcentajeEconomicamenteInactivo'))
datosIntegrados=datosIntegrados.withColumn('RelacionDependenciaEconomica',datosIntegrados['RelacionDependenciaEconomica'].cast("float").alias('RelacionDependenciaEconomica'))
datosIntegrados=datosIntegrados.withColumn('Fecha',to_date(unix_timestamp(col('Fecha'), 'yyyy-MM-dd').cast("timestamp")))

In [20]:
datosIntegrados.printSchema()

root
 |-- Delito: string (nullable = true)
 |-- SubDelito: string (nullable = true)
 |-- Hora: string (nullable = true)
 |-- Fecha: date (nullable = true)
 |-- Victima: string (nullable = true)
 |-- SubVictima: string (nullable = true)
 |-- Edad: string (nullable = true)
 |-- Genero: string (nullable = true)
 |-- Nacionalidad: string (nullable = true)
 |-- Provincia: string (nullable = true)
 |-- Canton: string (nullable = true)
 |-- Distrito: string (nullable = true)
 |-- PoblacionMayor15: integer (nullable = true)
 |-- TasaNetaParticipacion: float (nullable = true)
 |-- TasaOcupacion: float (nullable = true)
 |-- TasaDesempleoAbierto: float (nullable = true)
 |-- PorcentajeEconomicamenteInactivo: float (nullable = true)
 |-- RelacionDependenciaEconomica: float (nullable = true)



In [21]:
datosIntegrados.show()

+------+-------------+-------------------+----------+-----------+----------+--------------------+-------------+------------+----------+----------+-------------+----------------+---------------------+-------------+--------------------+--------------------------------+----------------------------+
|Delito|    SubDelito|               Hora|     Fecha|    Victima|SubVictima|                Edad|       Genero|Nacionalidad| Provincia|    Canton|     Distrito|PoblacionMayor15|TasaNetaParticipacion|TasaOcupacion|TasaDesempleoAbierto|PorcentajeEconomicamenteInactivo|RelacionDependenciaEconomica|
+------+-------------+-------------------+----------+-----------+----------+--------------------+-------------+------------+----------+----------+-------------+----------------+---------------------+-------------+--------------------+--------------------------------+----------------------------+
|ASALTO|  ARMA BLANCA|18:00:00 - 20:59:59|2011-09-19|EDIFICACION|      CAFÉ| INTERNET [EDIFIC...|Mayor de eda

In [22]:
spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "postgresql-42.1.4.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.1.4.jar") \
    .getOrCreate()

In [23]:
def mandaPostgres(dataframe, nombre):
    dataframe \
        .write \
        .format("jdbc") \
        .mode('overwrite') \
        .option("url", "jdbc:postgresql://localhost/") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .option("dbtable", nombre) \
        .save()

In [24]:
mandaPostgres(datosIntegrados.select("Delito"), "prueba")

Py4JJavaError: An error occurred while calling o123.save.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:105)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:105)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:194)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:198)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
