EQWorks Solution
 
Author: Tryambak Kaushik

Date: 07 February 2021

---

In [3]:
import findspark

In [4]:
findspark.init()

In [5]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row, Window

from datetime import date

In [6]:
import pandas as pd
import math
import numpy as np
import os
from pprint import pprint

In [7]:
#Create a SparkContext and SparkSession
sc = SparkContext("local","firstapp")
# sc.stop()

spark = SparkSession.builder.master("local").appName("EQ Soln")\
.config("spark.some.config.option", "some-value").getOrCreate()

In [8]:
# Load data/DataSample.csv to Spark DataFrame

df_dataSample = spark.read.option("header",True).csv("data\DataSample.csv")

print('Display Schema of DataSample.csv dataset table')
df_dataSample.printSchema()

root
 |-- _ID: string (nullable = true)
 |--  TimeSt: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [9]:
#Display the contents of DataSample data
print('Display contents of DataSample.csv dataset table')
df_dataSample.show()

+-------+--------------------+-------+--------+------------+--------+---------+
|    _ID|              TimeSt|Country|Province|        City|Latitude|Longitude|
+-------+--------------------+-------+--------+------------+--------+---------+
|4516516|2017-06-21 00:00:...|     CA|      ON|    Waterloo|43.49347|-80.49123|
|4516547|2017-06-21 18:00:...|     CA|      ON|      London|42.93990|-81.27090|
|4516550|2017-06-21 15:00:...|     CA|      ON|      Guelph|43.57760|-80.22010|
|4516600|2017-06-21 15:00:...|     CA|      ON|   Stratford|43.37160|-80.97730|
|4516613|2017-06-21 15:00:...|     CA|      ON|   Stratford|43.37160|-80.97730|
|4516693|2017-06-21 14:00:...|     CA|      ON|   Kitchener|43.43810|-80.50990|
|4516771|2017-06-21 10:00:...|     CA|      ON|      Sarnia|42.96100|-82.37300|
|4516831|2017-06-21 12:00:...|     CA|      ON|      London|43.00910|-81.17650|
|4516915|2017-06-21 15:00:...|     CA|      ON|      London|43.00910|-81.17650|
|4516953|2017-06-21 16:00:...|     CA|  

### 1. Cleanup

A sample dataset of request logs is given in data/DataSample.csv. We consider records that have identical geoinfo and timest as suspicious. Please clean up the sample dataset by filtering out those suspicious request records.

In [22]:
# Drop duplicate rows based on columns TimeSt, Latitude and Longitude

df_clean = df_dataSample.dropDuplicates(['Latitude', 'Longitude']).dropDuplicates([' TimeSt'])

print ('Display clean dataset after dropping suspicius requests (i.e., duplicate geoinfo and timest)')
df_clean.show()

+-------+--------------------+-------+--------+-------------+--------+----------+
|    _ID|              TimeSt|Country|Province|         City|Latitude| Longitude|
+-------+--------------------+-------+--------+-------------+--------+----------+
|4516516|2017-06-21 00:00:...|     CA|      ON|     Waterloo|43.49347| -80.49123|
|4519209|2017-06-21 00:00:...|     CA|      ON|      Hanover|44.15170| -81.02660|
|4518130|2017-06-21 00:00:...|     CA|      ON|       London|43.00040| -81.23430|
|4521574|2017-06-21 00:00:...|     CA|      ON|    Brantford|43.15080| -80.20940|
|4524947|2017-06-21 00:00:...|     CA|      ON|    Kitchener|43.43060| -80.48770|
|4530820|2017-06-21 00:00:...|     CA|      NB|      Moncton|46.11830| -64.73380|
|4534383|2017-06-21 00:00:...|     CA|      AB|Fort Mcmurray|56.74308|-111.47651|
|5380915|2017-06-21 00:01:...|     CA|      AB|      Calgary|51.15880|-113.96360|
|4536827|2017-06-21 00:01:...|     CA|      ON|       Oshawa|43.90635| -78.87251|
|5387037|2017-06

In [175]:
print ("------------------------END OF ANSWER #1------------------------")

