# Work Sample Problems - Data Engineering

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import count, min

from pathlib import Path
from math import pi

In [2]:
data_path = Path('data')
sample_file_path = data_path / 'DataSample.csv'
poi_file_path = data_path / 'POIList.csv'

In [3]:
spark = SparkSession.builder.appName('EQ_Works').getOrCreate()

## Create dataframes from CSV files

In [4]:
poi_df = spark.read.csv(str(poi_file_path), inferSchema=True, header=True) \
            .toDF('POI_ID', 'POI_Latitude', 'POI_Longitude')
poi_df.show(5)

+------+------------+-------------+
|POI_ID|POI_Latitude|POI_Longitude|
+------+------------+-------------+
|  POI1|   53.546167|  -113.485734|
|  POI2|   53.546167|  -113.485734|
|  POI3|   45.521629|   -73.566024|
|  POI4|    45.22483|   -63.232729|
+------+------------+-------------+



In [5]:
data_df = spark.read.csv(str(sample_file_path), inferSchema=True, header=True) \
            .toDF('ID', 'TimeSt', 'Country', 'Province', 'City', 'Latitude', 'Longitude')
data_df.show(5)

+-------+--------------------+-------+--------+---------+--------+---------+
|     ID|              TimeSt|Country|Province|     City|Latitude|Longitude|
+-------+--------------------+-------+--------+---------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON| Waterloo|43.49347|-80.49123|
|4516547|2017-06-21 18:00:...|     CA|      ON|   London| 42.9399| -81.2709|
|4516550|2017-06-21 15:00:...|     CA|      ON|   Guelph| 43.5776| -80.2201|
|4516600|2017-06-21 15:00:...|     CA|      ON|Stratford| 43.3716| -80.9773|
|4516613|2017-06-21 15:00:...|     CA|      ON|Stratford| 43.3716| -80.9773|
+-------+--------------------+-------+--------+---------+--------+---------+
only showing top 5 rows



## 1. Cleanup

Remove duplicates and filter suspicious records

In [6]:
data_df_clean = data_df \
    .select(
        '*',
        count('*').over(Window.partitionBy('TimeSt', 'Latitude', 'Longitude')).alias('count')
    ) \
    .where('count = 1') \
    .drop('count')

In [7]:
d = (t := data_df.count()) - (c := data_df_clean.count())
print(f'Filtered {d}/{t} rows -> {c} remaining.')

Filtered 4052/22025 rows -> 17973 remaining.


In [8]:
poi_df_dedup = poi_df.drop_duplicates(['POI_Latitude', 'POI_Longitude'])
poi_df_dedup.show(5)

+------+------------+-------------+
|POI_ID|POI_Latitude|POI_Longitude|
+------+------------+-------------+
|  POI1|   53.546167|  -113.485734|
|  POI4|    45.22483|   -63.232729|
|  POI3|   45.521629|   -73.566024|
+------+------------+-------------+



## 2. Label
Assign each request to the closest PO


In [9]:
cross_df = data_df_clean.crossJoin(poi_df_dedup)
cross_df.show(5)

+-------+--------------------+-------+--------+---------+--------+---------+------+------------+-------------+
|     ID|              TimeSt|Country|Province|     City|Latitude|Longitude|POI_ID|POI_Latitude|POI_Longitude|
+-------+--------------------+-------+--------+---------+--------+---------+------+------------+-------------+
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787|  POI1|   53.546167|  -113.485734|
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787|  POI4|    45.22483|   -63.232729|
|5613403|2017-06-21 00:22:...|     CA|      ON|Etobicoke| 43.6381| -79.5787|  POI3|   45.521629|   -73.566024|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toronto| 43.6606| -79.4635|  POI1|   53.546167|  -113.485734|
|5013924|2017-06-21 00:34:...|     CA|      ON|  Toronto| 43.6606| -79.4635|  POI4|    45.22483|   -63.232729|
+-------+--------------------+-------+--------+---------+--------+---------+------+------------+-------------+
o

In [10]:
# Use haversine formula to calcuate distance between coordinates
!pip3 install haversine

from haversine import haversine as hd

@udf(returnType='double')
def haversine(lat1, lng1, lat2, lng2):
    start = (float(lat1), float(lng1))
    end = (float(lat2), float(lng2))

    return hd(start, end, unit='km')



In [11]:
min_dist_df = cross_df \
    .select(
        'ID',
        'Latitude',
        'Longitude',
        'POI_ID',
        'POI_Latitude',
        'POI_Longitude',
        haversine('Latitude', 'Longitude', 'POI_Latitude', 'POI_Longitude') \
            .alias('distance_km'),
        min(haversine('Latitude', 'Longitude', 'POI_Latitude', 'POI_Longitude')) \
            .over(Window.partitionBy('ID')) \
            .alias('min_distance_km')
    ) \
    .where('distance_km = min_distance_km') \
    .drop('distance_km')
min_dist_df.show(5)

+-------+--------+---------+------+------------+-------------+------------------+
|     ID|Latitude|Longitude|POI_ID|POI_Latitude|POI_Longitude|   min_distance_km|
+-------+--------+---------+------+------------+-------------+------------------+
|4517905| 42.2957| -82.9599|  POI3|   45.521629|   -73.566024| 832.9570550053622|
|4526426|46.72072|-71.30409|  POI3|   45.521629|   -73.566024|219.46182926617928|
|4535091|  51.049|-113.9622|  POI1|   53.546167|  -113.485734| 279.5548830773176|
|4545807|43.85883|-79.29809|  POI3|   45.521629|   -73.566024| 489.2506101021576|
|4559622| 44.2647| -76.5504|  POI3|   45.521629|   -73.566024|273.47008304760004|
+-------+--------+---------+------+------------+-------------+------------------+
only showing top 5 rows



## 3. Analysis
#### 3.1. Calculate the average and std of the distance between the _POI_ to each of its assigned _requests_
#### 3.2. Calculate the density of requests (around the radius) of each _POI_

In [34]:
poi_metrics_df = min_dist_df \
    .groupBy('POI_ID') \
    .agg(
        count('*').alias('num_requests'),
        avg('min_distance_km').alias('avg_distance'),
        stddev('min_distance_km').alias('std_distance'),
        max('min_distance_km').alias('radius')
    ) \
    .select(
        'POI_ID',
        'num_requests',
        'avg_distance',
        'std_distance',
        'radius',
        round(col('num_requests') / (pi * col('radius') ** 2), 8).alias('density')
    )

In [35]:
poi_metrics_df.show(5)

+------+------------+-----------------+------------------+------------------+----------+
|POI_ID|num_requests|     avg_distance|      std_distance|            radius|   density|
+------+------------+-----------------+------------------+------------------+----------+
|  POI4|         422|514.9978833256106|1506.8920521753998| 9349.585684667303|   1.54E-6|
|  POI1|        8749| 300.715162933584| 388.2739215695841|11531.836760265795|  2.094E-5|
|  POI3|        8802|451.6517730486509| 223.6320507243465|1474.5829988065425|0.00128853|
+------+------------+-----------------+------------------+------------------+----------+



## 4. 