In [1]:
from sedona.spark import *
import os
import time

In [2]:
DATA_LINK = "s3a://overturemaps-us-west-2/release/2023-07-26-alpha.0/"

In [3]:
config = SedonaContext.builder()\
    .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")\
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")\
    .getOrCreate()

sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/05 22:41:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Converts geometry column from binary to geometry type

In [5]:
def geomTypeConverter(df, df_name):
    cols =','.join(df.schema.names[:-1])
    df.createOrReplaceTempView(df_name)
    df = sedona.sql("SELECT "+cols+", ST_GeomFromWKB(geometry) AS geometry FROM "+df_name)
    df.createOrReplaceTempView(df_name)
    return df

# Building Dataset 

In [6]:
df_building = sedona.read.format("parquet").load(DATA_LINK+"theme=buildings/type=building")
df_building = geomTypeConverter(df_building, "df_building")
df_building.printSchema()

23/08/05 22:42:00 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- id: string (nullable = true)
 |-- updatetime: string (nullable = true)
 |-- version: integer (nullable = true)
 |-- names: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- level: integer (nullable = true)
 |-- height: double (nullable = true)
 |-- numfloors: integer (nullable = true)
 |-- class: string (nullable = true)
 |-- sources: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- bbox: struct (nullable = true)
 |    |-- minx: double (nullable = true)
 |    |-- maxx: double (nullable = true)
 |    |-- miny: double (nullable = true)
 |    |-- maxy: double (nullable = true)
 |-- geometry: geometry (nullable = true)




                                                                                

In [10]:
df_building.describe(["level"]).show()



+-------+-----------------+
|summary|           height|
+-------+-----------------+
|  count|         27871758|
|   mean|22.26968765265663|
| stddev|82080.48443441925|
|    min|              0.0|
|    max|     4.33333333E8|
+-------+-----------------+




                                                                                

# Place Dataset 

In [None]:
df_place = sedona.read.format("parquet").load(DATA_LINK+"theme=places/type=place")
df_place = geomTypeConverter(df_place, "df_place")

In [None]:
df_place.printSchema()

In [None]:
df_place.selectExpr("phones").filter(~(size(col("addresses")) == 1)).count()

In [None]:
df_place.selectExpr("phones").filter(size(col("phones")) == 1).count()

# Admins Datasets

## Administrative Boundary Dataset

In [None]:
df_admin = sedona.read.format("parquet").load(DATA_LINK+"theme=admins/type=administrativeBoundary")
df_admin = geomTypeConverter(df_admin, "df_admin")

In [None]:
df_admin_boundary.filter(col("isocountrycodealpha2").isNotNull()).selectExpr("isocountrycodealpha2").show(200, False)

## Locality Dataset

In [None]:
df_locality = sedona.read.format("parquet").load(DATA_LINK + "theme=admins/type=locality").limit(10000)
df_locality = geomTypeConverter(df_locality, "df_locality")

# Transportation Datasets

## Connector Dataset

In [None]:
df_connector = sedona.read.format("parquet").load(DATA_LINK+"theme=transportation/type=connector")
df_connector = geomTypeConverter(df_connector, "df_connector")

## Segment Dataset

In [None]:
df_segment = sedona.read.format("parquet").load(DATA_LINK+"theme=transportation/type=segment")
df_segment = geomTypeConverter(df_segment, "df_segment")

# San Francisco boundary

In [None]:
bay_area_boundary_RDD = ShapefileReader.readToGeometryRDD(sc, "data/bay-area-counties")
san_francisco = Adapter.toDf(bay_area_boundary_RDD, sedona)
san_francisco.createOrReplaceTempView("san_francisco")

# Filtering out non-geometry columns

## Building dataset

In [None]:
df_filtered_building = df_building.filter(~(size(col("names")) <= 0)).filter(col("height") >= 0)\
                                  .filter(col("height") <= 100 )\
                                  .filter(~(size(col("sources")) <= 0)).filter(col("numfloors") == 1)
df_filtered_building.createOrReplaceTempView("df_filtered_building")

