In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark OpenStreetMap project") \
    .config("spark.some.config.option", "osm-hungary") \
    .getOrCreate()

In [3]:
hungaryNodes = spark.read.parquet("parquets/hungary-latest.osm.pbf.node.parquet")
hungaryWays = spark.read.parquet("parquets/hungary-latest.osm.pbf.way.parquet")
hungaryRelations = spark.read.parquet("parquets/hungary-latest.osm.pbf.relation.parquet")

In [4]:
hungaryNodes.createOrReplaceTempView("hungaryNodes")
hungaryWays.createOrReplaceTempView("hungaryWays")
hungaryRelations.createOrReplaceTempView("hungaryRelations")

In [5]:
from pyspark.sql.functions import col, explode, array_contains, collect_list, struct

In [6]:
from pyspark.sql.types import ArrayType, StringType

In [7]:
search_key = 'highway'

In [8]:
wayNode_df = hungaryWays.filter(array_contains(col('tags').getField('key')\
                                               .cast(ArrayType(StringType())), search_key))\
.select(col('id').alias('wayId'), col('user_sid'), 
        col('tags').getField('key').cast(ArrayType(StringType())).alias('keys'),\
        col('tags').getField('value').cast(ArrayType(StringType())).alias('values'),\
        explode("nodes").alias('indexedNode') )

In [9]:
wayNode_df = wayNode_df.withColumn('user_sid', wayNode_df['user_sid'].cast(StringType()))

In [10]:
wayNode_df.show(10)

