In [2]:
# code to convert big fat csv to reduced/required csv

import pandas as pd
import findspark
from pyspark.sql import SparkSession

findspark.init("/usr/local/spark")
# Build the SparkSession
spark = SparkSession.builder    .master("local")    .appName(
    "Crime Score")    .config("spark.executor.memory", "1gb")    .getOrCreate()
sc = spark.sparkContext

df = None
# pandas_df = pd.read_csv('rp_data.csv')  
df = (spark.read.format("csv").options(header="true")
    .load("rp_data.csv"))

from pyspark.sql.functions import split
split_col = split(df['Year-Month'], '-')
df = df.withColumn('Month', split_col.getItem(1))
sdf = df.groupBy(['Zipcode','Year','Month','Primary Type']).count()
# df.toPandas().to_csv('grouped.csv')

+---------+----------+--------------------+--------------------+--------------------+-------+------------+-------------------+-----------+
|  Case_ID|      Date|   Crime_Description|Location_Description|         Geolocation|Zipcode|Rental_Price|         Crime_Type|       City|
+---------+----------+--------------------+--------------------+--------------------+-------+------------+-------------------+-----------+
|161816333|08/18/2016|VANDALISM - FELON...|VEHICLE, PASSENGE...|(33.9328, -118.2621)|90061.0|      2090.0|    CRIMINAL DAMAGE|Los Angeles|
|161816334|08/18/2016|    VEHICLE - STOLEN|              STREET|(33.9581, -118.2651)|90003.0|      2047.0|MOTOR VEHICLE THEFT|Los Angeles|
|161816335|08/18/2016|BURGLARY FROM VEH...|PROJECT/TENEMENT/...| (33.9573, -118.263)|90003.0|      2047.0|           BURGLARY|Los Angeles|
|161816337|08/18/2016|VANDALISM - MISDE...|            DRIVEWAY|(33.9292, -118.2695)|90061.0|      2090.0|    CRIMINAL DAMAGE|Los Angeles|
|161816338|08/18/2016|BURGL

