# Purpose

The purpose of this notebook is to analysis gps data from Event 131 table

In [1]:
import datetime as dt
start = dt.datetime.now()
print("Notebook Last Run Initiated: "+str(start))
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
display(HTML("""<style>
    div.output_area{
        max-height:10000px;
        overflow:scroll;
    }
</style>"""))

Notebook Last Run Initiated: 2018-10-19 20:49:35.454170


# Run Locally or on Cluster

In [2]:
isLocal = False

# Module Imports

In [3]:
%load_ext autoreload
%autoreload
from datetime import datetime
import numpy as np
import pandas as pd
pd.set_option("display.max_rows",5000)
pd.set_option("display.max_columns",200)
import warnings
warnings.filterwarnings('ignore')
import math, random

from pyspark.sql.window import Window
from pyspark.sql.functions import avg, sum, udf, countDistinct, col, datediff, max, min, stddev, count, skewness, asc
from pyspark.sql.functions import kurtosis, corr, hour, date_format, desc, sqrt, weekofyear, month, dayofyear, pow
from pyspark.sql.types import IntegerType, BooleanType, DateType, StringType, LongType, Row
import pyspark.sql.types as T

import matplotlib
import matplotlib.pyplot as plt
from os.path import exists,join
from haversine import haversine
import igraph as ig
import pygeohash as geohash
import sys
from functools import reduce

from networkTrips import loadDeviceTrips, saveDeviceTrips
from sparkUtils import getSparkDFDim, modFunc, castLong, fillNAWithZero, castDate, getPandasDataFrame, castString, commonSparkAddOns
from sparkUtils import setupSpark, setupHive, setupSQL, getHiveData, isSparkDataFrame
from modelio import saveData, loadData, saveSparkData, appendSparkData, saveJoblib, loadJoblib
from timeUtils import clock, elapsed, getTimeSuffix, getDateTime, addDays, printDateTime, getFirstLastDay
from pandasUtils import castDateTime, castInt64, cutDataFrameByDate, convertToDate, isSeries, isDataFrame, getColData
from network import getNetworkGraph, rows_to_pandas_df, pandas_df_to_rows, extract_features

clock()
print(sys.version)
print(ig.__version__)

Current Time is Fri Oct 19, 2018 20:49:38 for Begin
3.6.4 |Anaconda custom (64-bit)| (default, Jan 16 2018, 18:10:19) 
[GCC 7.2.0]
0.7.1


# PySpark & Hive

In [4]:
if isLocal is False:
    sc     = setupSpark()
    if sc is not None:
        commonSparkAddOns(sc, debug=True)
        hc     = setupHive(sc)
        scsql  = setupSQL(sc)
start, cmt = clock("Finished Spark Setup")

Current Time is Fri Oct 19, 2018 20:49:38 for Setting up Spark/PySpark
PySpark home is /nas/isg_prodops_work/astro_research/spark-2.2.1-bin-hadoop2.6
Current Time is Fri Oct 19, 2018 20:50:11 for Done with Setting up Spark/PySpark
Process [Done with Setting up Spark/PySpark] took 33 seconds.
  Adding common files to SparkContent
  All common files added to SparkContent
Current Time is Fri Oct 19, 2018 20:50:16 for Finished Spark Setup


# Augment GPS Data

In [5]:
if not isLocal:
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType, DoubleType, FloatType

    def getGeo3(lat, long):
        try:
            retval = geohash.encode(lat, long, precision=3)
        except:
            retval = None
        return retval

    def getGeo4(lat, long):
        try:
            retval = geohash.encode(lat, long, precision=4)
        except:
            retval = None
        return retval

    def getGeo5(lat, long):
        try:
            retval = geohash.encode(lat, long, precision=5)
        except:
            retval = None
        return retval

    def getGeo6(lat, long):
        try:
            retval = geohash.encode(lat, long, precision=6)
        except:
            retval = None
        return retval

    def getGeo7(lat, long):
        try:
            retval = geohash.encode(lat, long, precision=7)
        except:
            retval = None
        return retval

    def getGeo8(lat, long):
        try:
            retval = geohash.encode(lat, long, precision=8)
        except:
            retval = None
        return retval

    prec=""
    get_geo3_udf = udf(lambda lat,long: getGeo3(lat, long), StringType())
    get_geo4_udf = udf(lambda lat,long: getGeo4(lat, long), StringType())
    get_geo5_udf = udf(lambda lat,long: getGeo5(lat, long), StringType())
    get_geo6_udf = udf(lambda lat,long: getGeo6(lat, long), StringType())
    get_geo7_udf = udf(lambda lat,long: getGeo7(lat, long), StringType())
    get_geo8_udf = udf(lambda lat,long: getGeo8(lat, long), StringType())
        
_, _ = clock("Last Run")

Current Time is Fri Oct 19, 2018 20:50:16 for Last Run


In [6]:
def loadGeoData(suffix, prec=7):
    from modelio import loadJoblib
    from pickle import load
    from timeUtils import clock, elapsed
    startall,cmtall = clock("Loading Geo Data")
    geomapname = "/home/tgadf/geomap-{0}-{1}.p".format(prec, suffix)
    start, cmt = clock("Loading {0}".format(geomapname))
    geomap     = load(open(geomapname, "rb"))
    elapsed(start, cmt)
    return geomap

def castInt(coldata):
    import pandas as pd
    coldata = pd.to_numeric(coldata, downcast='signed', errors='coerce')
    coldata = coldata.fillna(0)
    return coldata

def createGeoDataFrame(geomap, prec=7):
    from timeUtils import clock, elapsed
    from pandas import DataFrame
    start, cmt = clock("Creating Pandas Data Frame")
    geodf = DataFrame(geomap).T
    geodf.reset_index(inplace=True)
    cols = list(geodf.columns)
    cols[0] = "Geo{0}".format(prec)
    geodf.columns = cols

    for col in geodf.columns:
        geodf[col] = geodf[col].astype(str)
        if col.startswith("Geo") is False:
            geodf[col] = castInt(geodf[col])
        
    elapsed(start, cmt)
    return geodf

def createGeohashPandasDataFrame(suffix, prec=7):
    geomap = loadGeoData(suffix, prec)
    geodf  = createGeoDataFrame(geomap)
    return geodf

def createGeohashSparkDataFrame(suffix, prec, scsql, inputDBName):    
    from timeUtils import clock, elapsed
    startall,cmtall = clock("Creating Spark DataFrame from {0}".format(suffix))
    pddf = createGeohashPandasDataFrame(suffix, prec)
    
    start, cmt = clock("Creating Spark DataFrame")
    spdf   = scsql.createDataFrame(pddf)
    elapsed(start, cmt)

    start, cmt = clock("Saving spark dataframe to HIVE")
    saveSparkData(spdf, inputDBName, "Geo{0}Map".format(suffix))
    elapsed(startall, cmtall)
    
def getGeohashSparkDataFrame(suffix, prec, hc, scsql, inputDBName, force=False):
    if force:
        createGeohashSparkDataFrame(suffix, prec, scsql, inputDBName)
        spdf = None
    else:
        try:
            spdf = getHiveData(hc, inputDBName, "Geo{0}Map".format(suffix))
            spdf.cache()
            print("Total Rows = {0}".format(spdf.count()))
            print("   Columns = {0}".format(spdf.columns))
        except:
            createGeohashSparkDataFrame(suffix, prec, scsql, inputDBName)
            spdf = None
    return spdf

_, _ = clock("Last Run")

Current Time is Fri Oct 19, 2018 20:50:16 for Last Run


In [7]:
def castDouble(spdf, colname):
    from pyspark.sql.types import DoubleType
    spdf = spdf.withColumn(colname, spdf[colname].cast(DoubleType()))
    return spdf

def castTimestamp(spdf, colname):
    from pyspark.sql.types import TimestampType
    spdf = spdf.withColumn(colname, spdf[colname].cast(TimestampType()))
    return spdf

def DQ(spdf, spdfname = "DQ"):
    from pyspark.sql.functions import countDistinct, col, max, min
    print("Showing Data Quality for {0}".format(spdfname))
    start = clock()
    print("Total Rows = {0}".format(spdf.count()))
    spdf.agg(countDistinct("device")).show()
    spdf.agg(min(col("start"))).show()
    spdf.agg(max(col("end"))).show()
    elapsed(start, comment="Data Quality")
    
_, _ = clock("Last Run")

Current Time is Fri Oct 19, 2018 20:50:16 for Last Run


In [None]:
start, cmt = clock("Setting Params")
from timeUtils import getTimeSuffix
outdatadir     = '/home/tgadf/astro_research_data/futuremiles/sparkData'

inputDBName        = 'dra_cc_ce_res'
outputDBName       = inputDBName
gpsTableName       = 'gps_trip_data'
gpsTableName       = 'r4henricheddata2018Not'
networkTableName   = 'device_network_data'
gpsData            = None

elapsed(start, cmt)

Current Time is Fri Oct 19, 2018 20:50:16 for Setting Params
Current Time is Fri Oct 19, 2018 20:50:16 for Done with Setting Params
Process [Done with Setting Params] took 0 seconds.


