# Urban Forest Challenge Eliiza

### This challenge solve using below steps

#### Step 1 : 
           Load the melbourne suburb Json data into data frame.
#### Step 2 :
           Extract the column of Lsa 2 suburb name, suburb code and geometry cordination.
#### Step 3 :
           Calculate the suburb area.
#### Step 4 :
           Load the data of forst in data frame.
#### Step 5 :
           Convert or shapping the forest geometry.
#### Step 6:
           Perform the intersection operation on suburb and forest data.
#### Step 7 :
           Calculate intersection part area.
#### Step 8 :
           Find the % by intersection area/suburb area.

#### Import the required packages

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf,isnan, when, count, col
from polygon_utils import *
from shapely.wkt import loads
import pyspark.sql.functions as F

#### Create Spark Context

In [2]:
def create_sc():
    """
    Create or generate spark context based on custom configuration and return spark object.
    """
    sc_conf = SparkConf() # Initilise spark config
    sc_conf.setAppName('Eliiza Urban Forest Challenge') # set the spark application name
    print(sc_conf.getAll()) # display default all the configuration

    sc = None 
    try:
        sc.stop() # stop before to create spark context 
        sc = SparkContext(conf=sc_conf) # create and assign spark context
    except:
        sc = SparkContext(conf=sc_conf) # create and assign spark context

    return sc 

scontxt = create_sc() # Call the spark create function
sqlcntxt = SQLContext(scontxt) # Create SQL context

dict_items([('spark.app.name', 'Eliiza Urban Forest Challenge')])


#### Read the Suburb Raw Json Data

In [3]:
melbInnerRaw = sqlcntxt.read.json('melb_inner_2016.json') # Read Json data and assign into dataframe
melbInnerRaw.createOrReplaceTempView("suburbraw") # Create Temporary View
allraw_data = sqlcntxt.sql("SELECT * FROM suburbraw") # retrive all suburb raw data
df_pd = allraw_data.toPandas() # convert into pandas frame for just view purpose
df_pd # display data

Unnamed: 0,areasqkm16,gcc_code16,gcc_name16,geometry,sa1_7dig16,sa1_main16,sa2_5dig16,sa2_main16,sa2_name16,sa3_code16,sa3_name16,sa4_code16,sa4_name16,ste_code16,ste_name16,type
0,0.0410,2GMEL,Greater Melbourne,"([[[[144.974079984, -37.75927600899996], [144....",2110501,20601110501,21105,206011105,Brunswick,20601,Brunswick - Coburg,206,Melbourne - Inner,2,Victoria,Feature
1,0.1237,2GMEL,Greater Melbourne,"([[[[144.968065915, -37.76141466399997], [144....",2110502,20601110502,21105,206011105,Brunswick,20601,Brunswick - Coburg,206,Melbourne - Inner,2,Victoria,Feature
2,0.0622,2GMEL,Greater Melbourne,"([[[[144.9677592060001, -37.763158584999985], ...",2110503,20601110503,21105,206011105,Brunswick,20601,Brunswick - Coburg,206,Melbourne - Inner,2,Victoria,Feature
3,0.0597,2GMEL,Greater Melbourne,"([[[[144.97196615900012, -37.76444694399992], ...",2110504,20601110504,21105,206011105,Brunswick,20601,Brunswick - Coburg,206,Melbourne - Inner,2,Victoria,Feature
4,0.0685,2GMEL,Greater Melbourne,"([[[[144.9664913050001, -37.76468318899998], [...",2110505,20601110505,21105,206011105,Brunswick,20601,Brunswick - Coburg,206,Melbourne - Inner,2,Victoria,Feature
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1229,0.1650,2GMEL,Greater Melbourne,"([[[[144.9952361500001, -37.79466029299994], [...",2114517,20607114517,21145,206071145,Yarra - North,20607,Yarra,206,Melbourne - Inner,2,Victoria,Feature
1230,0.1135,2GMEL,Greater Melbourne,"([[[[144.9966683010001, -37.79045864699998], [...",2114518,20607114518,21145,206071145,Yarra - North,20607,Yarra,206,Melbourne - Inner,2,Victoria,Feature
1231,0.1169,2GMEL,Greater Melbourne,"([[[[145.00227707300007, -37.78604730399997], ...",2114519,20607114519,21145,206071145,Yarra - North,20607,Yarra,206,Melbourne - Inner,2,Victoria,Feature
1232,0.7632,2GMEL,Greater Melbourne,"([[[[145.00271102200008, -37.79376200699994], ...",2114520,20607114520,21145,206071145,Yarra - North,20607,Yarra,206,Melbourne - Inner,2,Victoria,Feature