In [2]:
crimeSampleScore = {
    'NARCOTICS': 16,
    'BATTERY': 4,
    'THEFT': 9,
    'CRIMINAL DAMAGE': 9,
    'OTHER OFFENSE': 1,
    'ASSAULT': 36,
    'ROBBERY': 25,
    'BURGLARY': 16,
    'MOTOR VEHICLE THEFT': 9,
    'DECEPTIVE PRACTICE': 4,
    'PROSTITUTION': 16,
    'CRIMINAL TRESPASS': 4,
    'WEAPONS VIOLATION': 16,
    'PUBLIC PEACE VIOLATION': 9,
    'GAMBLING': 4,
    'OFFENSE INVOLVING CHILDREN': 16,
    'CRIM SEXUAL ASSAULT': 16,
    'INTERFERENCE WITH PUBLIC OFFICER': 4,
    'SEX OFFENSE': 9,
    'HOMICIDE': 49,
    'ARSON': 36,
    'LIQUOR LAW VIOLATION': 9,
    'KIDNAPPING': 36,
    'INTIMIDATION': 4,
    'STALKING': 9,
    'OBSCENITY': 16,
    'CONCEALED CARRY LICENSE VIOLATION': 4,
    'PUBLIC INDECENCY': 9,
    'HUMAN TRAFFICKING': 25,
    'NON-CRIMINAL': 1,
    'OTHER NARCOTIC VIOLATION': 4,
    'NON - CRIMINAL': 1,
    'RITUALISM': 1,
    'NON-CRIMINAL (SUBJECT SPECIFIED)': 1,
}
sampleCrimeByCommunityBody = {
    "size": 0,
    "aggs": {
        "yearWise": {
            "terms": {"field": "Year",
                      "size": 20,
                      "order": {"_key": "asc"}
                      }, "aggs": {
                "monthWise": {
                    "terms": {
                        "field": "Month",
                        "size": 20,
                         "order": {"_key": "asc"}
                    }, "aggs": {
                        "zipCodeWise": {
                            "terms": {
                                "field": "Zipcode",
                                "size": 100
                            }, "aggs": {
                                "primaryTypeWise": {
                                    "terms": {
                                        "field": "Primary Type",
                                        "size": 50
                                    }, "aggs": {
                                        "total": {"sum": {"field": "count"}}
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}


In [4]:
# get crime data from elastic search
import json
import requests
import statistics
import math
from constants import crimeSampleScore, sampleCrimeByCommunityBody

def getCrimesByCommunity(bodyContent):

    api_url = 'http://localhost:9200/temp_crime_data/_search?pretty'
    headers = { 'Content-Type': 'application/json'}
    response = requests.get(api_url, data=json.dumps(bodyContent), headers = headers)
    if response.status_code == 200:
        return response.json()
    else:
        return None

crimeDataByZipcode = getCrimesByCommunity(sampleCrimeByCommunityBody)
deprecationFactor = 0.50
# setup weights for each crime type
# voilent crimes : Homicide, Rape and Sexual Assault Robbery, Assault, Stalking/intimidation
# property crimes : Burglary, Larceny/theft, Motor vehicle theft, Cybercrime - Electronic crime, Identity theft
# Drugs & Crime
# Gangs, Hate Crime, Human Trafficking/Trafficking in Persons, Cybercrime, Identity Theft, Weapon Use
# divide crime by communityId
scorePerCrime = crimeSampleScore
sumOfSampleScores = sum(scorePerCrime.values())
scalingFactor = (1000/sumOfSampleScores)
scorePerCrime = dict(map(lambda kv: (kv[0], kv[1]*scalingFactor), scorePerCrime.items()))

# calculate total crime count per type
previousCalculation = {}
yearMap = {}
for yearWise in crimeDataByZipcode['aggregations']['yearWise']['buckets']:
    monthMap = {}
    for monthWise in yearWise['monthWise']['buckets']:
        # find max & min among all zipcodes for this month
        crimeTypeMaxVal = {}
        crimeTypeMinVal = {}
        for zipCodeWise in monthWise['zipCodeWise']['buckets']:
            
            for crimeType in zipCodeWise['primaryTypeWise']['buckets']:
                if crimeType['key'] in crimeTypeMaxVal:
                    if crimeType['total']['value'] > crimeTypeMaxVal[crimeType['key']]:
                        crimeTypeMaxVal[crimeType['key']] = crimeType['total']['value']
                    if crimeType['total']['value'] < crimeTypeMinVal[crimeType['key']]:
                        crimeTypeMinVal[crimeType['key']] = crimeType['total']['value']
                else:
                    crimeTypeMaxVal[crimeType['key']]=crimeType['total']['value']
                    crimeTypeMinVal[crimeType['key']]=crimeType['total']['value']
        # calculate score for each community
        # score is the weighted score (min/max scaling) for each crime type (lookup constants.py for score values per crime type)
        scorePerZipcode = {}
        for zipCodeWise in monthWise['zipCodeWise']['buckets']:
            currentZipcodeMap = {}
            score = sum(map(lambda x: (((x['total']['value'] - crimeTypeMinVal[x['key']])/(crimeTypeMaxVal[x['key']] - crimeTypeMinVal[x['key']]))*scorePerCrime[x['key']] if ((crimeTypeMaxVal[x['key']] - crimeTypeMinVal[x['key']]) > 0) else 1),zipCodeWise['primaryTypeWise']['buckets']))
            scorePerZipcode[str(zipCodeWise['key'])] = score

        # accounting previousCalculation in calculating the new scores
        for key, value in scorePerZipcode.items():
            if key in previousCalculation:
                scorePerZipcode[key] = deprecationFactor*previousCalculation[key] + (1.0 - deprecationFactor)*value
        for key, value in previousCalculation.items():
            if key not in scorePerZipcode:
                scorePerZipcode[key] = value
        previousCalculation = scorePerZipcode
        
        # ugly code .. please remove
        monthKey = ''
        if monthWise['key'] < 10:
            monthKey = '0'
        monthKey+=str(monthWise['key'])
        monthMap[monthKey] = scorePerZipcode
    yearMap[str(yearWise['key'])] = monthMap
# map(lambda x: ((x - meanCrimeScore)/stdevCrimeScore),scorePerCommunity)
# create a new index in elastic search to insert score for each communityId
print(yearMap)

{'2001': {'01': {'60619': 599.4087000459776, '60617': 243.22062865600574, '60620': 435.86966310048257, '60621': 512.5645832958992, '60639': 268.31395456694446, '60651': 391.7128013766577, '60623': 274.7447916906432, '60625': 175.87542550956707, '60637': 316.9794142047704, '60641': 185.68598007072094, '60644': 392.90194810141253, '60649': 375.4231572934729, '60618': 193.84158745863752, '60628': 385.53891075630634, '60629': 282.9262705643683, '60636': 302.40061560008127, '60640': 112.49302597616266, '60607': 121.88095554615884, '60612': 246.64770149471346, '60615': 175.0041313402986, '60624': 416.5512532009182, '60626': 169.99915138708835, '60632': 161.21406267547948, '60638': 98.904625001788, '60647': 415.23490936598654, '60660': 76.61518893471431, '60608': 138.81568678700194, '60609': 194.3802149159871, '60643': 98.92438196886098, '60653': 184.8224388594502, '60657': 103.78201813018335, '60610': 121.60865236935197, '60613': 47.152759681228666, '60616': 214.36004404285342, '60622': 196.

In [5]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import FloatType

function = udf(lambda year, month, zipcode : yearMap[year][month][zipcode], FloatType())
df = df.withColumn('crime_score', function(col('Year'), col('Month'), col('Zipcode')))

In [6]:
from pyspark.sql import DataFrameWriter
#df.show()
df.coalesce(1).write.csv(path="./updated_rental_crime", header=True, mode="append", sep=',')
# panda_dataframe = df.toPandas()
# panda_dataframe.to_csv('updated_rental_crime.csv.csv.gz'
#          , sep=','
#          , header=True
#          , index=False
#          , chunksize=100000
#          , compression='gzip'
#          , encoding='utf-8')

In [7]:
spark.stop()