In [0]:
%sh
pip install geopy

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, BooleanType, ArrayType, IntegerType, DateType, time
import pyspark.sql.functions as func
import pandas as pd
import re
from datetime import datetime, date,timedelta
import numpy as np
from geopy import distance
import math
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys


In [0]:
%sh
wget --output-file="logs.csv" "https://docs.google.com/spreadsheets/d/1TzKC5vvs9AhppX8DDS25tH-MnCRtpkD3AAxJF8h4tMY/gviz/tq?tqx=out:csv&sheet=bus_stop" -O "bus_stop.csv"

In [0]:
%sh
wget --output-file="logs.csv" "https://docs.google.com/spreadsheets/d/1NTmOn-w_6NRm9ShY-sgo5VS7Xx9kEBvBlvzoNmg1aas/gviz/tq?tqx=out:csv&sheet=attraction" -O "attraction.csv"

In [0]:
bus_stop = pd.read_csv('bus_stop.csv')
bus_stop_s = spark.createDataFrame(bus_stop)
attraction = pd.read_csv('attraction.csv')
attraction_s = spark.createDataFrame(attraction)

In [0]:
schema = StructType([
StructField('_id',StructType([StructField('$oid',StringType(),True)]),True),
StructField('busStop',LongType(),True),
StructField('journeyPatternId',StringType(),True),
StructField('latitude',DoubleType(),True),
StructField('longitude',DoubleType(),True),
StructField('timestamp',StructType([StructField('$numberLong',StringType(),True)]),True),
StructField('vehicleId',LongType(),True),
StructField('lineId',StringType(),True),
StructField('justStopped',BooleanType(),True),
StructField('delay',LongType(),True),
])
 
path1 = '/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile'

 
bus_df = spark.read.json(path1, schema)
 
#clean the data
bus_df = bus_df.withColumn("_id", bus_df["_id"]["$oid"].cast(StringType()))
bus_df = bus_df.withColumnRenamed("_id", "id")

#creat new columns
udf1 = udf(lambda x:x[:5],StringType())
bus_df = bus_df.withColumn('journey',udf1('journeyPatternId'))
bus_df = bus_df.withColumn("timestamp", bus_df["timestamp"]["$numberLong"].cast(LongType()) / 1000)
bus_df = bus_df.withColumnRenamed("timestamp", "real_timestamp")
bus_df = bus_df.withColumn("date_time", func.to_utc_timestamp(func.from_unixtime(bus_df.real_timestamp,'yyyy-MM-dd HH:mm:ss'),'GMT'))
bus_df = bus_df.withColumn('timestamp', func.from_unixtime('real_timestamp'))
bus_df = bus_df.withColumn('date',func.from_unixtime('real_timestamp').cast(DateType()))
bus_df = bus_df.withColumn('hour', func.hour(bus_df.timestamp))
bus_df = bus_df.withColumn('day_of_week', func.dayofweek(bus_df.timestamp))
bus_df = bus_df.withColumn('month', func.month(bus_df.date))
bus_df = bus_df.withColumn('year', func.year(bus_df.date))
bus_df = bus_df[func.col('date').isin('2017-11-13','2017-11-14','2017-11-15','2017-11-16','2017-11-17','2017-11-18','2017-11-19')]
bus_df = bus_df[(bus_df['hour'] > 2) | (bus_df['date'] > '2017-11-13')]


In [0]:


bus_df_fill_last = bus_df.withColumn("new_delay", func.when((bus_df['delay'] >-650) & (bus_df['delay'] <900), bus_df['delay']).otherwise(None))


window = Window.partitionBy('vehicleId')\
               .orderBy('real_timestamp')\
               .rowsBetween(Window.unboundedPreceding, 0)

# define the forward-filled column
filled_column = last(bus_df_fill_last['new_delay'], ignorenulls=True).over(window)

# do the fill
bus_df_filled = bus_df_fill_last.withColumn('delay_fill', filled_column)
bus_df_filled = bus_df_filled.drop('delay')
bus_df_filled = bus_df_filled.drop('new_delay')
bus_df_filled = bus_df_filled.withColumnRenamed("delay_fill", "delay")
#fill records who stayed NaN with mean value of delay for certain data
bus_df_filled = bus_df_filled.na.fill(140.027)
bus_df_filled = bus_df_filled.sort('date_time')

In [0]:


bus_stops_journey_dict = dict()
for row in bus_df_filled.rdd.collect():
  date = row['date']
  if date not in bus_stops_journey_dict:
    bus_stops_journey_dict[date] = dict()
  j = row['journeyPatternId']
  s = str(row['busStop'])
  v = row['vehicleId']
  if v in bus_stops_journey_dict[date]:
    if s not in bus_stops_journey_dict[date][v]:
      bus_stops_journey_dict[date][v][s]= (row['date_time'] - timedelta(seconds=round(row['delay'])), j[:5])
    else:
      time_dif =  bus_stops_journey_dict[date][v][s][0] - (row['date_time'] - timedelta(seconds=round(row['delay'])))
      if time_dif.seconds/60 > 30:
        bus_stops_journey_dict[date][v][s +', ' +str(row['date_time'])]= (row['date_time'] - timedelta(seconds=round(row['delay'])), j[:5])
  else:
    bus_stops_journey_dict[date][v] =dict()
    bus_stops_journey_dict[date][v][s]= (row['date_time'] - timedelta(seconds=round(row['delay'])), j[:5])
  dist = math.inf

In [0]:
bus_stops_journey_dict_smaller_ = dict()

