In [1]:
import pyspark
import os
import datetime
import uuid
import math
import operator
import pygeohash as pgh
import boto3
import re
import calendar

from datetime import (datetime, date, time, timedelta)
from pyspark.sql.window import Window
from pyspark.sql.functions import (when, udf, col, lit, lead, lag, row_number)
from pyspark.sql import (SparkSession, DataFrameReader)
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
#from pyspark.sql.functions import col,array_contains
from pyspark.sql import functions as py
from sys import exit
from tabulate import tabulate



In [2]:
mpe = "2800M"
pyspark_submit_args = ' --executor-memory ' + mpe + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

master_location = "spark://172.25.0.09:7077"
confg = pyspark.SparkConf()
confg.setMaster(master_location)
context = pyspark.SparkContext(conf=confg)        
sqlContext = SQLContext(context)
session = SparkSession.builder.appName('clean_and_cluster').getOrCreate()
dataFrameReader = session.read

In [3]:
def distance(lat1, lon1, lat2, lon2, unit='miles'):
    '''
    Measure simple haversine distance between two points. Default unit = miles.
    '''    
    units = {'miles':3963.19,'kilometers':6378.137,'meters':6378137,'feet':20902464}    

    phi_1 = py.radians(lat1)
    phi_2 = py.radians(lat2)
    delta_phi = py.radians(lat2 - lat1)
    delta_lambda = py.radians(lon2-lon1)

    area = py.sin(delta_phi/2.0) ** 2 \
    + py.cos(phi_1) * py.cos(phi_2) * \
    py.sin(delta_lambda / 2.0) ** 2

    central_angle = 2 * py.asin((area ** 0.5))
    radius = units[unit.lower()]

    return py.abs(py.round((central_angle * radius),4))

In [4]:
def distance_dr(lat1, lon1, lat2, lon2, unit='miles'):
    units = {'miles':3963.19,'kilometers':6378.137,'meters':6378137,'feet':20902464}


    phi_1 = math.radians(lat1)
    phi_2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2-lon1)

    area = math.sin(delta_phi/2.0) ** 2 \
    + math.cos(phi_1) * math.cos(phi_2) * \
    math.sin(delta_lambda / 2.0) ** 2

    central_angle = 2 * math.asin((area ** 0.5))
    radius = units[unit.lower()]

    return abs(round((central_angle * radius),4))  

In [5]:
def rate_of_speed(distance, seconds, time_unit='hour'):
    '''
    Returns the rate of speed based on input distance and seconds
    '''
    time = {'second':1, 'minute':60, 'hour':3600, 'day':86400, 'year':31536000}
    return py.abs(distance / (seconds / time[time_unit]))

In [6]:
def s3_dates(object='daily-feed'):
    '''
    Returns a list of GPS dates present in S3
    Options include: daily-feed, micro-clusters, home, work
    '''
    
    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects')
    gps_bucket = 'b6-8f-fc-09-0f-db-50-3f-gpsdata'
    
    if object == 'daily-feed':
        gps_prefix = 'cuebiq/daily-feed/US/'  
        foo_bar = 123456789
    elif object == 'micro-clusters':
        gps_prefix = 'cuebiq/processed-data/US/micro-clusters/'
    elif object == 'home':
        gps_prefix = 'cuebiq/processed-data/US/home/20'
    elif object == 'work':
        gps_prefix = 'cuebiq/processed-data/US/work/20'
    else:
        return

    dates = set()
    for result in paginator.paginate(Bucket=gps_bucket, Prefix=gps_prefix, Delimiter='/'):
        for prefix in result.get('CommonPrefixes'):
            dates.add(datetime.strptime((re.search("(\d{6,})", prefix.get('Prefix')).group() + '01')[0:8], '%Y%m%d'))
    return sorted(list(dates))

In [7]:
df_days = s3_dates('daily-feed')
micro_days = s3_dates('micro-clusters')

study_dts = sorted(list(set(df_days) - set(micro_days) - set([max(df_days)])))
study_dts

[datetime.datetime(2020, 8, 9, 0, 0), datetime.datetime(2020, 8, 10, 0, 0)]

In [8]:
#Use fake date values for dev

#study_dts = study_dates()

study_dts = [datetime.strptime('2020-07-30', '%Y-%m-%d'),datetime.strptime('2020-07-29', '%Y-%m-%d'), datetime.strptime('2020-08-01', '%Y-%m-%d')]

#home_months = [datetime.strptime('2020-07-01', '%Y-%m-%d')]
#work_months = [datetime.strptime('2020-07-01', '%Y-%m-%d')

study_dts

