In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import types as T

from os.path import isfile, join, splitext
from glob import glob
from datetime import datetime

import json
import numpy as np

# Functions

In [2]:
def read_files(path, sqlContext, sc, initial_date, final_date):
    extension = splitext(path)[1]

    if extension == "":
        path_pattern = path + "/*/part-*"
        if "hdfs" in path:
            URI = sc._gateway.jvm.java.net.URI
            Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
            FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
            Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

            hdfs = "/".join(path_pattern.split("/")[:3])
            dir = "/" + "/".join(path_pattern.split("/")[3:])

            fs = FileSystem.get(URI(hdfs), Configuration())

            status = fs.globStatus(Path(dir))

            files = map(lambda file_status: str(file_status.getPath()), status)

        else:
            files = glob(path_pattern)

        print files

        files = filter(lambda f: initial_date <= datetime.strptime(f.split("/")[-2], '%Y_%m_%d_veiculos') <=
                                 final_date, files)

        print files

        return reduce(lambda df1, df2: df1.unionAll(df2),
                      map(lambda f: read_buste_data_v3(f, sqlContext), files))
    else:
        return read_file(path, sqlContext)

def get_files(path, sqlContext, sc, initial_date, final_date):

    path_pattern = path + "/*"
    if "hdfs" in path:
        URI = sc._gateway.jvm.java.net.URI
        Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
        FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
        Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

        hdfs = "/".join(path_pattern.split("/")[:3])
        dir = "/" + "/".join(path_pattern.split("/")[3:])

        fs = FileSystem.get(URI(hdfs), Configuration())

        status = fs.globStatus(Path(dir))

        files = map(lambda file_status: str(file_status.getPath()), status)

    else:
        files = glob(path_pattern)

    files = filter(lambda f: initial_date <= datetime.strptime(f.split("/")[-1], '%Y_%m_%d_veiculos') <=
    final_date, files)

    return files

def rename_columns(df, list_of_tuples):
    for (old_col, new_col) in list_of_tuples:
        df = df.withColumnRenamed(old_col, new_col)
    return df

def read_hdfs_folder(sqlContext, folderpath):
	data_frame = sqlContext.read.csv(folderpath, header=True, inferSchema=True, nullValue="-")
	data_frame = rename_columns(data_frame, [("cardNum18", "cardNum"), ("cardNum19", "userGender"),])
	date = "-".join(folderpath.split("/")[-2].split("_")[:3])
	data_frame = data_frame.withColumn("date", F.lit(date))
	data_frame = data_frame.withColumn("date", F.date_sub(F.col("date"),1))
	return data_frame

def read_buste_data_v3( filepath, sqlContext):
    data_frame = sqlContext.read.csv(filepath, header=True, inferSchema=True,nullValue="-")
    date = "-".join(filepath.split("/")[-1].split("_")[:3])
    data_frame = data_frame.withColumn("date", F.lit(date))
    data_frame = data_frame.withColumn("date", F.unix_timestamp(F.date_sub(F.col("date"),1),'yyyy-MM-dd'))
    return data_frame

def dist(lat_x, long_x, lat_y, long_y):
    return F.acos(
        F.sin(F.toRadians(lat_x)) * F.sin(F.toRadians(lat_y)) +
        F.cos(F.toRadians(lat_x)) * F.cos(F.toRadians(lat_y)) *
            F.cos(F.toRadians(long_x) - F.toRadians(long_y))
    ) * F.lit(6371.0)


