# 0. Init pySpark and Imports

In [168]:
import os
import chardet
from IPython.display import display, HTML
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
os.getcwd()

'/Users/juandavidescobarescobar/Documents/Unir/Materias/BD Big Data/Actividad 1'

In [169]:
#testing pyspark installation
import findspark
findspark.init('/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2')
findspark.find()
import pyspark
findspark.find()

'/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2'

In [173]:
#Initiate Spark Context

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc = pyspark.SparkContext(conf = conf)
spark = SparkSession(sc)

# 1. Funciones de validacion - CSV

In [None]:
'''
En esta parte del código se encarga de validar la lectura correcta del archivo en formato CSV, de acuerdo 
a sus propiedades (encabezados, encoding, separador de linea, separador de columna, filas, columnas) y el 
esquema o tipología de los datos.
''' 

In [174]:
"""
Descripción: Retorna boolean que determina si el archivo cuenta con el encoding UTF-8.
Responsables: Juan David Escobar E
Fecha: 30/11/2021
"""

def is_valid_encoding_csv(ar_file):
    this_encoding = 'UTF-8'
    result = chardet.detect(open(ar_file, 'rb').read())
    charenc = result['encoding']
    return  True if this_encoding in charenc.upper() else False

In [175]:
"""
Descripción: Retorna los registros duplicados a partir de un Dataframe, y los registros unicos
Parámetros:
    ar_file -- Archivo a validar
    gb_records -- String el cual contiene los nombres de la columna que son unicos del Dataframe.
Responsables: Juan David Escobar E
Fecha: 01/12/2021
"""

def get_duplicates(df_csv, df_pk):
    is_error = False
    msg_error = ''
    separator = ''
   
    try:      
        df_Campo = df_csv.groupby(df_pk).count()
        df_duplicados = df_Campo.select(col(df_pk), col("count")).filter(col("count") > 1).collect()
        duplicados = [str(df_pk + ": " + row[df_pk] + " - Cantidad: " \
                   + str(row['count'])) for row in df_duplicados]

        for i in range(len(duplicados)):
            lista_duplicados = duplicados[i].split(",")
            msg_error += separator + "["+(lista_duplicados[0].replace('"',''))+"]"
            separator = ', '
        if len(duplicados) == 0:
            is_error = True
            msg_error = ''
            result = (is_error, msg_error)
    except Exception as error:
        is_error = False
        msg_error = 'No se pudo validar duplicados. !ERROR¡: ' + str(error)
        result = {'is_error' : is_error, 'msg_error' : msg_error}
    return result

# 2. Lectura y limpieza 

In [176]:
'''
Descripción: Lectura desde una ruta local un archivo en formato CSV, el cual se  intenta interpreta 
             interpretar por primera vez, asumiendo que el archivo posee un encoding tipo UTF-8, no 
             se especifica esquema, delimitador el caracter ";", salto de linea el caracter CRLF y 
             la primera fila con encabezado.
Responsables: Juan David Escobar E
Fecha: 30/11/2021
'''

def read_csv():

    # File location (https://www.youtube.com/watch?v=-tZbkgTnGs4)
    file_location = '/Users/juandavidescobarescobar/Documents/Unir/Materias/BD Big Data/Actividad 1/data_act_03.csv'
    file_type = 'csv'

    # CSV options
    infer_schema = 'true'
    first_row_is_header = 'true'
    delimiter = ';'
    
    # Validate encoding UTF-8
    is_valid_encode = is_valid_encoding_csv(file_location)
    
    if is_valid_encode:
                
        try:        
            # The applied options are for CSV files. For other types, these will ignored.
            df = spark.read.format(file_type) \
                           .option('inferSchema', infer_schema) \
                           .option('header', first_row_is_header) \
                           .option('sep', delimiter) \
                           .load(file_location) 
        except Exception as error:
                        
            print('Error leyendo el archivo: ' + str(error))
                
    return df    

# 3. Limpieza General

In [225]:
# Init clean and file validations

# 1. Comparar el esquema inferido por pySpark Dataframe vs Los valores almacenados en el archivo

df = read_csv()
df.printSchema()