------------------------END OF ANSWER #1------------------------


**End of Answer #1**

---

### 2. Label
Assign each request (from data/DataSample.csv) to the closest (i.e. minimum distance) POI (from data/POIList.csv).

**Note:** A POI is a geographical Point of Interest.

In [173]:
#Load data from data/POIList.csv in Spark Dataframe

df_poil = spark.read.option("header",True).csv("data\POIList.csv")

print ('\nDisplay Schema and data of POIList dataset table\n')
df_poil.printSchema()
df_poil.show(5)


Display Schema and data of POIList

root
 |-- POIID: string (nullable = true)
 |--  Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

+-----+----------+------------+
|POIID|  Latitude|   Longitude|
+-----+----------+------------+
| POI1| 53.546167| -113.485734|
| POI2| 53.546167| -113.485734|
| POI3| 45.521629|  -73.566024|
| POI4| 45.224830|  -63.232729|
+-----+----------+------------+



In [24]:
#Convert pois Spark DataFrame to Pandas Dataframe
df_pd_pois = df_poil.toPandas()
df_pd_pois

Unnamed: 0,POIID,Latitude,Longitude
0,POI1,53.546167,-113.485734
1,POI2,53.546167,-113.485734
2,POI3,45.521629,-73.566024
3,POI4,45.22483,-63.232729


In [69]:
#Python-UDF to find POI with minimum distance to each entry of DataSample
def myfun(la2, lo2):
    
    min_dis = 1.0e10
    poi_id = df_pd_pois.loc[0,'POIID']
    
    for i, (la1,lo1) in enumerate( zip(df_pd_pois[' Latitude'], df_pd_pois['Longitude'])):
        la1, lo1 = float(la1), float(lo1)
        dis = math.sqrt((la1-la2)**2 + (lo1-lo2)**2)
        if min_dis > dis:
            min_dis = dis
            poi_id = df_pd_pois.loc[i,'POIID']
            
    return ([poi_id, min_dis])

#Register Python-UDF with Spark-UDF
myfun_spark = F.udf(myfun, ArrayType(StringType()))

df_poi = df_clean.withColumn('temp_col', myfun_spark(  F.col('Latitude').cast(FloatType()),
                                          F.col('Longitude').cast(FloatType())  )).cache()\
            .withColumn('POI', F.col('temp_col')[0])\
            .withColumn('POI_DIS', F.col('temp_col')[1].cast(DoubleType()))\
            .drop('temp_col')

print('Display the dataframe with new columns of nearest POI and POI_DIS(i.e, distance to POI from request)')
df_poi.show(5)

Display the dataframe with new columns of nearest POI and POI_DIS(i.e, distance to POI from request)
+-------+--------------------+-------+--------+---------+--------+---------+----+--------+
|    _ID|              TimeSt|Country|Province|     City|Latitude|Longitude| POI| POI_DIS|
+-------+--------------------+-------+--------+---------+--------+---------+----+--------+
|4516516|2017-06-21 00:00:...|     CA|      ON| Waterloo|43.49347|-80.49123|POI3|7.216083|
|4519209|2017-06-21 00:00:...|     CA|      ON|  Hanover|44.15170|-81.02660|POI3|7.585312|
|4518130|2017-06-21 00:00:...|     CA|      ON|   London|43.00040|-81.23430|POI3|8.072114|
|4521574|2017-06-21 00:00:...|     CA|      ON|Brantford|43.15080|-80.20940|POI3|7.053739|
|4524947|2017-06-21 00:00:...|     CA|      ON|Kitchener|43.43060|-80.48770|POI3|7.230631|
+-------+--------------------+-------+--------+---------+--------+---------+----+--------+
only showing top 5 rows



In [179]:
print ("------------------------END OF ANSWER #2------------------------")

------------------------END OF ANSWER #2------------------------


**End of Answer #2**

---

### 3. Analysis
For each POI, calculate the average and standard deviation of the distance between the POI to each of its assigned requests.

At each POI, draw a circle (with the center at the POI) that includes all of its assigned requests. Calculate the radius and density (requests/area) for each POI.