[datetime.datetime(2020, 7, 30, 0, 0),
 datetime.datetime(2020, 7, 29, 0, 0),
 datetime.datetime(2020, 8, 1, 0, 0)]

In [9]:
# Create universe of all dates to pull from S3 (rolling three days per study date)
dt_universe = set()
for dt in study_dts:
    dt_universe.add(dt + timedelta(days=-1))
    dt_universe.add(dt)
    dt_universe.add(dt + timedelta(days=1))
    
dt_universe
len(set(dt_universe) - set(df_days))

0

In [10]:
# Exit script if a date that needs to be pulled is unaccounted for in daily-feed
if len(set(dt_universe) - set(df_days)) > 0:
    exit()


In [11]:
# Apply daily time offset (in minutes)
offset = 240
for index,dt in enumerate(study_dts):
    study_dts[index] = dt + timedelta(minutes=offset)

unix_dts = []
for dt in study_dts:
    unix_dts.append(calendar.timegm(dt.timetuple()))
unix_dts


[1596081600, 1595995200, 1596254400]

In [12]:
# Converted universe dates into S3 object paths

#base_path = 's3://b6-8f-fc-09-0f-db-50-3f-gpsdata/cuebiq/daily-feed/US/'
base_path = "/opt/spark/sample_data/daily-feed/US/"
dt_paths = []
for dt in dt_universe:
    dt_paths.append(base_path + str(dt).replace('-','')[0:8] + '*/*.csv.gz')

dt_paths

['/opt/spark/sample_data/daily-feed/US/20200731*/*.csv.gz',
 '/opt/spark/sample_data/daily-feed/US/20200728*/*.csv.gz',
 '/opt/spark/sample_data/daily-feed/US/20200802*/*.csv.gz',
 '/opt/spark/sample_data/daily-feed/US/20200729*/*.csv.gz',
 '/opt/spark/sample_data/daily-feed/US/20200730*/*.csv.gz',
 '/opt/spark/sample_data/daily-feed/US/20200801*/*.csv.gz']

In [13]:
schema = StructType([StructField("utc_timestamp",IntegerType(),True),
                        StructField("device_id",StringType(),True),
                        StructField("os",IntegerType(),True),
                        StructField("latitude",FloatType(),True),
                        StructField("longitude",FloatType(),True),
                        StructField("accuracy",IntegerType(),True),
                        StructField("tz_offset",IntegerType(),True)])

def read_csv(schema, paths):   
    df = sqlContext.read \
        .option("sep", '\t') \
        .option("inferSchema", False) \
        .option("header", False) \
        .schema(schema) \
        .csv(paths.split(','))
    return df

In [14]:
print("Running the following dates:")
for dt in sorted(study_dts):
    print("   " + dt.strftime('%x'))


# Populate dataframe by sending in comma-separated file paths
df = read_csv(schema, ",".join(dt_paths))
init_cnt = df.cache().count()

# Drop records with null values in critical fields
df = df.na.drop(subset=['latitude','longitude','utc_timestamp','tz_offset','accuracy'])
null_cnt = df.cache().count()

# Drop duplicate records based on device_id and timestamp
df = df.dropDuplicates(['utc_timestamp','device_id'])
dupe_cnt = df.cache().count()
                   
# Remove records falling outside safe horizontal accuracy thresholds
df = df.filter((df['accuracy'] >= 5) & (df['accuracy'] <= 65))
acc_cnt = df.cache().count()

# Remove records falling outside of a bounding rectanlge of the US (not just contintental)
df = df.filter((df['latitude'] >= 18) & (df['latitude'] < 72) & ((df['longitude'] > 171) | (df['longitude'] < -63)))
coord_cnt = df.cache().count()

# Remove records falling outside of the study range scope
schema = StructType([StructField("utc_timestamp",IntegerType(),True),
                        StructField("device_id",StringType(),True),
                        StructField("os",IntegerType(),True),
                        StructField("latitude",FloatType(),True),
                        StructField("longitude",FloatType(),True),
                        StructField("accuracy",IntegerType(),True),
                        StructField("tz_offset",IntegerType(),True),
                        StructField("study_dt", StringType(),True)])   

fnl_df = sqlContext.createDataFrame(context.emptyRDD(),schema) 

for dt in unix_dts:
    fnl_df = fnl_df.union(df.filter((df['utc_timestamp'] + df['tz_offset']).between(dt, dt + 86399)) \
                          .withColumn("study_dt",py.to_date(py.from_unixtime(lit(dt)))))

tm_cnt = fnl_df.cache().count()
fnl_df = fnl_df.repartition(8, 'device_id')

