In [20]:
%load_ext nb_black
import numpy as np
import pandas as pd
import sys
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql import types as T

plt.style.use(style="seaborn")
%matplotlib inline

spark = SparkSession.builder.appName("Indice de Crimenes en Chicago").getOrCreate()
spark

The nb_black extension is already loaded. To reload it, use:
  %reload_ext nb_black


<IPython.core.display.Javascript object>

### UDFs

In [21]:
unpack_list_udf = F.udf(
    lambda x: list(set([item for sublist in x for item in sublist])),
    T.ArrayType(T.StringType()),
)

mean_udf = F.udf(lambda x: float(np.mean(x)), T.FloatType())

median_udf = F.udf(lambda x: float(np.median(x)), T.FloatType())

# schema = StructType([ \
#     StructField("firstname",StringType(),True), \
#     StructField("middlename",StringType(),True), \
#     StructField("lastname",StringType(),True), \
#     StructField("id", StringType(), True), \
#     StructField("gender", StringType(), True), \
#     StructField("salary", IntegerType(), True) \
#   ])
 
# df = spark.createDataFrame(data=data,schema=schema)
# df.printSchema()

<IPython.core.display.Javascript object>

### Leyendo el CSV

In [22]:
df = spark.read.csv("./Chicago_Crimes_2012_to_2017.csv", inferSchema=True, header=True)

<IPython.core.display.Javascript object>

In [23]:
df = df.toDF(
    "_c0",
    "ID",
    "Case Number",
    "Date",
    "Block",
    "IUCR",
    "Primary Type",
    "Description",
    "Location Description",
    "Arrest","Domestic",
    "Beat","District",
    "Ward",
    "Community Area",
    "FBI Code",
    "X Coordinate",
    "Y Coordinate",
    "Year",
    "Updated On",
    "Latitude",
    "Longitude",
    "Location"
)

<IPython.core.display.Javascript object>

In [24]:
def procesando_datos(data, loc_descripcion, tipo_crimenes):
    dato_locacion = {}

    assert data.groupBy("ID").count().count() == data.count()

    for tipo_crimen in tipo_crimenes:

        filtered = data.filter(data["Primary Type"] == tipo_crimen)

        filtered = filtered.withColumn(
            "Date", F.to_timestamp(filtered.Date, "yyyy-MM-dd 00:00")
        )

        dato_locacion[tipo_crimen] = {}
        for locacion in loc_descripcion:
            sub = (
                filtered.filter(filtered["Location Description"] == locacion)
                .groupBy("Date", F.window("Date", "30 days"))
                .agg(
                    F.expr("collect_list(Domestic)").alias("Domesticos"),
                    F.expr("count('ID')").alias("Count"),
                )
            )

            sub = sub.select("Date", "window.*", "Domesticos", "Count").sort(F.asc("end"))

            sub = sub.groupBy("end").agg(
                F.expr("collect_list(Domesticos)").alias("Domesticos"),
                F.expr("sum(Count)").alias("Count"),
            )

            sub = sub.withColumn("Domesticos", unpack_list_udf(F.col("Domesticos")))

            sub = (
                sub.withColumn("Mean", mean_udf("Domesticos"))
                .withColumn("Median", median_udf("Domesticos"))
                .drop("Domesticos")
            )

            dato_locacion[tipo_crimen][locacion] = sub

    return dato_locacion

<IPython.core.display.Javascript object>

In [25]:
%%time
tipo_crimenes = ["THEFT", "ASSAULT", "ROBBERY", "STALKING", "BATTERY", "SEX OFFENSE", "KIDNAPPING", "NARCOTICS"]
loc_descripcion = df.groupBy("Location Description").count().sort(F.desc("count")).collect()
loc_descripcion = [x["Location Description"] for x in loc_descripcion]

CPU times: total: 31.2 ms
Wall time: 1.61 s


<IPython.core.display.Javascript object>

In [26]:
loc_descripcion[:10]

['STREET',
 'RESIDENCE',
 'APARTMENT',
 'SIDEWALK',
 'OTHER',
 'PARKING LOT/GARAGE(NON.RESID.)',
 'ALLEY',
 'RESIDENTIAL YARD (FRONT/BACK)',
 'SMALL RETAIL STORE',
 'SCHOOL, PUBLIC, BUILDING']

<IPython.core.display.Javascript object>

In [29]:
len(loc_descripcion)

143

<IPython.core.display.Javascript object>

In [33]:
%%time
dato_procesado = procesando_datos(df, loc_descripcion[:20], tipo_crimenes)

CPU times: total: 1.98 s
Wall time: 8.82 s


<IPython.core.display.Javascript object>

In [36]:
dato_procesado["SEX OFFENSE"]["RESIDENCE"].show(5)

+---+-----+----+------+
|end|Count|Mean|Median|
+---+-----+----+------+
+---+-----+----+------+



<IPython.core.display.Javascript object>