def buildODMatrix(buste_data, datapath, filepath):

    clean_buste_data = buste_data.na.drop(subset=["date","route","busCode","tripNum","stopPointId","timestamp","shapeLon","shapeLat"])

    filtered_boardings = clean_buste_data.na.drop(subset=['cardNum','cardTimestamp']).dropDuplicates(['cardNum','date','cardTimestamp'])

    multiple_boardings = filtered_boardings.groupby('cardNum').count().filter(F.col('count') > 1) \
    .select(F.col("cardNum").alias("cardNum1"), F.col("count").alias("count1"))

    clean_boardings = filtered_boardings.join(multiple_boardings, filtered_boardings.cardNum == multiple_boardings.cardNum1, 'leftsemi')

    boarding_data = clean_boardings.withColumn('boarding_id',F.monotonically_increasing_id())

    user_boarding_w = Window.partitionBy(boarding_data.cardNum, boarding_data.date).orderBy(boarding_data.cardTimestamp)

    od_matrix_ids = boarding_data.select(F.col('cardNum'),
    F.col('boarding_id'),
    F.lead('boarding_id',default=-1).over(user_boarding_w).alias('next_boarding_id'),
    F.first('boarding_id',True).over(user_boarding_w).alias('first_boarding')).withColumn('next_boarding_id',
    F.when(F.col('next_boarding_id') == -1,F.col('first_boarding'))
    .otherwise(F.col('next_boarding_id'))).drop('first_boarding')


    origin_matrix = boarding_data.select(F.col("route").alias("o_route"),
    F.col("busCode").alias("o_bus_code"),
    F.col("date").alias("o_date"),
    F.col("tripNum").alias("o_tripNum"),
    F.col("cardTimestamp").alias("o_timestamp"),
    F.col("shapeId").alias("o_shape_id"),
    F.col("shapeSequence").alias("o_shape_seq"),
    F.col("shapeLat").alias("o_shape_lat"),
    F.col("shapeLon").alias("o_shape_lon"),
    F.col("stopPointId").alias("o_stop_id"),
    F.col("boarding_id").alias("o_boarding_id"))


    next_origin_matrix = boarding_data.select(F.col("route").alias("next_o_route"),
    F.col("busCode").alias("next_o_bus_code"),
    F.col("date").alias("next_o_date"),
    F.col("tripNum").alias("next_o_tripNum"),
    F.col("cardTimestamp").alias("next_o_timestamp"),
    F.col("shapeId").alias("next_o_shape_id"),
    F.col("shapeSequence").alias("next_o_shape_seq"),
    F.col("shapeLat").alias("next_o_shape_lat"),
    F.col("shapeLon").alias("next_o_shape_lon"),
    F.col("stopPointId").alias("next_o_stop_id"),
    F.col("boarding_id").alias("next_o_boarding_id"))



    user_trips_data = origin_matrix.join(od_matrix_ids, origin_matrix.o_boarding_id == od_matrix_ids.boarding_id, 'inner') \
    .join(next_origin_matrix, od_matrix_ids.next_boarding_id == next_origin_matrix.next_o_boarding_id, 'inner') \
    .drop('boarding_id').drop('next_boarding_id') \
    .withColumn('o_unixtimestamp',F.unix_timestamp(F.col('o_timestamp'), 'HH:mm:ss')) \
    .withColumn('next_o_unixtimestamp',F.unix_timestamp(F.col('next_o_timestamp'), 'HH:mm:ss')) \
    .withColumn('leg_duration',F.when(F.col('next_o_unixtimestamp') > F.col('o_unixtimestamp'), \
    ((F.col('next_o_unixtimestamp') - F.col('o_unixtimestamp'))/60.0)).otherwise(-1)) \
    .orderBy(['cardNum','o_date','o_timestamp'])
    # .withColumn('o_date',F.from_unixtime(F.unix_timestamp(F.col('o_date'),'yyyy-MM-dd'), 'yyyy-MM-dd'))\
    # .withColumn('next_o_date',F.from_unixtime(F.unix_timestamp(F.col('next_o_date'),'yyyy-MM-dd'), 'yyyy-MM-dd')) \

    bus_trip_data = clean_buste_data.orderBy(['route','busCode','tripNum','timestamp']) \
    .dropDuplicates(['route','busCode','tripNum','stopPointId']) \
    .drop('cardNum') \
    .withColumn('id',F.monotonically_increasing_id()) \
    .withColumn('route', F.col('route').cast(T.IntegerType())) \
    .withColumnRenamed('','cardNum')

    cond = [bus_trip_data.route == user_trips_data.o_route,
    bus_trip_data.busCode == user_trips_data.o_bus_code,
    bus_trip_data.date == user_trips_data.o_date,
    bus_trip_data.tripNum == user_trips_data.o_tripNum]

    w = Window().partitionBy(['cardNum','date','route','busCode','tripNum']).orderBy('dist')

    filtered_od_matrix = bus_trip_data.join(user_trips_data, cond, 'left_outer') \
    .withColumn('dist',dist(F.col('shapeLat'),F.col('shapeLon'),F.col('next_o_shape_lat'),F.col('next_o_shape_lon'))) \
    .filter('timestamp > o_timestamp') \
    .withColumn('rn', F.row_number().over(w)) \
    .where(F.col('rn') == 1) \
    .filter('dist <= 1.0') \
    .filter(user_trips_data.cardNum.isNotNull())

    trips_origins = filtered_od_matrix \
    .select(['o_date','o_route','o_bus_code','o_tripNum','o_stop_id','o_timestamp']) \
    .groupBy(['o_date','o_route','o_bus_code','o_tripNum','o_stop_id']) \
    .count() \
    .withColumnRenamed('count','boarding_cnt') \
    .withColumnRenamed('o_date','date') \
    .withColumnRenamed('o_route','route') \
    .withColumnRenamed('o_bus_code','busCode') \
    .withColumnRenamed('o_tripNum','tripNum') \
    .withColumnRenamed('o_stop_id','stopPointId')

    trips_destinations = filtered_od_matrix \
    .select(['date','route','busCode','tripNum','stopPointId','timestamp']) \
    .groupBy(['date','route','busCode','tripNum','stopPointId']) \
    .count() \
    .withColumnRenamed('count','alighting_cnt')

    trips_origins.write.csv(path=datapath+'od/trips_origins/' + filepath,header=True, mode='overwrite')
    trips_destinations.write.csv(path=datapath+'od/trips_destinations/' + filepath,header=True, mode='overwrite')

    trips_o = sqlContext.read.csv(datapath + 'od/trips_origins/' + filepath, header=True,inferSchema=True,nullValue="-")
    trips_d = sqlContext.read.csv(datapath + 'od/trips_destinations/' + filepath, header=True,inferSchema=True,nullValue="-")

    trips_passengers = trips_o.join(trips_d, on = ['date','route','busCode','tripNum','stopPointId'], how='outer')

    trips_window = Window.partitionBy(['date','route','busCode','tripNum']).orderBy('timestamp')

    od_matrix_route_boarding = filtered_od_matrix.groupby(['route']).count() \
    .withColumnRenamed('count','odmatrix_boarding')

    od_matrix_route_prop = bus_trip_data.groupby(['route']).count() \
    .withColumnRenamed('count','overall_boarding') \
    .join(od_matrix_route_boarding, 'route','left_outer') \
    .withColumn('extrap_factor',F.when(((F.col('odmatrix_boarding') == 0) | (F.col('odmatrix_boarding').isNull())), 0.0) \
    .otherwise(F.col('overall_boarding').cast('float')/F.col('odmatrix_boarding')))

    buste_crowdedness_extrapolated = bus_trip_data.join(trips_passengers, on=['date','route','busCode','tripNum','stopPointId'], how='left_outer') \
    .withColumn('crowd_bal', F.col('boarding_cnt') - F.col('alighting_cnt')) \
    .withColumn('num_pass',F.sum('crowd_bal').over(trips_window)) \
    .drop('numPassengers','gps_timestamp','gps_timestamp_in_secs') \
    .orderBy(['date','route','busCode','tripNum','timestamp']) \
    .join(od_matrix_route_prop, 'route', 'left') \
    .drop('overall_boarding','odmatrix_boarding') \
    .withColumn('ext_num_pass', F.col('num_pass')*F.col('extrap_factor'))

    return buste_crowdedness_extrapolated