In [70]:
#Group the dataframe df_poi on 'POI' column and calculate average and standard deviation on each group
df_avgSD = df_poi.groupby('POI').agg(F.avg('POI_DIS').alias('Average'), F.stddev('POI_DIS').alias('Std_Dev'))

#Left Join df_avgSD dataframe to df_poil dataframe for completeness
df_avgSD = df_poil.join(df_avgSD, df_poil.POIID == df_avgSD.POI, how = 'Left').drop(df_avgSD.POI)

print('Display distance Average and Std_Dev for each POI')
df_avgSD.show()

Display distance Average and Std_Dev for each POI
+-----+----------+------------+------------------+-----------------+
|POIID|  Latitude|   Longitude|           Average|          Std_Dev|
+-----+----------+------------+------------------+-----------------+
| POI1| 53.546167| -113.485734| 4.548856149843565|4.666481513614549|
| POI2| 53.546167| -113.485734|              null|             null|
| POI3| 45.521629|  -73.566024| 5.356265976821089| 3.04113827053302|
| POI4| 45.224830|  -63.232729|12.736570363332119|35.65542795879107|
+-----+----------+------------+------------------+-----------------+



In [181]:
print ("Note: Based on above output, it can be concluded that POI2 radius of influence is ZERO\n")

Note: Based on above output, it can be concluded that POI2 radius of influence is ZERO



**Note:** Based on above output, it can be concluded that POI2 radius of influence is ZERO

In [71]:
#The radius of Influence-Circle of POI will be the distance to farthest assigned request

w = Window.partitionBy('POI')

df_radius = df_poi.withColumn('max_r', F.max('POI_DIS').over(w))\
                  .where(F.col('POI_DIS') == F.col('max_r'))\
                  .drop('max_r')

#Left Join df_radius dataframe to df_poil dataframe for completeness
df_avgSD_r = df_avgSD.join(df_radius['POI', 'POI_DIS'], df_avgSD.POIID == df_radius.POI, how = 'Left')\
                   .drop(df_radius.POI)\
                   .withColumnRenamed('POI_DIS', 'POI_RADIUS')

print('Display the maximum POI_DIS (i.e, POI_RADIUS) values for each group\n')
df_avgSD_r.show()

Display the maximum POI_DIS (i.e, POI_RADIUS) values for each group

+-----+----------+------------+------------------+-----------------+----------+
|POIID|  Latitude|   Longitude|           Average|          Std_Dev|POI_RADIUS|
+-----+----------+------------+------------------+-----------------+----------+
| POI1| 53.546167| -113.485734| 4.548856149843565|4.666481513614549| 24.851934|
| POI2| 53.546167| -113.485734|              null|             null|      null|
| POI3| 45.521629|  -73.566024| 5.356265976821089| 3.04113827053302| 20.155376|
| POI4| 45.224830|  -63.232729|12.736570363332119|35.65542795879107| 192.70499|
+-----+----------+------------+------------------+-----------------+----------+



In [72]:
#Calculate number of requests for each POI
df_no_of_req = df_poi.groupby('POI').agg(F.count('POI').alias('Requests'))

#Append POI_No.
df_poi_req = df_avgSD_r.join(df_no_of_req, df_avgSD_r.POIID == df_no_of_req.POI, 'Left' )\
                         .drop(df_no_of_req['POI'])

#Calculate the density
df_poi_density = df_poi_req.withColumn('Density', F.col('Requests')/ (3.14*F.col('POI_RADIUS')**2 ))

print('Dislay No. of Requests and Density for each POI')
df_poi_density.show()

Dislay No. of Requests and Density for each POI
+-----+----------+------------+------------------+-----------------+----------+--------+--------------------+
|POIID|  Latitude|   Longitude|           Average|          Std_Dev|POI_RADIUS|Requests|             Density|
+-----+----------+------------+------------------+-----------------+----------+--------+--------------------+
| POI1| 53.546167| -113.485734| 4.548856149843565|4.666481513614549| 24.851934|    3027|  1.5608543339196206|
| POI2| 53.546167| -113.485734|              null|             null|      null|    null|                null|
| POI3| 45.521629|  -73.566024| 5.356265976821089| 3.04113827053302| 20.155376|    2297|  1.8007338135858355|
| POI4| 45.224830|  -63.232729|12.736570363332119|35.65542795879107| 192.70499|     224|0.001921022556875...|
+-----+----------+------------+------------------+-----------------+----------+--------+--------------------+



