## Intermediate Parallel Computing

### Segment 5 of 6

### PySpark SQL III: Spatial is Special!

#### In this segment we will learn:
* Apache Sedona
* Querying spatial data with PySpark SQL
* The benefits of parallelization for spatial data and analysis


*Lesson Developer: Mohsen Ahmadkhani, ahmad178@umn.edu*

## Reminder
<a href="#/slide-2-0" class="navigate-right" style="background-color:blue;color:white;padding:8px;margin:2px;font-weight:bold;">Continue with the lesson</a>

<br>
</br>
<font size="+1">

By continuing with this lesson you are granting your permission to take part in this research study for the Hour of Cyberinfrastructure: Developing Cyber Literacy for GIScience project. In this study, you will be learning about cyberinfrastructure and related concepts using a web-based platform that will take approximately one hour per lesson. Participation in this study is voluntary.

Participants in this research must be 18 years or older. If you are under the age of 18 then please exit this webpage or navigate to another website such as the Hour of Code at https://hourofcode.com, which is designed for K-12 students.

If you are not interested in participating please exit the browser or navigate to this website: http://www.umn.edu. Your participation is voluntary and you are free to stop the lesson at any time.

For the full description please navigate to this website: <a href="../../gateway-lesson/gateway/gateway-1.ipynb">Gateway Lesson Research Study Permission</a>.

</font>

In [None]:
# This code cell starts the necessary setup for Hour of CI lesson notebooks.
# First, it enables users to hide and unhide code by producing a 'Toggle raw code' button below.
# Second, it imports the hourofci package, which is necessary for lessons and interactive Jupyter Widgets.
# Third, it helps hide/control other aspects of Jupyter Notebooks to improve the user experience
# This is an initialization cell
# It is not displayed because the Slide Type is 'Skip'

from IPython.display import HTML, IFrame, Javascript, display, Latex, Markdown
from ipywidgets import interactive
import ipywidgets as widgets
from ipywidgets import Layout, VBox, Button
import warnings
warnings.filterwarnings('ignore')
import getpass # This library allows us to get the username (User agent string)

# import package for hourofci project
import sys
sys.path.append('../../supplementary') # relative path (may change depending on the location of the lesson notebook)
import hourofci

class Output:
    def __init__(self, name='cmd_out'):
        self.h = display(display_id=name)
        self.content = ''
        self.mime_type = None
        self.dic_kind = {
            'text': 'text/plain',
            'markdown': 'text/markdown',
            'html': 'text/html',
        }
        
    def display(self):
        self.h.display({'text/plain': ''}, raw=True)
        
    def _build_obj(self, content, kind, append, new_line):
        self.mime_type = self.dic_kind.get(kind)
        if not self.mime_type:
            return content, False
        if append:
            sep = '\n' if new_line else ''
            self.content = self.content + sep + content
        else:
            self.content = content
        return {self.mime_type: self.content}, True
        
    def update(self, content, kind=None, append=False, new_line=True):
        obj, raw = self._build_obj(content, kind, append, new_line)
        self.h.update(obj, raw=raw)
        

def cont(gpd_time, sedona_time):
    if round(gpd_time/sedona_time, 2)>1:
        res = f'''*** \n <font style="background-color: #E3FFF5; font-size: large; color: black; font-weight: bold">
        Comparing the two execution times, for executing the intersection operation, GeoPandas took <font size=5 color='red'>{round(gpd_time, 3)}</font> seconds and 
        Apache Sedona did it for us in only <font size=5 color='red'>{round(sedona_time, 3)}</font> seconds. It means parallelizing this operation with Spark SQL made 
        the spatial operation <font size=5 color='red'>{round(gpd_time/sedona_time, 2)}</font> times <font size=5 color='green'>FASTER</font>.
        \n ***
        '''
    else:
        res = f'''**
        Comparing the two execution times, for executing the intersection operation GeoPandas took {round(gpd_time, 3)} seconds and 
        Apache Sedona did it for us in {round(sedona_time, 3)} seconds. Although we parallelized the operation using Spark framework, 
        it did slightly slower than GeoPandas. This is because of the trade-off between the partitioning cost and the load of the input data.** 
        '''
    return res

# load javascript to initialize/hide cells, get user agent string, and hide output indicator
# hide code by introducing a toggle button "Toggle raw code"
HTML(''' 
    <script type="text/javascript" src=\"../../supplementary/js/custom.js\"></script>
    
    <style>
        .output_prompt{opacity:0;}
    </style>
    
    <input id="toggle_code" type="button" value="Toggle raw code">
''')