tbl_data = [['Starting count', init_cnt, 0, 0, 'Count of pings before filtering'], \
           ['Null values', null_cnt, init_cnt - null_cnt, ((init_cnt - null_cnt) / float(init_cnt)) * 100, 'Empty values among latitude, longitude, accuracy, timestamp, tz offset'], \
           ['Duplicates', dupe_cnt, null_cnt - dupe_cnt, ((null_cnt - dupe_cnt) / float(init_cnt)) * 100, 'Records with duplicate device_id and utc_timestamps'], \
           ['Accuracy', acc_cnt, dupe_cnt - acc_cnt, ((dupe_cnt - acc_cnt) / float(init_cnt)) * 100, 'Horizontal accuracy values exceeding safe thresholds (outside of 5 - 65)'], \
           ['Coordinates', coord_cnt, acc_cnt - coord_cnt, ((acc_cnt - coord_cnt) / float(init_cnt)) * 100, 'Pings occurring outside a general rectangle covering the US (including AK, HI)'], \
           ['Study date(s)', tm_cnt, coord_cnt - tm_cnt, ((coord_cnt - tm_cnt) / float(init_cnt)) * 100, 'Pings occuring outside the study date range when tz_offset is applied. Process brings in preceding and following dates from GPS provider to capture straggling pings'], \
           ['Final count', tm_cnt, init_cnt - tm_cnt, ((init_cnt - tm_cnt) / float(init_cnt)) * 100, 'Final pings after junk filtering process']]

print(tabulate(tbl_data, floatfmt=".2f", headers=['Phase', 'Remaining Pings', 'Removed Pings', 'Percent Reduction', 'Description']))


Running the following dates:
   07/29/20
   07/30/20
   08/01/20
Phase             Remaining Pings    Removed Pings    Percent Reduction  Description
--------------  -----------------  ---------------  -------------------  --------------------------------------------------------------------------------------------------------------------------------------------------------------------
Starting count            4072669                0                 0.00  Count of pings before filtering
Null values               4072669                0                 0.00  Empty values among latitude, longitude, accuracy, timestamp, tz offset
Duplicates                3819596           253073                 6.21  Records with duplicate device_id and utc_timestamps
Accuracy                  2562796          1256800                30.86  Horizontal accuracy values exceeding safe thresholds (outside of 5 - 65)
Coordinates               2562647              149                 0.00  Pings occurring out

In [15]:
df = fnl_df

In [16]:
init_cnt = df.cache().count()
print("Starting ping count = {:,}".format(init_cnt))
w = Window().partitionBy('device_id', 'study_dt').orderBy('utc_timestamp')

df = df.withColumn('dist_to',distance(df['latitude'], df['longitude'], lead(df['latitude'],1).over(w), \
                    lead(df['longitude'],1).over(w))) \
    .withColumn('sec_to', (lead(df['utc_timestamp'], 1).over(w) - df['utc_timestamp'])) \
    .withColumn('speed_to', rate_of_speed(col('dist_to'), col('sec_to'),'hour')) \
    .withColumn('dist_from', lag(col('dist_to'), 1).over(w)) \
    .withColumn('sec_from', lag(col('sec_to'), 1).over(w)) \
    .withColumn('speed_from', lag(col('speed_to'), 1).over(w)) \
    .withColumn('speed_flag', when(((col('dist_to').isNull()) | (col('dist_from').isNull())) \
                 | ((((col('speed_from') + col('speed_to')) / 2) <= 90) | ((col('dist_to') >= 150) | (col('dist_from') >= 150))) \
                 & ((col('speed_from') < 600) & (col('speed_to') < 600)) \
                 & ((col('speed_from') < 30) | (col('speed_to') < 30)), 1).otherwise(0))

speed_df = df.filter(df['speed_flag'] == 0) \
                .withColumn('avg_speed', (df['speed_from'] + df['speed_to']) / 2)

df = df.filter(df['speed_flag'] == 1) \
        .drop('dist_to', 'dist_from', 'sec_to', 'speed_to', 'sec_from', 'speed_from', 'speed_flag')

min_speed = speed_df.agg(py.min(col('avg_speed'))).collect()[0][0]
max_speed = speed_df.agg(py.max(col('avg_speed'))).collect()[0][0]

sum_df = speed_df.withColumn('speed_bucket', when(col('avg_speed') < 50, str(min_speed) + " - 49.9") \
                            .when((col('avg_speed') >= 50) & (col('avg_speed') < 60), "50 - 59.9") \
                            .when((col('avg_speed') >= 60) & (col('avg_speed') < 70), "60 - 69.9") \
                             .when((col('avg_speed') >= 70) & (col('avg_speed') < 80), "70 - 79.9") \
                             .when((col('avg_speed') >= 80) & (col('avg_speed') < 90), "80 - 89.9") \
                             .when((col('avg_speed') >= 90) & (col('avg_speed') < 100), "90 - 99.9") \
                             .when(col('avg_speed') >= 100, "100 - " + str(max_speed)) \
                             .otherwise(lit("other")))