root
 |-- CrimeId: integer (nullable = true)
 |-- OriginalCrimeTypeName: string (nullable = true)
 |-- OffenseDate: timestamp (nullable = true)
 |-- CallTime: string (nullable = true)
 |-- CallDateTime: timestamp (nullable = true)
 |-- Disposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- AgencyId: string (nullable = true)
 |-- Range: string (nullable = true)
 |-- AddressType: string (nullable = true)



In [226]:
# 2. Al realizar la comparación del paso anterior se encuentran las siguientes anotaciones para cada valor:

'''
-- CrimeId: integer (nullable = true), El tipo de dato corresponde al que se infiere en el esquema, al ser un
            identificador de registro este campo no debería aceptar valores nulos, por lo cual se debe
            corregir en el esquema inferido de manera automatica por Spark Dataframe.
          
-- OriginalCrimeTypeName: string (nullable = true), Los valores concuerdan con el tipo de dato inferido "String", 
                          el cual acepta valores nulos sin ningun inconveniente, a simple vista en el archivo se
                          logra identificar algunos patrones de información numerica como por ejemplo "594" o 
                          caracteres sin sentido o en código "lp" que no concuerdan con la descripción o proposito del 
                          campo, se puede concluir que son datos errados que quiza debamos limpiar del dataset, teniendo
                          muy presente la previa autorización y validación del analista de negocio a cargo.

-- OffenseDate: timestamp (nullable = true),  Formato de la fecha:YYYY-MM-DD T HH:MM:SS, para que este formato
                identificado a ojo en el archivo CSV cumpla su estructura, se debe generar en una segunda 
                lectura del dataset leído o inferido de manera original, a un nuevo dataset donde se especifique 
                el formato que predomina para la fecha y tipo de dato inferido de manera automatica, por medio de
                un constrain en un nuevo schema definido por el usuario. Adicional a esto se puede intentar hacer un 
                casteo de este formato para todos los valores del dataset e identificar valores con error, aunque si
                el esquema que lo inferio de manera automatica tiene un tipo de dato timestamp, esto nos da la tranquilidad 
                de sasber que los valorers estan correctos para este formato de estampa de tiempo.

-- CallTime: string (nullable = true) Formato HH:MM, el tipo de dato se infirio de tipo texto, pero identificamos 
             un factor que se relaciona con la descripción o nombre del campo, para asegurarnos que todos los valores 
             cumplan con este formato, es importante validarlo por medio de una expresión regular, excluyendo 
             aquellos valores nulos. Otra manera es recrear el dataset con un esquema especificado en el cual
             asignemos este tipo de dato como timestamp con el formato HH:MM.

-- CallDateTime: timestamp (nullable = true), Aplica la misma descripción que se especifico para el campo 
                 "OffenseDate"
                 
-- Disposition: string (nullable = true), Los valores concuerdan con el tipo de dato inferido "String", tambien 
                se analiza la longitud de los caracteres de cada campo, la cual es de 3 y en mayuscula, ejemplo "REP".
                A simple vista se detectan algunos campos vacios, lo cual es normal, pero se detectan valores que no 
                cumplen el mismo patron 3 caracteres en letra mayuscula, se identifican valores como por ejemplo:
                "Not recorded", lo cual se puede asumir que es un Dummy que se almaceno ya que no se disponia el 
                valor, en este caso lo mejor es limpiar esta información para dejar la información mas consistente.

-- Address: string (nullable = true), Los valores concuerdan con el tipo de dato inferido "String", lo cual es un
            buen indicio ya que es el tipo de dato comunmente usado para las direcciones en la mayoría de sistemas
            de información, ya que almacenan valores alfanumericos. No se identifican valores atipicos. 

-- City: string (nullable = true), Los valores concuerdan con el tipo de dato inferido "String", lo cual es un
            buen indicio ya que es el tipo de dato comunmente usado para las ciudades, los valores tienen la descipcion
            completa de una sola ciudad la cual es San Francisco, algunos campos tienen valores nulos, los cuales
            se deben dejar tal cual ya que no se posee información de cual ciudad del estado CA: California pueden
            pertenecer Ej: (Los Angeles, San Francisco, San Jose, entre otros). El factor común es San Francisco, pero
            lo correcto sería reportar estos campos a los dueños de la informacón para corregirlos en una carga posterior
            y actualizarlos por el ID.

-- State: string (nullable = true) CA 2 CHARACTERS, Los valores concuerdan con el tipo de dato inferido "String", lo cual
          es un buen indicio ya que es el tipo de dato comunmente usado para las estados, 
          https://es.wikipedia.org/wiki/Anexo:Abreviaciones_de_los_estados_de_Estados_Unidos. Para validar la calidad
          de la información se puede hacer un distinc de la información para conocer las diferentes categorias
          registradas, y se debe validar que cumplen el formato de 2 caracteres en mayúsculas.

-- AgencyId: string (nullable = true) INT, Los valores no concuerdan con el tipo de dato inferido, ya que 
             debería ser un INT, esta columna no es relevante en esta sabana de datos ya que es solo el
             Id numerico de la agencia donde se reporto el crimen, pero que no brinda una descripción, quiza
             este dato se deba conservar para poder relacionar esta tabla de registro de crimenes con otro
             dataset de agencias.

-- Range: string (nullable = true) Los valores no concuerdan con la descriopcion del campo, ya que normalmente
          un rango es un tipo de dato entero, pero que tambien puede ser la descripcion de un limite inferior y
          superior, en este caso esta columna no tiene valores y no proporciona información asociada al crimen ya que
          todos los valores son null.
          
-- AddressType: string (nullable = true), Los valores concuerdan con el tipo de dato inferido "String", para validar
                la calidad de la información se puede hacer un distinct de las categorias o tipos de dirección
                y validar cual de ellas es un dato errado que no pertenece a un tipo de dirección.
'''