# Preface
### Why Distributed Spatial Computing?

<center><img src=https://media.makeameme.org/created/why-even-bother-5c8eb2.jpg width=300></center>
In recent years, spatial technology has evolved a lot and we now have BIG spatial data. Some examples of spatial big data include location-based services (e.g., Uber, Lyft, scooter ride companies), remote sensing data, spatial social networks' data like twitter and FaceBook, weather maps, transportation, and countless others. Handling such BIG loads of spatial data needs <b>faster</b> database management technologies, and parallel computing is faster!

By the way, if you are curious about spatial big data, we talked about it in the <a href="http://try.hourofci.org/hub/user-redirect/git-pull?repo=https%3A%2F%2Fgithub.com%2Fhourofci%2Flessons&urlpath=tree%2Flessons%2Fintermediate-lessons%2Fbig-data%2FWelcome.ipynb&branch=master">intermediate Big Data</a> lesson. So, take a look if you have not already.   


## Apache Sedona

So far, we briefly saw how Apache Spark works, but like most data management technologies, Apache Spark first was developed for non-spatial data. Why? Well, because <b>spatial is special!</b> Due to nature of spatial data, it is much more complex than non-spatial data, and therefore it is harder to store and analyse. 
To support spatial data, Apache launched an extension to Spark named <b><a href="http://sedona.apache.org">Apache Sedona</a></b>. 

Apache Sedona (formerly GeoSpark) is a powerful tool that extends RDDs to geospatial RDDs (aka SpatialRDD). In simple words Apache Sedona enables two preliminary things: 
<ol>
    <li>
Distributing geospatial data between multiple computational cores 
    </li>
    <li>
Spatial functions and queries in Spark SQL
    </li>
</ol>

In this segment we touch on Apache Sedona and see how it works faster than regular data processing frameworks (like GeoPandas) through a demo spatial problem. 

Please note that in this lesson, we use "(spatial) Spark SQL" and "(Apache) Sedona" interchangeably. 

## Exit the Presentation Mode Again!

It's CODE time again! <br>
We'll start by installing Apache Sedona package.

Click on the **X** in the upper left hand corner to exit RISE Presentation mode and see the regular Jupyter Notebook.


In [None]:
pip install apache-sedona --quiet

Now, click the "Restart Kernel" to update the list of installed packages.

In [None]:
def restarter():
    display(HTML(
        '''
            <script>
                code_show = false;
                function restart_kernel(){
                    IPython.notebook.kernel.restart();
                }
            </script>
            <button onclick="restart_kernel()">Restart Kernel</button>
        '''
    ))
restarter()

Now we are going to import the packages we need including sedona, pyspark, geopandas (for data processing) and ipyleaflet (for mapping).

In [None]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
import geopandas as gpd
from ipyleaflet import Map, GeoData

Now, we create a spatially enabled spark context using the imported Sedona sub-modules. Don't worry too much if it looks complicated! You can copy and paste this for your project :))

In [None]:
spark = SparkSession.\
    builder.\
    master("local[*]").\
    appName("Spatial Spark Demo").\
    config("spark.serializer", KryoSerializer.getName).\
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName) .\
    config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.1-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2") .\
    getOrCreate()

SedonaRegistrator.registerAll(spark)
sc = spark.sparkContext


## Outline 
To illustrate how much faster Spark SQL can work through parallelization, we will compare its performance with the GeoPandas module. In this segment, we will execute a spatial operation between two sample spatial datasets to solve a demo spatial problem: 
>**Which rivers and lake centerlines intersect the states of Minnesota and Washington?** 
 

To find out the answer, we will take the following steps: 
><ol style='margin= 100px '>
    <li>
        Data Collection<br>
        <ul>
            <li>
        Download and read the <i>US states</i> and world's <i>rivers</i> datasets as <i>GeoPandas</i> dataframes. <br><br>
            </li>
        </ul>
    </li>
    <li>
        Spatial Operation With <b>GeoPandas</b> (Not Parallelized)
        <ul>
            <li>
        Perform the spatial intersection operation using GeoPandas and record the execution time. <br><br>
            </li>
        </ul> 
    </li>
    <li>
        Spatial Operation With <b> Apache Sedona</b>  (Parallelized)
        <ul>
            <li>
        Do the same operation with Apache Sedona and record the execution time. <br><br>
            </li>
        </ul> 
    </li>
    <li>
        Conclusion 
    </li>
</ol>
    
Let's do it! 


## 1. Data Collection

