In [0]:
sc = spark.sparkContext

#dbutils.widgets.text("file_location", "/uploads/data", "Upload Location")
#lines = sc.textfile("")
dataPath = "/FileStore/tables/Police_Department_Incidents.csv"

crimes_df = spark.read.format("com.databricks.spark.csv")\
  .option("header","true")\
  .option("inferSchema", "true")\
  .load(dataPath)


#To create a table with dataset
crimes_df.createOrReplaceTempView("crimes")

In [0]:
display(crimes_df.dtypes)

In [0]:
display(crimes_df)

In [0]:
# Crime event counts in dataset
crimes_df.count()

In [0]:
%sql

-- Q1 - crime counts by category, version 1

select category, count(*) from crimes group by 1 order by 2 desc

In [0]:
# Q1 - crime counts by category, version 2 
# A way to order by count in display function?

crime_by_category = crimes_df.groupBy("category").count()

display(crime_by_category)

In [0]:
%sql

-- Q2 - crime counts by district, version 1

select PdDistrict, count(*) from crimes group by 1 order by 2 desc

In [0]:
# Q2 - crime counts by district, version 2 
# A way to order by count in display function?

crime_by_category = crimes_df.groupBy("pddistrict").count()

display(crime_by_category)

In [0]:
%python

# Q3, version 1, use python to define a function to calculate distance(Km) between each location and SF downtown center location (latitude, longitude)
import math

def distance_calculator(lat, lon, target_lat, target_lon):
  Radius_earth = 6371.0
  Lon_distance = lon - target_lon
  Lat_distance = lat - target_lat
  Haversine_base = math.sin(Lat_distance / 2)**2 + math.cos(target_lat) * math.cos(lat) * math.sin(Lon_distance / 2)**2
  Haversine_secondary = 2 * math.atan2(sqrt(Haversine_Base), math.sqrt(1 - Haversine_Base))
  Distance = Radius_earth * Haversine_secondary
  return Distance
  

In [0]:
crimes_df_with_distance = crimes_df.select('*', distance_calculator(crimes_df.Y, crimes_df.X, 37.773972, -122.431297).alias('Distance'))
display(crimes_df_with_distance)

In [0]:
%scala


case class Location(lat: Double, lon: Double)
trait DistanceCalcular {
    def calculateDistanceInKilometer(userLocation: Location, warehouseLocation: Location): Int
}
class DistanceCalculatorImpl extends DistanceCalcular {
    private val AVERAGE_RADIUS_OF_EARTH_KM = 6371
    override def calculateDistanceInKilometer(userLocation: Location, warehouseLocation: Location): Int = {
        val latDistance = Math.toRadians(userLocation.lat - warehouseLocation.lat)
        val lngDistance = Math.toRadians(userLocation.lon - warehouseLocation.lon)
        val sinLat = Math.sin(latDistance / 2)
        val sinLng = Math.sin(lngDistance / 2)
        val a = sinLat * sinLat +
        (Math.cos(Math.toRadians(userLocation.lat)) *
            Math.cos(Math.toRadians(warehouseLocation.lat)) *
            sinLng * sinLng)
        val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
        (AVERAGE_RADIUS_OF_EARTH_KM * c).toInt
    }
}


In [0]:
crimes_df_with_distance = crimes_df.select('*', DistanceCalculatorImpl().calculateDistanceInKilometer(crimes_df.Location, Location(37.773972, -122.431297)).alias("Distance"))

In [0]:
%sql

-- Q3 - crime counts each Sunday at SF downtown (defined as a circle area with radius = 10km)

drop view if exists Sunday_crimes;
create temp view Sunday_crimes as
select *, 2 * asin(
      sqrt(
        cos(radians(Y)) *
        cos(radians(37.773972)) *
        pow(sin(radians((X + 122.431297)/2)), 2)
            +
        pow(sin(radians((Y - 37.773972)/2)), 2)

      )
    ) * 6371 distance_km from crimes where DayOfWeek = "Sunday";


select date, count(*) from Sunday_crimes where distance_km < 10 group by 1 order by 2 desc
