In [0]:
from kafka import KafkaProducer, KafkaConsumer
import time
import json
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import requests
import geopandas as gpd
from shapely.geometry import Point
from shapely.geometry import Polygon
import pandas as pd




#### Kafka consumer do pobierania danych strumieniowych z API ZTM

In [0]:
d = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "ec2-35-158-128-12.eu-central-1.compute.amazonaws.com:9092")\
  .option("subscribe", "BUS__221")\
  .load()

##### Przygotowanie danych

In [0]:
d2 = d.select(d.value.cast(StringType()))

In [0]:
split_col = split(d2['value'],',')

In [0]:
d2 = d2.withColumn("Lines", translate(split_col.getItem(0),'[',''))
d2 = d2.withColumn("Line", translate("Lines",'"',''))
d2 = d2.withColumn("Lon", split_col.getItem(1))
d2 = d2.withColumn("VehicleNumber", translate(split_col.getItem(2),'"',''))
d2 = d2.withColumn("Time", translate(split_col.getItem(3),'"','').cast("timestamp"))
d2 = d2.withColumn("Lat", split_col.getItem(4))
d2 = d2.withColumn("Brigade", translate(split_col.getItem(5),'"',''))
d2 = d2.withColumn("BrigadeID", translate("Brigade",']',''))

In [0]:
d3 = d2.select(d2['Line'],d2['VehicleNumber'],d2['Time'],d2['BrigadeID'],d2['Lat'],d2['Lon'])
d3 = d3.withColumn('date', to_date(col('Time'),'yyyy-MM-dd'))
d3 = d3.withColumn('time', date_format(col('Time'),'HH:mm:ss'))


In [0]:
d4 = d3.select(d3['Line'],d3['VehicleNumber'],d3['BrigadeID'],d3['Lat'],d3['Lon'],d3['date'],d3['time'])

##### Wyświetlenie danych na mapie

In [0]:
display(d4)

Line,VehicleNumber,BrigadeID,Lat,Lon,date,time
221,9221,1,52.2593973,20.9802829,2023-02-05,12:05:19
221,9229,2,52.2594626,20.994034,2023-02-05,12:05:25
221,9221,1,52.2598356,20.98192,2023-02-05,12:05:37
221,9229,2,52.2594568,20.9940376,2023-02-05,12:05:36
221,9221,1,52.2599156,20.982681,2023-02-05,12:05:47
221,9229,2,52.2594426,20.9940438,2023-02-05,12:05:48
221,9221,1,52.2599993,20.9832218,2023-02-05,12:05:58
221,9229,2,52.2594376,20.9940355,2023-02-05,12:05:58
221,9221,1,52.2602107,20.9846273,2023-02-05,12:06:09
221,9229,2,52.2594401,20.9940411,2023-02-05,12:06:08


Output can only be rendered in Databricks

##### Zapisanie danych w celu dalszego przetworzenia

In [0]:
queryStream =(
    d4
    .writeStream
    .format("csv")
    .option("checkpointLocation", '/FileStore/tables/checkpoint_')    
    .option("path", '/FileStore/tables/221buus')
    .outputMode("append")
    .start())

In [0]:
data_221 = spark.read.format("csv").\
    option("delimiter", ",").\
    option("header", "false").\
    load("dbfs:/FileStore/tables/221buus")
display(data_221)

_c0,_c1,_c2,_c3,_c4,_c5,_c6
221,9218,1,52.259418,20.994095,2023-01-28,10:28:54
221,9225,2,52.2594421,20.9935845,2023-01-28,10:28:54
221,9218,1,52.259422,20.994091,2023-01-28,10:29:17
221,9225,2,52.2594291,20.9939736,2023-01-28,10:29:17
221,9218,1,52.259415,20.994095,2023-01-28,10:29:33
221,9225,2,52.2594275,20.9939864,2023-01-28,10:29:32
221,9218,1,52.259415,20.994095,2023-01-28,10:29:41
221,9225,2,52.2594266,20.9939866,2023-01-28,10:29:43
221,9218,1,52.259422,20.994095,2023-01-28,10:30:20
221,9225,2,52.2594331,20.9939828,2023-01-28,10:30:22