### World-Scale Rivers and Lake Centerlines Dataset
This shapefile includes all the rivers and lake centerlines in the world with 10 meters accuracy. 
This is the same dataset we used <a href="http://try.hourofci.org/hub/user-redirect/git-pull?repo=https%3A%2F%2Fgithub.com%2Fhourofci%2Flessons&urlpath=tree%2Flessons%2Fbeginner-lessons%2Fgeospatial-data%2Fgd-example_1.ipynb&branch=master">here</a> in the beginner geospatial data lesson. You can learn more about this dataset there. 

Run the cell below to download the dataset as a zip file using the `wget` module and then extract the shapefile using `unzip`. 

In [None]:
!wget -O ne_10m_rivers_lake_centerlines.zip https://www.naturalearthdata.com/http//www.naturalearthdata.com/download/10m/physical/ne_10m_rivers_lake_centerlines.zip 
!unzip -n ne_10m_rivers_lake_centerlines.zip

Let's read in the shapefile using geopandas and take a look at the first few rows. 

In [None]:
# Use geopandas to read the file we just downloaded and unzipped through wget and unzip 
rivers = gpd.read_file('ne_10m_rivers_lake_centerlines.shp')
# Look at the first few rows
rivers.head()

### US States Dataset

Next, we will do the same for the `US states` shapefile. The `US states` shapefile contains the polygons of the US states. Run the following two cells to download it from the US Census government's <a href="https://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=&cad=rja&uact=8&ved=2ahUKEwjBpayMwJn7AhV2lGoFHbfWB_cQFnoECAsQAQ&url=https%3A%2F%2Fwww.census.gov%2Fgeographies%2Fmapping-files%2Ftime-series%2Fgeo%2Fcarto-boundary-file.html&usg=AOvVaw2QKo7f-rChpkoO7zQ9E75A">data warehouse</a> and read it as a GeoPandas Dataframe.


In [None]:
!wget -O us_states.zip https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_state_20m.zip 
!unzip -n us_states.zip

In [None]:
states = gpd.read_file('us_states.zip')
states.head()

### An Important Caveat

Before we start working with multiple spatial datasets we MUST make sure they are all in the same <a href='https://en.wikipedia.org/wiki/Spatial_reference_system'>coordinate reference system (CRS)</a>.

We can pull the coordinate system information of each dataframe using the `.crs` attribute. Run the following two cells to see the coordinate systems information for our two dataframes. 


In [None]:
rivers.crs

In [None]:
states.crs

Obviously, they are using different coordinate systems (WGS84 for rivers vs. NAD83 for states). We can use GeoPandas' `.to_crs("CRS ID")` method to change the CRS of a dataframe. 

In the next cell, we change the CRS of the `states` dataframe to WGS84 by passing its CRS ID (4326) as an argument so both dataframes are at the same coordinate system. 



In [None]:
states = states.to_crs('4326')

### Mapping and Visualization

To visualize the loaded spatial datasets we use ipyleaflet package. In the next two cells, we create a data *layer*. Then we make a map using `Map` module setting a center point and an appropriate zoom level. These arguments vary according to the geographical extent of the datasets. 


In [None]:
# create an ipyleaflet layer for the rivers' dataset using GeoData module
rivers_layer = GeoData(geo_dataframe = rivers, style={'color':'blue'})
# create a map object and specify appropriate center point and zoom level according the extent of the data
mymap1 = Map(center=(40,10), zoom = 2)
# add the rivers' layer to the map to visualize
mymap1.add_layer(rivers_layer)
mymap1

In [None]:
states_layer = GeoData(geo_dataframe = states, style={'color':'red'})
mymap2 = Map(center=(40,-100), zoom = 4)
mymap2.add_layer(states_layer)
mymap2

## 2. Spatial Operation With *GeoPandas* (Not Parallelized)

Now that we have a better sense of our datasets by visualizing them, it's time to run the intersection operation using **GeoPandas**. To do this, we will first filter the two states of Minnesota and Washington using the following code.

Note that here the character `|` is the logical `OR` operator.


In [None]:
states[(states['NAME']=='Minnesota')|(states['NAME']=='Washington')]

To operate the spatial function of `intersection` we will use GeoPandas `overlay` function setting its `how=` argument to `"intersection"`. 

We record the execution time using the `time` module.

In [None]:
import time

start_time = time.time()

inter = gpd.overlay(rivers, states[(states['NAME']=='Minnesota')|(states['NAME']=='Washington')], how='intersection')

gpd_time = time.time() - start_time
print(f"Execution time for GeoPandas: {gpd_time} seconds")

## 3. Spatial Operation With *Apache Sedona* (Parallelized)


### Converting GeoPandas to Apache Sedona