for date in bus_stops_journey_dict.keys():
  bus_stops_journey_dict_smaller_[date] = dict()
  for vehicle in bus_stops_journey_dict[date].keys():
    sorted_journey = dict(sorted(bus_stops_journey_dict[date][vehicle].items(), key=lambda item: item[1][0])) #sort dict according to time
    prev_t, prev_j = None,None
    cur_seq = list() 
    for s,value in sorted_journey.items(): #going throgh the sorted dict
      time = value[0]
      journey = value[1]
      if journey not in bus_stops_journey_dict_smaller_[date]:
          bus_stops_journey_dict_smaller_[date][journey] = []
      cur_t = time
      if not prev_t:
        prev_t = time
      if not prev_j:
        prev_j = journey
      delta = cur_t - prev_t
      if delta.seconds/60 < 30 or not prev_j == journey: #less than 30 minutes since last stop or changes joureny pattern
        s = str(s)
        s = s.split(', ')[0]
        s = int(s)
        if s not in cur_seq:
          cur_seq.append(s)
        prev_t = time
      else:
        if journey not in bus_stops_journey_dict_smaller_[date]:
          bus_stops_journey_dict_smaller_[date][journey] = cur_seq
        elif len(bus_stops_journey_dict_smaller_[date][journey]) < len(cur_seq):
          bus_stops_journey_dict_smaller_[date][journey] = cur_seq
        cur_seq = list()
        prev_t = None
    if len(bus_stops_journey_dict_smaller_[date][journey]) < len(cur_seq):
       bus_stops_journey_dict_smaller_[date][journey] = cur_seq

      
bus_stops_journey_dict_smaller = dict()
for date,journyes in bus_stops_journey_dict_smaller_.items():
  for journy,stops in bus_stops_journey_dict_smaller_[date].items():
    if journy not in bus_stops_journey_dict_smaller:
      bus_stops_journey_dict_smaller[journy] = stops
    elif len(stops) > len(bus_stops_journey_dict_smaller[journy]):
      bus_stops_journey_dict_smaller[journy] = stops


In [0]:
bus_stops_journey_dict_smaller_df = pd.DataFrame(columns=['busId','stops'])
stop_to_stop = dict()

for key,value in bus_stops_journey_dict_smaller.items():
  bus_stops_journey_dict_smaller_df.at[-1] = [key,value]
  bus_stops_journey_dict_smaller_df.index = bus_stops_journey_dict_smaller_df.index + 1
  for idx in range(len(value)):
    if idx == len(value) -1:
      break
    o_stop = value[idx]
    d_stop = value[idx+1]
    if (o_stop,d_stop) in stop_to_stop:
      stop_to_stop[(o_stop,d_stop)].append(key)
    else:
      stop_to_stop[(o_stop,d_stop)] = [key]
      
  
stop_to_stop_df = pd.DataFrame(columns=['origen_stop','dest_stop','lines'])
for key,value in stop_to_stop.items():
  stop_to_stop_df.at[-1] = [key[0],key[1],value]
  stop_to_stop_df.index = stop_to_stop_df.index + 1
  
bus_stops_journey_dict_smaller_df.to_csv('bus_stops_in_each_journey.csv',index = False)
stop_to_stop_df.to_csv('stop_to_stop.csv',index = False)

In [0]:
%sh
curl https://bashupload.com/stop_to_stop.csv --data-binary @stop_to_stop.csv

In [0]:
%sh
curl https://bashupload.com/bus_stops_in_each_journey.csv --data-binary @bus_stops_in_each_journey.csv

In [0]:

attraction_busStop = dict()
for idx,row_a in attraction.iterrows():
  atr = row_a['Name']
  attraction_busStop[atr] = list()
  loc_a = (row_a['Latitude'],row_a['Longitude'])
  for idx,row_b in bus_stop.iterrows():
    loc_b = (row_b['Y'],row_b['X'])
    dist = distance.distance(loc_a, loc_b).m
    if dist <= 300:
      attraction_busStop[atr].append(row_b['stop_code'])


In [0]:
attraction_stops = pd.DataFrame(columns=['attraction','bus_stations'])
cnt = 0
for key,value in attraction_busStop.items():
  attraction_stops.at[-1] = [key,value]
  attraction_stops.index =attraction_stops.index + 1
  if len(value) == 0:
    cnt += 1
# print(len(attraction_busStop)-cnt)
# attraction_stops.to_csv('attraction_stops.csv',index = False)

In [0]:
%sh
curl https://bashupload.com/attraction_stops.csv --data-binary @attraction_stops.csv

In [0]:
station_to_attrction_dict = dict()
station_to_attrction = pd.DataFrame(columns=['station','attraction','lines','throw_station'])
i= 0
for first_station in bus_stop['stop_code'].unique():
  if i % 100 == 0 and i != 0:
  for atr in attraction['Name'].unique():
    optional_stations = attraction_busStop[atr]
    if len(optional_stations) == 0:
      continue
    optinal_lines = list()
    throw_stations = dict()
    for bus, bus_stations in bus_stops_journey_dict_smaller.items():
      if first_station not in bus_stations:
        continue
      for optional_s in optional_stations:
        if optional_s in bus_stations:
          if bus_stations.index(first_station) < bus_stations.index(optional_s):
            optinal_lines.append(bus)
            throw_stations[bus] = optional_s
            break
    if len(optinal_lines) > 0:
      station_to_attrction_dict[(first_station,atr)] = optinal_lines
      tmp_df = pd.DataFrame([[first_station,atr,optinal_lines,throw_stations]], columns=['station','attraction','lines','throw_station'])
      station_to_attrction = station_to_attrction.append(tmp_df)
  i += 1
  
station_to_attrction.to_csv('station_to_attrction.csv',index = False)

In [0]:
%sh
curl https://bashupload.com/station_to_attrction.csv.csv --data-binary @station_to_attrction.csv