In [176]:
print ("------------------------END OF ANSWER #3------------------------")

------------------------END OF ANSWER #3------------------------


**End of Answer #3**

---

### 4. Data Science/Engineering Tracks
Please complete either 4a or 4b. Extra points will be awarded for completing both tasks.

#### 4a. Model
To visualize the popularity of each POI, they need to be mapped to a scale that ranges from -10 to 10. Please provide a mathematical model to implement this, taking into consideration of extreme cases and outliers. Aim to be more sensitive around the average and provide as much visual differentiability as possible.
Bonus: Try to come up with some reasonable hypotheses regarding POIs, state all assumptions, testing steps and conclusions. Include this as a text file (with a name bonus) in your final submission.

In [61]:
#Import PySpark Libraries for Data Analytics
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

In [88]:
df_poi_density.show()
df_poi_density_temp = df_poi_density.filter(df_poi_density.Density.isNotNull())
df_poi_density_temp.show()

+-----+----------+------------+------------------+-----------------+----------+--------+--------------------+
|POIID|  Latitude|   Longitude|           Average|          Std_Dev|POI_RADIUS|Requests|             Density|
+-----+----------+------------+------------------+-----------------+----------+--------+--------------------+
| POI1| 53.546167| -113.485734| 4.548856149843565|4.666481513614549| 24.851934|    3027|  1.5608543339196206|
| POI2| 53.546167| -113.485734|              null|             null|      null|    null|                null|
| POI3| 45.521629|  -73.566024| 5.356265976821089| 3.04113827053302| 20.155376|    2297|  1.8007338135858355|
| POI4| 45.224830|  -63.232729|12.736570363332119|35.65542795879107| 192.70499|     224|0.001921022556875...|
+-----+----------+------------+------------------+-----------------+----------+--------+--------------------+

+-----+----------+------------+------------------+-----------------+----------+--------+--------------------+
|POIID|  

In [109]:
# Spark-udf for converting column from vector type to double type
myfun_vec2double = F.udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Use Spark VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=['Density'],outputCol="Density_Vector")

# Use Spark MinMaxScaler Transformation to scale the column within (min,max) range
scaler = MinMaxScaler(min = -10, max = 10, inputCol="Density_Vector", outputCol="Density_Scaled")

# Create a Spark Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])

#Drop POI2 as outlier 
df_poi_density_temp = df_poi_density.filter(df_poi_density.Density.isNotNull())

# Spark fitting pipeline on dataframe
df_norm = pipeline.fit(df_poi_density_temp).transform(df_poi_density_temp)\
                  .withColumn("Density_Scaled", myfun_vec2double("Density_Scaled"))\
                  .drop("Density_Vector")

print('Display scaled density for each POI')
df_norm.select(*['POIID'], *[F.round(c, 3).alias(c) for c in df_norm.columns[1:] ]).show()

Display scaled density for each POI
+-----+---------+---------+-------+-------+----------+--------+-------+--------------+
|POIID| Latitude|Longitude|Average|Std_Dev|POI_RADIUS|Requests|Density|Density_Scaled|
+-----+---------+---------+-------+-------+----------+--------+-------+--------------+
| POI1|   53.546| -113.486|  4.549|  4.666|    24.852|    3027|  1.561|         7.333|
| POI3|   45.522|  -73.566|  5.356|  3.041|    20.155|    2297|  1.801|          10.0|
| POI4|   45.225|  -63.233| 12.737| 35.655|   192.705|     224|  0.002|         -10.0|
+-----+---------+---------+-------+-------+----------+--------+-------+--------------+



In [112]:
df_lognorm = df_norm.withColumn('log_Density', F.log10(F.col('Density')) )