def printdf(df,l=10):
    return df.limit(l).toPandas()

# OD-Matrix Generation

## Parameters

In [3]:
input_folder = '/local/tarciso/masters/data/bus_trips/test/buste-sample-output-latest//'
output_folder = '/local/tarciso/masters/data/bus_trips/test/'
initial_date = datetime.strptime('2017-05-09', '%Y-%m-%d') 
final_date = datetime.strptime('2017-05-09', '%Y-%m-%d') 

## Execution

In [4]:
global sc, sqlContext

sc = SparkContext(appName="OD matrix Builder")
sqlContext = pyspark.SQLContext(sc)

In [5]:
files = get_files(input_folder, sqlContext, sc, initial_date, final_date)

In [6]:
files

['/local/tarciso/masters/data/bus_trips/test/buste-sample-output-latest/2017_05_09_veiculos']

In [7]:
sample_file = files[0]
sample_buste_data = read_buste_data_v3('/local/tarciso/masters/data/bus_trips/test/buste-sample-output-latest/2017_05_09_veiculos', sqlContext)
buste_data = read_buste_data_v3('/local/tarciso/masters/data/bus_trips/test/buste-output-latest/2017_05_09_veiculos', sqlContext)
dailyFile = sample_file.split("/")[-1]

In [8]:
buste_data.limit(10).toPandas()

Unnamed: 0,route,tripNum,shapeId,shapeSequence,shapeLat,shapeLon,distanceTraveledShape,busCode,gpsPointId,gpsLat,...,distanceToShapePoint,timestamp,stopPointId,problem,birthdate,cardTimestamp,lineName,cardNum,gender,date
0,372,4,1891,6136674,-25.418008,-49.201094,985.183,CC170,,,...,,12:00:03,30778,BETWEEN,,,,,,1494212400
1,372,4,1891,6136668,-25.416008,-49.202063,743.173,CC170,,,...,,12:00:52,30767,BETWEEN,,,,,,1494212400
2,372,4,1891,6136658,-25.413175,-49.20397,326.247,CC170,,,...,,12:02:16,30760,BETWEEN,,,,,,1494212400
3,372,4,1891,6136644,-25.412986,-49.205264,0.0,CC170,,-25.413055,...,11.568364,12:03:22,26210,NO_PROBLEM,,,,,,1494212400
4,372,5,1891,6136658,-25.413175,-49.20397,326.247,CC170,,-25.413151,...,24.480747,12:04:49,30760,NO_PROBLEM,,,,,,1494212400
5,372,5,1891,6136668,-25.416008,-49.202063,743.173,CC170,,,...,,12:05:40,30767,BETWEEN,,,,,,1494212400
6,372,5,1891,6136674,-25.418008,-49.201094,985.183,CC170,,-25.41793,...,13.330036,12:06:23,30778,NO_PROBLEM,,,,,,1494212400
7,372,5,1891,6136679,-25.419443,-49.200402,1158.771,CC170,,-25.419556,...,13.122053,12:06:46,30780,NO_PROBLEM,,,,,,1494212400
8,372,5,1891,6136689,-25.422071,-49.199187,1474.535,CC170,,-25.42218,...,12.203533,12:07:36,30783,NO_PROBLEM,14/01/81,12:08:28,OP. CONTIGENCIA,3328982.0,F,1494212400
9,372,5,1891,6136696,-25.423411,-49.201801,1806.999,CC170,,-25.423435,...,11.065555,12:09:23,30784,NO_PROBLEM,,,,,,1494212400


In [9]:
sample_buste_data.limit(10).toPandas()

Unnamed: 0,route,tripNum,shapeId,shapeSequence,shapeLat,shapeLon,distanceTraveledShape,busCode,gpsPointId,gpsLat,...,distanceToShapePoint,timestamp,stopPointId,problem,birthdate,cardTimestamp,lineName,cardNum,gender,date
0,812,1,2211,4166977,-25.41822,-49.34032,3468.058,BA295,,-25.418238,...,6.744825,06:59:27,30326,NO_PROBLEM,12/02/90,07:00:32,MONTANA,1068796,F,1494212400
1,801,14,2203,4060715,-25.442379,-49.287696,908.601,LC072,,-25.442398,...,5.391984,20:04:11,29936,NO_PROBLEM,12/02/90,20:04:20,INTERB II ANTI H,1068796,F,1494212400
2,380,4,4127,6448295,-25.430143,-49.212594,1050.583,LC302,,-25.43011,...,9.717526,12:33:42,30202,NO_PROBLEM,28/05/82,12:34:56,DETRAN/V.MACHADO,1292886,M,1494212400
3,380,4,4128,6448918,-25.439355,-49.292391,1439.96,CC302,,-25.439253,...,23.755987,19:36:47,30165,NO_PROBLEM,28/05/82,19:37:04,DETRAN/V.MACHADO,1292886,M,1494212400
4,901,5,2899,6191462,-25.42266,-49.28995,2325.793,MC305,,-25.422608,...,9.328225,09:29:06,32796,NO_PROBLEM,20/09/74,09:29:55,STA. FELICIDADE,1319913,M,1494212400
5,901,13,2257,6191197,-25.414588,-49.308205,3192.212,MC303,,-25.414611,...,9.356814,19:09:15,33670,NO_PROBLEM,20/09/74,19:10:07,STA. FELICIDADE,1319913,M,1494212400
6,701,2,2161,6354882,-25.474086,-49.318295,1181.927,JC604,,-25.474115,...,7.406663,07:20:15,31642,NO_PROBLEM,06/03/74,07:21:17,FAZENDINHA,1345704,M,1494212400
7,703,4,2165,5889366,-25.434736,-49.272225,11888.998,JC305,,-25.434765,...,7.631959,11:12:46,26140,NO_PROBLEM,06/03/74,11:14:36,CAIUÁ,1345704,M,1494212400
8,967,6,2287,6086042,-25.402186,-49.302502,936.84,MN604,,-25.40226,...,9.469753,11:03:36,33425,NO_PROBLEM,19/12/78,11:03:44,OP. CONTIGENCIA,1953016,F,1494212400
9,183,3,1753,6096567,-25.428575,-49.271158,10934.301,BC021,,-25.428528,...,47.73123,08:57:29,26351,TRIP_PROBLEM,19/12/78,11:29:52,JD. CHAPARRAL,1953016,F,1494212400


