In [270]:
##script by @rikardoroa
## Transformaciones con Spark

from pyspark.sql.functions import*
from pyspark.sql import SparkSession as Sesion
import pyspark.sql as py
from graphframes import *


#crear la sesion en Spark
class Spark_data():
    
    def __init__(self, session = Sesion, df = py.dataframe.DataFrame):
        self.session = session.builder.master("spark://192.168.56.1:7077").appName("Datahack_Proyecto").getOrCreate() 
        self.df = df

    def crear_sesion(self):
        SparkSession = self.session
        return SparkSession

    #visualizar el archivo csv
    def leer_visualizar_csv(self):
        SparkSession = self.session
        df = SparkSession.read.format("csv")\
        .option("header", "true").load("sf.csv")
        self.df = df
        return df.show(2)

    #barrios con mayor cantidad de Reportes
    def filtro_data(self):
        df = self.df
        data = df.select("neighborhood")\
        .groupBy("neighborhood")\
        .agg(count("neighborhood").alias("MaxCount"))\
        .orderBy("MaxCount",ascending=False)
        data.createOrReplaceTempView("data")
        return data.show()

    #utilizando where para filtrar los incidentes en el barrio "Forest Knolls"
    def filtro_incidentes(self):
        df = self.df
        data_n1 = df.select("neighborhood","source")\
        .where(col("neighborhood")=="Forest Knolls")\
        .groupBy("source","neighborhood")\
        .agg(count("source")\
        .alias("MaxCount"))\
        .orderBy("MaxCount",ascending=False)
        return data_n1.show()

    #utilizando where para filtrar los incidentes cuya categoria es "grafitti"   
    def filtro_incidentes_categoria(self):
        df = self.df
        data_n2 = df.select("neighborhood","category")\
        .where(col("category")=="Graffiti")\
        .groupBy("neighborhood","category")\
        .agg(count("category")\
        .alias("MaxCount"))\
        .orderBy("MaxCount",ascending=False)
        return data_n2.show(3)


    #utilizando where para filtrar los distritos de policia con mayores casos resueltos       
    def filtro_incidentes_tipo_estado(self):
        df = self.df
        data_n3 = df.select("police_district","status_notes")\
        .filter((col("status_notes").rlike("Case Resolved")) & (~col("police_district").rlike("null"))&\
                (~col("police_district").rlike("0|-")))\
        .groupBy("police_district")\
        .agg(count("status_notes")\
        .alias("MaxCount"))\
        .orderBy("MaxCount",ascending=False)
        return data_n3.show()

In [271]:
data = Spark_data()


In [272]:
data.crear_sesion()

In [273]:
data.leer_visualizar_csv()

+----------+--------------------+--------------------+------------------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------+--------------------+------------------+--------------------+---------------+----------------------+
|unique_key|        created_date|         closed_date|resolution_action_updated_date|status|        status_notes|         agency_name|            category|      complaint_type|          descriptor|    incident_address|supervisor_district|        neighborhood|            location|source|           media_url|          latitude|           longitude|police_district|neighborhood_center_ds|
+----------+--------------------+--------------------+------------------------------+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------

In [274]:
data.filtro_data()

+--------------------+--------+
|        neighborhood|MaxCount|
+--------------------+--------+
|           Sunnyside|     445|
|          Holly Park|     445|
|      Merced Heights|     403|
|  Central Waterfront|     394|
|       Hunters Point|     393|
|         Rincon Hill|     382|
|           Sunnydale|     359|
|         Cole Valley|     357|
|         Mission Bay|     355|
|      Produce Market|     353|
|       Outer Mission|     348|
|         South Beach|     345|
|           Mint Hill|     321|
|        Alamo Square|     320|
|Laurel Heights / ...|     315|
|           Japantown|     302|
|           Glen Park|     281|
|     Peralta Heights|     258|
|            Dogpatch|     255|
|        Upper Market|     251|
+--------------------+--------+
only showing top 20 rows



In [275]:
data.filtro_incidentes()

+--------------+-------------+--------+
|        source| neighborhood|MaxCount|
+--------------+-------------+--------+
|         Phone|Forest Knolls|      48|
|           Web|Forest Knolls|      14|
|Mobile/Open311|Forest Knolls|      14|
+--------------+-------------+--------+



In [276]:
data.filtro_incidentes_categoria()

+--------------------+--------+--------+
|        neighborhood|category|MaxCount|
+--------------------+--------+--------+
|        Alamo Square|Graffiti|      89|
|         Cole Valley|Graffiti|      85|
|Laurel Heights / ...|Graffiti|      65|
+--------------------+--------+--------+
only showing top 3 rows



In [277]:
data.filtro_incidentes_tipo_estado()

+---------------+--------+
|police_district|MaxCount|
+---------------+--------+
|      INGLESIDE|    1469|
|        BAYVIEW|     964|
|           PARK|     801|
|        TARAVAL|     756|
|       SOUTHERN|     624|
|       RICHMOND|     549|
|       NORTHERN|     548|
|        CENTRAL|     433|
|        MISSION|     208|
+---------------+--------+

