In [4]:
from pyroutelib2.route import Router
from pyroutelib2.loadOsm import LoadOsm
import csv
import pandas as pd
import math
from multiprocessing import Pool, Pipe, Process, Lock
import numpy as np
from functools import partial

This function counts distance between two geographical coordinates.

In [5]:
def distance_between_coordinates(point1, point2):
    point2['lat'] = float(point2['lat'])
    point2['lon'] = float(point2['lon'])
    point1['lat'] = float(point1['lat'])
    point1['lon'] = float(point1['lon'])
    ky = 40000 / 360
    kx = math.cos(math.pi * point2['lat'] / 180.0) * ky
    dx = math.fabs(point2['lon'] - point1['lon']) * kx
    dy = math.fabs(point2['lat'] - point1['lat']) * ky
    return dx * dx + dy * dy

Function to load data from two csv files. First should contain list of objects that are the base for our research. In this case this will be real estate transactions in Katowice. Requirement is that it should have lat and lon columns.

Second file should contains list of Point of Interest which we look for arround each of the objects from the first file.

In [6]:
def load_data(objectsfile, poifile, sep1, sep2):
    _objects = pd.read_csv(filepath_or_buffer=objectsfile, sep=sep1)
    _pois = pd.read_csv(filepath_or_buffer=poifile, sep=sep2)
    return _objects, _pois

For a coordinate, this will return list of POIs being within given distance from the object.

There definitelly is better way to do it though. One would be to sort the list based on the POIs coordinates and choose only those whose coordinates are around the object's coordinates.

In [8]:
def find_closest_objects(centerPoint, pois, within_distance):
    nearpois = []
    for ind, row in pois.iterrows():
        checkPoint = {
            'lat': row['lat'],
            'lon': row['lon'],
        }
        if distance_between_coordinates(centerPoint, checkPoint) < within_distance:
            nearpois.append([checkPoint['lat'], checkPoint['lon']])
    return nearpois

find_pois is a function that search will return number of POIs within given distance from the object plus walking distance from the one being the closest.

In [9]:
def find_pois(pois, within_distance, data, router, fprow, pipe_):
    centerpoint = {
        'lat': float(fprow['lat']),
        'lon': float(fprow['lon']),
    }
    closest_objects = find_closest_objects(centerpoint, pois, within_distance)
    distances = []
    node1 = data.findNode(float(centerpoint['lat']), float(centerpoint['lon']))
    for object_ in closest_objects:
        if float(centerpoint['lat']) == object_[0] and float(centerpoint['lon']) == object_[1]:
            foundroute = 'success'
            routedistance = 0
        else:
            node2 = data.findNode(object_[0], object_[1])
            foundroute, route, routedistance = router.doRoute(node1, node2)
        if foundroute == 'success':
            print("Walking distance: %s" % routedistance)
            distances.append(routedistance)
        else:
            print("Failed (%s)" % foundroute)
    if len(distances) > 0 and min(distances):
        fprow['NumberOfPOIs'] = len(distances)
        fprow['DistanceToClosesPoi'] = min(distances)
        pipe_.send(fprow)
        return "%s %s" % (len(distances), min(distances))
    else:
        return "0 666"

add_distance is function that is ran bye the multiprocessing pool. It modifies the Data Frame so that it includes data about number of sorrounding POIs and distance from the closest one. 

In [10]:
def add_distance(df_, pois_, within_distance_, pipe_):
    data_ = LoadOsm("foot")
    router_ = Router(data_)
    df_['NumberOfPOIsAndDistanceToClosestPoi'] = df_.apply(lambda row_:
                                                           find_pois(
                                                               pois=pois_,
                                                               within_distance=within_distance_,
                                                               data=data_,
                                                               router=router_,
                                                               fprow=row_,
                                                               pipe_=pipe_
                                                           ), axis=1)
    return df_

Calculations can take quite some time, especially if the dataset is large. Thus multiprocessing is used to utilize all processors being available. Python multiprocessing pool function is used and it requires a function with single parameter to be called. This is why partial from functools was used later, we need to pass multiple arguments somehow and this is the way to do it.

In [12]:
def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

Last function is a reader that will read a pipe and save its content to a file every 100 rows are added. The only purpose of it is so that if the program crashesh, hangs or anything else happens to it during computation, the results collected so far are not lost.

In [13]:
def reader(pipeAndLock):
    output_p, input_p, lock = pipeAndLock
    input_p.close()
    rowscount = 0
    rows = []
    while True:
        try:
            msg = output_p.recv()
            rows.append(msg)
            rowscount += 1
            if rowscount == 100:
                lock.acquire()
                with open("output.csv", "a") as f:
                    writer = csv.writer(f)
                    writer.writerows(rows)
                    del rows[:]
                    rowscount = 0
                lock.release()
        except EOFError:
            writer = csv.writer(f)
            writer.writerows(rows)
            break

Main function loads the data, create partial call and initializes computations.

In [None]:
if __name__ == "__main__":

    num_partitions = 2  # number of partitions to split dataframe
    num_cores = 2  # number of cores on your machine

    folder = "/home/mapastec/Documents/studia/KoloNaukowe/dane/"

    objects, pois = load_data(objectsfile="%sremaster_lokale.csv" %folder,
                              poifile="%sszkolykur.csv" %folder,
                              sep1=',', sep2=';')
    within_distance = 1
#    data = LoadOsm("foot")
#    router = Router(data)
    output_p, input_p = Pipe()
    lock = Lock()

    reader_p = Process(target=reader, args=((output_p, input_p, lock),))
    reader_p.start()
    output_p.close()

    partialAddDistance = partial(add_distance, pois_=pois, within_distance_=within_distance,
                                 pipe_=input_p)

    outputdf = parallelize_dataframe(objects, partialAddDistance)

    print(outputdf.head())
    outputdf.to_csv('outputdf.csv', sep=';')
    reader_p.join()

Walking distance: 0.7224500872018547
Walking distance: 0.2993469272507149
Walking distance: 0.786854302908355
Walking distance: 0.14770649375276232
Walking distance: 0.7224500872018547
Walking distance: 0.7224500872018547
Walking distance: 0.7224500872018547
Walking distance: 0.2993469272507149
Walking distance: 0.786854302908355
Walking distance: 0.14770649375276232
Walking distance: 0.7224500872018547
Walking distance: 0.7224500872018547
Walking distance: 0.7875279647373605
Walking distance: 0.3369849603412221
Walking distance: 0.8448942786212982
Walking distance: 0.18177061137384434
Walking distance: 0.7875279647373605
Walking distance: 0.7875279647373605
Walking distance: 0.7875279647373605
Walking distance: 0.3369849603412221
Walking distance: 0.8448942786212982
Walking distance: 0.18177061137384434
Walking distance: 0.7875279647373605
Walking distance: 0.7875279647373605
   lp.  Cena.jednostkowa.netto Z.powierzchnia.przynalezna  Data.transakcji  \
0   10                 1975.75  