In [39]:
sample_buste_data.count()

43

In [10]:
clean_sample_buste_data = sample_buste_data.na.drop(subset=["date","route","busCode","tripNum","stopPointId","timestamp","shapeLon","shapeLat"])
clean_buste_data = buste_data.na.drop(subset=["date","route","busCode","tripNum","stopPointId","timestamp","shapeLon","shapeLat"])
filtered_boardings = clean_sample_buste_data.na.drop(subset=['cardNum','cardTimestamp']).dropDuplicates(['cardNum','date','cardTimestamp'])
multiple_boardings = filtered_boardings.groupby('cardNum').count().filter(F.col('count') > 1) \
    .select(F.col("cardNum").alias("cardNum1"), F.col("count").alias("count1"))
clean_boardings = filtered_boardings.join(multiple_boardings, filtered_boardings.cardNum == multiple_boardings.cardNum1, 'leftsemi')

In [11]:
clean_boardings.count()

43

In [12]:
printdf(clean_boardings)

Unnamed: 0,route,tripNum,shapeId,shapeSequence,shapeLat,shapeLon,distanceTraveledShape,busCode,gpsPointId,gpsLat,...,distanceToShapePoint,timestamp,stopPointId,problem,birthdate,cardTimestamp,lineName,cardNum,gender,date
0,646,3,4030,5568627,-25.593204,-49.331786,831.868,HA285,,-25.593121,...,9.625549,07:59:30,40251,NO_PROBLEM,04/07/95,07:59:36,CASA DE CUSTÓDIA,3343535,M,1494212400
1,472,3,2889,6357271,-25.490594,-49.222306,2067.428,KC001,,-25.490553,...,4.865018,07:55:30,30078,NO_PROBLEM,06/05/68,07:55:40,UBERABA,2053439,M,1494212400
2,534,2,1977,6377663,-25.5289,-49.259803,1832.488,GA213,,-25.528913,...,1.92229,07:12:22,33751,NO_PROBLEM,18/12/84,07:13:15,OP. CONTIGENCIA,3665001,F,1494212400
3,761,5,2188,6059057,-25.472784,-49.310429,7812.424,JC859,,,...,,11:52:29,32870,BETWEEN,18/12/80,11:53:06,V. IZABEL,3577342,F,1494212400
4,342,12,1875,5847039,-25.395265,-49.239475,9032.192,BA002,,,...,,17:13:44,34897,BETWEEN,25/12/97,17:14:17,B.ALTO/BOA VISTA,3742706,M,1494212400
5,703,4,2165,5889366,-25.434736,-49.272225,11888.998,JC305,,-25.434765,...,7.631959,11:12:46,26140,NO_PROBLEM,06/03/74,11:14:36,CAIUÁ,1345704,M,1494212400
6,623,2,2048,6196218,-25.504403,-49.304601,3480.909,HA244,,-25.504405,...,4.050304,05:58:12,37830,NO_PROBLEM,17/01/54,05:59:05,OP. CONTIGENCIA,3647708,M,1494212400
7,619,10,2040,6140357,-25.54427,-49.315914,6884.402,HA254,,,...,,15:14:06,35164,BETWEEN,01/07/65,15:14:07,STA. RITA / CIC,2492016,F,1494212400
8,761,5,2188,6059057,-25.472784,-49.310429,7812.424,JC859,,,...,,11:52:29,32870,BETWEEN,18/12/80,11:53:11,V. IZABEL,3577342,F,1494212400
9,30,4,1716,6293690,-25.490586,-49.228439,19160.666,GR123,,-25.490833,...,47.090557,17:40:11,32833,NO_PROBLEM,18/12/84,17:40:41,INTERBAIRROS III,3665001,F,1494212400


In [13]:
boarding_data = clean_boardings.withColumn('boarding_id',F.monotonically_increasing_id())
user_boarding_w = Window.partitionBy(boarding_data.cardNum, boarding_data.date).orderBy(boarding_data.cardTimestamp)

od_matrix_ids = boarding_data.select(F.col('cardNum'),
    F.col('boarding_id'),
    F.lead('boarding_id',default=-1).over(user_boarding_w).alias('next_boarding_id'),
    F.first('boarding_id',True).over(user_boarding_w).alias('first_boarding')).withColumn('next_boarding_id',
    F.when(F.col('next_boarding_id') == -1,F.col('first_boarding'))
    .otherwise(F.col('next_boarding_id'))).drop('first_boarding')

In [14]:
printdf(od_matrix_ids)

Unnamed: 0,cardNum,boarding_id,next_boarding_id
0,2311743,884763262977,1417339207680
1,2311743,1417339207680,884763262977
2,3000936,1176821039104,1108101562368
3,3000936,1108101562368,1176821039104
4,3347673,1640677507072,996432412672
5,3347673,996432412672,1640677507072
6,3812035,781684047872,1065151889408
7,3812035,1065151889408,781684047872
8,3577342,231928233984,403726925824
9,3577342,403726925824,1297080123392