In [None]:
start_time = time.time()
print(df_filtered_building.count())
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))

## Place DataSet

In [None]:
df_filtered_place = df_place.filter((~(size(col("addresses")) == 1)) & (col("confidence") == 0.7087309956550598))
df_filtered_place.createOrReplaceTempView("df_filtered_place")

In [None]:
start_time = time.time()
print(" count: " + str(df_filtered_place.count()))
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))

In [None]:
df_filtered_place.selectExpr("confidence").show()

## Admins Datasets

### Administrative Dataset

It doesn't need filtering as it is a small dataset

### Locality Dataset 

It doesn't need filtering as it is a small dataset

## Transportation Dataset

### Connector 

Nothing to filter on. As most of the non-geometry columns are null.

### Segment

In [None]:
df_filtered_segment = sedona.sql("select * from df_segment where level < 6 and level > 2")
df_filtered_segment.createOrReplaceTempView("df_filtered_segment")

In [None]:
start_time = time.time()
print(" count: " + str(df_segment.count()))
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))

In [None]:
df_segment.filter((col("level") < 6) & (col("level") > 2)).selectExpr("level").count()

# Intersecting data points to San Francisco Bay Area

## Building Dataset

In [None]:
result_building = sedona.sql("SELECT * FROM df_filtered_building b, san_francisco s WHERE ST_Intersects(b.geometry, s.geometry)")
building_geom = result_building.selectExpr("b.geometry")

## Place Dataset

In [None]:
result_place = sedona.sql("SELECT * FROM df_place p, san_francisco s WHERE ST_Intersects(p.geometry, s.geometry)")
place_geom = result_place.selectExpr("p.geometry")

## Admins Datasets

### Administrative Boundary Dataset

In [None]:
result_admin = sedona.sql("SELECT * FROM df_admin a, san_francisco s WHERE ST_Intersects(a.geometry, s.geometry)")
admin_geom = result_admin.select("a.geometry")

### Locality Dataset

In [None]:
result_locality = sedona.sql("SELECT * FROM df_locality p, san_francisco s WHERE ST_Intersects(p.geometry, s.geometry)")
locality_geom = result_locality.selectExpr("p.geometry")

## Transportation Datasets 

### Connector Dataset

In [None]:
result_connector = sedona.sql("SELECT * FROM df_connector c, san_francisco s WHERE ST_Intersects(c.geometry, s.geometry)")
connector_geom = result_connector.selectExpr("c.geometry")

### Segment Dataset

In [None]:
result_segment = sedona.sql("SELECT * FROM df_filtered_segment f, san_francisco s WHERE ST_Intersects(f.geometry, s.geometry)")
segment_geom = result_segment.selectExpr("f.geometry")

# SedonaKepler to Visualize

## Building Dataset

In [None]:
start_time = time.time()
map_building = SedonaKepler.create_map(building_geom, 'Building')
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))
map_building

## Place Dataset

In [None]:
start_time = time.time()
map_place = SedonaKepler.create_map(place_geom, "Place")
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))
map_place

## Admins Datasets 

### Admisnistrative Dataset

In [None]:
start_time = time.time()
map_admin = SedonaKepler.create_map(admin_geom, "Admin")
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))
map_admin

### Locality Dataset

In [None]:
start_time = time.time()
map_locality = SedonaKepler.create_map(locality_geom, 'Locality')
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))
map_locality

## Transportation Datasets

### Connector

In [None]:
start_time = time.time()
map_connector = SedonaKepler.create_map(connector_geom, "Connector")
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))
map_connector

### Segment

In [None]:
start_time = time.time()
map_segment = SedonaKepler.create_map(segment_geom, "Segment")
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))
map_segment

In [None]:
start_time = time.time()
admin_geom_whole = df_locality.selectExpr("geometry")
map_admin_whole = SedonaKepler.create_map(admin_geom_whole, "Admin whole")
print("--- %s milliseconds ---" % ((time.time() - start_time)* 1000))
map_admin_whole