'\n-- CrimeId: integer (nullable = true), El tipo de dato corresponde al que se infiere en el esquema, al ser un\n            identificador de registro este campo no debería aceptar valores nulos, por lo cual se debe\n            corregir en el esquema inferido de manera automatica por Spark Dataframe.\n          \n-- OriginalCrimeTypeName: string (nullable = true), Los valores concuerdan con el tipo de dato inferido "String", \n                          el cual acepta valores nulos sin ningun inconveniente, a simple vista en el archivo se\n                          logra identificar algunos patrones de información numerica como por ejemplo "594" o \n                          caracteres sin sentido o en código "lp" que no concuerdan con la descripción o proposito del \n                          campo, se puede concluir que son datos errados que quiza debamos limpiar del dataset, teniendo\n                          muy presente la previa autorización y validación del analista de negocio

In [227]:
# 3. Filas leidas

df.count()
# 10051

10051

In [228]:
# 4. Columnas leidas

len(df.columns)
# 12

12

In [229]:
# 5. Limpieza general - Datos perdidos N/A or None (Elimina las filas duplicadas - por todos los campos)

#df.dropna()
display(HTML("<style>pre { white-space: pre !important; }</style>"))
df.dropna(how='all').show()
df.count()

+---------+---------------------+-------------------+--------+-------------------+-----------+--------------------+-------------+-----+--------+-----+---------------+
|  CrimeId|OriginalCrimeTypeName|        OffenseDate|CallTime|       CallDateTime|Disposition|             Address|         City|State|AgencyId|Range|    AddressType|
+---------+---------------------+-------------------+--------+-------------------+-----------+--------------------+-------------+-----+--------+-----+---------------+
|160903280|    Assault / Battery|2016-03-30 00:00:00|   18:42|2016-03-30 18:42:00|        REP|100 Block Of Chil...|San Francisco|   CA|       1| null|Premise Address|
|160912272|   Homeless Complaint|2016-03-31 00:00:00|   15:31|2016-03-31 15:31:00|        GOA|2300 Block Of Mar...|San Francisco|   CA|       1| null|Premise Address|
|160912590|            Susp Info|2016-03-31 00:00:00|   16:49|2016-03-31 16:49:00|        GOA|2300 Block Of Mar...|San Francisco|   CA|       1| null|Premise Address

