In [1]:
#find dir where Spark is installed
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import sys
from pyspark.sql.window import Window
from pyspark.sql import SQLContext

In [4]:
#initiate a Spark session
spark = SparkSession.builder.appName("NHTS").getOrCreate()

In [6]:
#read data as Spark dataframe
path = '/Users/LiangHu/Downloads/NHTS/'
trip = spark.read.csv(path+'trippub.csv', inferSchema=True, header=True)

In [7]:
trip.printSchema()

root
 |-- HOUSEID: integer (nullable = true)
 |-- PERSONID: integer (nullable = true)
 |-- TDTRPNUM: integer (nullable = true)
 |-- STRTTIME: integer (nullable = true)
 |-- ENDTIME: integer (nullable = true)
 |-- TRVLCMIN: integer (nullable = true)
 |-- TRPMILES: double (nullable = true)
 |-- TRPTRANS: integer (nullable = true)
 |-- TRPACCMP: integer (nullable = true)
 |-- TRPHHACC: integer (nullable = true)
 |-- VEHID: integer (nullable = true)
 |-- TRWAITTM: integer (nullable = true)
 |-- NUMTRANS: integer (nullable = true)
 |-- TRACCTM: integer (nullable = true)
 |-- DROP_PRK: integer (nullable = true)
 |-- TREGRTM: integer (nullable = true)
 |-- WHODROVE: integer (nullable = true)
 |-- WHYFROM: integer (nullable = true)
 |-- LOOP_TRIP: integer (nullable = true)
 |-- TRPHHVEH: integer (nullable = true)
 |-- HHMEMDRV: integer (nullable = true)
 |-- HH_ONTD: integer (nullable = true)
 |-- NONHHCNT: integer (nullable = true)
 |-- NUMONTRP: integer (nullable = true)
 |-- PSGR_FLG: integ

In [9]:
#filter
pov = trip.filter((trip['DRVR_FLG']==1) & (trip['TRPTRANS'].isin({3,4,5,6})))
pov.head(1)