speed_tbl = sum_df.groupBy(sum_df['speed_bucket']).count().collect()


tab_row = []
for row in speed_tbl:
    tab_row.append([float(re.search("(\S+)", row[0]).group()), \
                    float(re.search("[-]\s(\S+)", row[0]).group(1)), \
                   row[1]])
    
print(tabulate(sorted(tab_row), headers=['Minimum Average (mph)', 'Maximum Average (mph)', 'Ping Count']))

#test_df.show(15)
#spd_cnt = df.cache().count()
#print("{:,} pings removed {:.1%}. Ping universe now {:,}".format(init_cnt - spd_cnt, (init_cnt - spd_cnt) / float(init_cnt), spd_cnt))

Starting ping count = 1,263,325


  Minimum Average (mph)    Maximum Average (mph)    Ping Count
-----------------------  -----------------------  ------------
                  30.03                     49.9        141533
                  50                        59.9        114037
                  60                        69.9        193964
                  70                        79.9        166916
                  80                        89.9         32517
                  90                        99.9          6917
                 100                     24283.8          8092


In [19]:
init_cnt = df.cache().count()
print("Starting ping count = {:,}".format(init_cnt))

# Linear travel filter
w = Window().partitionBy('device_id', 'study_dt').orderBy('utc_timestamp')
l = Window().partitionBy('device_id', 'study_dt', 'lin_grp').orderBy('utc_timestamp')
lgrp = 4

df = df.withColumn('RecordNum',row_number().over(w)) \
    .withColumn('lin_grp', py.ceil(row_number().over(w) / lgrp)) \
    .withColumn('dist_to', distance(df['latitude'], df['longitude'], \
          lead(df['latitude'],1).over(l), lead(df['longitude'],1).over(l),'meters')) \
    .withColumn('sequence', row_number().over(l))


# Create aggregated table for linear groupings
expr = [py.min(col('utc_timestamp')).alias('min_utc_timestamp'),py.max(col('utc_timestamp')).alias('max_utc_timestamp'), \
    py.count(col('utc_timestamp')).alias('cnt'),py.sum(col('dist_to')).alias('sum_dist'),py.min(col('dist_to')).alias('min_dist')]

df_grp = df.groupBy('device_id','lin_grp').agg(*expr) 

df_l = df.filter(df['sequence'].isin([1,lgrp])).join(df_grp, ['device_id','lin_grp'])


#Measure the distance between first and last in each linear grouping and compare to sum distance of all points
# Only keep groups that meet criteria for being straight-line
df_j = df_l.withColumn('strt_dist', distance(df_l['latitude'],df_l['longitude'], \
            lead(df_l['latitude'],1).over(l), \
            lead(df_l['longitude'],1).over(l), 'meters')) \
        .withColumn('lin', col('strt_dist') / df_l['sum_dist']) \
        .na.drop(subset=['strt_dist']) \
        .filter((df_l['min_dist'] > 0)  \
            & (col('strt_dist').between(150, 2000)) \
            & (df_l['cnt'] == 4) \
            & (col('lin') >= .99825)) \
        .select('device_id','lin_grp', 'lin')

# Outer join main dataframe to linears groups to filter non-linear pings 
df = df.join(df_j, ['device_id','lin_grp'], how='left_outer') \
    .filter(col('lin').isNull()) \
    .drop('lin_grp', 'RecordNum', 'dist_to', 'sequence', 'lin')

lin_cnt = df.cache().count()
print("{:,} pings removed ({:.1%} of universe). Final ping count = {:,}".format(init_cnt - lin_cnt, (init_cnt - lin_cnt) / float(init_cnt), lin_cnt ))

Starting ping count = 599,349
6,144 pings removed (1.0% of universe). Final ping count = 593,205


In [20]:
#df_j.orderBy(col('device_id'), col('utc_timestamp')).show(10)

In [21]:
out_path = "/opt/spark/sample_data/daily-feed-reduced/"

for dt in study_dts:
    df \
    .filter(df['study_dt'] == str(dt)[0:10]) \
    .repartition(5, 'device_id').sortWithinPartitions('device_id','utc_timestamp') \
    .write \
    .csv(out_path + str(dt)[0:10].replace("-","") + "00", sep = ",", compression = "gzip")

AnalysisException: u'path file:/opt/spark/sample_data/daily-feed-reduced/2020073000 already exists.;'