# Spark Set-up

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.master("yarn").appName("reprojection")
# SparkConf().set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.0.0-alpha3")
sc = spark.getOrCreate()
sqlContext = SQLContext(sc)

# Reprojection

In [None]:
from pyspark.sql.types import *
from pyproj import Proj, transform
import shapely, pyproj
import shapely.ops

In [None]:
def update_wkt(wkt, input_epsg, output_epsg):
    """
    Input: 
        - wkt as string of the polygon we want to reproject
        - input_epsg of the polygon (i.e. 4326)
        - output_epsg to which we want to reproject (i.e. 32632 in Italy)
        
    Output: string wkt reprojected
    """
    P = shapely.wkt.loads(wkt)
        
    transformer = pyproj.Transformer.from_proj(pyproj.Proj('epsg:' + str(input_epsg)), 
                                               pyproj.Proj('epsg:' + str(output_epsg)), 
                                               always_xy=True)
    # we set always_xy=True because the coordinates are swapped in the source, to keep them in that order
    # (https://pyproj4.github.io/pyproj/stable/api/transformer.html)
        

    projected = shapely.ops.transform(transformer.transform, P)
    return projected.wkt

In [None]:
# Define UDF to update wkt projection

update_wkt_udf = f.udf(update_wkt, StringType())

### Example

In [None]:
# Example: Reproject polygons from 4324 to 32632

pols = sqlContext.read.option('header', 'True')\
                .csv('gs://gcs_path_to_file.csv.gz')

    # If it's a very big dataset we should add a .repartition(500) at the end (500 for example) to distribute 
    # the data along the workers
    

pols_projected = pols.withColumn('wkt', update_wkt_udf(f.col('wkt')), 4326, 32632)