In [15]:
origin_matrix = boarding_data.select(F.col("route").alias("o_route"),
    F.col("busCode").alias("o_bus_code"),
    F.col("date").alias("o_date"),
    F.col("tripNum").alias("o_tripNum"),
    F.col("cardTimestamp").alias("o_timestamp"),
    F.col("shapeId").alias("o_shape_id"),
    F.col("shapeSequence").alias("o_shape_seq"),
    F.col("shapeLat").alias("o_shape_lat"),
    F.col("shapeLon").alias("o_shape_lon"),
    F.col("stopPointId").alias("o_stop_id"),
    F.col("boarding_id").alias("o_boarding_id"))


next_origin_matrix = boarding_data.select(F.col("route").alias("next_o_route"),
    F.col("busCode").alias("next_o_bus_code"),
    F.col("date").alias("next_o_date"),
    F.col("tripNum").alias("next_o_tripNum"),
    F.col("cardTimestamp").alias("next_o_timestamp"),
    F.col("shapeId").alias("next_o_shape_id"),
    F.col("shapeSequence").alias("next_o_shape_seq"),
    F.col("shapeLat").alias("next_o_shape_lat"),
    F.col("shapeLon").alias("next_o_shape_lon"),
    F.col("stopPointId").alias("next_o_stop_id"),
    F.col("boarding_id").alias("next_o_boarding_id"))

In [16]:
printdf(origin_matrix)

Unnamed: 0,o_route,o_bus_code,o_date,o_tripNum,o_timestamp,o_shape_id,o_shape_seq,o_shape_lat,o_shape_lon,o_stop_id,o_boarding_id
0,646,HA285,1494212400,3,07:59:36,4030,5568627,-25.593204,-49.331786,40251,25769803776
1,472,KC001,1494212400,3,07:55:40,2889,6357271,-25.490594,-49.222306,30078,34359738368
2,534,GA213,1494212400,2,07:13:15,1977,6377663,-25.5289,-49.259803,33751,51539607552
3,761,JC859,1494212400,5,11:53:06,2188,6059057,-25.472784,-49.310429,32870,231928233984
4,342,BA002,1494212400,12,17:14:17,1875,5847039,-25.395265,-49.239475,34897,326417514496
5,703,JC305,1494212400,4,11:14:36,2165,5889366,-25.434736,-49.272225,26140,335007449088
6,623,HA244,1494212400,2,05:59:05,2048,6196218,-25.504403,-49.304601,37830,369367187456
7,619,HA254,1494212400,10,15:14:07,2040,6140357,-25.54427,-49.315914,35164,386547056640
8,761,JC859,1494212400,5,11:53:11,2188,6059057,-25.472784,-49.310429,32870,403726925824
9,30,GR123,1494212400,4,17:40:41,1716,6293690,-25.490586,-49.228439,32833,532575944704


In [17]:
printdf(next_origin_matrix)

Unnamed: 0,next_o_route,next_o_bus_code,next_o_date,next_o_tripNum,next_o_timestamp,next_o_shape_id,next_o_shape_seq,next_o_shape_lat,next_o_shape_lon,next_o_stop_id,next_o_boarding_id
0,646,HA285,1494212400,3,07:59:36,4030,5568627,-25.593204,-49.331786,40251,25769803776
1,472,KC001,1494212400,3,07:55:40,2889,6357271,-25.490594,-49.222306,30078,34359738368
2,534,GA213,1494212400,2,07:13:15,1977,6377663,-25.5289,-49.259803,33751,51539607552
3,761,JC859,1494212400,5,11:53:06,2188,6059057,-25.472784,-49.310429,32870,231928233984
4,342,BA002,1494212400,12,17:14:17,1875,5847039,-25.395265,-49.239475,34897,326417514496
5,703,JC305,1494212400,4,11:14:36,2165,5889366,-25.434736,-49.272225,26140,335007449088
6,623,HA244,1494212400,2,05:59:05,2048,6196218,-25.504403,-49.304601,37830,369367187456
7,619,HA254,1494212400,10,15:14:07,2040,6140357,-25.54427,-49.315914,35164,386547056640
8,761,JC859,1494212400,5,11:53:11,2188,6059057,-25.472784,-49.310429,32870,403726925824
9,30,GR123,1494212400,4,17:40:41,1716,6293690,-25.490586,-49.228439,32833,532575944704


In [18]:
user_trips_data = origin_matrix.join(od_matrix_ids, origin_matrix.o_boarding_id == od_matrix_ids.boarding_id, 'inner') \
    .join(next_origin_matrix, od_matrix_ids.next_boarding_id == next_origin_matrix.next_o_boarding_id, 'inner') \
    .drop('boarding_id').drop('next_boarding_id') \
    .withColumn('o_unixtimestamp',F.unix_timestamp(F.col('o_timestamp'), 'HH:mm:ss')) \
    .withColumn('next_o_unixtimestamp',F.unix_timestamp(F.col('next_o_timestamp'), 'HH:mm:ss')) \
    .withColumn('leg_duration',F.when(F.col('next_o_unixtimestamp') > F.col('o_unixtimestamp'), \
    ((F.col('next_o_unixtimestamp') - F.col('o_unixtimestamp'))/60.0)).otherwise(-1)) \
    .orderBy(['cardNum','o_date','o_timestamp'])

In [19]:
printdf(user_trips_data.select(['o_boarding_id','next_o_boarding_id','o_route','next_o_route','o_bus_code','next_o_bus_code']))

