# Perform Analysis on All Features and All Cities

OK we are now ready for the big moment of finding which city is the cultural capital of Europe based on our criteria

In [None]:
# %load './code/helpers/imports.py'
import notebook
import os.path, json, io, pandas
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['figure.figsize'] = (16, 20)

from retrying import retry # for exponential back down when calling TurboOverdrive API

import pyspark.sql.functions as func # resuse as func.coalace for example
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType

import pandas as pandas
from geopandas import GeoDataFrame # Loading boundaries Data
from shapely.geometry import Point, Polygon, shape # creating geospatial data
from shapely import wkb, wkt # creating and parsing geospatial data
import overpy # OpenStreetMap API

# make sure nbextensions are installed
notebook.nbextensions.check_nbextension('usability/codefolding', user=True)

try:
    sc
except NameError:
    import pyspark
    sc = pyspark.SparkContext('local[*]')
    sqlContext = pyspark.sql.SQLContext(sc)


In [None]:
# %load './code/helpers/load_boundaries_and_pois.py'
OVERPASS_API         = overpy.Overpass()
BASE_DIR             = os.path.join(os.path.abspath('.'), 'work-flow')
URBAN_BOUNDARIES_FILE = '06_Europe_Cities_boundaries_with_Labels_Population.geo.json'

# Paths to base datasets that we are using:
URBAN_BOUNDARIES_PATH = os.path.join(BASE_DIR,URBAN_BOUNDARIES_FILE)
POIS_PATH            = os.path.join(BASE_DIR, "pois.json")

try:
    geo_df
except NameError:
    geo_df = GeoDataFrame.from_file(URBAN_BOUNDARIES_PATH)
    # Add a WKT column for use later
    geo_df['wkt'] = pandas.Series(
        map(lambda geom: str(geom.to_wkt()), geo_df['geometry']),
        index=geo_df.index, dtype='string')

try:
    boundaries_from_pd
except NameError:
    boundaries_from_pd = sqlContext.createDataFrame(geo_df)
    boundaries_from_pd.registerTempTable("boundaries")

try:
    pois_df
except NameError:
    pois_df = sqlContext.read.json(POIS_PATH)
    pois_df = pois_df.toPandas()
    def toWktColumn(coords):
        return (Point(coords).wkt)

    pois_df['wkt'] = pandas.Series(
        map(lambda geom: toWktColumn(geom.coordinates), pois_df['geometry']),
        index=pois_df.index, dtype='string')

    pois_df = sqlContext.createDataFrame(pois_df)


# Unique IDs
as we saw in SpatialSpark each record in our dataset needs to have unique ID inorder to allow us to get the tuple matching our sptial predicates


In [None]:
from pyspark.sql.functions import monotonicallyIncreasingId

# create dataframe with (id, geometry) for POIs
# 1. Add and ID Column to POIs

pois_df           = pois_df.withColumn("id", monotonicallyIncreasingId())
pois_tuple_id_wkt = pois_df.select(pois_df['id'], pois_df['wkt'])

pois_tuple_id_wkt.show()
pois_tuple_id_wkt.printSchema()
print pois_tuple_id_wkt.count()

In [None]:
# create dataframe with (id, geometry-as-WKT) for boundaries
# 1. Add and ID Column to boundaries

boundaries_from_pd     = boundaries_from_pd.withColumn("id", monotonicallyIncreasingId())
boundaries_tuple_id_wkt = boundaries_from_pd.select(boundaries_from_pd['id'], boundaries_from_pd['wkt'])

boundaries_tuple_id_wkt.printSchema()
print boundaries_from_pd.count()
boundaries_tuple_id_wkt.show()
wkt.loads(boundaries_tuple_id_wkt.take(7)[6].wkt)

In [None]:
spatialspark = sc._jvm.spatialspark
SpatialOperator      = spatialspark.operator.SpatialOperator 
BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin
from ast import literal_eval as make_tuple # used to decode data from java

joinPoiBdryRDD = BroadcastSpatialJoin.apply(sc._jsc, 
                                            pois_tuple_id_wkt._jdf, 
                                            boundaries_tuple_id_wkt._jdf, 
                                            SpatialOperator.Within(), 
                                            0.0)

In [None]:
print joinPoiBdryRDD.count()

joinResults = map(lambda result: make_tuple(result.toString()), joinPoiBdryRDD.collect())


In [None]:
print pois_tuple_id_wkt.count()
print boundaries_tuple_id_wkt.count()
print joinResults[695]

In [None]:
# make the results a DF
rddResult = sc.parallelize(joinResults)
df = sqlContext.createDataFrame(rddResult, ["poi_id", "boundry_id"])
df.printSchema()

In [None]:
# Do a join with poi df and bdry df
df_with_pois = df.join(pois_df, df['poi_id'] == pois_df['id']).select(
    df['poi_id'],
    df['boundry_id'],
    pois_df['properties'].alias("poi_properties"),    
    pois_df['wkt'].alias("poi_wkt")
)


In [None]:
df_with_pois.columns

In [None]:
df_with_pois_bdrys = df_with_pois.join(
        boundaries_from_pd, df_with_pois['boundry_id'] == boundaries_from_pd['id']
    ).select(
        df_with_pois['poi_id'],
        df_with_pois['boundry_id'],
        df_with_pois['poi_properties'],
        df_with_pois['poi_wkt'],
        boundaries_from_pd['wkt'].alias("boundry_wkt"),
        boundaries_from_pd['NAMEASCII'].alias("city_name"),
        boundaries_from_pd['POPEU2013'].alias("population")
    )