10051

In [230]:
# 6. Limpieza general - Borrar columnas con None / NAN 

df = df.toPandas().dropna(axis=1, how='all')
len(df.columns)

11

In [231]:
# 7. Limpieza general - Elimina las filas duplicadas - por todos los campos

df = df.drop_duplicates()
df.count()

CrimeId                  10051
OriginalCrimeTypeName    10051
OffenseDate              10051
CallTime                 10051
CallDateTime             10051
Disposition              10051
Address                  10051
City                      9730
State                    10048
AgencyId                 10051
AddressType              10051
dtype: int64

In [232]:
# 8. Limpieza general - Elimina las filas duplicadas - por "CrimeId"

df_pk = 'CrimeId'
result_duplicates = get_duplicates(df, df_pk)

if result_duplicates['is_error']:
    df.drop_duplicates(subset = [df_pk])
    print('Identificadores duplicados: {0}'.format(result_duplicates['is_error']))

# 4. Limpieza Específica

In [233]:
# 1. Ajustar esquema de acuerdo a la naturaleza de los datos analizados en el Datset
#StructField("Range", StringType(), True),  \ # col eliminada


schema = StructType([ \
    StructField("CrimeId",IntegerType(),False), \
    StructField("OriginalCrimeTypeName",StringType(),True), \
    StructField("OffenseDate",TimestampType(),True), \
    StructField("CallTime", StringType(), True), \
    StructField("CallDateTime", TimestampType(), True), \
    StructField("Disposition", StringType(), True), \
    StructField("Address", StringType(), True), \
    StructField("City", StringType(), True), \
    StructField("State", StringType(), True), \
    StructField("AgencyId", IntegerType(), True), \
    StructField("AddressType", StringType(), True)  \
  ])

data = df.values.tolist()
df = spark.createDataFrame(data = data)


df_new = spark.createDataFrame(data = df.rdd, schema = schema)
df_new.printSchema()
df_new.show()

root
 |-- CrimeId: integer (nullable = false)
 |-- OriginalCrimeTypeName: string (nullable = true)
 |-- OffenseDate: timestamp (nullable = true)
 |-- CallTime: string (nullable = true)
 |-- CallDateTime: timestamp (nullable = true)
 |-- Disposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- AgencyId: integer (nullable = true)
 |-- AddressType: string (nullable = true)



21/12/01 21:28:44 WARN TaskSetManager: Stage 48 contains a task of very large size (1011 KiB). The maximum recommended task size is 1000 KiB.
21/12/01 21:28:44 ERROR Executor: Exception in task 0.0 in stage 48.0 (TID 39)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py",