# Use Spark VectorAssembler Transformation - Converting column to vector type
assembler_log = VectorAssembler(inputCols=['log_Density'],outputCol="log_Density_Vector")

# Use Spark MinMaxScaler Transformation to scale the column within (min,max) range
scaler_log = MinMaxScaler(min = -1.0, max = 1.0, inputCol="log_Density_Vector", outputCol="log_Density_Scaled")

# Create a Spark Pipeline of VectorAssembler and MinMaxScaler
pipeline_log = Pipeline(stages=[assembler_log, scaler_log])


# Spark fitting pipeline on dataframe
df_lognorm = pipeline_log.fit(df_lognorm).transform(df_lognorm)\
                  .withColumn("log_Density_Scaled", myfun_vec2double("log_Density_Scaled"))\
                  .drop("log_Density_Vector")

print('Display scaled density for each POI')
df_lognorm.select(*['POIID'], *[F.round(c, 3).alias(c) for c in df_lognorm.columns[1:] ]).show()


Display scaled density for each POI
+-----+---------+---------+-------+-------+----------+--------+-------+--------------+-----------+------------------+
|POIID| Latitude|Longitude|Average|Std_Dev|POI_RADIUS|Requests|Density|Density_Scaled|log_Density|log_Density_Scaled|
+-----+---------+---------+-------+-------+----------+--------+-------+--------------+-----------+------------------+
| POI1|   53.546| -113.486|  4.549|  4.666|    24.852|    3027|  1.561|         7.333|      0.193|             0.958|
| POI3|   45.522|  -73.566|  5.356|  3.041|    20.155|    2297|  1.801|          10.0|      0.255|               1.0|
| POI4|   45.225|  -63.233| 12.737| 35.655|   192.705|     224|  0.002|         -10.0|     -2.716|              -1.0|
+-----+---------+---------+-------+-------+----------+--------+-------+--------------+-----------+------------------+



In [188]:
#Save the interpretation on results in 'bonus' file
bonus = """
Interpretation:
Density column is the ratio of Requests to POI_Area. log_Density was calculated by taking log10 of Density values. log_Density were scaled in range (-10,10) to calculate log_Density_Scaled.

It is difficult to come up with a statitics with only 3 good POIs.

Nonetheless, the density values of POI1 and POI3 are 3 orders higher than POI4. Hence, Density_Scaled, log_Density and log_Density_Scaled values are also skewed.
POI1 and POI3 attract more customers or requests per unit area of influence.

Assumptions: POI2 was dropped as outlier. POI2 data must be investigated to identify the cause of zero zone of influence. Bad data collection and formatting can be reasons for POI2 being outlier
"""

with open('bonus', 'w') as f:
    f.write(bonus)

f.close()

Bonus: Try to come up with some reasonable hypotheses regarding POIs, state all assumptions, testing steps and conclusions. Include this as a text file (with a name bonus) in your final submission.

**Interpretation:**
Density column is the ratio of Requests to POI_Area. log_Density was calculated by taking log10 of Density values. log_Density were scaled in range (-10,10) to calculate log_Density_Scaled.

It is difficult to come up with a statitics with only 3 good POIs.

Nonetheless, the density values of POI1 and POI3 are 3 orders higher than POI4. Hence, Density_Scaled, log_Density and log_Density_Scaled values are also skewed.
POI1 and POI3 attract more customers or requests per unit area of influence.

**Assumptions:** POI2 was dropped as outlier. POI2 data must be investigated to identify the cause of zero zone of influence. Bad data collection and formatting can be reasons for POI2 being outlier

In [177]:
print ("------------------------END OF ANSWER #4a------------------------")

------------------------END OF ANSWER #4a------------------------


**End of Answer #4a**

----

#### 4b. Pipeline Dependency
We use a modular design on all of our data analysis tasks. To get to a final product, we organize steps using a data pipeline. One task may require the output of one or multiple other tasks to run successfully. This creates dependencies between tasks.

We also require the pipeline to be flexible. This means a new task may enter a running pipeline anytime that may not have the tasks' dependencies satisfied. In this event, we may have a set of tasks already running or completed in the pipeline, and we will need to map out which tasks are prerequisites for the newest task so the pipeline can execute them in the correct order. For optimal pipeline execution, when we map out the necessary tasks required to execute the new task, we want to avoid scheduling tasks that have already been executed.