##### Dodanie nagłówków oraz unikalnego ID

In [0]:
headers = ['Line','VehicleNumber','BrigadeID','Lat','Lon','date','time']
bus_221 = data_221.toDF(*headers)
bus_221 = bus_221.withColumn('ID',monotonically_increasing_id())
display(bus_221)

Line,VehicleNumber,BrigadeID,Lat,Lon,date,time,ID
221,9218,1,52.259418,20.994095,2023-01-28,10:28:54,0
221,9225,2,52.2594421,20.9935845,2023-01-28,10:28:54,1
221,9218,1,52.259422,20.994091,2023-01-28,10:29:17,2
221,9225,2,52.2594291,20.9939736,2023-01-28,10:29:17,3
221,9218,1,52.259415,20.994095,2023-01-28,10:29:33,4
221,9225,2,52.2594275,20.9939864,2023-01-28,10:29:32,5
221,9218,1,52.259415,20.994095,2023-01-28,10:29:41,6
221,9225,2,52.2594266,20.9939866,2023-01-28,10:29:43,7
221,9218,1,52.259422,20.994095,2023-01-28,10:30:20,8
221,9225,2,52.2594331,20.9939828,2023-01-28,10:30:22,9


##### Wczytanie danych odnosnie przystanków

In [0]:
point_csv_df = spark.read.format("csv").\
    option("delimiter", ",").\
    option("header", "true").\
    load("dbfs:/FileStore/tables/point_221-3.csv")


In [0]:
display(point_csv_df)

_c0,SZER_GEO,DLUG_GEO,Zespol,Slupek,Nazwa_zespolu,KIERUNEK
5964,52.263658,20.978727,6039,1,pl.Grunwaldzki,"""rondo """"Radosława"""""""
5965,52.262923,20.979629,6039,2,pl.Grunwaldzki,Krasińskiego
5966,52.26285,20.979453,6039,3,pl.Grunwaldzki,"""rondo """"Radosława"""""""
5967,52.263328,20.979185,6039,4,pl.Grunwaldzki,Metro Marymont
5968,52.262406,20.978472,6039,7,pl.Grunwaldzki,"""rondo """"Radosława"""""""
5969,52.262629,20.97782,6039,8,pl.Grunwaldzki,Sady Żoliborskie
5970,52.26023,20.978699,6039,13,pl.Grunwaldzki,ks.Boguckiego
5971,52.26064,20.977151,6039,14,pl.Grunwaldzki,Anny German
6137,52.258483,20.971464,6089,3,Rydygiera,Anny German
6138,52.258131,20.970151,6089,4,Rydygiera,PKP Powązki


##### Transformacja do formatu pandas

In [0]:
stops_221 = point_csv_df.toPandas()

##### Utworzenie Geo Data Frame

In [0]:
gdf = gpd.GeoDataFrame(
    stops_221, geometry=gpd.points_from_xy(stops_221.DLUG_GEO, stops_221.SZER_GEO),crs="EPSG:4326")

##### Stworzenie bufora dookoła każdego z przystanków

In [0]:
gdf.to_crs("EPSG:3857")
gdf['geometry']=gdf.geometry.buffer(0.0002)
gdf.to_crs("EPSG:4326")


  gdf['geometry']=gdf.geometry.buffer(0.0002)