In [None]:
df_with_pois_bdrys.columns

In [None]:
df_with_pois_bdrys.cache()
df_with_pois_bdrys.count()

In [None]:
df_with_pois_bdrys.show(3)
# we have now

In [None]:
# ## what have we done so far

# We have now created a join table with each POI and corresponding city along with the 
# population and city name POI's have a nested property thanks to geojson with keys for 
# the type of location. We will still need a way to decipher these

# +------+----------+--------------------+--------------------+--------------------+---------+----------+
# |poi_id|boundry_id|      poi_properties|             poi_wkt|         boundry_wkt|city_name|population|
# +------+----------+--------------------+--------------------+--------------------+---------+----------+
# |    31|         0|[null,null,null,n...|POINT (8.50017549...|MULTIPOLYGON (((8...|   Zurich|    380777|
# |    32|         0|[null,null,null,n...|POINT (8.53091749...|MULTIPOLYGON (((8...|   Zurich|    380777|
# |    33|         0|[null,null,null,n...|POINT (8.52973519...|MULTIPOLYGON (((8...|   Zurich|    380777|
# +------+----------+--------------------+--------------------+--------------------+---------+----------+

# The other issue is that the tag for location type is split across amentiy and tourism. 
# To simplify our our calculation we will create a colum for location type and have the label there.

In [None]:
rec = df_with_pois_bdrys.take(1)[0]

In [None]:
print rec.poi_properties.amenity or rec.poi_properties.tourism

In [None]:
# For the first step we add columns for each tag. In this case we have amenity and tourism

df_with_pois_bdrys = df_with_pois_bdrys.withColumn(
        'tourism', df_with_pois_bdrys['poi_properties']['tourism']
    ).withColumn(
        'amenity', df_with_pois_bdrys['poi_properties']['amenity'])

df_with_pois_bdrys.show(5)

In [None]:
# We next coalesce the two columns into a single column called location_type and then group our data by
# all the fields we need in this case city_name, population, location_type and the perform a count

df = df_with_pois_bdrys.select('*', func.coalesce(
        df_with_pois_bdrys['poi_properties']['tourism'], 
        df_with_pois_bdrys['poi_properties']['amenity']
    ).alias("location_type")).groupby('boundry_id', 
                                      'city_name', 
                                      'location_type', 
                                      'population').count()

In [None]:
df.show(5)

In [None]:
# Create an UDF for a column that calculates the score per record

def get_cultural_score(location_type, count, population):
    cultural_weight_lookup = { 
        u'museum':      1.0,
        u'arts_centre': 2.0,
        u'theatre':     3.0,
        u'gallery':     4.0,
        u'artwork':     5.0  # try modifying the weights as an exercise
    }

    wgt = cultural_weight_lookup.get(location_type, 0.0)

    return float((wgt* float(count) * 100000)/float(population))

sqlContext.registerFunction("get_cultural_score", get_cultural_score, FloatType())

# score_udf = func.udf(get_cultural_score, FloatType())

In [None]:
# score_df = df.select(df.boundry_id, 
#           df.city_name, 
#           df.population,
#           df.location_type,
#           df.count,
#           score_udf(df.location_type, df.count, df.population).alias('cultural_score')
#          )

In [None]:
df.registerTempTable("cultural_score")

score_df = sqlContext.sql(
    "SELECT boundry_id, \
        city_name, \
        location_type, \
        population, \
        count, get_cultural_score(location_type, count, population) as score \
    FROM cultural_score")

In [None]:
score_df.sort(score_df.city_name.asc()).show()
# score_df.show()

In [None]:
df = score_df.groupBy("boundry_id", 
                      "city_name", 
                      "population").agg(func.sum(score_df.score)).sort("city_name")

In [None]:
df.show()

In [None]:
df= df.withColumnRenamed("sum(score)","final_score")

In [None]:
pd = df.toPandas()
pd.sort_values("final_score", ascending=False)

# Visualizing the data

Let map the data using the Folium package that allows embedding Leaflet Maps inside ipython notebooks.


In [None]:

pd_df = df_with_pois_bdrys.toPandas()
pd_df.head(2)

In [None]:
pd_df = pd_df[['boundry_id', 'boundry_wkt', 'city_name', 'population']]
pd_df = pd_df.drop_duplicates()

In [None]:
geometry = [wkt.loads(boundry_wkt) for boundry_wkt in pd_df.boundry_wkt]

In [None]:
geodf = GeoDataFrame(pd_df, geometry=geometry)

In [None]:
geodf.head(2)

In [None]:
scores = pd

In [None]:
scores.head()
scores_merged_df=pandas.DataFrame.merge(geodf, scores, on='boundry_id')


In [None]:
scores_merged_df.head(1)
geo_scores_merged = GeoDataFrame(scores_merged_df)

In [None]:
geo_scores_merged

In [None]:
import folium
map_osm = folium.Map(location=[47.19094, 11.98566], 
    tiles='Mapbox Bright',
    zoom_start=6)

map_osm.choropleth(geo_str=geo_scores_merged.to_json(),
              data=geo_scores_merged,
              columns=['city_name_x', 'final_score'],
              fill_color='RdBu',
              key_on='properties.city_name_x')
map_osm