## Indicadores de tiempo en parada utilizando información de la API pública de Data BA

In [1]:
!sudo conda install -c conda-forge -y pyarrow

Collecting package metadata: done
Solving environment: / 
The environment is inconsistent, please check the package plan carefully
The following packages are causing the inconsistency:

  - conda-forge/linux-64::matplotlib==3.0.3=py37_1
done


  current version: 4.6.14
  latest version: 4.7.12

Please update conda by running

    $ conda update -n base conda



# All requested packages already installed.



In [2]:
pip install pygeohash

Note: you may need to restart the kernel to use updated packages.


In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path=/home/jovyan/work/lib/postgresql-42.2.6.jar pyspark-shell'

In [4]:
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
import pygeohash as pgh
from pyspark.sql import SparkSession, Row
from pyspark.sql.window import Window
import pyspark.sql.functions as F 
import pyspark
from pyspark.sql.functions import lit, udf, col, unix_timestamp, udf, pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType, IntegerType

In [5]:
spark = SparkSession.builder.appName("BaPoints").getOrCreate()

In [6]:
points = spark.read.json('../Datasets/ba_points/points_50000_new.json')
points = points.drop('_id').selectExpr('_vehicle._trip._route_id as route_id',
                              '_vehicle._trip._trip_id as trip_id',
                              '_vehicle._stop_id as last_stop_id',
                              '_vehicle._timestamp as timestamp',
                              '_vehicle._vehicle._id as vehicle_id',
                              '_vehicle._vehicle._label as vehicle_label',
                              '_vehicle._trip._start_time as start_time',
                              '_vehicle._trip._start_date as start_date',         
                              '_vehicle._position._latitude as latitude',
                              '_vehicle._position._longitude as longitude',
                              '_vehicle._position._speed as speed')
points = points.where(~F.isnull(points.start_time))
points.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
points

DataFrame[route_id: string, trip_id: string, last_stop_id: string, timestamp: bigint, vehicle_id: string, vehicle_label: string, start_time: string, start_date: string, latitude: double, longitude: double, speed: double]

In [32]:
point_count_by_vehicle = points.groupBy('vehicle_id').count()
point_count_by_vehicle

DataFrame[vehicle_id: string, count: bigint]

In [8]:
routes = spark.read.csv('../Datasets/ba_points/routes.txt', inferSchema=True, header=True)
routes = routes.drop('route_url','route_color', 'route_text_color')
routes

DataFrame[route_id: int, agency_id: int, route_short_name: string, route_long_name: string, route_desc: string, route_type: int]

In [33]:
stop_trips = spark.read.csv('../Datasets/ba_points/stop_times.txt', inferSchema=True, header=True)
stop_trips = stop_trips.drop('arrival_time', 'departure_time', 'stop_headsign',
                             'pickup_type', 'drop_off_type', 'shape_dist_traveled')
stop_trips.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
stop_trips

DataFrame[trip_id: string, stop_id: bigint, stop_sequence: int, timepoint: int]

In [10]:
trips = spark.read.csv('../Datasets/ba_points/trips.txt', inferSchema=True, header=True)
trips = trips.drop('block_id', 'shape_id', 'wheelchair_accessible', 'bikes_allowed')
trips

DataFrame[route_id: int, service_id: int, trip_id: string, trip_headsign: string, trip_short_name: string, direction_id: int, exceptional: int]

In [11]:
stops = spark.read.csv('../Datasets/ba_points/stops.txt', inferSchema=True, header=True)
stops.drop('stop_timezone', 'wheelchair_boarding', 'stop_desc', 'zone_id', 'stop_url', 
           'location_type', 'parent_station')

DataFrame[stop_id: bigint, stop_code: bigint, stop_name: string, stop_lat: double, stop_lon: double]

In [12]:
from math import sin, cos, atan, sqrt, pi

earthRadius = 6.371e3

def toRadians(series):
    return series.mul(pi).div(180.0)

def cosS(series):
    return series.apply(cos)

def sinS(series):
    return series.apply(sin)

def absS(series):
    return series.abs()

def sqrtS(series):
    return series.apply(sqrt)

def atan2S(series1, series2):
    return (series1/series2).apply(atan)

