### installs

In [None]:
# python3 -m pip install --upgrade pip
# python3 -m pip install numpy
# python3 -m pip install pandas
# python3 -m pip install shapely
# brew install gdal # fiona dependency
# python3 -m pip install fiona # geopandas dependency
# python3 -m pip install pyproj # geopandas dependency
# python3 -m pip install pygeos # geopandas dependency
# python3 -m pip install geopandas
# python3 -m pip install jupyter
# python3 -m pip install folium
# python3 -m pip install matplotlib
# python3 -m pip install seaborn
# python3 -m pip install findspark
# python3 -m pip install apache-sedona

### imports

In [None]:
import os
import datetime as dt
import pytz # python timezones

import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (16, 9)

from multiprocessing import cpu_count

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, DecimalType
from pyspark import SparkFiles

from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

import pandas as pd
import geopandas as gpd
from geopandas import GeoDataFrame # loading boundaries data
from shapely.geometry import Point, Polygon, shape # creating geospatial data
from shapely import wkb, wkt # creating and parsing geospatial data

In [None]:
# set max number of processes (defaults to number of physical CPUs)
num_processors = cpu_count()

### create a spark session

In [None]:
# create a spark session
spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("adsquare assignment")
         .config("spark.serializer", KryoSerializer.getName)
         .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
         .config('spark.jars.packages',
                 'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'
                 'org.datasyslab:geotools-wrapper:geotools-24.0')
         .config("spark.cores.max", num_processors)
         .getOrCreate()
        )

print("Spark {} session initialised".format(spark.version))

In [None]:
# Add Sedona functions and types to Spark
SedonaRegistrator.registerAll(spark)

In [None]:
# set Sedona spatial indexing and partitioning config in Spark session
# (no effect on the "small" spatial join query in this script. Will improve bigger queries)
spark.conf.set("sedona.global.index", "true")
spark.conf.set("sedona.global.indextype", "rtree")
spark.conf.set("sedona.join.gridtype", "kdbtree")

### abbreviations
**df** = Pandas DataFrame \
**gdf** = Geopandas GeoDataFrame \
**sdf** = Spark DataFrame

### stores

**stores_df**

In [None]:
stores_df = pd.read_csv("../../assignment_data/stores.csv")

In [None]:
type(stores_df)

In [None]:
stores_df.shape

In [None]:
stores_df.columns

In [None]:
stores_df.dtypes

In [None]:
stores_df.head()

**stores_sdf**

In [None]:
stores_sdf = stores_df.rename(columns={"wkt": "polygon_store"})

In [None]:
stores_sdf = spark.createDataFrame(stores_sdf).repartition(5)

In [None]:
type(stores_sdf)

In [None]:
stores_sdf.printSchema()

In [None]:
stores_sdf.columns

In [None]:
stores_sdf.dtypes

In [None]:
# stores_sdf.show(5)

### GPS signals

In [None]:
# read all gps_signal csv batches
signals_sdf = (spark.read.format("csv")
               .option("header", "true")
               .load("../../assignment_data/full_data/*.csv")).repartition(57)

In [None]:
# length of signals
# signals_sdf.count() # 56 572 824

In [None]:
signals_sdf.columns

In [None]:
signals_sdf.dtypes

In [None]:
signals_sdf.printSchema()

In [None]:
# sort the dataframe by utc_timestamp in ascending order
signals_sdf = signals_sdf.orderBy("utc_timestamp", ascending=True)

In [None]:
# signals_sdf.show(5)

**convert utc timestamp to local time**

In [None]:
def date_from_utc_ms_ts(utc_ms_ts) -> str:
    """Return a date (yyyy-mm-dd) from a string of utc timestamp in milliseconds (timezone = Europe/Berlin).

    :param utc_ms_ts: Unix UTC timestamp in milliseconds (int or str)
    :return: date yyyy-mm-dd (str)
    """
    # convert from time stamp to datetime
    utc_datetime = dt.datetime.utcfromtimestamp(int(utc_ms_ts) / 1000)
    # set the timezone to UTC, and then convert to desired timezone
    date = (utc_datetime
            .replace(tzinfo=pytz.timezone('UTC'))
            .astimezone(pytz.timezone('Europe/Berlin'))
            .strftime('%Y-%m-%d'))
    return date

In [None]:
utc_extractor = udf(date_from_utc_ms_ts)

In [None]:
signals_sdf = signals_sdf.withColumn("date", utc_extractor(signals_sdf["utc_timestamp"]))

In [None]:
# drop column "utc_timestamp"
signals_sdf = signals_sdf.drop("utc_timestamp")

In [None]:
# signals_sdf.show(5)

**convert lat, lon to POINT ()

In [None]:
def point_from_lon_lat(lon, lat) -> str:
    """Return a POINT (lon, lat) as a string from lon lat coordinates.

    :param lon: longitude (string)
    :param lat: latitude (string)
    :return: POINT (lon, lat) as a string type
    """
    lon = lon.strip()
    lon = float(lon)
    lon = round(lon, 7)
    lon = str(lon)
    
    lat = lat.strip()
    lat = float(lat)
    lat = round(lat, 7)
    lat = str(lat)
    
    return f"POINT ({lon} {lat})"

In [None]:
lon_lat_extractor = udf(point_from_lon_lat)

In [None]:
signals_sdf = (signals_sdf.withColumn("point_signal", 
                                      lon_lat_extractor(signals_sdf["lon"], signals_sdf["lat"])))

In [None]:
# signals_sdf.show(5)

### signals_stores_sdf

In [None]:
# stores_sdf.show(5)

In [None]:
# signals_sdf.show(5)

In [None]:
# create temporary tables for SQL queries
stores_sdf.createOrReplaceTempView("stores")
signals_sdf.createOrReplaceTempView("signals")

In [None]:
# signals_stores_sdf = spark.sql(
#     """
#     SELECT *
#     FROM 
#         signals, 
#         stores
#     WHERE ST_Intersects(ST_GeomFromText(stores.polygon_store), 
#                         ST_POINT(CAST(signals.lon AS Decimal(24,20)), CAST(signals.lat AS Decimal(24,20))))
#     """)

In [None]:
signals_stores_sdf = spark.sql(
    """
    SELECT *
    FROM 
        signals, 
        stores
    WHERE ST_Intersects(ST_GeomFromText(stores.polygon_store), 
                        ST_GeomFromText(signals.point_signal))
    """).cache()

# .cache() can save processing time when calling the same dataframe more than once

In [None]:
%time signals_stores_sdf.explain()

# HERE I STUCKED

In [None]:
%time signals_stores_sdf.show(5) 
# ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker 
# for task 186,5,main] java.lang.OutOfMemoryError: Java heap space

In [None]:
%time signals_stores_sdf.count()

In [None]:
%time signals_stores_df = signals_stores_sdf.toPandas()

In [None]:
signals_stores_df.shape

In [None]:
signals_stores_df.head()

In [None]:
signals_stores_df = signals_stores_df.rename(columns={"device_id": "store_id", 
                                                      "lat": "store_name", 
                                                      "lon": "polygon_store", 
                                                      "date": "device_id", 
                                                      "point_signal": "lat", 
                                                      "store_id": "lon", 
                                                      "store_name": "date", 
                                                      "polygon_store": "point_signal"})

In [None]:
signals_stores_df.head()

In [None]:
signals_stores_df.to_csv("../signals_stores.csv", index=False)

### close the Spark session

In [None]:
spark.stop()