If we treat each task as a node and the dependencies between a pair of tasks as directed edges, we can construct a DAG (Wiki: Directed Acyclic Graph).

Consider the following scenario. At a certain stage of our data processing, we have a set of tasks (starting tasks) that we know all its prerequisite task has been executed, and we wish to reach to a later goal task. We need to map out a path that indicates the order of executions on tasks that finally leads to the goal task. We are looking for a solution that satisfies both necessity and sufficiency -- if a task is not a prerequisite task of goal, or its task is a prerequisite task for starting tasks (already been executed), then it shouldn't be included in the path. The path needs to follow a correct topological ordering of the DAG, hence a task needs to be placed behind all its necessary prerequisite tasks in the path.

Note: A starting task should be included in the path, if and only if it's a prerequisite of the goal task

For example, we have 6 tasks [A, B, C, D, E, F], C depends on A (denoted as A->C), B->C, C->E, E->F. A new job has at least 2 tasks and at most 6 tasks, each task can only appear once.

Examples:

Inputs: starting task: A, goal task: F, output: A,B,C,E,F or B,A,C,E,F.
Input: starting task: A,C, goal task:'F', outputs: C,E,F.
You will find the starting task and the goal task in question.txt file, list of all tasks in task_ids.txt and dependencies in relations.txt.

Please submit your implementation and result.

In [113]:
#Assign questions data
questions = {'starting task': '73', 'goal task': '36'}
questions

{'starting task': '73', 'goal task': '36'}

In [114]:
#Assign relations data
relations = [(97,102),
             (75,31),
             (75,37),
             (100,20),
             (102,36),
             (102,37),
             (102,31),
             (16,37),
             (39,73),
             (39,100),
             (41,73),
             (41,112),
             (62,55),
             (112,97),
             (20,94),
             (20,97),
             (21,20),
             (73,20),
             (56,102),
             (56,75),
             (56,55),
             (55,31),
             (55,37),
             (94,56),
             (94,102)]

In [117]:
#Assign Task-IDs data
task_ids = [97,75,100,102,16,39,41,62,112,20,21,73,56,55,36,37,94,31]

In [118]:
#Create a pandas-dataframe of relations data
r_pd = pd.DataFrame(relations, columns = ['from', 'to'])
r_pd.head()

Unnamed: 0,from,to
0,97,102
1,75,31
2,75,37
3,100,20
4,102,36


In [192]:
#Get starting target (st) and goal target (gt)
st = int(questions['starting task']); print ('Starting Task: %2d'%(st))
gt = int(questions['goal task']); print ('Goal Task: %2d'%(gt))

Starting Task: 73
Goal Task: 36


In [171]:
#A python recursive function to find the path from source to target
def replicate_recur(st, gt, mylist=None):

    # If a list has not been passed as argument create an empty one
    if(mylist == None):
        mylist = [st]
        
    if st == gt:
        return mylist
    
    temp = r_pd[r_pd['from'] == st].values

    if not temp.any() :
        temp = 'Error'
        mylist.append(temp)
        return mylist
    
    mylist = [ [i for i in mylist] for _ in range(len(temp))]
    for idx,val in enumerate(temp[:,1]):
        mylist[idx].append(val)
        mylist[idx] = replicate_recur(val, gt, mylist[idx])

    return mylist

output = []
def removeNestings(l): 
    for i in l: 
        if (type(i) == list) & (type(i[0]) == list):
            removeNestings(i) 
        elif ('Error' not in i):
            output.append(i)

print ('The different paths from Starting Target to Goal Target\n')
removeNestings([replicate_recur(st, gt)])
pprint(output)

The different paths from Starting Target to Goal Target

[[73, 20, 94, 56, 102, 36], [73, 20, 94, 102, 36], [73, 20, 97, 102, 36]]


In [178]:
print ("------------------------END OF ANSWER #4b------------------------")

------------------------END OF ANSWER #4b------------------------
