In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window


In [None]:
spark = SparkSession \
        .builder.master('local[8]') \
        .appName('RadiusOfGyration') \
        .config('spark.sql.shuffle.partitions', 30) \
        .config('spark.driver.memory', '20g') \
        .config('spark.executor.memory', '20g') \
        .getOrCreate()

In [None]:
path_data = '/path/to/gps_data'
gps_data = spark.read.parquet(path_data)

In [None]:
def distance(lat_p1, lon_p1, lat_p2, lon_p2):
    '''
    Calculates the great-circle distance (in km) between two 
    GPS points p1 and p2
    https://en.wikipedia.org/wiki/Great-circle_distance#Formulae
    -------------------------------------
    :param lat_p1: latitude of origin point
    :param lon_p1: longitude of origin point
    :param lat_p1: latitude of destination point
    :param lon_p1: longitude of destination point
    :returns: distance in km
    '''
    return F.acos(
        F.sin(F.toRadians(lat_p1)) * F.sin(F.toRadians(lat_p2)) + 
        F.cos(F.toRadians(lat_p1)) * F.cos(F.toRadians(lat_p2)) * 
            F.cos(F.toRadians(lon_p1) - F.toRadians(lon_p2))
    ) * F.lit(6371.0)

In [None]:
w = Window().partitionBy('userId')

radius_df = (gps_data
             # number of visits per stop
             .groupby('userId', 'locationId').agg(F.count(F.lit(1)).alias('n_i'), 
                                                  F.first('locationLongitude').alias('locationLongitude'),
                                                  F.first('locationLatitude').alias('locationLatitude'))
             #compute center of mass (lat/lon) per user
             .withColumn('center_lon', F.avg(F.col('locationLongitude')).over(w))
             .withColumn('center_lat', F.avg(F.col('locationLatitude')).over(w))
             # compute total visits
             .withColumn('N', F.sum(F.col('n_i')).over(w))    
             # compute (r_i - r_cm)
             .withColumn('distance', distance(F.col('locationLatitude'), F.col('locationLongitude'), F.col('center_lat'), F.col('center_lon')))
             # compute n_i(r_i - r_cm)^2 / N
             .withColumn('distance2', F.col('n_i') * (F.col('distance') * F.col('distance')) / F.col('N'))
             # compute sum(n_i(r_i - r_cm)^2)
             .groupBy('userId').agg(F.sum(F.col('distance2')).alias('sum_dist2'))
             # square root
             .withColumn('radius_gyr', F.sqrt(F.col('sum_dist2')))
             .select('userId','radius_gyr')
            )
radius_df.show()