To tackle this problem with Apache Sedona, first, we need to convert our input data to `spark dataframes`, something that Spark understands and can parallelize. We can perform this conversion for our GeoPandas dataframes using `createDataFrame` method provided by spark in the next cell. 

Here we also print the dataframe's schema to see what columns and data types we have. 

In [None]:
states_spdf = spark.createDataFrame(states)
states_spdf.printSchema()


Notice that at the very bottom of the schema, there is a `geometry` data type! That's what Sedona brought to us. 

Next, we use the `show` method to see a few first rows of the spark dataframe. 

In [None]:
states_spdf.show(3)

And same process for the rivers' dataset. 

In [None]:
rivers_spdf = spark.createDataFrame(rivers)
rivers_spdf.printSchema()


### Creating SQL Views (Virtual Tables)

Similar to non-spatial data, we need to create SQL Views for each of the dataframes we have. This is a required step to enable querying data in SQL and means creating virtual relations (tables) for dataframes. Below, we do this using `createOrReplaceTempView` method and create two Views named `rivers_table` and `states_table`. We will query from these two tables. 

In [None]:
rivers_spdf.createOrReplaceTempView("rivers_table")
states_spdf.createOrReplaceTempView("states_table")


### Intersection Spatial Query 

Apache Sedona uses SQL language, so we need to write a spatial query.<br>
In the following SQL query, we select state and river names along with the rivers' geometry. 

Under the `WHERE` clause, we set the conditions of *the state name being either Minnesota OR Washington* AND *the rivers intersect the polygons of these two states*. 

> ```sql
SELECT states_table.NAME, rivers_table.name as river_name, rivers_table.geometry geom
FROM states_table, rivers_table
WHERE states_table.NAME IN ('Minnesota', 'Washington') and ST_INTERSECTS(rivers_table.geometry, states_table.geometry)



Let's execute this spatial query together and record its execution time:

In [None]:
start_time = time.time()

mn_rivers = spark.sql("""
SELECT states_table.NAME, rivers_table.name as river_name, rivers_table.geometry as geom
FROM states_table, rivers_table
WHERE states_table.NAME IN ('Minnesota', 'Washington') and ST_INTERSECTS(rivers_table.geometry, states_table.geometry)
""")

sedona_time = time.time() - start_time
print(f"Execution time for Sedona: {sedona_time} seconds. \nReminder: GeoPandas did it in {gpd_time} seconds.")


### What do you think?!
<img src="supplementary/Pondering.jpg" width=300>

In [None]:
display(Markdown('<b>Did you expect this result regarding the execution times?</b>'))
o1='Yes'
o2='No'
widget1 = widgets.RadioButtons(
    options = [o1, o2],
    description = ' ', style={'description_width': 'initial'},
    value = None
)
execute = Button(
    description='Submit',
    disabled=False,
    button_style='success',
)

children1 = [widget1, execute]
vbox = VBox(children=children1)
out = Output(name='cmd_out')
def cmd(b):
    print('Submitted!')
    return out.update(Markdown(cont(gpd_time, sedona_time)))

execute.on_click(cmd)
display(vbox)



## 4. Conclusion

In [None]:
out = Output(name='cmd_out')

out.display()



### Note
Spatial queries like this one would take much longer if you do not partition them. Please note that this difference is more significant for larger datasets as the "distribution" of data between multiple worker nodes itself could be time-consuming. Hence, for small datasets we do not usually use parallel computing. In the exploration segment, we will explore this fact further. 

### [Optional] Converting Spark Dataframes Back to GeoPandas For Visualization

Now to spatially visualize our query result we need to convert the result back to `geopandas dataframe`. To do this we simply use `toPandas` method. 

In [None]:
mn_rivers_df = mn_rivers.toPandas()
result = gpd.GeoDataFrame(mn_rivers_df, geometry="geom")
result

And here is the result on a map! 

In [None]:
wa_mn = states[(states['NAME']=='Minnesota') | (states['NAME']=='Washington')]
wa_mn_layer = GeoData(geo_dataframe = wa_mn, style={'color':'red'})
gdf_layer = GeoData(geo_dataframe = result, style={'color':'blue'})
gdf_map = Map(center=(40,-100), zoom = 4)
gdf_map.add_layer(gdf_layer)
gdf_map.add_layer(wa_mn_layer)
gdf_map

Awesome! Now, click the link below to go to the exploration segment to dig in even more! 

<font size="+1"><a style="background-color:blue;color:white;padding:12px;margin:10px;font-weight:bold;" 
href="pc-exploration.ipynb">Click here to go to the next notebook.</a></font>
