# PYSPARK integration

## 1. Import libraries

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
import sys
import os

os.environ['PYSPARK_PYTHON']

'/home/lukasz/miniconda3/envs/geonurse/bin/python'

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('pyspark-geonurse').master('local[*]').getOrCreate()
sc = spark.sparkContext

In [5]:
spark

In [6]:
import geopandas as gpd
import geonurse
import geonurse.tools.conversion

In [7]:
os.getcwd()

'/home/lukasz/Projects/geonurse/notebooks'

In [8]:
data = '../tests/data/conversion/shp/naturalearth_admin_boundary_lines.shp'

gdf = gpd.read_file(data)
gdf.head()

Unnamed: 0,featurecla,name,comment,adm0_usa,adm0_left,adm0_right,adm0_a3_l,adm0_a3_r,sov_a3_l,sov_a3_r,type,labelrank,scalerank,min_zoom,min_label,note,adm0_abr_l,adm0_abr_r,geometry
0,Indefinite (please verify),,,1,Canada,United States of America,CAN,USA,Wat,US1,Water Indicator,2,1,2.0,7.6,,Can.,U.S.A.,LINESTRING (-124.7588659269999 48.494017843000...
1,International boundary (verify),,,1,Sweden,Norway,SWE,NOR,SWE,NOR,Water Indicator,2,1,0.0,7.6,,Swe.,Nor.,LINESTRING (11.4375106135067 58.99172086270566...
2,International boundary (verify),,,1,Denmark,Germany,DNK,DEU,DN1,DEU,Water Indicator,5,1,0.0,8.0,,Den.,Ger.,(LINESTRING (8.394091838000094 55.096328024000...
3,International boundary (verify),,,1,Singapore,Malaysia,SGP,MYS,SGP,MYS,Water Indicator,3,1,0.0,9.0,,Sing.,Malay.,"LINESTRING (104.067683554 1.277390849000057, 1..."
4,International boundary (verify),,,1,Uruguay,Argentina,URY,ARG,URY,ARG,Water Indicator,2,1,0.0,8.0,,Ury.,Arg.,LINESTRING (-58.20011185221784 -32.44712991235...


## 2. READ_FILE and create GeoRDD using geonurse (fiona under the hood)

In [9]:
geoRdd = geonurse.read_file(spark, data)

In [10]:
geoRdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [11]:
type(geoRdd)

geonurse.base.GeoRDD

In [12]:
geoRdd.take(1)

[{'type': 'Feature',
  'id': '0',
  'properties': OrderedDict([('featurecla', 'Indefinite (please verify)'),
               ('name', None),
               ('comment', None),
               ('adm0_usa', 1),
               ('adm0_left', 'Canada'),
               ('adm0_right', 'United States of America'),
               ('adm0_a3_l', 'CAN'),
               ('adm0_a3_r', 'USA'),
               ('sov_a3_l', 'Wat'),
               ('sov_a3_r', 'US1'),
               ('type', 'Water Indicator'),
               ('labelrank', 2),
               ('scalerank', 1),
               ('min_zoom', 2.0),
               ('min_label', 7.6),
               ('note', None),
               ('adm0_abr_l', 'Can.'),
               ('adm0_abr_r', 'U.S.A.')]),
  'geometry': {'type': 'LineString',
   'coordinates': [(-124.75886592699995, 48.49401784300004),
    (-124.58285599799993, 48.44391754200002),
    (-124.31713578399993, 48.36828908300008),
    (-124.12464107299994, 48.313537903000096),
    (-124.0051134849

### 2.1. Get all geometries as GeoJson

In [13]:
geojson_geometries = geoRdd.geometries('geoJson')

In [14]:
geojson_geometries.take(1)

[{'type': 'LineString',
  'coordinates': [(-124.75886592699995, 48.49401784300004),
   (-124.58285599799993, 48.44391754200002),
   (-124.31713578399993, 48.36828908300008),
   (-124.12464107299994, 48.313537903000096),
   (-124.00511348499995, 48.27958648700006),
   (-123.85395992099988, 48.255582784000055),
   (-123.70668208899987, 48.23212168400006),
   (-123.58508744399984, 48.212717184),
   (-123.52764908899992, 48.216231181000055),
   (-123.41685481799992, 48.238193665000026),
   (-123.27296179299992, 48.26671905600007),
   (-123.23180151399994, 48.28917246500008),
   (-123.17322627799993, 48.35482737200003),
   (-123.11932775899989, 48.415366313000064),
   (-123.17782548099989, 48.46652598100006),
   (-123.21257788099992, 48.53347279900005),
   (-123.26012019999993, 48.68261098200007),
   (-123.1204646409999, 48.72312530500007),
   (-123.00520035899987, 48.75671498700011),
   (-123.0031333009999, 48.78544708300002),
   (-123.00520035899987, 48.81500600200006),
   (-123.100284994

### 2.2. Get all geometries as SHAPELY objects

In [15]:
shapely_geometries = geoRdd.geometries('shapely')

In [16]:
shapely_geometries.take(10)

[<shapely.geometry.linestring.LineString at 0x7f1a24cb6438>,
 <shapely.geometry.linestring.LineString at 0x7f1a24cb6588>,
 <shapely.geometry.multilinestring.MultiLineString at 0x7f1a24cb6f60>,
 <shapely.geometry.linestring.LineString at 0x7f1a24cb6940>,
 <shapely.geometry.linestring.LineString at 0x7f1a24cb6c18>,
 <shapely.geometry.multilinestring.MultiLineString at 0x7f1a24cb66a0>,
 <shapely.geometry.linestring.LineString at 0x7f1a24cb6b00>,
 <shapely.geometry.linestring.LineString at 0x7f1a24cb6550>,
 <shapely.geometry.linestring.LineString at 0x7f1a24cb6898>,
 <shapely.geometry.multilinestring.MultiLineString at 0x7f1a24cb6518>]

### 2.3. Get all geometries as WKT String

In [17]:
wkt_geometries = geoRdd.geometries('wkt')

In [18]:
wkt_geometries.take(2)

['LINESTRING (-124.7588659269999 48.49401784300004, -124.5828559979999 48.44391754200002, -124.3171357839999 48.36828908300008, -124.1246410729999 48.3135379030001, -124.005113485 48.27958648700006, -123.8539599209999 48.25558278400005, -123.7066820889999 48.23212168400006, -123.5850874439998 48.212717184, -123.5276490889999 48.21623118100005, -123.4168548179999 48.23819366500003, -123.2729617929999 48.26671905600007, -123.2318015139999 48.28917246500008, -123.1732262779999 48.35482737200003, -123.1193277589999 48.41536631300006, -123.1778254809999 48.46652598100006, -123.2125778809999 48.53347279900005, -123.2601201999999 48.68261098200007, -123.1204646409999 48.72312530500007, -123.0052003589999 48.75671498700011, -123.0031333009999 48.78544708300002, -123.0052003589999 48.81500600200006, -123.1002849949999 48.86988637300006, -123.2136630869999 48.93536041300005, -123.3126234539999 48.99251454700008)',
 'LINESTRING (11.4375106135067 58.99172086270566, 11.40093672600008 59.02590728800

### 2.4. Get all properties

In [19]:
raw_properties = geoRdd._properties

In [20]:
raw_properties.take(1)

[{'featurecla': 'Indefinite (please verify)',
  'name': None,
  'comment': None,
  'adm0_usa': 1,
  'adm0_left': 'Canada',
  'adm0_right': 'United States of America',
  'adm0_a3_l': 'CAN',
  'adm0_a3_r': 'USA',
  'sov_a3_l': 'Wat',
  'sov_a3_r': 'US1',
  'type': 'Water Indicator',
  'labelrank': 2,
  'scalerank': 1,
  'min_zoom': 2.0,
  'min_label': 7.6,
  'note': None,
  'adm0_abr_l': 'Can.',
  'adm0_abr_r': 'U.S.A.'}]

In [21]:
properties_rdd = geoRdd.properties()
properties_rdd

PythonRDD[6] at RDD at PythonRDD.scala:53

In [22]:
properties_rdd.take(1)

[{'featurecla': 'Indefinite (please verify)',
  'name': 'No Data',
  'comment': 'No Data',
  'adm0_usa': 1,
  'adm0_left': 'Canada',
  'adm0_right': 'United States of America',
  'adm0_a3_l': 'CAN',
  'adm0_a3_r': 'USA',
  'sov_a3_l': 'Wat',
  'sov_a3_r': 'US1',
  'type': 'Water Indicator',
  'labelrank': 2,
  'scalerank': 1,
  'min_zoom': 2.0,
  'min_label': 7.6,
  'note': 'No Data',
  'adm0_abr_l': 'Can.',
  'adm0_abr_r': 'U.S.A.'}]

## 3. Convert GeoRDD to DF

### 3.1. Properties DF

In [23]:
properties_df = geoRdd._property_df



In [24]:
properties_df.printSchema()

root
 |-- adm0_a3_l: string (nullable = true)
 |-- adm0_a3_r: string (nullable = true)
 |-- adm0_abr_l: string (nullable = true)
 |-- adm0_abr_r: string (nullable = true)
 |-- adm0_left: string (nullable = true)
 |-- adm0_right: string (nullable = true)
 |-- adm0_usa: long (nullable = true)
 |-- comment: string (nullable = true)
 |-- featurecla: string (nullable = true)
 |-- labelrank: long (nullable = true)
 |-- min_label: double (nullable = true)
 |-- min_zoom: double (nullable = true)
 |-- name: string (nullable = true)
 |-- note: string (nullable = true)
 |-- scalerank: long (nullable = true)
 |-- sov_a3_l: string (nullable = true)
 |-- sov_a3_r: string (nullable = true)
 |-- type: string (nullable = true)



In [25]:
properties_df.show(2)

+---------+---------+----------+----------+---------+--------------------+--------+-------+--------------------+---------+---------+--------+-------+-------+---------+--------+--------+---------------+
|adm0_a3_l|adm0_a3_r|adm0_abr_l|adm0_abr_r|adm0_left|          adm0_right|adm0_usa|comment|          featurecla|labelrank|min_label|min_zoom|   name|   note|scalerank|sov_a3_l|sov_a3_r|           type|
+---------+---------+----------+----------+---------+--------------------+--------+-------+--------------------+---------+---------+--------+-------+-------+---------+--------+--------+---------------+
|      CAN|      USA|      Can.|    U.S.A.|   Canada|United States of ...|       1|No Data|Indefinite (pleas...|        2|      7.6|     2.0|No Data|No Data|        1|     Wat|     US1|Water Indicator|
|      SWE|      NOR|      Swe.|      Nor.|   Sweden|              Norway|       1|No Data|International bou...|        2|      7.6|     0.0|No Data|No Data|        1|     SWE|     NOR|Water I

### 3.2. Geometries DF

In [26]:
geometries_df = geoRdd._geometry_df

In [27]:
geometries_df.show(4)

+--------------------+
|            geometry|
+--------------------+
|LINESTRING (-124....|
|LINESTRING (11.43...|
|MULTILINESTRING (...|
|LINESTRING (104.0...|
+--------------------+
only showing top 4 rows



### 3.3. Join geometries and properties DF's

In [28]:
geoDf = geoRdd.toGeoDF()

In [29]:
geoDf.printSchema()

root
 |-- adm0_a3_l: string (nullable = true)
 |-- adm0_a3_r: string (nullable = true)
 |-- adm0_abr_l: string (nullable = true)
 |-- adm0_abr_r: string (nullable = true)
 |-- adm0_left: string (nullable = true)
 |-- adm0_right: string (nullable = true)
 |-- adm0_usa: long (nullable = true)
 |-- comment: string (nullable = true)
 |-- featurecla: string (nullable = true)
 |-- labelrank: long (nullable = true)
 |-- min_label: double (nullable = true)
 |-- min_zoom: double (nullable = true)
 |-- name: string (nullable = true)
 |-- note: string (nullable = true)
 |-- scalerank: long (nullable = true)
 |-- sov_a3_l: string (nullable = true)
 |-- sov_a3_r: string (nullable = true)
 |-- type: string (nullable = true)
 |-- id: long (nullable = false)
 |-- geometry: string (nullable = true)



In [30]:
geoDf.show(4)

+---------+---------+----------+----------+---------+--------------------+--------+-------+--------------------+---------+---------+--------+-------+-------+---------+--------+--------+---------------+---+--------------------+
|adm0_a3_l|adm0_a3_r|adm0_abr_l|adm0_abr_r|adm0_left|          adm0_right|adm0_usa|comment|          featurecla|labelrank|min_label|min_zoom|   name|   note|scalerank|sov_a3_l|sov_a3_r|           type| id|            geometry|
+---------+---------+----------+----------+---------+--------------------+--------+-------+--------------------+---------+---------+--------+-------+-------+---------+--------+--------+---------------+---+--------------------+
|      CAN|      USA|      Can.|    U.S.A.|   Canada|United States of ...|       1|No Data|Indefinite (pleas...|        2|      7.6|     2.0|No Data|No Data|        1|     Wat|     US1|Water Indicator|  0|LINESTRING (-124....|
|      SWE|      NOR|      Swe.|      Nor.|   Sweden|              Norway|       1|No Data|I

In [31]:
geoDf.select('id', 'name', 'geometry').show(5)

+---+-------+--------------------+
| id|   name|            geometry|
+---+-------+--------------------+
|  0|No Data|LINESTRING (-124....|
|  1|No Data|LINESTRING (11.43...|
|  2|No Data|MULTILINESTRING (...|
|  3|No Data|LINESTRING (104.0...|
|  4|No Data|LINESTRING (-58.2...|
+---+-------+--------------------+
only showing top 5 rows