Unnamed: 0,_c0,SZER_GEO,DLUG_GEO,Zespol,Slupek,Nazwa_zespolu,KIERUNEK,geometry
0,5964,52.263658,20.978727,6039,1,pl.Grunwaldzki,"""rondo """"Radosława""""""","POLYGON ((20.97893 52.26366, 20.97893 52.26364..."
1,5965,52.262923,20.979629,6039,2,pl.Grunwaldzki,Krasińskiego,"POLYGON ((20.97983 52.26292, 20.97983 52.26290..."
2,5966,52.26285,20.979453,6039,3,pl.Grunwaldzki,"""rondo """"Radosława""""""","POLYGON ((20.97965 52.26285, 20.97965 52.26283..."
3,5967,52.263328,20.979185,6039,4,pl.Grunwaldzki,Metro Marymont,"POLYGON ((20.97939 52.26333, 20.97938 52.26331..."
4,5968,52.262406,20.978472,6039,7,pl.Grunwaldzki,"""rondo """"Radosława""""""","POLYGON ((20.97867 52.26241, 20.97867 52.26239..."
5,5969,52.262629,20.97782,6039,8,pl.Grunwaldzki,Sady Żoliborskie,"POLYGON ((20.97802 52.26263, 20.97802 52.26261..."
6,5970,52.26023,20.978699,6039,13,pl.Grunwaldzki,ks.Boguckiego,"POLYGON ((20.97890 52.26023, 20.97890 52.26021..."
7,5971,52.26064,20.977151,6039,14,pl.Grunwaldzki,Anny German,"POLYGON ((20.97735 52.26064, 20.97735 52.26062..."
8,6137,52.258483,20.971464,6089,3,Rydygiera,Anny German,"POLYGON ((20.97166 52.25848, 20.97166 52.25846..."
9,6138,52.258131,20.970151,6089,4,Rydygiera,PKP Powązki,"POLYGON ((20.97035 52.25813, 20.97035 52.25811..."


##### Wyświetlenie danych na mapie

In [0]:
from matplotlib import *
from mapclassify import *
from folium import *

In [0]:
gdf.explore()

##### Stworzenie GeoDataframe z loakliazacji autobusu

In [0]:
bus_221_p = bus_221.toPandas()
display(bus_221_p)

Line,VehicleNumber,BrigadeID,Lat,Lon,date,time,ID
221,9218,1,52.259418,20.994095,2023-01-28,10:28:54,0
221,9225,2,52.2594421,20.9935845,2023-01-28,10:28:54,1
221,9218,1,52.259422,20.994091,2023-01-28,10:29:17,2
221,9225,2,52.2594291,20.9939736,2023-01-28,10:29:17,3
221,9218,1,52.259415,20.994095,2023-01-28,10:29:33,4
221,9225,2,52.2594275,20.9939864,2023-01-28,10:29:32,5
221,9218,1,52.259415,20.994095,2023-01-28,10:29:41,6
221,9225,2,52.2594266,20.9939866,2023-01-28,10:29:43,7
221,9218,1,52.259422,20.994095,2023-01-28,10:30:20,8
221,9225,2,52.2594331,20.9939828,2023-01-28,10:30:22,9


In [0]:
gdf_bus = gpd.GeoDataFrame(
    bus_221_p, geometry=gpd.points_from_xy(bus_221_p.Lon, bus_221_p.Lat),crs="EPSG:4326")


##### Wyswietlenie danych punktowych i bufora na mapie

In [0]:

m = gdf.explore(height=250, width=500, name="Polygons")
m = gdf_bus.explore(m=m, color="red", name="Points")
LayerControl().add_to(m)
m

##### Sprawdzenie czy autobus znajduje sie w buforze sprzystanku

In [0]:
df = pd.DataFrame(columns=['ID_BUS','Zespol','Slupek','Wewn'])
for row in gdf.itertuples():
    for bus in gdf_bus.itertuples():
        list1 = [bus.ID, row.Zespol, row.Slupek, bus.geometry.within(row.geometry)]
        df.loc[len(df)] = list1
print(df)

             ID_BUS Zespol Slupek   Wewn