def geodesicdistance(point1Lat, point1Lng, point2Lat, point2Lng):
    # Geodesic distance between two points on the Earth 
    # computed using Vincenty inverse problem formula 
    lat1, lng1 = toRadians(point1Lat), toRadians(point1Lng)
    lat2, lng2 = toRadians(point2Lat), toRadians(point2Lng)
    a = cosS(lat2)*sinS(absS(lng2 - lng1))
    b = cosS(lat1)*sinS(lat2)-sinS(lat1)*cosS(lat2)*cosS(abs(lng2 - lng1))
    c = sinS(lat1)*sinS(lat2)+cosS(lat1)*cosS(lat2)*cosS(abs(lng2 - lng1))
    return earthRadius*atan2S(sqrtS(a*a+b*b),c)*1000

In [13]:
@pandas_udf('float', PandasUDFType.SCALAR)
def distance(lat1,lon1,lat2,lon2):
    return geodesicdistance(lat1, lon1, lat2, lon2)

@pandas_udf('int', PandasUDFType.GROUPED_AGG)
def time_spread(timestamp):
    return timestamp.max() - timestamp.min()

@udf('string')
def geohash(lat, lon):
    return pgh.encode(lat, lon, precision=7)

In [14]:
stops = stops.withColumn('geohash', geohash(stops.stop_lat, stops.stop_lon))
points = points.withColumn('geohash', geohash(points.latitude, points.longitude))

In [15]:
stops_with_coors = stop_trips.join(stops, 'stop_id')
stops_with_points = stops_with_coors.join(points, ['trip_id', 'geohash'])
stops_with_points = stops_with_points.drop('start_date', 'vehicle_label', 'route_id', 'timepoint',
                       'stop_name', 'stop_code', 'last_stop_id')
stops_with_points.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
stops_with_points

DataFrame[trip_id: string, geohash: string, stop_id: bigint, stop_sequence: int, stop_lat: double, stop_lon: double, timestamp: bigint, vehicle_id: string, start_time: string, latitude: double, longitude: double, speed: double]

### Tiempo en parada por parada por colectivo

In [34]:
time_in_stop = stops_with_points.drop('date') \
  .withColumn("distance", distance('stop_lat', 'stop_lon','latitude', 'longitude')) \
  .orderBy("timestamp") \
  .filter(col("distance") < 50) \
  .filter(stops_with_points.speed <= 5) \
  .groupby("vehicle_id", "stop_id") \
  .agg(time_spread(col("timestamp")).alias('tiempo')) \
  .withColumn('tiempo', col('tiempo') + lit(30))
time_in_stop.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
time_in_stop

DataFrame[vehicle_id: string, stop_id: bigint, tiempo: int]

### Tiempo total en parada total por colectivo

In [35]:
total_time_in_stop = time_in_stop.groupBy('vehicle_id').agg(F.sum('tiempo').alias('total_time_in_stop'))
total_time_in_stop

DataFrame[vehicle_id: string, total_time_in_stop: bigint]

### Tiempo total trabajado por colectivo

In [36]:
total_working_time = points.groupBy('vehicle_id').agg(time_spread(col('timestamp')).alias('total_work_time'))
total_working_time

DataFrame[vehicle_id: string, total_work_time: int]

### Porcentaje de tiempo en parada por colectivo

In [37]:
time_distribution = total_time_in_stop.join(total_working_time, 'vehicle_id') \
                        .withColumn('percentage_in_stop', (F.col('total_time_in_stop') / F.col('total_work_time'))) \
                        .orderBy('vehicle_id')
time_distribution

DataFrame[vehicle_id: string, total_time_in_stop: bigint, total_work_time: int, percentage_in_stop: double]

### Paradas efectivamente paradas

In [38]:
stopped_stops = stops_with_points.drop('date') \
  .withColumn("distance", distance('stop_lat', 'stop_lon','latitude', 'longitude')) \
  .orderBy("timestamp") \
  .filter(col("distance") < 50) \
  .filter(stops_with_points.speed <= 1) \
  .dropDuplicates() \
  .orderBy('stop_id')
stopped_stops

