# DISTANCE ANALYSIS

This notebook analyzes geographic data in BFKOORD_GEO.

Following data structures created and saved as pickle file:
- zurich_stations_set : set of stations around in Zurich (Circle of 10 km centered in Zurich HB)
- distance_dictionary : dictionary that show the distance between two stations in Zurich
- coordinate_dictionary : dictionary that gives latitude and longitude information of an arbitrary station in Zurich

### Dependencies

In [6]:
import pickle
import socket
import getpass
import os

import ast
import numpy as np
from scipy import stats
import pandas as pd

from datetime import datetime, timedelta

from math import sin, cos, sqrt, atan2, radians

import matplotlib.pyplot as plt
%matplotlib inline

Change Layout to be able to see spark dataframes

In [7]:
plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

def fix_layout(width:int=95):
    from IPython.core.display import display, HTML
    display(HTML('<style>.container { width:' + str(width) + '% !important; }</style>'))
    
fix_layout()

### Spark Setup

In [8]:
username = getpass.getuser()

SPARK_LOCAL = False

# on the laptop
if not 'iccluster' in socket.gethostname():
    # set this to the base spark directory on your system
    SPARK_LOCAL = True
    
    if username == "fatine":
        spark_home = '/home/fatine/spark-2.4.1-bin-hadoop2.7'
        
        try:
            import findspark
            findspark.init(spark_home)
        except ModuleNotFoundError as e:
            print('Info: {}'.format(e))
    elif username == "soner":
        spark_home = '/home/soner/Desktop/DSLAB2019/spark-2.4.1-bin-hadoop2.7'
        
        try:
            import findspark
            findspark.init(spark_home)
        except ModuleNotFoundError as e:
            print('Info: {}'.format(e))
            
    elif username == "jelena":
        pass
        
    
        
# cluster
if username == "jbanjac":
    ROOT_PATH = "/home/jbanjac/robust-journey-planning"
    os.environ['PYSPARK_PYTHON'] = '/opt/anaconda3/bin/python'
# local
elif username == "jelena":
    ROOT_PATH = os.getcwd()
    os.environ['PYSPARK_PYTHON'] = '/home/jelena/anaconda3/bin/python'

In [9]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import unix_timestamp, udf, desc
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import VectorAssembler,Normalizer,  PCA,VectorIndexer
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture

In [10]:
if username == "jelena":
    spark = (SparkSession \
                .builder \
                .appName('sbb-{0}'.format(getpass.getuser())) \
                .master('local[4]') \
                .config('spark.driver.memory', '10g') \
                .config('spark.executor.memory', '4g') \
                .config('spark.executor.instances', '5') \
                .config('spark.port.maxRetries', '100') \
                .getOrCreate())
    CLUSTER_URL = "hdfs://iccluster042.iccluster.epfl.ch:8020"

elif SPARK_LOCAL:
    spark = SparkSession \
                .builder \
                .master("local") \
                .appName("roboust-journey-planing") \
                .config("spark.driver.host", "localhost") \
                .getOrCreate()
    CLUSTER_URL = ""
else:
    spark = SparkSession \
                .builder \
                .master("yarn") \
                .appName('sbb-{0}'.format(getpass.getuser())) \
                .config('spark.executor.memory', '4g') \
                .config('spark.executor.instances', '5') \
                .config('spark.port.maxRetries', '100') \
                .getOrCreate()
    CLUSTER_URL = ""

In [11]:
sc = spark.sparkContext
spark

### Data

In [12]:
def read_hrdf(file_name):
    with open(file_name, encoding='utf-8') as f:
        lines = f.readlines()
    
    data = []
    for line in lines:
        data.append([line[:7].strip(), float(line[8:18].strip()), float(line[19:29].strip()), float(line[30:36].strip()), line[38:].strip()])
    return data

df = pd.DataFrame(data=read_hrdf('BFKOORD_GEO'), 
                       columns=["stop_number", "longitude", "latitude", "elevation", "stop_name"])
# convert to Spark DF
df_geo = spark.createDataFrame(df)
df_geo.printSchema()

root
 |-- stop_number: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- stop_name: string (nullable = true)



### Stations Around Zurich

Select stations around 10 km of Zurich HB

In [13]:
df_geo.where(df_geo['stop_name'].like('Zürich HB')).show()

Zurich_HB_lat = df_geo.where(df_geo['stop_name'].like('Zürich HB')).\
                        select('latitude').collect()[0][0]
Zurich_HB_lon = df_geo.where(df_geo['stop_name'].like('Zürich HB')).\
                        select('longitude').collect()[0][0]

+-----------+---------+---------+---------+---------+
|stop_number|longitude| latitude|elevation|stop_name|
+-----------+---------+---------+---------+---------+
|    8503000| 8.540192|47.378177|    408.0|Zürich HB|
+-----------+---------+---------+---------+---------+