As per given link [http://www.abs.gov.au/websitedbs/D3310114.nsf/home/Australian+Statistical+Geography+Standard+(ASGS)], for suburb data need to extract column of Statistical Areas Level 2 (SA2s) are designed to reflect functional areas that represent a community that interacts together socially and economically. They consider Suburb and Locality boundaries to improve the geographic coding of data to these areas and in major urban areas SA2s often reflect one or more related suburbs. 

In [4]:
# Extract the Suburb data from the raw data
feature_data = melbInnerRaw.select(F.col("sa2_name16").alias("suburb_name"),\
                                   F.col("sa2_main16").alias("suburb_code"), \
                                   F.col("sa3_name16").alias("regional_level_name"),\
                                   F.col("sa3_code16").alias("regional_level_code"),\
                                   F.col("geometry.coordinates").alias("map_coordinates")) # Extract data of suburb name,
                                    # suburb code, regional name, regional code, and geometry coordination.
feature_data.show(10)

+-----------+-----------+-------------------+-------------------+--------------------+
|suburb_name|suburb_code|regional_level_name|regional_level_code|     map_coordinates|
+-----------+-----------+-------------------+-------------------+--------------------+
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.974079984...|
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.968065915...|
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.967759206...|
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.971966159...|
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.966491305...|
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.966457433...|
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.962853934...|
|  Brunswick|  206011105| Brunswick - Coburg|              20601|[[[[144.972592207...|
|  Brunswick|  206011105| Brunswick - Cobur

In [5]:
# Checking the extract data contains null value or not
feature_data.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           feature_data.columns]).toPandas().T  

Unnamed: 0,0
suburb_name,0
suburb_code,0
regional_level_name,0
regional_level_code,0
map_coordinates,0


There is no null or undefine value, so we don't require to clean the data.

#### Sururb wise Data

First need to group by the data by suburb name or suburb code and after calculate the area of that suburb.

In [6]:
# Aggregate data by suburb name and create list of mulipolugon geometry
subhurb_agg_data = feature_data.groupby('suburb_name').agg(F.collect_list(feature_data["map_coordinates"]).alias('suburb_multipolygon'),\
                                       F.first('suburb_code').alias('suburb_code')).select('suburb_name','suburb_code','suburb_multipolygon')
subhurb_agg_data.show(10) # display data

+--------------------+-----------+--------------------+
|         suburb_name|suburb_code| suburb_multipolygon|
+--------------------+-----------+--------------------+
|      Brunswick West|  206011107|[[[[[144.94775505...|
|     South Melbourne|  206051132|[[[[[144.96560629...|
|           Brunswick|  206011105|[[[[[144.97407998...|
|          Ascot Vale|  206031113|[[[[[144.89810023...|
|       St Kilda East|  206051134|[[[[[144.99882107...|
|Port Melbourne In...|  206051131|[[[[[144.94568999...|
|     Richmond (Vic.)|  206071144|[[[[[144.99160670...|
|           Parkville|  206041124|[[[[[144.94886173...|
|           Thornbury|  206021112|[[[[[145.02981194...|
|Flemington Raceco...|  206041120|[[[[[144.90494891...|
+--------------------+-----------+--------------------+
only showing top 10 rows



#### Calculate Suburb Area

In [7]:
# Call UDF to calculate suburb area, first need to merge multipolygon list and then calculate the area
# Define lambda function to call UDF to calculate suburb area
suburb_area_udf = udf(lambda suburb_multipolygons: multi_polygon_area(merge_multi_polygons(*suburb_multipolygons)), FloatType())
subhurb_agg_data = subhurb_agg_data.withColumn('suburb_area', suburb_area_udf('suburb_multipolygon')) # Create New column of suburb area
subhurb_agg_data.show(5) # display data

+---------------+-----------+--------------------+------------+
|    suburb_name|suburb_code| suburb_multipolygon| suburb_area|
+---------------+-----------+--------------------+------------+
| Brunswick West|  206011107|[[[[[144.94775505...| 3.250158E-4|
|South Melbourne|  206051132|[[[[[144.96560629...| 2.553239E-4|
|      Brunswick|  206011105|[[[[[144.97407998...|5.2584003E-4|
|     Ascot Vale|  206031113|[[[[[144.89810023...|3.9230532E-4|
|  St Kilda East|  206051134|[[[[[144.99882107...|2.4709452E-4|
+---------------+-----------+--------------------+------------+
only showing top 5 rows



### Load the Forest Data

In [8]:
# define the forest schema of Spark DataFrames
forest_schema = StructType([
    StructField('forest_area', FloatType(), False),
    StructField('forest_polygon', StringType(), False),
    ])

# Load Data from text file
df_with_schema = sqlcntxt.read.format('csv') \
      .option('header', False) \
      .option('delimiter', ' ') \
      .schema(forest_schema) \
      .load('melb_urban_forest_2016.txt/part-*')
#      .load('melb_urban_forest_2016.txt/part-00000')  
# The option melb_urban_forest_2016.txt/part-00000 is used to solve this challenge because my machine
# is not able to compute faster in docker. My machine maximum RAM is 4GB so I have used on single file for development
# I have also try with all file but it will taking too much time for transformation and action on data frame.
# I have tried multiple time with perfomance tunnig also but machine is crashing evry time.


df_with_schema.first() # Display First element

Row(forest_area=6.573422431945801, forest_polygon='POLYGON ((144.96233513562794 -37.82850434925475 144.96233130242123 -37.82850454785653 144.96232748492085 -37.82850421683983 144.9623237976764 -37.828503363594265 144.96232160123958 -37.82850250460986 144.9623222133609 -37.82848025440896 144.96235226880833 -37.828480774593196 144.96235264766736 -37.82848145793179 144.96235356040816 -37.82848441696255 144.96235381193424 -37.828487457370365 144.96235339346634 -37.82849048528118 144.9623523188634 -37.8284934099163 144.96235062012963 -37.82849614171223 144.96234834951218 -37.82849859866522 144.9623455749836 -37.82850070535189 144.96234238125348 -37.828502397452624 144.9623388662872 -37.82850362439485 144.96233513562794 -37.82850434925475))')

As per shown in this result the geometry of forest is not in format as per polygon object. First we need to reshape the polygon object.

#### Shape the Geometry data into PolyGon format

In [9]:
df_with_schema.createOrReplaceTempView('forest_raw_data') # Create a view of forest raw data

# extract forest area and substring polygon for shape
# substring(forest_polygon,9,length(forest_polygon)-1) is remove 'POLYGON ' and return only co-ordination part 
forest_new_data = sqlcntxt.sql(
    "SELECT forest_area, substring(forest_polygon,9,length(forest_polygon)-1) as cordination from forest_raw_data")


def to_shape_polygon(polygon):
    """
    To convert into polygone seq of loop into point, need to place comma after each pair.
    This function is replace every second empty spacr ' ' into ', ' and return reshape polygon string.
    """
    convert_polygon = ', '.join(' '.join(s) for s in zip(*[iter(polygon.split())]*2)) # replace every second space with ', '
    return 'POLYGON '+ convert_polygon

# Define lambda function to call UDF to re shape polygon object
forest_data_df = forest_new_data.withColumn('forest_polygon_convert', udf(to_shape_polygon, StringType())('cordination')) # create new column of reformatted geometry

forest_data_df.first() # display converted polygon record

Row(forest_area=6.573422431945801, cordination='((144.96233513562794 -37.82850434925475 144.96233130242123 -37.82850454785653 144.96232748492085 -37.82850421683983 144.9623237976764 -37.828503363594265 144.96232160123958 -37.82850250460986 144.9623222133609 -37.82848025440896 144.96235226880833 -37.828480774593196 144.96235264766736 -37.82848145793179 144.96235356040816 -37.82848441696255 144.96235381193424 -37.828487457370365 144.96235339346634 -37.82849048528118 144.9623523188634 -37.8284934099163 144.96235062012963 -37.82849614171223 144.96234834951218 -37.82849859866522 144.9623455749836 -37.82850070535189 144.96234238125348 -37.828502397452624 144.9623388662872 -37.82850362439485 144.96233513562794 -37.82850434925475))', forest_polygon_convert='POLYGON ((144.96233513562794 -37.82850434925475, 144.96233130242123 -37.82850454785653, 144.96232748492085 -37.82850421683983, 144.9623237976764 -37.828503363594265, 144.96232160123958 -37.82850250460986, 144.9623222133609 -37.8284802544089

iter(s) returns an iterator for s.

[iter(s)]*n makes a list of n times the same iterator for s.

So, when doing zip(*[iter(s)]*n), it extracts an item from all the three iterators from the list in order. Since all the iterators are the same object, it just groups the list in chunks of n.

#### Combine Suburb and Forest Data

To combine suburb and forest data first we need to calculate the bound value of both data and the find the intersect between two. If both are intersect with each other then we need to keep that record else not consider that record.

To achieve this first need to create function bound intersect as per given in polygon_utils.py 'may_intersect' with some modification, because polygon_utils.py 'may_intersect' function is work for multipolygon object but we have polygon and multipolygon object.

First we calculate bound for suburb multipolygon and forest polygon and after use the''suburb_forest_intersect' function to find if both are intersect with each other or not.

If suburb and forest are intersect with each other then add into new created data frame called 'suburb_forest_join_data'

In [10]:
def suburb_forest_intersect(polygon_a, polygon_b):
    """
    Copy this function from given polygon utils to find interaction between two bound polygon
    """
    a_min_x, a_min_y, a_max_x, a_max_y = polygon_a
    b_min_x, b_min_y, b_max_x, b_max_y = polygon_b

    return a_min_y <= b_max_y and \
           a_max_x >= b_min_x and \
           a_max_y >= b_min_y and \
           a_min_x <= b_max_x

def suburb_forest_combine(bound_a, bound_b):
    """
    Find the intersect bound between suburb and forest
    """
    for a in bound_a:
        if suburb_forest_intersect(a, bound_b):
            return True

    return False

def suburb_multipolygon_boundries(suburb_multipolygons):
    """
    Calculate the suburb multipolygon list bound value and return a list of bound
    """
    boundries = []
    for multipolygones in suburb_multipolygons: # Iterate over the multipolygon list
        shape_multipolygone = to_shape(multipolygones) # Shape the multipolygon
        shape_bound = shape_multipolygone.bounds # calculate bounds value
        boundries.append(shape_bound) # append into the list
    return boundries

# Suburb bound Calculation

# Define lambda function to call UDF to calculate bound of suburb multipolygon
boundries_udf = udf(suburb_multipolygon_boundries, ArrayType(ArrayType(FloatType()))) # define UDF for bound calculation
subhurb_agg_data = subhurb_agg_data.withColumn('suburb_boundries', boundries_udf('suburb_multipolygon')) # create a new column

# Forest bound calcuation

# Define lambda function to call UDF to calculate bound of forest polygon
# loads method Returns a geometric object from a WKT representation wkt.
forest_bound_calculation = udf(lambda forest_polygon: loads(forest_polygon).bounds, ArrayType(FloatType())) # define lamda function to calculate bound
forest_data_df = forest_data_df.withColumn('forest_boundries', forest_bound_calculation('forest_polygon_convert')) # create a new column

# Perform Partition
subhurb_area_df_p = subhurb_agg_data.repartition(1)
forest_data_df_p = forest_data_df.repartition(15)

# perfrom the cross join over the intersection value is true and combine intersect data into single data frame
suburb_forest_join_data = subhurb_area_df_p.crossJoin(forest_data_df).where(udf(suburb_forest_combine, BooleanType())\
                        (subhurb_area_df_p.suburb_boundries, forest_data_df.forest_boundries))

#### Calculate the Intersect Area

In [11]:
# Group by the joined data based on suburb code
suburb_forest_data = suburb_forest_join_data.groupby('suburb_code').agg(F.collect_list(suburb_forest_join_data["forest_polygon_convert"]).alias('forest_multipolygon'),\
                                           F.first('suburb_name').alias('suburb_name'),\
                                           F.first('suburb_area').alias('suburb_area'),\
                                           F.first('suburb_multipolygon').alias('suburb_multipolygon'),\
                                           ).select('suburb_name','suburb_area','suburb_code','suburb_multipolygon','forest_multipolygon')

suburb_forest_data.repartition(10) # perform partition

DataFrame[suburb_name: string, suburb_area: float, suburb_code: string, suburb_multipolygon: array<array<array<array<array<double>>>>>, forest_multipolygon: array<string>]

In [12]:
# Merge the suburb multipolygon and format it.

# Define lambda function to call UDF to merge multi polygon of suburb
merge_suburb_polygon = udf(lambda multipolygons: merge_multi_polygons(*multipolygons),ArrayType(ArrayType(ArrayType(FloatType()))))

# Create new column of merge suburb multipolygon
suburb_forest_data = suburb_forest_data.withColumn('subhurb_multipolygon_merge', merge_suburb_polygon('suburb_multipolygon'))

In [13]:
# Forest polygon is not in format of multipolygon so first we create list of polygon and then call function merge
# multipoly gone it's reshape polygon to multipolygon.

def generate_list(forest_in):
    """
    This function is return the list of polygon which is seperated by comma
    """
    forest_geo=[]
    for i in forest_in:
        forest_geo.append(loads(i)) # append the each ploygon object into list
    return forest_geo

# merge forest polygon and create multipolygon list from polygon list
# Define lambda function to call UDF to merge polygon of forest
merge_forest_polygon = udf(lambda forest_multipolygon: merge_multi_polygons(*generate_list(forest_multipolygon)),ArrayType(ArrayType(ArrayType(FloatType()))))

# create new column forest multipolygon
suburb_forest_data = suburb_forest_data.withColumn('forest_multipolygon_merge', merge_forest_polygon('forest_multipolygon'))

In [14]:
# Calculate intersect area between merge suburb multipolygon and forest multipolygon

# Once we ready with two merge multipolygon then need to find area of that both intersect 
intersect_area_udf = udf(lambda merged_suburb,merge_forest: intersection_area(merged_suburb,merge_forest), FloatType())

# Create a new column for intersect area
suburb_forest_data = suburb_forest_data.withColumn('intersect_area', intersect_area_udf('subhurb_multipolygon_merge','forest_multipolygon_merge'))

In [15]:
# To find the greenest area need to calculate %

# define lambda function to calculate % 
percentage = udf(lambda a,b: (a/b)*100, FloatType())

#create new column called Percentage
suburb_forest_data = suburb_forest_data.withColumn('Percentage', percentage('intersect_area','suburb_area'))

# Drop uneccessary Column from the data frame
suburb_forest_data = suburb_forest_data.drop('suburb_multipolygon').drop('suburb_area').\
                     drop('forest_multipolygon').drop('subhurb_multipolygon_merge').\
                     drop('forest_multipolygon_merge').drop('intersect_area')


In [16]:
# Display the Greenest Suburb
green_suburb_data=suburb_forest_data.createOrReplaceTempView("green_forest_data") # create a temporary view of suburb forest data

green_suburb =  sqlcntxt.sql(
 "SELECT  * FROM green_forest_data order by Percentage desc"
)

#green_suburb.show(10) # Display top 10 greenest suburb by Desc order

Due to my machine crash on large data computation, I am unable to execute last result. For this reason i have used melb_urban_forest_2016.txt/part-00000 to solve this challenge because my machine is not able to compute faster in docker. My machine maximum RAM is 4GB so I have used on single file for development I have also try with all file but it will taking too much time for transformation and action on data frame. I have tried multiple time with various perfomance tuning options but machine is crashing every time.

All the functions mentioned below are more or less same functionally, but there very minor differences among them.

createOrReplaceTempView

createTempView -- TempTableAlreadyExistsException, if the view name already exists in the catalog.

registerTempTable

The function registerTempTable is added from 1.3 version of Spark whereas the other two are added in the version 2.0. So registerTempTable is deprecated from Spark 2.0 onwards.

The life span of the view created by registerTempTable is limited to the SqlContext that was used to create the dataframe, whereas the life time of the view created by createOrReplaceTempView will be tied to SparkSession that was used to create the dataframe.