In [None]:
!pip install plotly-express

In [30]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType,DoubleType,DateType
import pandas as pd
from pyspark.sql.functions import acos, cos, sin, lit, toRadians

from pyspark.sql.window import Window
from pyspark.sql.functions import lag
from datetime import datetime,timedelta
import plotly.express as px

In [43]:
spark = SparkSession.builder.master("local").appName("regr").getOrCreate()
sc = spark.sparkContext
sql = SQLContext(sc)

df = spark.read.parquet('./data/masterdf.parquet')
df = df.drop('Condition')
df = df.withColumn('Time',F.to_timestamp(F.col('Time'), 'yyyy-MM-dd HH:mm:ss').alias('Time'))
df = df.dropDuplicates(['Time','VehicleNumber'])
df = df.withColumn('last_updated_epoch',F.from_unixtime(F.col('last_updated_epoch').cast('string')))

time_limit = datetime.now()
time_limit = time_limit.replace(minute=0, second=0, microsecond=0)
df = df.filter(F.col('Time') < F.to_timestamp(F.lit(time_limit)))


def dist(long_x, lat_x, long_y, lat_y):
    return acos(
        sin(toRadians(lat_x)) * sin(toRadians(lat_y)) + 
        cos(toRadians(lat_x)) * cos(toRadians(lat_y)) * 
            cos(toRadians(long_x) - toRadians(long_y))
    ) * lit(6371000.0)



w = Window().partitionBy("VehicleNumber").orderBy("time")

df = df.withColumn("dist", dist(
    "Lon", "Lat",
    F.lag("Lon", 1).over(w), F.lag("Lat", 1).over(w)
).alias("dist"))
df = df.withColumn('timedelta',F.col('time').cast('long') - F.lag('time',1).over(w).cast('long'))

df = df.withColumn('speed',3.6*(F.col('dist')/ F.col('timedelta')))
current_time  = str(datetime.now() - timedelta(minutes = 15)).split('.')[0]

#dodatkowa ramka do analiz danych tylko z teraz
df_current = df.filter(F.col('Time') > F.to_timestamp(F.lit(current_time)))

In [34]:
print(df.count())
print(df_current.count())

120907
0


In [67]:
time_limit = datetime.now() + timedelta(hours=1)
time_limit

datetime.datetime(2022, 1, 16, 13, 19, 55, 726508)

## Ilość jeżdżących pojazdów oraz ilość linii

In [57]:
df.select('VehicleNumber').distinct().groupBy().count().show()

{'count': 860}

In [63]:
df.select('Lines').distinct().groupBy().count().show()

# Ilość rekordów per godzina

In [10]:
df.withColumn('hour',F.hour('Time')).groupBy('hour').count().orderBy(F.asc('hour')).show()

+----+-----+
|hour|count|
+----+-----+
|  11| 9165|
|  12|96678|
|  13|15064|
+----+-----+



# Ilość działających autobusów per godzina

In [11]:
df.withColumn('hour',F.hour('Time')).groupBy('hour').agg(F.countDistinct('VehicleNumber')).orderBy(F.asc('hour',)).show()

+----+--------------------+
|hour|count(VehicleNumber)|
+----+--------------------+
|  11|                 841|
|  12|                 856|
|  13|                 847|
+----+--------------------+



# Ilość wpisów autobusów per update pogody

In [12]:
df.groupBy('last_updated_epoch').count().orderBy('count').show()

+-------------------+-----+
| last_updated_epoch|count|
+-------------------+-----+
|2022-01-15 10:45:00| 8330|
|               null|15899|
|2022-01-15 11:00:00|96678|
+-------------------+-----+



# Ilość autobusów per linia (która linia jest najbardziej oblegana przez autobusy)

In [13]:
df.select(['Lines','VehicleNumber']).groupBy('Lines').agg(F.countDistinct('VehicleNumber')).orderBy(F.desc('count(VehicleNumber)')).show()


+-----+--------------------+
|Lines|count(VehicleNumber)|
+-----+--------------------+
|  190|                  16|
|  189|                  14|
|  523|                  14|
|  112|                  12|
|  116|                  11|
|  180|                  11|
|  136|                  11|
|  114|                  10|
|  175|                  10|
|  141|                   9|
|  Z10|                   9|
|  509|                   9|
|  142|                   9|
|  148|                   9|
|  186|                   9|
|  709|                   9|
|  128|                   9|
|  166|                   8|
|  185|                   8|
|  138|                   8|
+-----+--------------------+
only showing top 20 rows



# Zachowanie autobusu numer 7253

In [14]:
bus_7253 = df.filter(F.col('VehicleNumber') == 7253)
px.line(bus_7253.toPandas(),x='Time',y='speed').write_html('./data/7253.html')

