## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf, row_number, count, min, stddev, avg, max, col, round
from pyspark.sql.window import Window

from math import radians, cos, sin, asin, sqrt, pi

In [3]:
# File location and type
file_location = "/FileStore/tables/POIList.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
poi_df = spark.read.format(file_type) \
  .option("header",first_row_is_header) \
  .option("inferSchema", infer_schema) \
  .option("sep", delimiter) \
  .load(file_location) \
  .toDF('POIID', 'Latitude', 'Longitude')

poi_df.printSchema()

In [4]:
display(poi_df)

POIID,Latitude,Longitude
POI1,53.546167,-113.485734
POI2,53.546167,-113.485734
POI3,45.521629,-73.566024
POI4,45.22483,-63.232729


In [5]:
# File location and type
file_location = "/FileStore/tables/DataSample.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
data_df = spark.read.format(file_type) \
  .option("header",first_row_is_header) \
  .option("inferSchema", infer_schema) \
  .option("sep", delimiter) \
  .load(file_location) \
  .toDF('ID', 'TimeSt', 'Country', 'Province', 'City', 'Latitude', 'Longitude')

data_df.printSchema()

In [6]:
display(data_df.limit(5))

ID,TimeSt,Country,Province,City,Latitude,Longitude
4516516,2017-06-21T00:00:00.143+0000,CA,ON,Waterloo,43.49347,-80.49123
4516547,2017-06-21T18:00:00.193+0000,CA,ON,London,42.9399,-81.2709
4516550,2017-06-21T15:00:00.287+0000,CA,ON,Guelph,43.5776,-80.2201
4516600,2017-06-21T15:00:00.307+0000,CA,ON,Stratford,43.3716,-80.9773
4516613,2017-06-21T15:00:00.497+0000,CA,ON,Stratford,43.3716,-80.9773


In [7]:
poi_df_filtered = poi_df.dropDuplicates(['Latitude', 'Longitude'])

display(poi_df_filtered)

POIID,Latitude,Longitude
POI1,53.546167,-113.485734
POI4,45.22483,-63.232729
POI3,45.521629,-73.566024


In [8]:
window_spec = Window.partitionBy('TimeSt','Latitude','Longitude')

data_df_filtered = data_df \
  .select(
    'ID', 
    'TimeSt', 
    'Latitude', 
    'Longitude', 
    count('*').over(window_spec).alias('count')
  ) \
  .where('count = 1') \
  .drop('count')

In [9]:
display(data_df_filtered.limit(5))

ID,TimeSt,Latitude,Longitude
4626672,2017-06-21T00:05:56.047+0000,43.5769,-79.6283
4637732,2017-06-21T00:06:43.607+0000,50.91431,-114.02187
4638304,2017-06-21T00:06:48.233+0000,48.4205,-89.2371
4698071,2017-06-21T00:11:29.367+0000,43.1508,-80.2094
5561338,2017-06-21T00:18:33.230+0000,51.0381,-114.118


In [10]:
data_df.count()

In [11]:
data_df_filtered.count()

In [12]:
poi_data_join_df = data_df_filtered.crossJoin(
  poi_df_filtered \
    .withColumnRenamed('Latitude', 'POI_Latitude') \
    .withColumnRenamed('Longitude', 'POI_Longitude')
)

In [13]:
display(poi_data_join_df.limit(5))

ID,TimeSt,Latitude,Longitude,POIID,POI_Latitude,POI_Longitude
4626672,2017-06-21T00:05:56.047+0000,43.5769,-79.6283,POI1,53.546167,-113.485734
4626672,2017-06-21T00:05:56.047+0000,43.5769,-79.6283,POI4,45.22483,-63.232729
4626672,2017-06-21T00:05:56.047+0000,43.5769,-79.6283,POI3,45.521629,-73.566024
4637732,2017-06-21T00:06:43.607+0000,50.91431,-114.02187,POI1,53.546167,-113.485734
4637732,2017-06-21T00:06:43.607+0000,50.91431,-114.02187,POI4,45.22483,-63.232729


In [14]:
@udf("double")
def haversine_dist(lat1, lon1, lat2, lon2):
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    R = 6371 # Radius of earth in kilometers
    return c * R

In [15]:
window_spec_2 = Window.partitionBy('ID')

event_poiid_lookup_df = poi_data_join_df \
  .select(
    'ID',
    'POIID',
    haversine_dist('Latitude', 'Longitude', 'POI_Latitude', 'POI_Longitude').alias('distance'),
    min(haversine_dist('Latitude', 'Longitude', 'POI_Latitude', 'POI_Longitude')).over(window_spec_2).alias('min_distance')
  ) \
  .where('distance = min_distance') \
  .drop('distance')

In [16]:
display(event_poiid_lookup_df.limit(5))

ID,POIID,min_distance
4517905,POI3,832.955904477665
4526426,POI3,219.46152613300868
4535091,POI1,279.55449694020047
4545807,POI3,489.2499343213663
4559622,POI3,273.4697053151551


In [17]:
display(event_poiid_lookup_df.groupBy('POIID').count().orderBy('count', ascending=False))

POIID,count
POI3,8802
POI1,8749
POI4,422


In [18]:
poiid_analysis_df = event_poiid_lookup_df \
  .groupBy('POIID') \
  .agg(
    count('*').alias('req_count'),
    avg('min_distance').alias('mean_distance'), 
    stddev('min_distance').alias('stddev_distance'),
    max('min_distance').alias('radius')
  ) \
  .select(
    'POIID',
    'req_count',
    'mean_distance',
    'stddev_distance',
    'radius',
    round(col('req_count')/(pi * col('radius')**2), 6).alias('density')
  )
  

In [19]:
display(poiid_analysis_df)

POIID,req_count,mean_distance,stddev_distance,radius,density
POI4,422,514.9971719812202,1506.8899707703229,9349.572770487366,2e-06
POI1,8749,300.7147475686837,388.27338526354254,11531.820831836454,2.1e-05
POI3,8802,451.65114920151353,223.63174183102868,1474.5809620285695,0.001289