Unnamed: 0,o_boarding_id,next_o_boarding_id,o_route,next_o_route,o_bus_code,next_o_bus_code
0,618475290624,790273982464,812,801,BA295,LC072
1,790273982464,618475290624,801,812,LC072,BA295
2,1434519076864,661424963584,380,380,LC302,CC302
3,661424963584,1434519076864,380,380,CC302,LC302
4,1116691496960,1090921693184,901,901,MC305,MC303
5,1090921693184,1116691496960,901,901,MC303,MC305
6,987842478080,335007449088,701,703,JC604,JC305
7,335007449088,987842478080,703,701,JC305,JC604
8,944892805120,910533066752,967,183,MN604,BC021
9,910533066752,1700807049216,183,967,BC021,MN604


In [20]:
bus_trip_data = clean_buste_data.orderBy(['route','busCode','tripNum','timestamp']) \
    .dropDuplicates(['route','busCode','tripNum','stopPointId']) \
    .drop('cardNum') \
    .withColumn('id',F.monotonically_increasing_id()) \
    .withColumn('route', F.col('route').cast(T.IntegerType())) \
    .withColumnRenamed('','cardNum')

In [21]:
printdf(bus_trip_data)

Unnamed: 0,route,tripNum,shapeId,shapeSequence,shapeLat,shapeLon,distanceTraveledShape,busCode,gpsPointId,gpsLat,...,distanceToShapePoint,timestamp,stopPointId,problem,birthdate,cardTimestamp,lineName,gender,date,id
0,1,13,2938,6451403,-25.433455,-49.262219,972.378,BN997,,-25.433363,...,13.169062,11:49:19,31454,NO_PROBLEM,,,,,1494212400,0
1,1,15,2938,6451475,-25.433142,-49.276703,3090.819,BN997,,-25.43315,...,5.602817,12:39:54,35219,NO_PROBLEM,,,,,1494212400,1
2,1,24,2938,6451490,-25.430831,-49.276405,3473.319,BN997,,-25.43075,...,28.135609,16:10:05,29420,NO_PROBLEM,,,,,1494212400,2
3,1,27,2938,6451424,-25.436991,-49.269392,1859.32,BN997,,,...,,17:10:01,35216,BETWEEN,,,,,1494212400,3
4,1,29,2938,6451490,-25.430831,-49.276405,3473.319,BN997,,-25.43066,...,28.576962,18:10:01,29420,NO_PROBLEM,,,,,1494212400,4
5,1,2,2938,6451490,-25.430831,-49.276405,3473.319,BN998,,,...,,07:43:27,29420,BETWEEN,,,,,1494212400,5
6,2,3,3077,4843548,-25.437491,-49.266078,4576.746,DN027,,-25.437508,...,3.323942,08:19:13,10899,NO_PROBLEM,,,,,1494212400,6
7,2,14,3077,4843564,-25.43579,-49.257191,5508.818,DN027,,-25.435803,...,10.638918,15:41:08,30225,NO_PROBLEM,05/10/53,15:41:10,C. CENTRO (AH),F,1494212400,7
8,10,2,1708,5859568,-25.436504,-49.260928,6624.916,BB302,,-25.436498,...,1.888935,08:27:04,28647,NO_PROBLEM,26/05/70,08:27:57,INTERBAIRROS I H,M,1494212400,8
9,10,11,1708,5859490,-25.418827,-49.254201,4233.147,BB302,,,...,,19:52:31,29104,BETWEEN,,,,,1494212400,9


In [22]:
cond = [bus_trip_data.route == user_trips_data.o_route,
    bus_trip_data.busCode == user_trips_data.o_bus_code,
    bus_trip_data.date == user_trips_data.o_date,
    bus_trip_data.tripNum == user_trips_data.o_tripNum]

w = Window().partitionBy(['cardNum','date','route','busCode','tripNum']).orderBy('dist')

filtered_od_matrix = bus_trip_data.join(user_trips_data, cond, 'left') \
.withColumn('dist',dist(F.col('shapeLat'),F.col('shapeLon'),F.col('next_o_shape_lat'),F.col('next_o_shape_lon'))) \
.filter('timestamp > o_timestamp') \
.withColumn('rn', F.row_number().over(w)) \
.where(F.col('rn') == 1) \
.filter('dist <= 1.0') \
.filter(user_trips_data.cardNum.isNotNull())

In [23]:
printdf(filtered_od_matrix)

Unnamed: 0,route,tripNum,shapeId,shapeSequence,shapeLat,shapeLon,distanceTraveledShape,busCode,gpsPointId,gpsLat,...,next_o_shape_seq,next_o_shape_lat,next_o_shape_lon,next_o_stop_id,next_o_boarding_id,o_unixtimestamp,next_o_unixtimestamp,leg_duration,dist,rn
0,40,4,1718,3368343,-25.513206,-49.295155,0.0,HB606,,-25.513303,...,6595143,-25.520498,-49.290104,40008,712964571136,63145,32512,-1.0,0.956238,1
1,472,3,2889,6357700,-25.435742,-49.271474,11709.572,KC001,,-25.435688,...,6357700,-25.435742,-49.271474,26182,884763262976,39340,78627,654.783333,0.0,1
2,646,3,3100,5567847,-25.597799,-49.329917,12802.335,HA285,,,...,5568599,-25.597774,-49.329915,40255,1194000908288,39576,63389,396.883333,0.00276,1
3,967,6,2287,6086350,-25.429277,-49.271228,7272.513,MN604,,-25.429286,...,6096567,-25.428575,-49.271158,26351,910533066752,50624,52192,26.133333,0.078304,1
4,870,5,2926,5386555,-25.430368,-49.32204,6087.232,BC281,,-25.4304,...,4092083,-25.430861,-49.322787,30959,1649267441664,51492,61137,160.75,0.092814,1
5,471,5,1932,6356271,-25.435529,-49.271583,9884.224,EC294,,-25.435525,...,5386375,-25.429662,-49.272567,26314,953482739712,50606,51492,14.766667,0.659822,1
6,380,4,4128,6449182,-25.43018,-49.213737,10247.773,CC302,,-25.430228,...,6448295,-25.430143,-49.212594,30202,1434519076864,81424,56096,-1.0,0.114821,1
7,701,2,2161,6355133,-25.434807,-49.272409,9552.661,JC604,,-25.434823,...,5889366,-25.434736,-49.272225,26140,335007449088,37277,51276,233.316667,0.020084,1
8,967,9,2286,6088034,-25.402184,-49.302503,6364.085,MN604,,-25.402178,...,6086042,-25.402186,-49.302502,33425,944892805120,62925,50624,-1.0,0.000233,1
9,703,4,2166,5889574,-25.474356,-49.318282,8048.015,JC305,,-25.474305,...,6354882,-25.474086,-49.318295,31642,987842478080,51276,37277,-1.0,0.029989,1