In [14]:
@F.udf
def calculateDistance(latitude, longitude):
    '''
    The distance between a coordinate and Zurich HB, in kilometers.
    '''
    # approximate radius of earth in km
    R = 6373.0

    # latitude and longitude values of Zurich HB in terms of radians
    lat_Zurich_HB = radians(Zurich_HB_lat)#radians(47.378178)
    lon_Zurich_HB = radians(Zurich_HB_lon)#radians(8.540192)

    # latitude and longitude values of a given station in terms of radians 
    # for comparing with the Zurich HB's coordinates
    lon = radians(longitude)
    lat = radians(latitude)
    
    # calculates the distance by using Haversine formula
    dlon = lon - lon_Zurich_HB
    dlat = lat - lat_Zurich_HB
    a = sin(dlat / 2)**2 + cos(lat_Zurich_HB) * cos(lat) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c # as km
    
    return distance

# Select the stations around 10 km of Zürich HB
df_zurich_geo = df_geo.withColumn('distance_to_Zurich', calculateDistance(df_geo.latitude,df_geo.longitude)).filter('distance_to_Zurich<=10.0')#.cache()

create zurich_stations_set as set of strings

In [15]:
zurich_stations_set = set([row[0] for row in df_zurich_geo.select('stop_number').collect()])
print('Number of stations around Zurich in BFKOORD_GEO is', len(zurich_stations_set))

Number of stations around Zurich in BFKOORD_GEO is 1040


### Distances Between Stations

In [16]:
@F.udf('float')
def calculateCrossDistance(lat1, lon1, lat2, lon2):
    '''
    The distance between two stations, in meters.
    '''
    # approximate radius of earth in km
    R = 6373.0

    # latitude and longitude values of a given station in terms of radians 
    # for comparing with the Zurich HB's coordinates
    lon_1 = radians(lon1)
    lat_1 = radians(lat1)
    lon_2 = radians(lon2)
    lat_2 = radians(lat2)
    
    # calculates the distance by using Haversine formula
    dlon = lon_2 - lon_1
    dlat = lat_2 - lat_1
    a = sin(dlat / 2)**2 + cos(lat_1) * cos(lat_2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c * 1000 # as m
    
    return distance

calculate the distance between any two stations in Zurich

In [17]:
walk_from_df = df_zurich_geo.alias('walk_from_df').withColumnRenamed('longitude', 'longitude_from')\
                                            .withColumnRenamed('latitude', 'latitude_from')\
                                            .withColumnRenamed('stop_number', 'stop_number_from')\
                                            .withColumnRenamed('stop_name', 'stop_name_from')\
                                            .drop('elevation').drop('distance_to_Zurich')

walk_to_df = df_zurich_geo.alias("walk_to_df").withColumnRenamed('longitude', 'longitude_to')\
                                            .withColumnRenamed('latitude', 'latitude_to')\
                                            .withColumnRenamed('stop_number', 'stop_number_to')\
                                            .withColumnRenamed('stop_name', 'stop_name_to')\
                                            .drop('elevation').drop('distance_to_Zurich')


df_joined = walk_from_df.crossJoin(walk_to_df)

# drop the row if arrival and departure stations are the same
mask = df_joined.stop_number_from == df_joined.stop_number_to
df_joined = df_joined[~mask]

In [18]:
df_distances = df_joined.withColumn('distance_as_meter', calculateCrossDistance(df_joined.latitude_from,df_joined.longitude_from,df_joined.latitude_to,df_joined.longitude_to))\
                                .drop('longitude_from').drop('longitude_to').drop('latitude_from').drop('latitude_to')

df_distances.show(5)

+----------------+--------------------+--------------+--------------------+-----------------+
|stop_number_from|      stop_name_from|stop_number_to|        stop_name_to|distance_as_meter|
+----------------+--------------------+--------------+--------------------+-----------------+
|         0000176|Zimmerberg-Basist...|       8502220|              Urdorf|        7887.3613|
|         0000176|Zimmerberg-Basist...|       8502221|      Birmensdorf ZH|        6393.3145|
|         0000176|Zimmerberg-Basist...|       8502222| Bonstetten-Wettswil|        4965.9766|
|         0000176|Zimmerberg-Basist...|       8502229|   Urdorf Weihermatt|        7633.4536|
|         0000176|Zimmerberg-Basist...|       8502559|Waldegg, Birmensd...|        4779.2036|
+----------------+--------------------+--------------+--------------------+-----------------+
only showing top 5 rows



create distance_dictionary -> (string, string) : float

In [19]:
distance_dictionary = {(row[0], row[1]):row[2] for row in df_distances.select('stop_number_from', 'stop_number_to', 'distance_as_meter').collect()}

### Location Information of Stations

create location_dictionary -> (string) : (string, string)

In [21]:
location_dictionary = {}
for stop_number in zurich_stations_set:
    row = df_zurich_geo.filter(df_zurich_geo.stop_number == stop_number).select('longitude', 'latitude').collect()[0]
    location_dictionary[stop_number] = (row[0], row[1])

save data structures

In [22]:
# save zurich stations set
with open('distance/zurich_stations_set.pickle', 'wb') as handle:
    pickle.dump(zurich_stations_set, handle)

# save distance dictionary
with open('distance/distance_dictionary.pickle', 'wb') as handle:
    pickle.dump(distance_dictionary, handle)
    
# save location dictionary
with open('distance/location_dictionary.pickle', 'wb') as handle:
    pickle.dump(location_dictionary, handle)