Py4JJavaError: An error occurred while calling o592.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 39) (192.168.20.49 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/session.py", line 682, in prepare
    verify_func(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1392, in verify_struct
    verifier(v)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1405, in verify_default
    verify_acceptable_types(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1293, in verify_acceptable_types
    raise TypeError(new_msg("%s can not accept object %r in type %s"
TypeError: field OffenseDate: TimestampType can not accept object Row() in type <class 'pyspark.sql.types.Row'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:476)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at sun.reflect.GeneratedMethodAccessor100.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/session.py", line 682, in prepare
    verify_func(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1392, in verify_struct
    verifier(v)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1411, in verify
    verify_value(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1405, in verify_default
    verify_acceptable_types(obj)
  File "/Users/juandavidescobarescobar/Documents/Apache Spark/spark-3.2.0-bin-hadoop3.2/python/pyspark/sql/types.py", line 1293, in verify_acceptable_types
    raise TypeError(new_msg("%s can not accept object %r in type %s"
TypeError: field OffenseDate: TimestampType can not accept object Row() in type <class 'pyspark.sql.types.Row'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [234]:
display(df_new)

DataFrame[CrimeId: int, OriginalCrimeTypeName: string, OffenseDate: timestamp, CallTime: string, CallDateTime: timestamp, Disposition: string, Address: string, City: string, State: string, AgencyId: int, AddressType: string]

In [None]:
# 1. Validar el porcentaje de correlación de los datos

# df2.stat.corr('CrimeId', 'CallTime')

In [None]:
# 2. Validar la variable media y desviacion estandar para los tipos de datos númericos

In [112]:

#display(HTML("<style>pre { white-space: pre !important; }</style>"))
    #df.show()
    df.limit(20).toPandas().head()
    df.printSchema()

    #len(df.columns)


df.limit(20).toPandas().head()

IndentationError: unexpected indent (2395780659.py, line 3)

In [None]:
df.count()

In [None]:
'''
|-- CrimeId: integer (nullable = true)
|-- OriginalCrimeTypeName: string (nullable = true)
|-- OffenseDate: timestamp (nullable = true)  YYYY-MM-DD T HH:MM:SS
|-- CallTime: string (nullable = true) HH:MM
|-- CallDateTime: timestamp (nullable = true) YYYY-MM-DD T HH:MM:SS
|-- Disposition: string (nullable = true) 3 charascter MAX EX: REP
|-- Address: string (nullable = true) 
|-- City: string (nullable = true)
|-- State: string (nullable = true) CA 2 CHARACTERS
|-- AgencyId: string (nullable = true) INT (OJO ERROR)
|-- Range: string (nullable = true) INT?? OJO SIN DATOS
|-- AddressType: string (nullable = true) String
'''

In [None]:
df2.describe('CrimeId', 'OriginalCrimeTypeName').show()
df2.describe('OffenseDate', 'CallTime').show()

In [None]:
# Almacenar la informacion del DF en una tabla temporal para poder manipularlo mediante consultas

temp_table_csv_name = 'crimes'

df.createOrReplaceTempView(temp_table_csv_name)
df2 = spark.sql('SELECT * FROM ' + temp_table_csv_name)
df2.limit(20).toPandas().head()
df2.cache()



In [None]:
'''
Describe de los valores enteros, para validar los numeros:
MAX, MIN, COUNT, MEAN (PROMEDIO) Y LA 
DESVIACION ESTANDAR

X   |X - (~X)         | (X - (~X))^2

5   |5 - 15,6 = -10,6 | (-10,6)^2 = 112,36
15  |-0,6             | 0,36
12  |-3,6             | 12,96
18  |2,4              | 5,76
28  |12,4             | 153,76
                        (285,2)        

~X = 5 +15 + 12 +18 + 28 / 5 = 15,6

s = raiz(sum( (X - (~X))^2 ) / N-1)
s = raiz(285,1 / (5-1)) = 8,44


La desviacion estandar me indica la variacion que existe en los datos de la muestra,
es decir que tan diferentes o parecidos son.

'''

import matplotlib.pyplot as plt
import numpy as np

x = np.array([5, 15, 12, 18, 28])
y = np.power(x, 2) # Effectively y = x**2

plt.errorbar(x, y, linestyle='None', marker='x')

plt.show()

In [None]:
'''
--------------------------------------------------------------------------------------------
0. Contar filas
0. Contar columnas
--------------------------------------------------------------------------------------------
Limpieza de datos:

1. Datos perdidos N/A or None       

Encontrar Nulos
df.isnull()

Filtrar datos perdidos
from numpy import nan as NA
df.dropna()

Borrar filas que todos los registros sean None / NA
df.dropna(how='all')

Borrar columnas con None / NA
df.dropna(axis=1, how='all')

Borrar ciertos NA, es decir solo lo que le indiquemos
df.dropna(thresh=2)

Rellenar datos con un valor predeterminado
df.fillna(0)

Rellenar datos con un valor de un key de un dic
para la col 1 y 2
df.fillna({1:0.5, 2:5})

df.fillna(method='ffill') #fordward fill, rellena con el ultimo valor que no era NA
df.fillna(method='ffill', limit=1) #lo mismo pero solo para un NAN

df.fillna(data.mean()) #rellena con un promedio de los valores de la fila y no con 0

--------------------------------------------------------------------------------------------
2. Datos duplicados

Nos dice las filas que estan duplicadas

df.duplicated()

Elimina las filas duplicadas

df.drop_duplicates()


3. Manipulación de strings
4. Transformación de datos

'''

In [172]:
sc.stop()