[Row(HOUSEID=30000007, PERSONID=1, TDTRPNUM=1, STRTTIME=1000, ENDTIME=1015, TRVLCMIN=15, TRPMILES=5.244, TRPTRANS=3, TRPACCMP=0, TRPHHACC=0, VEHID=3, TRWAITTM=-1, NUMTRANS=-1, TRACCTM=-1, DROP_PRK=-1, TREGRTM=-1, WHODROVE=1, WHYFROM=1, LOOP_TRIP=2, TRPHHVEH=1, HHMEMDRV=1, HH_ONTD=1, NONHHCNT=0, NUMONTRP=1, PSGR_FLG=2, PUBTRANS=2, TRIPPURP=u'HBO', DWELTIME=295, TDWKND=2, VMT_MILE=5.244, DRVR_FLG=1, WHYTRP1S=20, WHYTRP90=5, ONTD_P1=1, ONTD_P2=2, ONTD_P3=2, ONTD_P4=-1, ONTD_P5=-1, ONTD_P6=-1, ONTD_P7=-1, ONTD_P8=-1, ONTD_P9=-1, ONTD_P10=-1, ONTD_P11=-1, ONTD_P12=-1, ONTD_P13=-1, TDCASEID=300000070101, TRACC_WLK=-1, TRACC_POV=-1, TRACC_BUS=-1, TRACC_CRL=-1, TRACC_SUB=-1, TRACC_OTH=-1, TREGR_WLK=-1, TREGR_POV=-1, TREGR_BUS=-1, TREGR_CRL=-1, TREGR_SUB=-1, TREGR_OTH=-1, WHYTO=19, TRAVDAY=2, HOMEOWN=1, HHSIZE=3, HHVEHCNT=5, HHFAMINC=7, DRVRCNT=3, HHSTATE=u'NC', HHSTFIPS=37, NUMADLT=3, WRKCOUNT=1, TDAYDATE=201608, HHRESP=1, LIF_CYC=10, MSACAT=3, MSASIZE=1, RAIL=2, URBAN=1, URBANSIZE=1, URBRUR=1

In [11]:
#create unique vehicle ID
pov = pov.withColumn('HOUSEID', pov['HOUSEID'].cast('long')) #convert integer to long
pov = pov.withColumn('HOUSE_VEH_ID', 100*pov['HOUSEID']+pov['VEHID'])
pov.select('HOUSE_VEH_ID').distinct().count()

155133

In [12]:
#summary trip distance
pov = pov.withColumn('VMT_MILE', pov['VMT_MILE'].cast('float')) 
VMT_ub = pov.approxQuantile('VMT_MILE', probabilities=[0.999], relativeError=0)[0]
pov.select('VMT_MILE').describe().show()

+-------+------------------+
|summary|          VMT_MILE|
+-------+------------------+
|  count|            607474|
|   mean| 9.470340577530733|
| stddev|29.261666871793306|
|    min|              -1.0|
|    max|          5441.489|
+-------+------------------+



In [13]:
#create ORIGIN based on WHYFROM
ORIGIN = (F.when(pov['WHYFROM'].isin({1,2}), 'home')\
          .when(pov['WHYFROM'].isin({3,4}), 'work')\
          .when(pov['WHYFROM']>=5, 'public'))
pov = pov.withColumn('ORIGIN', ORIGIN)

In [14]:
#create DESTINATION based on WHYTRP1S
DESTINATION = (F.when(pov['WHYTRP1S']==1, 'home')\
               .when(pov['WHYTRP1S']==10, 'work')\
               .when(pov['WHYTRP1S']>=20, 'public'))
pov = pov.withColumn('DESTINATION', DESTINATION)

In [15]:
#remove missing values and errors, delete all affected HOUSE_VEH_ID
VMT_MILE_flag = (F.when((pov['VMT_MILE']<0) | (pov['VMT_MILE']>VMT_ub), 0)\
                 .otherwise(1))
pov = pov.withColumn('VMT_MILE_flag', VMT_MILE_flag)
STRTTIME_flag = (F.when(pov['STRTTIME']<0, 0).otherwise(1))
pov = pov.withColumn('STRTTIME_flag', STRTTIME_flag)
ENDTIME_flag = (F.when(pov['ENDTIME']<0, 0).otherwise(1))
pov = pov.withColumn('ENDTIME_flag', ENDTIME_flag)
WHYTRP1S_flag = (F.when(pov['WHYTRP1S']<0, 0).otherwise(1))
pov = pov.withColumn('WHYTRP1S_flag', WHYTRP1S_flag)
WHYFROM_flag = (F.when(pov['WHYFROM']<0, 0).otherwise(1))
pov = pov.withColumn('WHYFROM_flag', WHYFROM_flag)
pov = pov.withColumn('flag', pov['VMT_MILE_flag']*pov['STRTTIME_flag']*pov['ENDTIME_flag']*pov['WHYTRP1S_flag']*pov['WHYFROM_flag'])
temp = pov.select(['HOUSE_VEH_ID', 'flag'])
temp = temp.groupBy('HOUSE_VEH_ID').mean('flag')
temp = temp.withColumn('avg(flag)', temp['avg(flag)'].cast('integer'))
temp = temp.filter(temp['avg(flag)']<1)
Remove_HOUSE_VEH_ID = temp.select('HOUSE_VEH_ID')
Remove_HOUSE_VEH_ID.show(3)
Remove_HOUSE_VEH_ID = Remove_HOUSE_VEH_ID.rdd.flatMap(lambda x: x).collect()
pov = pov.filter(~pov['HOUSE_VEH_ID'].isin(Remove_HOUSE_VEH_ID))
drop_list = ['VMT_MILE_flag', 'STRTTIME_flag', 'ENDTIME_flag', 'WHYTRP1S_flag', 'WHYFROM_flag', 'flag']
pov = pov.select([col for col in pov.columns if col not in drop_list])
pov.head(1)

+------------+
|HOUSE_VEH_ID|
+------------+
|  3026804701|
|  4022766001|
|  3020845502|
+------------+
only showing top 3 rows



[Row(HOUSEID=30000007, PERSONID=1, TDTRPNUM=1, STRTTIME=1000, ENDTIME=1015, TRVLCMIN=15, TRPMILES=5.244, TRPTRANS=3, TRPACCMP=0, TRPHHACC=0, VEHID=3, TRWAITTM=-1, NUMTRANS=-1, TRACCTM=-1, DROP_PRK=-1, TREGRTM=-1, WHODROVE=1, WHYFROM=1, LOOP_TRIP=2, TRPHHVEH=1, HHMEMDRV=1, HH_ONTD=1, NONHHCNT=0, NUMONTRP=1, PSGR_FLG=2, PUBTRANS=2, TRIPPURP=u'HBO', DWELTIME=295, TDWKND=2, VMT_MILE=5.24399995803833, DRVR_FLG=1, WHYTRP1S=20, WHYTRP90=5, ONTD_P1=1, ONTD_P2=2, ONTD_P3=2, ONTD_P4=-1, ONTD_P5=-1, ONTD_P6=-1, ONTD_P7=-1, ONTD_P8=-1, ONTD_P9=-1, ONTD_P10=-1, ONTD_P11=-1, ONTD_P12=-1, ONTD_P13=-1, TDCASEID=300000070101, TRACC_WLK=-1, TRACC_POV=-1, TRACC_BUS=-1, TRACC_CRL=-1, TRACC_SUB=-1, TRACC_OTH=-1, TREGR_WLK=-1, TREGR_POV=-1, TREGR_BUS=-1, TREGR_CRL=-1, TREGR_SUB=-1, TREGR_OTH=-1, WHYTO=19, TRAVDAY=2, HOMEOWN=1, HHSIZE=3, HHVEHCNT=5, HHFAMINC=7, DRVRCNT=3, HHSTATE=u'NC', HHSTFIPS=37, NUMADLT=3, WRKCOUNT=1, TDAYDATE=201608, HHRESP=1, LIF_CYC=10, MSACAT=3, MSASIZE=1, RAIL=2, URBAN=1, URBANSIZE=

In [16]:
#a day begins from 4am
STRTTIME_new = (F.when(pov['STRTTIME']>=400, pov['STRTTIME']-400)\
                .otherwise(pov['STRTTIME']+2000))
pov = pov.withColumn('STRTTIME_new', STRTTIME_new)
ENDTIME_new = (F.when(pov['ENDTIME']>=400, pov['ENDTIME']-400)\
                .otherwise(pov['ENDTIME']+2000))
pov = pov.withColumn('ENDTIME_new', ENDTIME_new)

In [17]:
#order data by vehicle, by trip time
pov = pov.orderBy(['HOUSE_VEH_ID', 'STRTTIME_new'])

In [None]:
#write
pov.write.csv(path+'POV_spark.csv')