# Get The GPS Trip Endpoint Data

In [18]:
startdata,cmtdata = clock("Getting data")

#gpsData=None    
if isLocal is False:
    if gpsData is None:
        start, cmt = clock(comment="Getting GSP Trip Endpoint Data")
        gpsData = getHiveData(hc, inputDBName, gpsTableName)
        gpsData = castDouble(gpsData, 'lat0')
        gpsData = castDouble(gpsData, 'lat1')
        gpsData = castDouble(gpsData, 'long0')
        gpsData = castDouble(gpsData, 'long1')
        elapsed(start, cmt)
        start, cmt = clock("Getting GPS Table Count")
        print("There are {0} entries in the GPS table".format(gpsData.count()))
        elapsed(start, cmt)
    
        DQ(gpsData)
        tmpData = gpsData.limit(1000)
        #tmpData.show()
        
elapsed(startdata,cmtdata)

Current Time is Thu Oct 18, 2018 19:50:14 for Getting data
Current Time is Thu Oct 18, 2018 19:50:14 for Done with Getting data
Process [Done with Getting data] took 0 seconds.


## HERE Spark DataFrame

In [None]:
spGeoHEREMap = None
if not isLocal:
    suffix = "HEREPOI"
    spGeoHEREMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spGeoHEREMap):
        oldColumns = [x for x in list(spGeoHEREMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns ]
        spGeoHEREMap = reduce(lambda spGeoHEREMap, idx: spGeoHEREMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoHEREMap)
        print("   Columns = {0}".format(spGeoHEREMap.columns))
_, _ = clock("Last Run")

# Illinois Specific Spark DataFrames

In [None]:
spGeoRoadsMap = None
if not isLocal:
    force=False
    suffix = "InterstateIllinois"
    spGeoInterstateIllinoisMap = getGeohashSparkDataFrame(suffix, 8, hc, scsql, inputDBName, force=force)
    
    suffix = "USRteIllinois"
    spGeoUSRteIllinoisMap = getGeohashSparkDataFrame(suffix, 8, hc, scsql, inputDBName, force=force)
    
    suffix = "StateRteIllinois"
    spGeoStateRteIllinoisMap = getGeohashSparkDataFrame(suffix, 8, hc, scsql, inputDBName, force=force)
    
    suffix = "HighwayIllinois"
    spGeoHighwayIllinoisMap = getGeohashSparkDataFrame(suffix, 8, hc, scsql, inputDBName, force=force)
    
    suffix = "MajorRdIllinois"
    spGeoMajorRdIllinoisMap = getGeohashSparkDataFrame(suffix, 8, hc, scsql, inputDBName, force=force)
    
    suffix = "RoadIllinois"
    spGeoRoadIllinoisMap = getGeohashSparkDataFrame(suffix, 8, hc, scsql, inputDBName, force=force)

## Interstate Spark DataFrame

In [None]:
spGeoRoadsMap = None
if not isLocal:
    force=False
    suffix = "InterstateA"
    spGeoRoadsAMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "InterstateB"
    spGeoRoadsBMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "InterstateC"
    spGeoRoadsCMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)

    from functools import reduce  # For Python 3.x
    from pyspark.sql import DataFrame

    def unionAll(*dfs):
        return reduce(DataFrame.unionAll, dfs)

    if all(isSparkDataFrame(x) for x in [spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap]):
        suffix="ROADS"
        spGeoInterstateMap = unionAll(spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap)
        oldColumns = [x for x in list(spGeoInterstateMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spGeoInterstateMap = reduce(lambda spGeoInterstateMap, idx: spGeoInterstateMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoInterstateMap)        
        print("Total Rows = {0}".format(spGeoInterstateMap.count()))    
        print("   Columns = {0}".format(spGeoInterstateMap.columns))
_, _ = clock("Last Run")

## USRte Spark DataFrame

In [None]:
spGeoRoadsMap = None
if not isLocal:
    force=False
    suffix = "USRteA"
    spGeoRoadsAMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "USRteB"
    spGeoRoadsBMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "USRteC"
    spGeoRoadsCMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)

    from functools import reduce  # For Python 3.x
    from pyspark.sql import DataFrame

    def unionAll(*dfs):
        return reduce(DataFrame.unionAll, dfs)

    if all(isSparkDataFrame(x) for x in [spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap]):
        spGeoUSRteMap = unionAll(spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap)
        suffix="ROADS"
        oldColumns = [x for x in list(spGeoUSRteMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spGeoUSRteMap = reduce(lambda spGeoUSRteMap, idx: spGeoUSRteMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoUSRteMap)        
        print("Total Rows = {0}".format(spGeoUSRteMap.count()))    
        print("   Columns = {0}".format(spGeoUSRteMap.columns))        
_, _ = clock("Last Run")

## StateRte Spark DataFrame

In [None]:
spGeoRoadsMap = None
if not isLocal:
    force=False
    suffix = "StateRteA"
    spGeoRoadsAMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "StateRteB"
    spGeoRoadsBMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "StateRteC"
    spGeoRoadsCMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)

    from functools import reduce  # For Python 3.x
    from pyspark.sql import DataFrame

    def unionAll(*dfs):
        return reduce(DataFrame.unionAll, dfs)

    if all(isSparkDataFrame(x) for x in [spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap]):
        spGeoStateRteMap = unionAll(spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap)
        suffix="ROADS"
        oldColumns = [x for x in list(spGeoStateRteMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spGeoStateRteMap = reduce(lambda spGeoStateRteMap, idx: spGeoStateRteMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoStateRteMap)        
        print("Total Rows = {0}".format(spGeoStateRteMap.count()))    
        print("   Columns = {0}".format(spGeoStateRteMap.columns))
_, _ = clock("Last Run")

## Highway Spark DataFrame

In [None]:
spGeoRoadsMap = None
if not isLocal:
    force=False
    suffix = "HighwayA"
    spGeoRoadsAMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "HighwayB"
    spGeoRoadsBMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "HighwayC"
    spGeoRoadsCMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)

    from functools import reduce  # For Python 3.x
    from pyspark.sql import DataFrame

    def unionAll(*dfs):
        return reduce(DataFrame.unionAll, dfs)

    if all(isSparkDataFrame(x) for x in [spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap]):
        spGeoHighwayMap = unionAll(spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap)
        suffix="ROADS"
        oldColumns = [x for x in list(spGeoHighwayMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spGeoHighwayMap = reduce(lambda spGeoHighwayMap, idx: spGeoHighwayMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoHighwayMap)        
        print("Total Rows = {0}".format(spGeoHighwayMap.count()))    
        print("   Columns = {0}".format(spGeoHighwayMap.columns))
_, _ = clock("Last Run")

## MajorRd Spark DataFrame

In [None]:
spGeoRoadsMap = None
if not isLocal:
    force=False
    suffix = "MajorRdA"
    spGeoRoadsAMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "MajorRdB"
    spGeoRoadsBMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "MajorRdC"
    spGeoRoadsCMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)

    from functools import reduce  # For Python 3.x
    from pyspark.sql import DataFrame

    def unionAll(*dfs):
        return reduce(DataFrame.unionAll, dfs)

    if all(isSparkDataFrame(x) for x in [spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap]):
        spGeoMajorRdMap = unionAll(spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap)
        suffix="ROADS"
        oldColumns = [x for x in list(spGeoMajorRdMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        newColumns = [x.replace("rd", "Rd") for x in newColumns]
        spGeoMajorRdMap = reduce(lambda spGeoMajorRdMap, idx: spGeoMajorRdMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoMajorRdMap) 
        print("Total Rows = {0}".format(spGeoMajorRdMap.count()))    
        print("   Columns = {0}".format(spGeoMajorRdMap.columns))
_, _ = clock("Last Run")

# Generic Road Spark DataFrame

In [None]:
spGeoRoadsMap = None
if not isLocal:
    force=True
    suffix = "RoadA"
    spGeoRoadsAMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "RoadB"
    spGeoRoadsBMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)
    suffix = "RoadC"
    spGeoRoadsCMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=force)

    from functools import reduce  # For Python 3.x
    from pyspark.sql import DataFrame

    def unionAll(*dfs):
        return reduce(DataFrame.unionAll, dfs)

    if all(isSparkDataFrame(x) for x in [spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap]):
        spGeoGenRdMap = unionAll(spGeoRoadsAMap, spGeoRoadsBMap, spGeoRoadsCMap)
        suffix="ROADS"
        oldColumns = [x for x in list(spGeoGenRdMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        newColumns = [x.replace("rd", "Rd") for x in newColumns]
        spGeoGenRdMap = reduce(lambda spGeoGenRdMap, idx: spGeoGenRdMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoGenRdMap) 
        print("Total Rows = {0}".format(spGeoGenRdMap.count()))    
        print("   Columns = {0}".format(spGeoGenRdMap.columns))
_, _ = clock("Last Run")

## Drivewise POI Spark DataFrame

In [None]:
spGeoPOIMap = None
if not isLocal:
    suffix = "POI"
    spGeoPOIMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spGeoPOIMap):
        spGeoPOIMap = spGeoPOIMap.withColumnRenamed('geo7', 'Geo7')
        spGeoPOIMap = spGeoPOIMap.withColumnRenamed('Count', 'POIVisits')
        spGeoPOIMap = spGeoPOIMap.withColumnRenamed('Distinct', 'POIUniqueVisits')    
        print("   Columns = {0}".format(spGeoPOIMap.columns))
_, _ = clock("Last Run")

## Traffic OSM Spark DataFrame

In [None]:
spOSMTrafficMap = None
if not isLocal:
    suffix = "trafficOSM"
    spOSMTrafficMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spOSMTrafficMap):
        suffix="OSM"
        oldColumns = [x for x in list(spOSMTrafficMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spOSMTrafficMap = reduce(lambda spOSMTrafficMap, idx: spOSMTrafficMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spOSMTrafficMap)  
        print("   Columns = {0}".format(spOSMTrafficMap.columns))
_, _ = clock("Last Run")

## Place of Worship (POFW) OSM Spark DataFrame

In [None]:
spOSMPofwMap = None
if not isLocal:
    suffix = "pofwOSM"
    spOSMPofwMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spOSMPofwMap):
        suffix="OSM"
        oldColumns = [x for x in list(spOSMPofwMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spOSMPofwMap = reduce(lambda spOSMPofwMap, idx: spOSMPofwMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spOSMPofwMap)  
        print("   Columns = {0}".format(spOSMPofwMap.columns))
_, _ = clock("Last Run")

## Transport OSM Spark DataFrame

In [None]:
spOSMTransportMap = None
if not isLocal:
    suffix = "transportOSM"
    spOSMTransportMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spOSMTransportMap):
        suffix="OSM"
        oldColumns = [x for x in list(spOSMTransportMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spOSMTransportMap = reduce(lambda spOSMTransportMap, idx: spOSMTransportMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spOSMTransportMap)  
        print("   Columns = {0}".format(spOSMTransportMap.columns))
_, _ = clock("Last Run")

## POI OSM Spark DataFrame

In [None]:
spOSMPOIMap = None
if not isLocal:
    suffix = "poisOSM"
    spOSMPOIMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spOSMPOIMap):
        suffix="OSM"
        oldColumns = [x for x in list(spOSMPOIMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spOSMPOIMap = reduce(lambda spOSMPOIMap, idx: spOSMPOIMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spOSMPOIMap)  
        print("   Columns = {0}".format(spOSMPOIMap.columns))
_, _ = clock("Last Run")

## Terminal Spark DataFrame

In [None]:
spGeoTerminalsMap = None
if not isLocal:
    suffix = "Terminals"
    spGeoTerminalsMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spGeoTerminalsMap):
        oldColumns = [x for x in list(spGeoTerminalsMap.columns) if x != "Geo7"]
        newColumns = ["{0}{1}".format(suffix, x.title()) for x in oldColumns]
        spGeoTerminalsMap = reduce(lambda spGeoTerminalsMap, idx: spGeoTerminalsMap.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), spGeoTerminalsMap)  
        print("   Columns = {0}".format(spGeoTerminalsMap.columns))
_, _ = clock("Last Run")

## Census Spark DataFrame

In [None]:
spGeoCensusCSAMap = None
if not isLocal:
    suffix = "CouSub"
    spGeoCensusCSAMap = getGeohashSparkDataFrame(suffix, 6, hc, scsql, inputDBName, force=True)
    if isSparkDataFrame(spGeoCensusCSAMap):
        print("   Columns = {0}".format(spGeoCensusCSAMap.columns))
_, _ = clock("Last Run")

Current Time is Fri Oct 19, 2018 20:50:16 for Creating Spark DataFrame from CouSub
Current Time is Fri Oct 19, 2018 20:50:16 for Loading Geo Data
Current Time is Fri Oct 19, 2018 20:50:16 for Loading /home/tgadf/geomap-6-CouSub.p
Current Time is Fri Oct 19, 2018 20:50:39 for Done with Loading /home/tgadf/geomap-6-CouSub.p
Process [Done with Loading /home/tgadf/geomap-6-CouSub.p] took 22 seconds.
Current Time is Fri Oct 19, 2018 20:50:39 for Creating Pandas Data Frame


In [34]:
spGeoCensusMap = getGeohashSparkDataFrame("", 6, hc, scsql, inputDBName, force=True)
jData = spGeoCensusMap
#geomap = loadGeoData("CSA_2010Census_DP1", 6)
#createGeohashPandasDataFrame("CSA_2010Census_DP1", 6)
#geomap

Current Time is Thu Oct 18, 2018 20:35:05 for Creating Spark DataFrame from CouSub
Current Time is Thu Oct 18, 2018 20:35:05 for Loading Geo Data
Current Time is Thu Oct 18, 2018 20:35:05 for Loading /home/tgadf/geomap-6-CouSub.p
Current Time is Thu Oct 18, 2018 20:35:24 for Done with Loading /home/tgadf/geomap-6-CouSub.p
Process [Done with Loading /home/tgadf/geomap-6-CouSub.p] took 19 seconds.
Current Time is Thu Oct 18, 2018 20:35:24 for Creating Pandas Data Frame
Current Time is Thu Oct 18, 2018 20:58:06 for Done with Creating Pandas Data Frame
Process [Done with Creating Pandas Data Frame] took 22.7 minutes.
Current Time is Thu Oct 18, 2018 20:58:09 for Creating Spark DataFrame
Current Time is Thu Oct 18, 2018 21:07:37 for Done with Creating Spark DataFrame
Process [Done with Creating Spark DataFrame] took 9.4 minutes.
Current Time is Thu Oct 18, 2018 21:07:37 for Saving spark dataframe to HIVE
Current Time is Thu Oct 18, 2018 21:07:37 for Saving spark dataframe to dra_cc_ce_res.G

AttributeError: 'NoneType' object has no attribute 'write'

In [33]:
xData = tmpData.drop('GeoPoint', 'Accepted', 'summary', 'Policy_Identifier', 'ADW_Enroll_ID', 'ADW_Member_ID', 'ADW_Operator_ID', 'CCT_Enroll_ID', 'CCT_Member_ID', 'CCT_Operator_ID', 'tripId', 'tripStartLocation', 'tripEndLocation', 'tripTerminateReason', 'tripRejectReason', 'mobileAppDevice', 'mobileAppVersion', 'mobileOsVersion')
xData = xData.drop('Start', 'End', 'Duration', 'total_miles', 'inR4HTimePeriod')
xData = xData.withColumn('geo3', get_geo3_udf('lat0', 'long0'))
xData = xData.withColumn('geo4', get_geo4_udf('lat0', 'long0'))
xData = xData.withColumn('geo5', get_geo5_udf('lat0', 'long0'))
xData = xData.withColumn('geo6', get_geo6_udf('lat0', 'long0'))
xData = xData.join(jData, on=[xData['geo3'] == jData.Geo], how="left").drop("Geo").withColumnRenamed("CSA", "CSA3")
xData = xData.join(jData, on=[xData['geo4'] == jData.Geo], how="left").drop("Geo").withColumnRenamed("CSA", "CSA4")
xData = xData.join(jData, on=[xData['geo5'] == jData.Geo], how="left").drop("Geo").withColumnRenamed("CSA", "CSA5")
xData = xData.join(jData, on=[xData['geo6'] == jData.Geo], how="left").drop("Geo").withColumnRenamed("CSA", "CSA6")
xData.show(1)

+---------------+----------------+-------------------+-----------------+-------------------+----+----+-----+------+----+----+----+----+
|         device|            lat0|              long0|             lat1|              long1|geo3|geo4| geo5|  geo6|CSA3|CSA4|CSA5|CSA6|
+---------------+----------------+-------------------+-----------------+-------------------+----+----+-----+------+----+----+----+----+
|000V8N8MBGFQQ20|33.4540082216686|-112.07394435906116|33.49648051732159|-112.06978008883728| 9tb|9tbq|9tbq3|9tbq3d|null|null|null|null|
+---------------+----------------+-------------------+-----------------+-------------------+----+----+-----+------+----+----+----+----+
only showing top 1 row



In [25]:
geos = [x[0] for x in jData.select('Geo').collect()]


In [26]:
x.show(1)

+---------------+-----------------+-------------------+-----------------+-------------------+----+----+-----+------+----+----+
|         device|             lat0|              long0|             lat1|              long1|geo3|geo4| geo5|  geo6| Geo| CSA|
+---------------+-----------------+-------------------+-----------------+-------------------+----+----+-----+------+----+----+
|000V8N8MBGFQQ20|33.48029410807537|-112.05960016294972|33.48713300373057|-111.91922504383615| 9tb|9tbq|9tbq6|9tbq6n|null|null|
+---------------+-----------------+-------------------+-----------------+-------------------+----+----+-----+------+----+----+
only showing top 1 row



In [59]:
#jData.Geo
geos = [x[0] for x in jData.select('Geo').collect()]
from collections import Counter
tmp = Counter()
for geo in geos:
    tmp[len(geo)] += 1
print(tmp.most_common())
#[i for i in jData.Geo.collect()]

#tmpData

xData = tmpData.withColumn('geo{0}0'.format(5), get_geo5_udf('lat0', 'long0'))
x = dropNullGeoJoin(xData.join(jData, on=[xData.geo50 == jData.Geo], how="left"), "Geo").drop('geo{0}0'.format(5))
geo5 = [y[0] for y in x.select('Geo').collect()]
xData = tmpData.withColumn('geo{0}0'.format(6), get_geo6_udf('lat0', 'long0'))
x = dropNullGeoJoin(xData.join(jData, on=[xData.geo60 == jData.Geo], how="left"), "Geo").drop('geo{0}0'.format(6))
geo6 = [y[0] for y in x.select('Geo').collect()]
print(len(set(geo5).intersection(set(geos))))
print(len(set(geo6).intersection(set(geos))))

[(6, 351840), (5, 36629), (4, 1289), (3, 1), (2, 1)]
419
52


In [52]:
x.show(3)

+-----------------+-------------+-------------+---------------+-------------+-------------+---------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+-----------+-------------------+----------------+---------------+----------------+---------------+---------------+-----------+------------+-----------+------------+---+
|Policy_Identifier|ADW_Enroll_ID|ADW_Member_ID|ADW_Operator_ID|CCT_Enroll_ID|CCT_Member_ID|CCT_Operator_ID|         device|              tripId|               Start|                 End|   tripStartLocation|     tripEndLocation|Duration|total_miles|tripTerminateReason|tripRejectReason|mobileAppDevice|mobileAppVersion|mobileOsVersion|inR4HTimePeriod|       lat0|       long0|       lat1|       long1|CSA|
+-----------------+-------------+-------------+---------------+-------------+-------------+---------------+---------------+--------------------+--------------------+--------------------+--

In [43]:
def dropNullGeoJoin(spdf, colname):
    return spdf.na.drop(subset=[colname])

#tmpData.show(10)
jData = spGeoCensusCSAMap
geoData = []
xData = tmpData.withColumn('geo{0}0'.format(3), get_geo3_udf('lat0', 'long0'))
geoData.append(dropNullGeoJoin(xData.join(jData, on=[xData.geo30 == jData.Geo], how="left"), "Geo").drop("Geo").drop('geo{0}0'.format(3)))
xData = tmpData.withColumn('geo{0}0'.format(4), get_geo4_udf('lat0', 'long0'))
geoData.append(dropNullGeoJoin(xData.join(jData, on=[xData.geo40 == jData.Geo], how="left"), "Geo").drop("Geo").drop('geo{0}0'.format(4)))
xData = tmpData.withColumn('geo{0}0'.format(5), get_geo5_udf('lat0', 'long0'))
geoData.append(dropNullGeoJoin(xData.join(jData, on=[xData.geo50 == jData.Geo], how="left"), "Geo").drop("Geo").drop('geo{0}0'.format(5)))
xData = tmpData.withColumn('geo{0}0'.format(6), get_geo6_udf('lat0', 'long0'))
geoData.append(dropNullGeoJoin(xData.join(jData, on=[xData.geo60 == jData.Geo], how="left"), "Geo").drop("Geo").drop('geo{0}0'.format(6)))
xData = tmpData.withColumn('geo{0}0'.format(7), get_geo7_udf('lat0', 'long0'))
geoData.append(dropNullGeoJoin(xData.join(jData, on=[xData.geo70 == jData.Geo], how="left"), "Geo").drop("Geo").drop('geo{0}0'.format(7)))
xData = tmpData.withColumn('geo{0}0'.format(8), get_geo8_udf('lat0', 'long0'))
geoData.append(dropNullGeoJoin(xData.join(jData, on=[xData.geo80 == jData.Geo], how="left"), "Geo").drop("Geo").drop('geo{0}0'.format(8)))

print("============================================================")    
for j in geoData:
    print(j.count())
#print(geoData[0].show(3))
if False:
    if debug:
        gpsData.show(3)

    cols = geo5Data.columns[1:]
    start, cmt = clock("Augmenting data with {0} information".format(name))
    
    gpsData = gpsData.join(geo5Data, on=[gpsData.geo50 == geo5Data.Geo5], how="left")

1
6698
726
35
0
0


In [19]:
def loadGeoData(suffix, prec=7):
    from modelio import loadJoblib
    from pickle import load
    from timeUtils import clock, elapsed
    startall,cmtall = clock("Loading Geo Data")
    geomapname = "/home/tgadf/geomap-{0}-{1}.p".format(prec, suffix)
    start, cmt = clock("Loading {0}".format(geomapname))
    geomap     = load(open(geomapname, "rb"))
    elapsed(start, cmt)
    return geomap

def castInt(coldata):
    import pandas as pd
    coldata = pd.to_numeric(coldata, downcast='signed', errors='coerce')
    coldata = coldata.fillna(0)
    return coldata

def createGeoDataFrame(geomap, prec=7):
    from timeUtils import clock, elapsed
    from pandas import DataFrame
    start, cmt = clock("Creating Pandas Data Frame")
    geodf = DataFrame(geomap).T
    geodf.reset_index(inplace=True)
    cols = list(geodf.columns)
    cols[0] = "Geo" #{0}".format(prec)
    geodf.columns = cols

    for col in geodf.columns:
        geodf[col] = geodf[col].astype(str)
        if col.startswith("Geo") is False:
            geodf[col] = castInt(geodf[col])
        
    elapsed(start, cmt)
    return geodf

def createGeohashPandasDataFrame(suffix, prec=7):
    geomap = loadGeoData(suffix, prec)
    geodf  = createGeoDataFrame(geomap)
    return geodf

def createGeohashSparkDataFrame(suffix, prec, scsql, inputDBName):    
    from timeUtils import clock, elapsed
    startall,cmtall = clock("Creating Spark DataFrame from {0}".format(suffix))
    pddf = createGeohashPandasDataFrame(suffix, prec)
    
    start, cmt = clock("Creating Spark DataFrame")
    spdf   = scsql.createDataFrame(pddf)
    elapsed(start, cmt)

    start, cmt = clock("Saving spark dataframe to HIVE")
    saveSparkData(spdf, inputDBName, "Geo{0}Map".format(suffix))
    elapsed(startall, cmtall)
    
def getGeohashSparkDataFrame(suffix, prec, hc, scsql, inputDBName, force=False):
    if force:
        createGeohashSparkDataFrame(suffix, prec, scsql, inputDBName)
        spdf = None
    else:
        try:
            spdf = getHiveData(hc, inputDBName, "Geo{0}Map".format(suffix))
            spdf.cache()
            print("Total Rows = {0}".format(spdf.count()))
            print("   Columns = {0}".format(spdf.columns))
        except:
            createGeohashSparkDataFrame(suffix, prec, scsql, inputDBName)
            spdf = None
    return spdf

# Geohash Functions

In [None]:
def joinGeo5Data(gpsData, geo5Data, name, fillNA=None, debug=False):
    if not all([isSparkDataFrame(x) for x in [gpsData, geo5Data]]):
        print("Cannot join GPS and {0} data".format(name))
        return None
    
    ########################################################################################################################
    ## Census
    ########################################################################################################################
    print("\n================ Joining GPS and {0} Data ================\n".format(name))

    cenprec=5
    gpsData = gpsData.withColumn('geo{0}0'.format(cenprec), get_geo5_udf('lat0', 'long0')).withColumn('geo{0}1'.format(cenprec), get_geo5_udf('lat1', 'long1'))
    if debug:
        gpsData.show(3)

    cols = geo5Data.columns[1:]
    start, cmt = clock("Augmenting data with {0} information".format(name))
    
    gpsData = gpsData.join(geo5Data, on=[gpsData.geo50 == geo5Data.Geo5], how="left")
    for col in cols:
        gpsData = gpsData.withColumnRenamed(col, 'Geo0{0}ID'.format(col))
    gpsData = gpsData.drop('Geo5').drop('geo50')
    if fillNA is not None:
        gpsData = gpsData.na.fill(fillNA)
    if debug:
        gpsData.show(3)

    gpsData = gpsData.join(geo5Data, on=[gpsData.geo51 == geo5Data.Geo5], how="left")
    for col in cols:
        gpsData = gpsData.withColumnRenamed(col, 'Geo1{0}ID'.format(col))
    gpsData = gpsData.drop('Geo5').drop('geo51')
    if fillNA is not None:
        gpsData = gpsData.na.fill(fillNA)
    if debug:
        gpsData.show(3)
    
    elapsed(start, cmt)
    
    return gpsData

In [None]:
def joinGeo7Data(gpsData, geo7Data, name, fillNA=None, debug=False):
    if not all([isSparkDataFrame(x) for x in [gpsData, geo7Data]]):
        print("Cannot join GPS and {0} data".format(name))
        return None
    
    ########################################################################################################################
    ## Census
    ########################################################################################################################
    print("\n================ Joining GPS and {0} Data ================\n".format(name))

    cenprec=7
    gpsData = gpsData.withColumn('geo{0}0'.format(cenprec), get_geo7_udf('lat0', 'long0')).withColumn('geo{0}1'.format(cenprec), get_geo7_udf('lat1', 'long1'))
    if debug:
        gpsData.show(3)

    cols = geo7Data.columns[1:]
    start, cmt = clock("Augmenting data with {0} information".format(name))
    
    gpsData = gpsData.join(geo7Data, on=[gpsData.geo70 == geo7Data.Geo7], how="left")
    for col in cols:
        gpsData = gpsData.withColumnRenamed(col, 'Geo0{0}ID'.format(col))
    gpsData = gpsData.drop('Geo7').drop('geo70')
    if fillNA is not None:
        gpsData = gpsData.na.fill(fillNA)
    if debug:
        gpsData.show(3)

    gpsData = gpsData.join(geo7Data, on=[gpsData.geo71 == geo7Data.Geo7], how="left")
    for col in cols:
        gpsData = gpsData.withColumnRenamed(col, 'Geo1{0}ID'.format(col))
    gpsData = gpsData.drop('Geo7').drop('geo71')
    if fillNA is not None:
        gpsData = gpsData.na.fill(fillNA)
    if debug:
        gpsData.show(3)
    
    elapsed(start, cmt)
    
    return gpsData

In [None]:
## Census
gpsData = joinGeo5Data(gpsData, spGeoCensusMap, fillNA="NA", name="Census")

## HERE
gpsData = joinGeo7Data(gpsData, spGeoHEREMap, fillNA=0, name="HERE")

## OSM Traffic
gpsData = joinGeo7Data(gpsData, spOSMTrafficMap, fillNA=0, name="TrafficOSM")

## OSM Transit
gpsData = joinGeo7Data(gpsData, spOSMTransportMap, fillNA=0, name="TransportOSM")

## OSM Powf
gpsData = joinGeo7Data(gpsData, spOSMPofwMap, fillNA=0, name="PowfOSM")

## OSM POI
gpsData = joinGeo7Data(gpsData, spOSMPOIMap, fillNA=0, name="PoisOSM")

## Interstate
gpsData = joinGeo7Data(gpsData, spGeoInterstateMap, fillNA=0, name="Interstate")

## USRte
gpsData = joinGeo7Data(gpsData, spGeoUSRteMap, fillNA=0, name="USRte")

## StateRte
gpsData = joinGeo7Data(gpsData, spGeoStateRteMap, fillNA=0, name="StateRte")

## Highway
gpsData = joinGeo7Data(gpsData, spGeoHighwayMap, fillNA=0, name="Highway")

## MajorRd
gpsData = joinGeo7Data(gpsData, spGeoMajorRdMap, fillNA=0, name="MajorRd")

## POI
gpsData = joinGeo7Data(gpsData, spGeoPOIMap, fillNA=0, name="POI")

## Terminals
gpsData = joinGeo7Data(gpsData, spGeoTerminalsMap, fillNA=0, name="Terminals")


if False:
    ## OSM Buildings
    gpsData = joinGeo7Data(gpsData, spGeoHEREMap, fillNA=0, name="buildingsOSM")

    ## Venue
    gpsData = joinGeo7Data(gpsData, spGeoVenueMap, fillNA=0, name="Venue")

    ## College
    gpsData = joinGeo7Data(gpsData, spGeoCollegeMap, fillNA=0, name="College")

    ## School
    gpsData = joinGeo7Data(gpsData, spGeoSchoolMap, fillNA=0, name="School")

    ## Auto
    gpsData = joinGeo7Data(gpsData, spGeoAutoMap, fillNA=0, name="Auto")

    ## Rail
    gpsData = joinGeo7Data(gpsData, spGeoRailMap, fillNA=0, name="Rail")

In [None]:
gpsData2017 = gpsData
gpsData2017.cache()
print("Total Rows = {0}".format(gpsData2017.count()))    
print("   Columns = {0}".format(gpsData2017.columns))

In [None]:
gpsData2018 = gpsData
gpsData2018.cache()
print("Total Rows = {0}".format(gpsData2018.count()))    
print("   Columns = {0}".format(gpsData2018.columns))

In [None]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

gpsData = unionAll(gpsData2017, gpsData2018)

In [None]:
saveSparkData(gpsData, inputDBName, "r4happend")

In [None]:
testSPdf,testPDdf = saveDeviceTrips(gpsData, None, 'r4hNot', savePandas=True)

In [None]:
if True and isLocal is False:
    tmp = gpsData.filter("Geo0TractID is NULL")
    tmp = tmp.select('lat0', 'long0', 'geo0')
    print("Saving NoTractData!")
    tmp.toPandas().to_csv('notract.csv')

if False and isLocal is False:    
    counts = gpsData.groupBy('device').count().toPandas()
    counts.sort_values(by=['count'], ascending=False, inplace=True)
    touse=counts['device'].tolist()[:200]
    print(", ".join(touse))

# Subset the data if needed

In [None]:
if isLocal is False:
    def distHash(gcode1, gcode2):
        """
        distHash(gcode1, gcode2)

        inputs: gcode1 (geohash), gcode2 (geohash)
        outputs: distance (km)
        """
        pnt1 = geohash.decode_exactly(gcode1)[:2]
        pnt2 = geohash.decode_exactly(gcode2)[:2]
        dist = haversine(pnt1, pnt2)
        return dist

    def getCityData(cityname):
        poi = loadJoblib('poi-7.p')
        cities={}
        if False:
            cities['naperville'] = {"lat": 41.750839, "long": -88.153535, "radius": 0.05}
            cities['knoxville']  = {'lat': 35.964668, 'long': -83.926453, 'radius': 0.15}
            cities['wrigley']    = {'lat': 41.948437, 'long': -87.655334, 'radius': 0.03} # Wrigley Field
            cities['alexandria'] = {'lat': 38.80472,  'long': -77.04722, 'radius': 0.075}  # Alexandria, VA
            cities['dallas']     = {'long': -97.04044, 'lat': 32.897480, 'radius': 0.15}
            cities['urbana']     = {'long': -88.24338, 'lat': 40.116421, 'radius': 0.15}
            cities['portjeff']   = {'long': -73.06927, 'lat': 40.946487, 'radius': 0.15}
        cities['r4h']        = {'lat': 38.88184,  'long': -77.10806, 'radius': 0.1} # r4h
        for city,citydata in cities.items():
            if cityname != city:
                continue
            print("--> {0}".format(city))
            geocity = geohash.encode(citydata['lat'], citydata['long'], precision=7)
            poicity = {}
            for k,v in poi.items():
                poicity[k] = set()
                for geo in v:
                    try:
                        dist = distHash(geocity, geo)
                    except:
                        continue
                    if dist < citydata['radius']*111:
                        poicity[k].add(geo)
                print("  Keeping {0} out of {1}".format(len(poicity[k]), len(v)))
            print("--> {0}".format(city))
            saveJoblib(poicity, 'poi-{0}.p'.format(city))


            lat0 = citydata['lat']
            long0 = citydata['long']
            dl = citydata['radius']
            test = gpsData
            test = test.filter('lat0 > {0}'.format(lat0-dl))
            test = test.filter('lat0 < {0}'.format(lat0+dl))
            test = test.filter('long0 < {0}'.format(long0+dl))
            test = test.filter('long0 > {0}'.format(long0-dl))
            test = test.filter('lat1 > {0}'.format(lat0-dl))
            test = test.filter('lat1 < {0}'.format(lat0+dl))
            test = test.filter('long1 < {0}'.format(long0+dl))
            test = test.filter('long1 > {0}'.format(long0-dl))
            print("There are {0} rows in this data".format(test.count()))

            counts = test.groupBy('device').count().toPandas()
            counts.sort_values(by=['count'], ascending=False, inplace=True)
            touse=counts['device'].tolist()[:200]
            print(", ".join(touse))
            
    getCityData('r4h')

In [None]:
if isLocal is False:
    try:
        1/0
        testSPdf = getHiveData(hc, inputDBName, "testNetworkTable")
        testSPdf.cache()
    except:
        savePandas=False
        saveSPdf=True
        touse=['CHR1VHPYJOCHQPE', 'IS3XXESAIT8L7JQ', '025Y89CNOVL7M6T', '1W0L9CUYZKLC2EH', 'KV1LY17CFLOJEPO', 'THLIUIW5MLET3TD', 'EVO43H0OZ1891SG', '16ZIWIW0ZKCUVP0', 'EPO7MNGEWEB5A1O', 'JQAAQNT14S0A1FG', 'RS9KSH4GAAKW0HU', 'RBHRZPW4FAAAJ8W', 'QZE1TJP8BN4KIV4', 'VVIW09JJLL1O3DF', 'HOAGN03E70MTE1M', '1A0CS88DINJBIN0', '5UWVYLOUNI32SSK', 'C7U0B2N53EJIMUF', 'OHD6Z42Q7ECHWTM', 'GIRPNJ941S0DJZ8', 'R8ZCEHQCSI65PNU', 'BEVTKC4BDQQ14OP', 'LQHRNV1BZI2L84U', '02SZVM3K17UJPM5', 'L23BGHTNJ1CXV1B', 'IYGPJ1PUMU56DL4', 'CPGHR562L6GPWIE', 'HW5FEU8YHA62JWS', '26WGI6ZS4C2C8TZ', 'Y81QXYAVCS6N6YG', '0DRBJ7Y93D8IUZC', 'CCS3PJZFU79D85B', '4ETQIFWVWS7U9HT', 'QIO27XPVVLB4X4P', 'BT16QXY82T8WS18', 'EMAQM1SCIKUYYO4', '174MJHGHYBB4AWO', 'AAYIHQ9XDW8LY1P', 'BX6MMF5T9EGXBTQ', 'VGSSQG41NB6MMP5', '4CVWEU85854SGGC', 'ZL2XSDZOG2WQCNS', 'JLRI1S0UXHP2W9L', 'SDL4FYC5SUPMGKM', '1P1PROI93U5VBK8', 'F0JMBVEMTI0YD76', 'AOVS27L7GLDB1MB', 'OT82MGHMQVHVTAA', '27LL22GI5QX6G2I', 'HC15603LOJ0GVCR', '21D8J8P4FHXWS95', 'B6EKFX3A6GXSRU2', 'KN1AA4G1QSGAG0S', '6U51EKFNJMRR89T', 'Y2FY4ZDDPXROHRL', 'E7PY6WY05B3EA3L', 'HU6CEAML486T9OT', '7624ZGZ4JL5T2JU', 'SH5V63L92H644QG', '496B5OZ3JRW57L8', 'X4C5VC6ACGEOMYR', 'SGAXETSMRD2PC9L', 'PDWX0IVKSKQJ18N', 'IQ8X9Q8VED2CT2H', '064USL6GN4JXZW5', 'IBI92M9BVN6KUFU', 'AFN57RW69W1POWM', 'GZ95CHZ10ZUL21Q', 'PCBJ0ZUBX2LKUDC', 'VJAX94RCMO1QZGM', '9Z9HTI8BQYEPM5L', '3QWKN1GUBPXV1YG', 'U634DFGQ46CVZU1', '64TSVKTBGE6MUC4', 'LL2DKPE8VZJHOIQ', '3UGZH50IXU5M9GC', 'RPBOYKOUCKBEKWZ', '0W3WBE75JUPYBGB', '2K3KOAIGZ8H7E9F', 'EA0H49UFAHFIF1O', '2PO0ILNAH0Y6H5H', 'L9XWIWAS7TXZ5EI', '0DECP5BGAZ4D7OK', 'OM891TXIKHPFCEM', 'WT9PSHKQQNTONDQ', '34DW4A69OHV9VEA', 'PTE6489IRYPJZK9', 'G5736JP5MDWAO7R', 'HWPR7MYXNIOA1YM', '10EIURE3JVZWQOB', '7WU2IRYX9K9DFAK', 'V8B440NXJ9V36E1', '321T0ODW2B1X4NS', 'W0CLLCZP0PR9GT8', '76B42YPIOSB5PP5', 'GTPWG21BY6DNJ1A', 'M5IP1YYLH2T4MTV', 'LX80T9LCBJJP1A8', '8QPXFB0NR9XSL1R', 'QQHYEH1QSZ9BJ4C', '16WHBD18ANFXSC7', 'VBPQ1RZOHXQPU33', '5IRLIJ835DZQ7C9', 'S6ODHD1JNPP3LEF', 'VCKG70S09GFC18Y', 'KG9RCO0YRZNU229', 'X095KVI0YHCMXW8', 'SCGTEG1ZHDJCWUV', '8DHZIIXNRE2WDMO', 'B7YAFVR8T8UCO7T', 'U9FU6AB9TDVVASA', '6QE1FZ47BX9V8XV', 'H49QLSOALH391XB', 'EWULP0RUHTTTGSY', 'BWIXVB2VPFPL944', 'VND6SY2FADSI1XJ', 'N6KDDLYQDWEY03R', 'MO62RDISA9ZV2XE', 'A8PHV89STDXGJH0', '6QCF7KG3JDOZBOV', '75B3PTOBC1MSVW3', '4SREWJ404ADPL7G', 'FFE2PH8NGP9MGFR', 'RV47BQ3E7UF9ECI', 'SAPBLSYZE6Y6VQB', 'JESTIZQ3RO8AQDN', 'TFSWNYZPCQE66ZX', 'MNYI2VKFDE16FQZ', 'HJKJO1K2XKJOUP7', 'AAU4UFKKIBRAMGC', 'GDSBM6GB26J3N57', 'KGFLPA85K7IH92O', 'GKIIUA9CDSXPXYJ', 'TACCU73WROPWJZY', '03462RPFLK6E1SC', '4YVUNWIOVX59NBE', 'H6ZC3U4RMI811UX', 'AB3XSJDBYLWE7HF', 'R32N8RG51ZFBRKE', '8YOANILU9ZC92XN', 'BNN8BA3PKJ7R586', 'HNULR1ZEIF4FJCO', 'W4WQQYRHFSQPWWF', 'BRGN43WQOFJBX6T', '77C16XBOK1E1TDX', 'ZIA83JXA3WMZXY5', '3L3T2M4XV48L9YA', 'O7C8CK9XK9TRHK5', '1TZW5S459KEW0WL', '5SY6WCZNXSSDQYR', 'IE4O0EYAN1PIRI4', '7F730A7999IYVMI', 'SNH21FUYX7186QX', 'RO4SHGLP17FB0D6', 'MN9CR68XFO090C9', 'CV2RUI0PIK75M0N', 'S1M2FU7C3SMG9XM', 'AU6RTXA62SCS2SH', 'LZ4DP1NHJJG20AG', 'YWD7K7RNTHQXX92', '0Z78Q2TFZEB7T39', 'VO5BN4UUTU9CPP5', 'LBZUVTNJSYX7FC5', '0PQXGJLTD89UXTD', 'RXJ3XFFEDJRIK5C', 'N3R73MPTBHXALLF', 'HG5DGOAQBDWBUF9', 'QE54JE02BKJE6HI', 'QQV9CD0C2VUVZK7', 'SP1X0K7ASWKCBSM', 'XU0SF1AXLKK88RP', 'FH039Y5D0O0OHTA', '5FB6YYUH3LLPLW5', 'G95T3O2LXMA53Z9', '922OAUBOIB997SA', 'DJ2S7HP4RSRH2H8', '4HQ1E7F6JWY5DE1', 'D3WQPZVPHXTAXGC', 'XLCTF0OYKMDY9X0', 'VZ7TZKBHVQ0LU6A', '71JUGFNNYU0564P', 'JUHDWSKJDGCBZOS', 'Y7E9LQ2Y6OTUV20', 'D2444UOK8WLK30Y', 'UFJ8MFSUQBUHIV1', 'HJONJKLEJ8DSN8B', 'A5AEA60GEJ2W3P2', 'DK82UHRLKDPMAI9', 'UT1LDDN38LMI4UR', 'DUUSJT0P6WEMKU4', 'SO5WCZ95O14RSQ3', 'V1VV83TBXDBKTAZ', '22J2PERM3KVJGL3', 'IM9MXK1U0TL8PD2', '5C79WQ9EJAHA1Y9', 'W632ATN1WKHG1C4', 'V9BHATWO8G2PXC6', 'Y6NBP0Z7TFIDSNG', 'TI391INCV3QJQUT', 'U2SPM7E9HWNZQ6J']
        touse=None
        testSPdf,testPDdf = saveDeviceTrips(gpsData, touse, 'testNetworkTable', savePandas=savePandas)

        if saveSPdf:
            start, cmt = clock("Saving spark dataframe to HIVE")
            saveSparkData(testSPdf, inputDBName, "testNetworkTable")
            elapsed(start, cmt)
else:
    print("Loading Device Trips in Local Mode")
    #highgeoppd = loadDeviceTrips("highgeo")
    testPDdf = loadDeviceTrips('r4h')
    touse = list(testPDdf['device'].unique())
    print("There are {0} devices in this table".format(len(touse)))
    #testPDdf = testPDdf[testPDdf['device'] == '353162070041042']
    print("There are {0} devices in this table".format(testPDdf['device'].nunique()))

In [None]:
testSPdf.count()

# Local Test

In [None]:
if isLocal is True:
    start = clock("Running manually")
    %load_ext autoreload
    %autoreload
    from network import extract_features
    from geoClustering import geoClusters
    from networkCategories import categories
    from timeUtils import getDateTime, printDateTime, clock, elapsed, getDateTime
    pddata = {}
    i=0
    for device, geodata in testPDdf.groupby('device'):
        start = clock(device, showTime=False)
        i += 1
        print(device,i,testPDdf['device'].nunique(),geodata.shape[0])
        #if i < 25:
        #    continue
        retval = extract_features(geodata, debug=False, numFeats=1071)
        pddata[device] = retval
        print("  --> {0}".format(retval.shape))
        elapsed(start, device, showTime=False)
        break
    elapsed(start, "Done running manually")

In [None]:
if isLocal is True:
    retval.T

# Spark Test

In [None]:
if isLocal is False:
    start, cmt = clock("Computing Graph Features")
    deviceNetworkData = (testSPdf
                          .repartition(200, 'device')           # turn up parallelism to ensure we're not overloading an executor
                          .rdd                                  # going to be using the rdd api here
                          .map(lambda row: (row.device, row))   # key by the dev_imei for now, but it might be better to also key by vin
                          .groupByKey()                         # rdd of (dev_imei, [row1, row2...])
                          .mapValues(rows_to_pandas_df)         # rdd of (dev_imei, df) pairs where df is all the data from that dev_imei
                          .mapValues(extract_features)          # rdd of (dev_imei, df) pairs where df that dev_imei's extracted features
                          .map(lambda tpl: tpl[1])              # drop the key, now just rdd of df's
                          .flatMap(pandas_df_to_rows)           # now rdd of Row's where each Row represents a device day
                          .toDF()                               # convert back to dataframe
                             )
    elapsed(start, cmt)
    start, cmt = clock("Counting rows")
    print("There are {0} rows in deviceNetworkData".format(deviceNetworkData.count()))
    elapsed(start, cmt)

In [None]:
dnData = deviceNetworkData.toPandas()
#dnData

from modelio import saveJoblib
saveJoblib(dnData, filename="/home/tgadf/astro_research_data/futuremiles/gpsData/portjeffNetworkFeatures.p", compress=True)
elapsed(start, "Saved Driving Network")

# Compute Full Graph Features

In [None]:
if isLocal is False:
    start, cmt = clock("Computing Graph Features")
    deviceNetworkData = (gpsData
                          .repartition(50000, 'device')         # turn up parallelism to ensure we're not overloading an executor
                          .rdd                                  # going to be using the rdd api here
                          .map(lambda row: (row.device, row))   # key by the dev_imei for now, but it might be better to also key by vin
                          .groupByKey()                         # rdd of (dev_imei, [row1, row2...])
                          .mapValues(rows_to_pandas_df)         # rdd of (dev_imei, df) pairs where df is all the data from that dev_imei
                          .mapValues(extract_features)          # rdd of (dev_imei, df) pairs where df that dev_imei's extracted features
                          .map(lambda tpl: tpl[1])              # drop the key, now just rdd of df's
                          .flatMap(pandas_df_to_rows)           # now rdd of Row's where each Row represents a device day
                          .toDF()                               # convert back to dataframe
                             )
    elapsed(start, cmt)


    start, cmt = clock("Counting and caching results")
    deviceNetworkData.cache()
    print("There are {0} devices with network parameters.".format(deviceNetworkData.count()))
    elapsed(start, cmt)


    start, cmt = clock("Saving spark dataframe to HIVE")
    saveSparkData(deviceNetworkData, inputDBName, networkTableName)
    elapsed(start, cmt)


    start, cmt = clock("Creating Pandas DF")
    deviceNetworkPandasData = deviceNetworkData.toPandas()
    elapsed(start, cmt)

    from modelio import saveJoblib
    start, cmt = clock("Saving Pandas DF")
    saveJoblib(deviceNetworkPandasData, filename="/home/tgadf/astro_research_data/futuremiles/gpsData/drivingNetworkFeatures.p", compress=True)
    elapsed(start, cmt)

In [None]:
start = clock(comment="Getting GSP Trip Endpoint Data")
deviceNetworkData = getHiveData(hc, inputDBName, networkTableName)
deviceNetworkData.cache()
elapsed(start, comment="Got GPS Trip Endpoint Data")
start = clock("")
print("There are {0} devices with network parameters.".format(deviceNetworkData.count()))
elapsed(start, "Done counting and caching results")

In [None]:
start=clock("Creating Pandas DF")
deviceNetworkPandasData = deviceNetworkData.toPandas()

from modelio import saveJoblib
saveJoblib(deviceNetworkPandasData, filename="/home/tgadf/astro_research_data/futuremiles/gpsData/drivingNetworkFeatures.p", compress=True)
elapsed(start, "Saved Driving Network")

In [None]:
df = highgeoppd
debug=True
import pandas as pd
import pygeohash
from numpy import amax
from geoClustering import geoClusters


## Make sure everything is sorted by time
if debug is True:
    start = clock("Sorting data by time")
df.sort_values(by="Start", ascending=True, inplace=True)
if debug is True:
    elapsed(start, "Done sorting data by time")

devices = list(df['device'].unique())
current_device = str(devices[0])
if len(devices) > 1:
    raise ValueError("There are [{0}] multiple devices".format(devices))

if debug:
    start = clock("Loading points of interest file")
from modelio import loadJoblib
poi = loadJoblib("poi-7.p")
if debug:
    elapsed(start, "Finished loading points of interest file")

points = df[["lat0", "long0"]]
points.columns = ["lat", "long"]
pnts = df[["lat1", "long1"]]
pnts.columns = ["lat", "long"]    
points = points.append(pnts)

if debug:
    start = clock("Finding clusters")
gc = geoClusters(points=points)
gc.setPOI(poi)
gc.findClusters(seedMin=3, debug=False)
gc.findPOIClusters()
#gc.printClusters()
if debug:
    elapsed(start, "Finished finding clusters (0)".format(gc.getNClusters()))


if debug:
    start = clock("Finding nearest cluster for start of trip")
geoResults = df[['lat0', 'long0']].apply(gc.getNearestClusters, axis=1).values
df["geo0"] = [x[0] for x in geoResults]
if debug:
    elapsed(start, "Done finding nearest cluster for start of trip")
    start = clock("Finding nearest cluster for end of trip")
geoResults = df[['lat1', 'long1']].apply(gc.getNearestClusters, axis=1).values
df["geo1"] = [x[0] for x in geoResults]    
if debug:
    elapsed(start, "Done finding nearest cluster for end of trip")

from networkTrips import getTripsFromPandas
trips = getTripsFromPandas(df, gc)

# Analysis With Full Graph Features

In [None]:
start = clock(comment="Getting GSP Trip Endpoint Data")
deviceNetworkData = getHiveData(hc, inputDBName, networkTableName)
elapsed(start, comment="Got GPS Trip Endpoint Data")
start = clock("Counting Rows")
deviceNetworkData.cache()
print("There are {0} rows in this data frame".format(deviceNetworkData.count()))
elapsed(start, "Done Counting Rows")

In [None]:
deviceNetworkPandasData = deviceNetworkData.toPandas()
from modelio import saveJoblib, loadJoblib
saveJoblib(deviceNetworkPandasData, filename="/home/tgadf/astro_research_data/futuremiles/gpsData/drivingNetworkFeatures.p", compress=True)

In [None]:
deviceNetworkPandasData[deviceNetworkPandasData['device'] == '
    highgeodata = gpsData.filter('device in '+use_devs)']

In [None]:
#saveDeviceTrips(deviceNetworkData, touse=None, name="newNetworkFeatures")

In [None]:
from modelio import saveJoblib, loadJoblib
pdData = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/newNetworkFeatures.p")

In [None]:
pdData[pdData['device'] == '352252060403962']

In [None]:
if not isLocal:
    from modelio import loadJoblib
    from pickle import load
    from timeUtils import clock, elapsed
    from pandas import DataFrame
    geoStateData5  = load(open("/home/tgadf/geoState-5.p", "rb"))
    geoCountyData5 = load(open("/home/tgadf/geoCounty-5.p", "rb"))
    geoTractData5  = load(open("/home/tgadf/geoTract-5.p", "rb"))

    geoStateData4  = load(open("/home/tgadf/geoState-4.p", "rb"))
    geoCountyData4 = load(open("/home/tgadf/geoCounty-4.p", "rb"))
    geoTractData4  = load(open("/home/tgadf/geoTract-4.p", "rb"))

    start = clock("Creating Geo4,5 Pandas DataFrames")
    df1 = DataFrame(list(zip(geoStateData4.keys(), geoStateData4.values())), columns=['Geo4', 'StateID'])
    df2 = DataFrame(list(zip(geoCountyData4.keys(), geoCountyData4.values())), columns=['Geo4', 'CountyID'])
    df3 = DataFrame(list(zip(geoTractData4.keys(), [x[:11] for x in geoTractData4.values()])), columns=['Geo4', 'TractID'])
    df4 = DataFrame(list(zip(geoTractData4.keys(), geoTractData4.values())), columns=['Geo4', 'BlkGrpID'])
    dfGeo4 = df1.merge(df2, on='Geo4', how='left').merge(df3, on="Geo4", how='left').merge(df4, on="Geo4", how='left')

    df1 = DataFrame(list(zip(geoStateData5.keys(), geoStateData5.values())), columns=['Geo5', 'StateID'])
    df2 = DataFrame(list(zip(geoCountyData5.keys(), geoCountyData5.values())), columns=['Geo5', 'CountyID'])
    df3 = DataFrame(list(zip(geoTractData5.keys(), [x[:11] for x in geoTractData5.values()])), columns=['Geo5', 'TractID'])
    df4 = DataFrame(list(zip(geoTractData5.keys(), geoTractData5.values())), columns=['Geo5', 'BlkGrpID'])
    dfGeo5 = df1.merge(df2, on='Geo5', how='left').merge(df3, on="Geo5", how='left').merge(df4, on="Geo5", how='left')
    elapsed(start, "Done with Geo Pandas DataFrame")

    start = clock("Creating Spark DataFrames")
    spdfGeo4   = scsql.createDataFrame(dfGeo4)
    spdfGeo5   = scsql.createDataFrame(dfGeo5)
    elapsed(start, "Done with Spark DataFrames")

    if False:
        start = clock("Loading all data")
        blockpopdata    = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_BlockGroup_Pop.p")
        tractpopdata    = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_Tract_Pop.p")
        tracthousedata  = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_Tract_House.p")
        tractareadata   = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_Tract_Area.p")
        urbanstatedata  = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_State.p")
        urbancountydata = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_County.p")
        urbanuapopdata  = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_County_Urban_PopDensity.p")
        urbanrapopdata  = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_County_Rural_PopDensity.p")
        urbanucpopdata  = loadJoblib("/home/tgadf/astro_research_data/futuremiles/gpsData/census/results/GeoID_County_Cluster_PopDensity.p")
        elapsed(start, "Done loading data")

        start = clock("Creating Pandas DataFrame")
        df1 = DataFrame(list(zip(urbanstatedata.keys(), urbanstatedata.values())), columns=['CountyID', 'State'])
        df2 = DataFrame(list(zip(urbancountydata.keys(), urbancountydata.values())), columns=['CountyID', 'County'])
        df3 = DataFrame(list(zip(urbanuapopdata.keys(), urbanuapopdata.values())), columns=['CountyID', 'CountyUrbanAreaPopPct'])
        df4 = DataFrame(list(zip(urbanrapopdata.keys(), urbanrapopdata.values())), columns=['CountyID', 'CountyRuralAreaPopPct'])
        df5 = DataFrame(list(zip(urbanucpopdata.keys(), urbanucpopdata.values())), columns=['CountyID', 'CountyUrbanClusterPopPct'])
        dfCounty = df1.merge(df2, on='CountyID', how='left').merge(df3, on="CountyID", how='left').merge(df4, on="CountyID", how='left').merge(df5, on="CountyID", how='left')

        df1 = DataFrame(list(zip(tractpopdata.keys(), tractpopdata.values())), columns=['TractID', 'TractPop'])
        df2 = DataFrame(list(zip(tracthousedata.keys(), tracthousedata.values())), columns=['TractID', 'TractHouses'])
        df3 = DataFrame(list(zip(tractareadata.keys(), tractareadata.values())), columns=['TractID', 'TractArea'])
        dfTract = df1.merge(df2, on='TractID', how='left').merge(df3, on="TractID", how='left')

        dfBlkGrp = DataFrame(list(zip(blockpopdata.keys(), blockpopdata.values())), columns=['BlkGrpID', 'BlkGrpPop'])
        elapsed(start, "Done with Pandas DataFrames")

        start = clock("Creating Spark DataFrames")
        spdfCounty = scsql.createDataFrame(dfCounty)
        spdfTract  = scsql.createDataFrame(dfTract)
        spdfBlkGrp = scsql.createDataFrame(dfBlkGrp)
        elapsed(start, "Done with Spark DataFrames")

In [None]:
x = '1234567'
x[:6]

In [None]:
%load_ext autoreload
%autoreload
if False:
    from driverNetwork import driverNetwork
    dn = driverNetwork(trips)
    dn.createNetwork(debug=True)
    dn.setVertexOrder(debug=True)
    dn.setEdgeOrder(debug=True)
    dn.setNetworkAttributes(debug=True)

    from networkFeatures import networkFeatures
    nf = networkFeatures(dn)
    nf.setGlobalNetworkFeatures(debug=True)
    nf.setArticulationStructure(debug=True)
    #nf.setCliqueStructure(debug=True)
    nf.setCommunityStructure(debug=True)
    nf.setDyadCensus(debug=True)
    nf.setEdgeCorrelations(debug=True)
    nf.setVertexCorrelations(debug=True)
    nf.setPointsOfInterest(debug=True)
    nf.setDayTimes(debug=True)
    nf.setDwellTimes(debug=True)
    nf.setDurations(debug=True)
    nf.setITAs(debug=True)
    nf.setEdgeFeatures(debug=True)
    nf.setVertexFeatures(debug=True)
    nf.setHomeVertexFeatures(debug=True)
    nf.setEdgeFractions(debug=True)
    nf.setVertexFractions(debug=True)
    nf.setUniqueEdgeProperties(debug=True)
    nf.setUniqueVertexProperties(debug=True)
    df = nf.getFeatureDataFrame(debug=True)

    nf = networkFeatures(dn)
    nf.setGlobalNetworkFeatures(debug=True)
    nf.setArticulationStructure(debug=True)
    #nf.setCliqueStructure(debug=True)
    nf.setCommunityStructure(debug=True)
    nf.setDyadCensus(debug=True)
    nf.setEdgeCorrelations(debug=True)
    nf.setVertexCorrelations(debug=True)
    nf.setPointsOfInterest(debug=True)
    nf.setDayTimes(debug=True)
    nf.setDwellTimes(debug=True)
    nf.setDurations(debug=True)
    nf.setITAs(debug=True)
    nf.setEdgeFeatures(debug=True)
    nf.setVertexFeatures(debug=True)
    nf.setHomeVertexFeatures(debug=True)
    nf.setEdgeFractions(debug=True)
    nf.setVertexFractions(debug=True)
    nf.setUniqueEdgeProperties(debug=True)
    nf.setUniqueVertexProperties(debug=True)
    df = nf.getFeatureDataFrame(debug=True)
    #dn.getVertexAttributes(0, debug=True)
    #dn.printEdges()

# Not Using These Right Now

## Buildings OSM Spark DataFrame

In [None]:
spOSMBuildingsMap = None
if not isLocal:
    suffix = "buildingsOSM"
    spOSMBuildingsMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName, force=False)
    if isSparkDataFrame(spOSMBuildingsMap):
        print("   Columns = {0}".format(spOSMBuildingsMap.columns))
_, _ = clock("Last Run")

## College Spark DataFrame

In [None]:
spGeoCollegeMap = None
if not isLocal:
    suffix = "College"
    spGeoCollegeMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName)
_, _ = clock("Last Run")

## Auto Spark DataFrame

In [None]:
spGeoAutoMap = None
if not isLocal:
    suffix = "Auto"
    spGeoAutoMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName)
    if isSparkDataFrame(spGeoRailMap):
        spGeoAutoMap = spGeoAutoMap.withColumnRenamed('Automatic_Traffic_Counters', 'TrafficCounters')
        spGeoAutoMap = spGeoAutoMap.withColumnRenamed('Weigh_in_Motion_Stations', 'WeighStations')
        print("   Columns = {0}".format(spGeoAutoMap.columns))
_, _ = clock("Last Run")

## Rail Spark DataFrame

In [None]:
spGeoRailMap = None
if not isLocal:
    suffix = "Rail"
    spGeoRailMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName)
    if isSparkDataFrame(spGeoRailMap):
        spGeoRailMap = spGeoRailMap.withColumnRenamed('RRBRIDGES', 'RailBridge')
        spGeoRailMap = spGeoRailMap.withColumnRenamed('Rail_Points', 'RailPoint')
        print("   Columns = {0}".format(spGeoRailMap.columns))
_, _ = clock("Last Run")

## School Spark DataFrame

In [None]:
spGeoSchoolMap = None
if not isLocal:
    suffix = "School"
    spGeoSchoolMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName)
_, _ = clock("Last Run")

## Venue Spark DataFrame

In [None]:
spGeoVenueMap = None
if not isLocal:
    suffix = "Venue"
    spGeoVenueMap = getGeohashSparkDataFrame(suffix, 7, hc, scsql, inputDBName)
    if isSparkDataFrame(spGeoVenueMap):
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Arenas_NBA', 'ArenasNBA')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Arenas_NCAA_Div_1_Basketball', 'ArenasNCAA')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Arenas_NHL', 'ArenasNHL')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Arenas_WNBA', 'ArenasWNBA')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Raceways_NASCAR', 'RacewaysNASCAR')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Stadiums_MLB', 'StadiumsMLB')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Stadiums_NCAA_Div_1_Football', 'StadiumsNCAA')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Stadiums_NFL', 'StadiumsNFL')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('State_Fairgrounds', 'StateFairgrounds')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('Tracks_Horses', 'TracksHorses')
        spGeoVenueMap = spGeoVenueMap.withColumnRenamed('TracksIRL', 'TracksIRL')
        print("   Columns = {0}".format(spGeoVenueMap.columns))
_, _ = clock("Last Run")