DataFrame[trip_id: string, geohash: string, stop_id: bigint, stop_sequence: int, stop_lat: double, stop_lon: double, timestamp: bigint, vehicle_id: string, start_time: string, latitude: double, longitude: double, speed: double, distance: float]

In [39]:
window = Window.partitionBy('vehicle_id').orderBy('stop_sequence')
stop_time_interval =  stopped_stops \
    .drop('stop_lat', 'stop_lon', 'start_time', 'latitude', 'longitude', 'speed', 'distance') \
    .withColumn('to_timestamp', F.lead(col('timestamp')).over(window)) \
    .groupBy('vehicle_id', 'stop_sequence', 'stop_id', 'trip_id') \
    .agg(F.min('timestamp').alias('from_timestamp'),F.max('to_timestamp').alias('to_timestamp'))
stop_time_interval

DataFrame[vehicle_id: string, stop_sequence: int, stop_id: bigint, trip_id: string, from_timestamp: bigint, to_timestamp: bigint]

In [40]:
tickets = spark.read.json('../Datasets/passengers_in_stop/sube_transactions.json', multiLine=True)
tickets

DataFrame[amount: double, ticket_type: string, timestamp: bigint, vehicle_id: string]

In [41]:
passengers_per_stop = stop_time_interval.join(tickets, 'vehicle_id') \
    .withColumn("to_timestamp",
                F.when(col("to_timestamp").isNull(), col("from_timestamp") + lit(50)).otherwise(col("to_timestamp"))
               ) \
    .filter(col("timestamp").between(col("from_timestamp"), col("to_timestamp")))
passengers_per_stop.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
passengers_per_stop

DataFrame[vehicle_id: string, stop_sequence: int, stop_id: bigint, trip_id: string, from_timestamp: bigint, to_timestamp: bigint, amount: double, ticket_type: string, timestamp: bigint]

### Cantidad total de revenue por interno

In [42]:
total_revenue_by_bus = passengers_per_stop \
    .groupBy('vehicle_id', 'stop_id', 'from_timestamp') \
    .agg(F.sum('amount').alias('total_revenue'))
total_revenue_by_bus

DataFrame[vehicle_id: string, stop_id: bigint, from_timestamp: bigint, total_revenue: double]

### Tipo de ticket por parada por colectivo

In [43]:
ticket_amount_by_stop_by_bus = passengers_per_stop \
    .groupBy('vehicle_id', 'stop_id', 'ticket_type', 'from_timestamp') \
    .agg(F.count(F.lit(1)).alias("amount"))
ticket_amount_by_stop_by_bus

DataFrame[vehicle_id: string, stop_id: bigint, ticket_type: string, from_timestamp: bigint, amount: bigint]

### Cantidad de pasajeros nuevos por parada por colectivo

In [44]:
new_passengers_per_stop = passengers_per_stop \
    .groupBy('vehicle_id', 'stop_id', 'from_timestamp') \
    .agg(F.count(F.lit(3)).alias("amount"))
new_passengers_per_stop

DataFrame[vehicle_id: string, stop_id: bigint, from_timestamp: bigint, amount: bigint]

### Escribir al PostgreSQL

In [31]:
time_in_stop.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", "public.time_in_stop_by_bus_by_stop",
                        properties={"user": "sa", "password": "password"})
total_time_in_stop.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", "public.time_in_stop_by_bus",
                              properties={"user": "sa", "password": "password"})
total_working_time.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", "public.total_worked_time_by_bus",
                              properties={"user": "sa", "password": "password"})
time_distribution.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", "public.stop_time_efficiency",
                             properties={"user": "sa", "password": "password"})
stopped_stops.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", "public.stopped_stops",
                         properties={"user": "sa", "password": "password"})
ticket_amount_by_stop_by_bus.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", 
                                        "public.ticket_amount_by_stop_by_bus",
                                        properties={"user": "sa", "password": "password"})
new_passengers_per_stop.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", "public.new_passengers_per_stop",
                             properties={"user": "sa", "password": "password"})
total_revenue_by_bus.write.jdbc("jdbc:postgresql://postgres:5432/sparkdb", "public.total_revenue_by_bus",
                          properties={"user": "sa", "password": "password"})