+-------+--------+---------------+--------------------+---------------+
|  wayId|user_sid|           keys|              values|    indexedNode|
+-------+--------+---------------+--------------------+---------------+
|3175810| SzPaula|[highway, name]|[residential, Hon...|  [0, 15231786]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[1, 1310148452]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[2, 1310021025]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[3, 1310020985]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[4, 1310020988]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[5, 1310021030]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[6, 1310021038]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[7, 1310021044]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[8, 1309949162]|
|3175810| SzPaula|[highway, name]|[residential, Hon...|[9, 1309949099]|
+-------+--------+---------------+--------------------+---------

In [11]:
import spark_df_profiling

In [14]:
wayNode_df = wayNode_df.withColumn('keys', wayNode_df['keys'].cast(StringType()))

In [15]:
wayNode_df = wayNode_df.withColumn('values', wayNode_df['values'].cast(StringType()))

In [16]:
wayNode_df = wayNode_df.withColumn('indexedNode', wayNode_df['indexedNode'].cast(StringType()))

In [17]:
report = spark_df_profiling.ProfileReport(wayNode_df)

  plot.set_axis_bgcolor("w")


In [18]:
report

0,1
Number of variables,5
Number of observations,4809092
Total Missing (%),0.0%
Total size in memory,0.0 B
Average record size in memory,0.0 B

0,1
Numeric,1
Categorical,4
Date,0
Text (Unique),0
Rejected,0

0,1
Distinct count,509675
Unique (%),10.6%
Missing (%),0.0%
Missing (n),0
Infinite (%),0.0%
Infinite (n),0

0,1
Mean,229090000
Minimum,3175800
Maximum,395330000
Zeros (%),0.0%

0,1
Minimum,3175800
5-th percentile,35829000
Q1,159160000
Median,236480000
Q3,311790000
95-th percentile,376130000
Maximum,395330000
Range,392160000
Interquartile range,152630000

0,1
Standard deviation,101510000
Coef of variation,0.44309
Kurtosis,-0.82729
Mean,229090000
MAD,85526000
Skewness,-0.36439
Sum,1101700000000000
Variance,1.0304e+16
Memory size,0.0 B

0,1
Distinct count,2584
Unique (%),0.1%
Missing (%),0.0%
Missing (n),0
Infinite (%),0.0%
Infinite (n),0

0,1
Domcsi,901090
mgpx,267148
szali,190094
Other values (2581),3450760

Value,Count,Frequency (%),Unnamed: 3
Domcsi,901090,18.7%,
mgpx,267148,5.6%,
szali,190094,4.0%,
hadidoki,164554,3.4%,
papesz,155802,3.2%,
BáthoryPéter,138984,2.9%,
Separis,129977,2.7%,
nagy_balint,123370,2.6%,
efemm,120861,2.5%,
kla25,105973,2.2%,

0,1
Distinct count,11430
Unique (%),0.2%
Missing (%),0.0%
Missing (n),0
Infinite (%),0.0%
Infinite (n),0

0,1
[highway],2124123
"[highway, name]",596312
"[highway, tracktype]",248277
Other values (11427),1840380

Value,Count,Frequency (%),Unnamed: 3
[highway],2124123,44.2%,
"[highway, name]",596312,12.4%,
"[highway, tracktype]",248277,5.2%,
"[highway, source]",194293,4.0%,
"[highway, ref]",156516,3.3%,
"[highway, source, traces]",119269,2.5%,
"[highway, name, ref]",85139,1.8%,
"[highway, surface]",81872,1.7%,
"[highway, surface, tracktype]",50009,1.0%,
"[highway, name, surface]",45385,0.9%,

0,1
Distinct count,90261
Unique (%),1.9%
Missing (%),0.0%
Missing (n),0
Infinite (%),0.0%
Infinite (n),0

0,1
[track],1383449
[residential],180227
[service],168356
Other values (90258),3077060

Value,Count,Frequency (%),Unnamed: 3
[track],1383449,28.8%,
[residential],180227,3.7%,
[service],168356,3.5%,
[path],136211,2.8%,
[unclassified],114813,2.4%,
[footway],97760,2.0%,
"[track, grade4]",74242,1.5%,
"[track, turistautak.hu, 1]",72131,1.5%,
"[track, grade2]",65222,1.4%,
"[track, grade3]",58004,1.2%,

0,1
Distinct count,4714854
Unique (%),98.0%
Missing (%),0.0%
Missing (n),0
Infinite (%),0.0%
Infinite (n),0

0,1
"[0, 659498198]",5
"[0, 1523554205]",5
"[0, 3772638180]",5
Other values (4714851),4809077

Value,Count,Frequency (%),Unnamed: 3
"[0, 659498198]",5,0.0%,
"[0, 1523554205]",5,0.0%,
"[0, 3772638180]",5,0.0%,
"[0, 251282084]",5,0.0%,
"[0, 1194200373]",5,0.0%,
"[0, 418393065]",5,0.0%,
"[0, 1497711817]",5,0.0%,
"[0, 2454709390]",5,0.0%,
"[1, 766849199]",5,0.0%,
"[0, 2454771916]",5,0.0%,

Unnamed: 0,wayId,user_sid,keys,values,indexedNode
0,3175810,SzPaula,"[highway, name]","[residential, Honvéd utca]","[0, 15231786]"
1,3175810,SzPaula,"[highway, name]","[residential, Honvéd utca]","[1, 1310148452]"
2,3175810,SzPaula,"[highway, name]","[residential, Honvéd utca]","[2, 1310021025]"
3,3175810,SzPaula,"[highway, name]","[residential, Honvéd utca]","[3, 1310020985]"
4,3175810,SzPaula,"[highway, name]","[residential, Honvéd utca]","[4, 1310020988]"


In [None]:
nodCoor_df = hungaryNodes\
    .select(col('id').alias('nodeId'), col('id'), col('latitude'), col('longitude'))

In [None]:
nodCoor_df.show(10)

In [None]:
a = wayNode_df.indexedNode.getField('nodeId').cast(StringType())

In [None]:
b = nodCoor_df.nodeId.cast(StringType())

In [None]:
way_geometry_df = wayNode_df.join(nodCoor_df, a==b)

In [None]:
way_geometry_df.show(10)

In [None]:
way_geometry_df = way_geometry_df.groupBy('wayId', 'values')\
                                 .agg(collect_list(struct(col('indexedNode').getField('index'),\
                                                          col('latitude'),\
                                                          col('longitude'))).alias('geometry'))


In [None]:
way_geometry_df.show(10)

In [None]:
way_geometry_df = way_geometry_df.limit(1000).toPandas()

In [None]:
way_geometry_df.head()

In [None]:
way_geometry_df.iloc[0]['geometry']

In [None]:
import plotly
plotly.offline.init_notebook_mode()
import plotly.graph_objs as go
import pandas as pd

In [None]:
data = dict (
    type = 'choropleth',
    locations = ['Hungary'],
    locationmode='country names',
    colorscale = 'Viridis',
    z=[50])

In [None]:
lyt = dict(geo=dict(scope='europe'))
fig = go.Figure(data=[data], layout = lyt)

plotly.offline.iplot(fig)

In [None]:
temp = way_geometry_df.iloc[0]['geometry']

In [None]:
temp[0]

In [None]:
len(way_geometry_df)

In [None]:

flight_paths = []
for j in range(len(way_geometry_df)):
    for i in range(len(way_geometry_df.iloc[j]['geometry'])):
        flight_paths.append(
            go.Scattergeo(
                locationmode = 'country names',
                lon = [way_geometry_df.iloc[j]['geometry'][i]['longitude'], way_geometry_df.iloc[j]['geometry'][int(i+1) % len(way_geometry_df.iloc[j]['geometry'])]['longitude']],
                lat = [way_geometry_df.iloc[j]['geometry'][i]['latitude'], way_geometry_df.iloc[j]['geometry'][int(i+1) % len(way_geometry_df.iloc[j]['geometry'])]['latitude']],
                mode = 'lines',
                line = go.scattergeo.Line(
                    width = 10,
                    color = 'red',
                ),
            opacity =1,
            )
        )

layout = go.Layout(
    title = go.layout.Title(
        text = 'Highways, Hungary'
    ),
    showlegend = False,
    geo = go.layout.Geo(
        scope = 'europe',
        projection = go.layout.geo.Projection(type = 'azimuthal equal area'),
        showland = True,
        landcolor = 'rgb(243, 243, 243)',
        countrycolor = 'rgb(204, 204, 204)',
    ),
)

fig = go.Figure(data = flight_paths, layout = layout)
plotly.offline.iplot(fig, filename = 'highways')