0                 0   6039     01  False
1                 1   6039     01  False
2                 2   6039     01  False
3                 3   6039     01  False
4                 4   6039     01  False
...             ...    ...    ...    ...
16737  111669149711   6221     02  False
16738  111669149712   6221     02  False
16739  111669149713   6221     02  False
16740  111669149714   6221     02  False
16741  111669149715   6221     02  False

[16742 rows x 4 columns]


##### Wybranie tylko rekordów, które znajdują się w buforze w celu analizy opóźnień

In [0]:
true_df = df[df['Wewn']==True]
print(true_df)

             ID_BUS Zespol Slupek  Wewn
4769    25769803787   6039     13  True
4867    34359738413   6039     13  True
5115    68719476773   6039     13  True
5213    85899345927   6039     13  True
5534    25769803791   6039     14  True
...             ...    ...    ...   ...
16448   60129542163   6221     02  True
16641   85899345940   6221     02  True
16666   94489280520   6221     02  True
16689   94489280543   6221     02  True
16718  103079215132   6221     02  True

[534 rows x 4 columns]


In [0]:
S_DF = spark.createDataFrame(true_df)

##### Wczytanie danych o rozkładzie jazdy

In [0]:
timetable_221_df = spark.read.format("csv").\
    option("delimiter", ",").\
    option("header", "true").\
    load("dbfs:/FileStore/tables/timetable_221.csv")


##### Połączenie tabel

In [0]:
join_1 = S_DF.join(bus_221, S_DF.ID_BUS == bus_221.ID,"left").drop(S_DF.ID_BUS)

##### Przygotowanie danych dotyczących rozkładu jazdu

In [0]:
timetable_221_df = timetable_221_df.withColumn('time_timetable', date_format(col('czas'),'HH:mm:ss'))

##### Dodanie kolumn +/-20 min od czasu odjazdu

In [0]:
timetable_221_df = timetable_221_df.withColumn('start_timetable',timetable_221_df.czas - expr('INTERVAL 20 MINUTES'))
timetable_221_df = timetable_221_df.withColumn('end_timetable',timetable_221_df.czas + expr('INTERVAL 20 MINUTES'))
timetable_221_df = timetable_221_df.withColumn('start_timetable', date_format(col('start_timetable'),'HH:mm:ss'))
timetable_221_df = timetable_221_df.withColumn('end_timetable', date_format(col('end_timetable'),'HH:mm:ss'))