In [15]:
fig = px.scatter_mapbox(bus_7253.toPandas(), lat="Lat", lon="Lon",color='speed', hover_name="Lines",color_continuous_scale=px.colors.sequential.matter, size_max=20,
                        zoom=5, mapbox_style="open-street-map")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.write_html('./data/7253_speed.html')

#  Najszybszy autobus w mieście

In [16]:
df.dropna().groupBy('VehicleNumber').agg(F.avg('speed')).orderBy(F.desc('avg(speed)')).show()

+-------------+------------------+
|VehicleNumber|        avg(speed)|
+-------------+------------------+
|        15617| 51.86956723609473|
|         8579| 50.31605979301619|
|         4309|   48.650863470269|
|         4211|48.446310464583334|
|        15612|  48.2953005303818|
|        70592| 47.28042408753447|
|         2253| 45.00745488520253|
|         7739| 44.01722652103087|
|        80637| 43.71927448565875|
|        36444| 43.20944089590878|
|         8454| 43.07651311049613|
|        10606|42.600096419883144|
|         1013| 42.42508980624884|
|        80635|41.899731572224646|
|         8303| 41.84570231382412|
|         8307| 41.79034561998278|
|          779|41.613024943668876|
|         9429| 41.32905990932134|
|         9840| 41.25620760054722|
|        70575|41.234897692200214|
+-------------+------------------+
only showing top 20 rows



# Najszybsza linia w mieście

In [17]:
df.dropna().groupBy('Lines').agg(F.avg('speed')).orderBy(F.desc('avg(speed)')).show()

+-----+------------------+
|Lines|        avg(speed)|
+-----+------------------+
|  720| 50.31605979301619|
|  730|   48.650863470269|
|  733|48.446310464583334|
|  L-3| 45.95295750392964|
|  L32| 45.31395785070558|
|  703| 45.00745488520253|
|  L24| 43.96219311781683|
|  L35| 43.71927448565875|
|  L17| 43.20944089590878|
|  L-8|42.600096419883144|
|  L45|41.899731572224646|
|  743| 40.65928391689126|
|  L40| 40.23596005015094|
|  L11| 39.85383681633742|
|  707|39.580786228085906|
|  736| 39.30709021951786|
|  L15| 38.83315617771096|
|  722| 38.12256454021867|
|  L10|37.058266056779395|
|  132| 37.01687410306165|
+-----+------------------+
only showing top 20 rows



# Opady a srednia predkosc w godzinie

In [18]:
df.withColumn('hour',F.hour('Time')).dropna().groupBy('hour').agg(F.avg('speed'),F.avg('precip_mm')).orderBy(F.desc('hour')).show()

+----+------------------+--------------+
|hour|        avg(speed)|avg(precip_mm)|
+----+------------------+--------------+
|  12|25.806656873043682|           0.0|
|  11|25.458257580251736|           0.0|
+----+------------------+--------------+



# Temperatura a srednia predkosc w godzinie

In [19]:
df.withColumn('hour',F.hour('Time')).dropna().groupBy('hour').agg(F.avg('speed'),F.avg('temp_c')).orderBy(F.desc('hour')).show()

+----+------------------+-----------+
|hour|        avg(speed)|avg(temp_c)|
+----+------------------+-----------+
|  12|25.806656873043682|        2.0|
|  11|25.458257580251736|        1.0|
+----+------------------+-----------+



# Heatmapa autobusów overall

In [35]:
help(px.density_mapbox)

Help on function density_mapbox in module plotly.express._chart_types:

density_mapbox(data_frame=None, lat=None, lon=None, z=None, hover_name=None, hover_data=None, custom_data=None, animation_frame=None, animation_group=None, category_orders=None, labels=None, color_continuous_scale=None, range_color=None, color_continuous_midpoint=None, opacity=None, zoom=8, center=None, mapbox_style=None, radius=None, title=None, template=None, width=None, height=None)
        In a Mapbox density map, each row of `data_frame` contributes to the intensity of
        the color of the region around the corresponding point on the map
        
    Parameters
    ----------
    data_frame: DataFrame or array-like or dict
        This argument needs to be passed for column names (and not keyword
        names) to be used. Array-like and dict are tranformed internally to a
        pandas DataFrame. Optional: if missing, a DataFrame gets constructed
        under the hood using the other arguments.
    lat:

In [41]:
df = df.withColumn('agg_Lat',F.round(F.col('Lat'),4))
df = df.withColumn('agg_Lon',F.round(F.col('Lon'),4))
heat_map = df.groupBy(['agg_Lat','agg_Lon','key']).count().orderBy(F.col('key'))
fig = px.density_mapbox(heat_map.toPandas(), lat='agg_Lat', lon='agg_Lon', z='count',mapbox_style="open-street-map",
                       radius=10,animation_frame='key')
fig.write_html('./data/heatmap.html')