#### Evaluate Resulting OD Matrix

In [38]:
filtered_od_matrix.count()

20

In [40]:
filtered_od_matrix.printSchema()

root
 |-- route: integer (nullable = true)
 |-- tripNum: integer (nullable = true)
 |-- shapeId: integer (nullable = true)
 |-- shapeSequence: integer (nullable = true)
 |-- shapeLat: double (nullable = true)
 |-- shapeLon: double (nullable = true)
 |-- distanceTraveledShape: double (nullable = true)
 |-- busCode: string (nullable = true)
 |-- gpsPointId: string (nullable = true)
 |-- gpsLat: double (nullable = true)
 |-- gpsLon: double (nullable = true)
 |-- distanceToShapePoint: double (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- stopPointId: integer (nullable = true)
 |-- problem: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- cardTimestamp: string (nullable = true)
 |-- lineName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date: long (nullable = true)
 |-- id: long (nullable = false)
 |-- o_route: integer (nullable = true)
 |-- o_bus_code: string (nullable = true)
 |-- o_date: long (nullable = true)
 |-- o_tripNum: 

In [43]:
printdf(filtered_od_matrix.select(['cardNum','o_timestamp','route','busCode','o_stop_id','o_shape_lat','o_shape_lon','next_o_stop_id','next_o_shape_lat','next_o_shape_lon']) \
            .orderBy(['cardNum','o_timestamp']),l=20)

Unnamed: 0,cardNum,o_timestamp,route,busCode,o_stop_id,o_shape_lat,o_shape_lon,next_o_stop_id,next_o_shape_lat,next_o_shape_lon
0,1292886,12:34:56,380,LC302,30202,-25.430143,-49.212594,30165,-25.439355,-49.292391
1,1292886,19:37:04,380,CC302,30165,-25.439355,-49.292391,30202,-25.430143,-49.212594
2,1319913,09:29:55,901,MC305,32796,-25.42266,-49.28995,33670,-25.414588,-49.308205
3,1319913,19:10:07,901,MC303,33670,-25.414588,-49.308205,32796,-25.42266,-49.28995
4,1345704,07:21:17,701,JC604,31642,-25.474086,-49.318295,26140,-25.434736,-49.272225
5,1345704,11:14:36,703,JC305,26140,-25.434736,-49.272225,31642,-25.474086,-49.318295
6,1953016,11:03:44,967,MN604,33425,-25.402186,-49.302502,26351,-25.428575,-49.271158
7,1953016,11:29:52,183,BC021,26351,-25.428575,-49.271158,26345,-25.429277,-49.271228
8,1953016,14:28:45,967,MN604,26345,-25.429277,-49.271228,33425,-25.402186,-49.302502
9,2053439,07:55:40,472,KC001,30078,-25.490594,-49.222306,26182,-25.435742,-49.271474


#### Generate OD Matrix with boarding and alighting count per route-bus-stop

In [24]:
trips_origins = filtered_od_matrix \
    .select(['o_date','o_route','o_bus_code','o_tripNum','o_stop_id','o_timestamp']) \
    .groupBy(['o_date','o_route','o_bus_code','o_tripNum','o_stop_id']) \
    .count() \
    .withColumnRenamed('count','boarding_cnt') \
    .withColumnRenamed('o_date','date') \
    .withColumnRenamed('o_route','route') \
    .withColumnRenamed('o_bus_code','busCode') \
    .withColumnRenamed('o_tripNum','tripNum') \
    .withColumnRenamed('o_stop_id','stopPointId')

trips_destinations = filtered_od_matrix \
    .select(['date','route','busCode','tripNum','stopPointId','timestamp']) \
    .groupBy(['date','route','busCode','tripNum','stopPointId']) \
    .count() \
    .withColumnRenamed('count','alighting_cnt')

In [25]:
printdf(trips_origins)

Unnamed: 0,date,route,busCode,tripNum,stopPointId,boarding_cnt
0,1494212400,40,HB606,4,33144,1
1,1494212400,472,KC001,3,30078,1
2,1494212400,646,HA285,3,40251,1
3,1494212400,967,MN604,6,33425,1
4,1494212400,870,BC281,5,26314,1
5,1494212400,471,EC294,5,30749,1
6,1494212400,380,CC302,4,30165,1
7,1494212400,701,JC604,2,31642,1
8,1494212400,967,MN604,9,26345,1
9,1494212400,703,JC305,4,26140,1


In [26]:
printdf(trips_destinations)

Unnamed: 0,date,route,busCode,tripNum,stopPointId,alighting_cnt
0,1494212400,40,HB606,4,31029,1
1,1494212400,472,KC001,3,26182,1
2,1494212400,646,HA285,3,40256,1
3,1494212400,967,MN604,6,26345,1
4,1494212400,870,BC281,5,30260,1
5,1494212400,471,EC294,5,26178,1
6,1494212400,380,CC302,4,30201,1
7,1494212400,701,JC604,2,26141,1
8,1494212400,967,MN604,9,33429,1
9,1494212400,703,JC305,4,31643,1


In [27]:
trips_passengers = trips_origins.join(trips_destinations, on = ['date','route','busCode','tripNum','stopPointId'], how='outer')

In [28]:
printdf(trips_passengers)

Unnamed: 0,date,route,busCode,tripNum,stopPointId,boarding_cnt,alighting_cnt
0,1494212400,967,MN604,6,33425,1.0,
1,1494212400,901,MC303,13,32796,,1.0
2,1494212400,967,MN604,9,33429,,1.0
3,1494212400,380,CC302,4,30165,1.0,
4,1494212400,646,HA285,3,40256,,1.0
5,1494212400,870,BC281,5,26314,1.0,
6,1494212400,20,BR108,1,32729,1.0,
7,1494212400,761,JC859,5,32861,,1.0
8,1494212400,472,KC002,10,26182,1.0,
9,1494212400,40,BB615,1,34133,1.0,


In [29]:
trips_window = Window.partitionBy(['date','route','busCode','tripNum']).orderBy('timestamp')

od_matrix_route_boarding = filtered_od_matrix.groupby(['route']).count() \
.withColumnRenamed('count','odmatrix_boarding')

In [30]:
printdf(od_matrix_route_boarding)

Unnamed: 0,route,odmatrix_boarding
0,471,1
1,472,2
2,183,1
3,40,3
4,20,1
5,646,2
6,761,1
7,870,1
8,380,2
9,703,1


In [31]:
od_matrix_route_prop = bus_trip_data.groupby(['route']).count() \
    .withColumnRenamed('count','overall_boarding') \
    .join(od_matrix_route_boarding, 'route','left_outer') \
    .withColumn('extrap_factor',F.when(((F.col('odmatrix_boarding') == 0) | (F.col('odmatrix_boarding').isNull())), 0.0) \
    .otherwise(F.col('overall_boarding').cast('float')/F.col('odmatrix_boarding')))

In [32]:
printdf(od_matrix_route_prop)

Unnamed: 0,route,overall_boarding,odmatrix_boarding,extrap_factor
0,463,2384,,0.0
1,471,2617,1.0,2617.0
2,243,947,,0.0
3,540,121,,0.0
4,623,3170,,0.0
5,516,657,,0.0
6,472,2864,2.0,1432.0
7,322,1803,,0.0
8,513,2907,,0.0
9,918,983,,0.0


In [33]:
buste_crowdedness_extrapolated = bus_trip_data.join(trips_passengers, on=['date','route','busCode','tripNum','stopPointId'], how='left_outer') \
    .withColumn('crowd_bal', F.col('boarding_cnt') - F.col('alighting_cnt')) \
    .withColumn('num_pass',F.sum('crowd_bal').over(trips_window)) \
    .drop('numPassengers','gps_timestamp','gps_timestamp_in_secs') \
    .orderBy(['date','route','busCode','tripNum','timestamp']) \
    .join(od_matrix_route_prop, 'route', 'left') \
    .drop('overall_boarding','odmatrix_boarding') \
    .withColumn('ext_num_pass', F.col('num_pass')*F.col('extrap_factor'))

In [34]:
printdf(buste_crowdedness_extrapolated.filter('boarding_cnt > 0'))

Unnamed: 0,route,date,busCode,tripNum,stopPointId,shapeId,shapeSequence,shapeLat,shapeLon,distanceTraveledShape,...,cardTimestamp,lineName,gender,id,boarding_cnt,alighting_cnt,crowd_bal,num_pass,extrap_factor,ext_num_pass
0,471,1494212400,EC294,5,30749,1932,6356242,-25.43602,-49.26688,8993.16,...,11:03:26,V. SÃO PAULO,F,1503238554838,1,,,,2617.0,
1,472,1494212400,KC001,3,30078,2889,6357271,-25.490594,-49.222306,2067.428,...,07:55:40,UBERABA,M,377957123264,1,,,,1432.0,
2,472,1494212400,KC002,10,26182,2889,6357700,-25.435742,-49.271474,11709.572,...,18:51:48,UBERABA,F,979252544764,1,,,,1432.0,
3,183,1494212400,BC021,3,26351,1753,6096567,-25.428575,-49.271158,10934.301,...,11:29:04,JD. CHAPARRAL,M,738734375325,1,,,,2365.0,
4,20,1494212400,BR108,1,32729,3264,6008351,-25.469442,-49.300445,3671.688,...,07:30:13,INTERBAIRR II H,F,343597383727,1,,,,7188.0,
5,40,1494212400,BB615,1,34133,1718,3368687,-25.458186,-49.346834,12488.642,...,06:54:00,INTERBAIRROS IV,F,910533066908,1,,,,3634.333333,
6,40,1494212400,BB615,5,34134,1717,3366897,-25.457942,-49.34709,9918.823,...,15:49:11,INTERBAIRROS IV,M,1116691497131,1,,,,3634.333333,
7,40,1494212400,HB606,4,33144,1717,3366806,-25.444211,-49.34387,7806.04,...,14:32:35,INTERBAIRROS IV,F,798863917229,1,,,,3634.333333,
8,646,1494212400,HA023,9,40255,4030,5568599,-25.597774,-49.329915,238.264,...,14:36:29,POMPÉIA / JANAÍNA,M,300647712721,1,,,,1609.0,
9,646,1494212400,HA285,3,40251,4030,5568627,-25.593204,-49.331786,831.868,...,07:59:58,CASA DE CUSTÓDIA,F,223338301325,1,,,,1609.0,