In [0]:
timetable_221_df  = timetable_221_df.drop(timetable_221_df.Symbol_1)
timetable_221_df  = timetable_221_df.drop(timetable_221_df.Symbol_2
display(timetable_221_df)

_c0,Line,zespol,Slupek,Brygada,kierunek,trasa,time_timetable,start_timetable,end_timetable
0,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,05:05:00,04:45:00,05:25:00
1,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,05:35:00,05:15:00,05:55:00
2,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,06:05:00,05:45:00,06:25:00
3,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,06:25:00,06:05:00,06:45:00
4,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,06:45:00,06:25:00,07:05:00
5,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,07:05:00,06:45:00,07:25:00
6,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,07:25:00,07:05:00,07:45:00
7,221,6039,13,2,Dw.Gdański (Rydygiera),TP-DWR,07:45:00,07:25:00,08:05:00
8,221,6039,13,1,Dw.Gdański (Rydygiera),TP-DWR,08:05:00,07:45:00,08:25:00
9,221,6039,13,2,Dw.Gdański (Rydygiera),TP-DWR,08:25:00,08:05:00,08:45:00


##### Połączenie tabel

In [0]:
join_conditions = [timetable_221_df.zespol == join_1.Zespol,timetable_221_df.Slupek == join_1.Slupek,timetable_221_df.Brygada == join_1.BrigadeID,join_1.time > timetable_221_df.start_timetable, join_1.time < timetable_221_df.end_timetable]
join_2 = join_1.join(timetable_221_df, join_conditions, "left").drop(timetable_221_df.zespol).drop(timetable_221_df.Slupek).drop(timetable_221_df.Line).drop(timetable_221_df.Brygada).drop(timetable_221_df.czas)

In [0]:
join_condition_2 = [join_2.Zespol== point_csv_df.Zespol, join_2.Slupek == point_csv_df.Slupek]
join_3 = join_2.join(point_csv_df, join_condition_2, "left").drop(join_2.Symbol_2).drop(join_2.Symbol_1).drop(point_csv_df.Zespol).drop(point_csv_df.Slupek).drop(point_csv_df._c0).drop(point_csv_df.KIERUNEK)

##### Przygotowanie tabeli do podsumowania

In [0]:
df_summary = join_3.select(join_3['Zespol'],join_3['Nazwa_zespolu'],join_3['Slupek'],join_3['kierunek'],join_3['VehicleNumber'],join_3['BrigadeID'],join_3['date'],join_3['time'],join_3['time_timetable'])
display(df_summary)

Zespol,Nazwa_zespolu,Slupek,kierunek,VehicleNumber,BrigadeID,date,time,time_timetable
6039,pl.Grunwaldzki,13,Dw.Gdański (Rydygiera),9225,2,2023-01-28,11:36:14,11:45:00
6039,pl.Grunwaldzki,13,Dw.Gdański (Rydygiera),9225,2,2023-01-28,12:16:11,12:25:00
6039,pl.Grunwaldzki,13,Dw.Gdański (Rydygiera),9225,2,2023-01-28,12:26:33,12:25:00
6039,pl.Grunwaldzki,13,Dw.Gdański (Rydygiera),9225,2,2023-01-28,12:26:23,12:25:00
6039,pl.Grunwaldzki,14,pl.Niemena,9225,2,2023-01-28,11:36:49,11:37:00
6039,pl.Grunwaldzki,14,pl.Niemena,9218,1,2023-01-28,12:05:00,11:57:00
6039,pl.Grunwaldzki,14,pl.Niemena,9225,2,2023-01-28,11:36:38,11:37:00
6039,pl.Grunwaldzki,14,pl.Niemena,9225,2,2023-01-28,12:16:49,12:17:00
6039,pl.Grunwaldzki,14,pl.Niemena,9225,2,2023-01-28,12:17:06,12:17:00
6089,Rydygiera,3,Dw.Gdański (Rydygiera),9218,1,2023-01-28,10:43:21,10:43:00


##### Obliczenie opóźnienia

In [0]:
from pyspark.sql import functions as F
timeFmt = "HH:mm:ss"
timeDiff = round(((F.unix_timestamp('time', format=timeFmt)
            - F.unix_timestamp('time_timetable', format=timeFmt))/60),0)
df_summary = df_summary.withColumn("delay", timeDiff)

##### Wiświetlenie danych od najnowszych

In [0]:
df_summary = df_summary.distinct()
display(df_summary.sort(df_summary.time.desc()))

Zespol,Nazwa_zespolu,Slupek,kierunek,VehicleNumber,BrigadeID,date,time,time_timetable,delay
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9225,2,2023-01-28,12:37:42,12:52:00,-14.0
6215,Anny German,2,pl.Niemena,9218,1,2023-01-28,12:37:39,12:38:00,0.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9225,2,2023-01-28,12:37:25,12:52:00,-15.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9225,2,2023-01-28,12:32:14,12:52:00,-20.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9218,1,2023-01-28,12:32:11,12:32:00,0.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9225,2,2023-01-28,12:32:01,12:52:00,-20.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9225,2,2023-01-28,12:31:49,12:12:00,20.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9225,2,2023-01-28,12:31:38,12:12:00,20.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9218,1,2023-01-28,12:31:38,12:32:00,0.0
6212,Dw.Gdański (Rydygiera),71,pl.Niemena,9225,2,2023-01-28,12:31:32,12:12:00,20.0
