In [0]:
%sh
wget --output-file="logs.csv" "https://docs.google.com/spreadsheets/d/1S-zNqY2QSBUZ9aozOiBDZVmKP895gaGCNDWCCfwHWeg/gviz/tq?tqx=out:csv&sheet=block_points_list" -O "block_points_list.csv"

wget --output-file="logs.csv" "https://docs.google.com/spreadsheets/d/<Ask yonam92@gmail.com for id>/gviz/tq?tqx=out:csv&sheet=dublin_aq" -O "dublin_aq.csv"

wget --output-file="logs.csv" "https://docs.google.com/spreadsheets/d/1lSmbOyQNW1grk0aYw36oZsfft_ehY-SGZSVmj-pIYjg/gviz/tq?tqx=out:csv&sheet=NTA_Public_Transport_bus_stops_dublin" -O "NTA_Public_Transport_bus_stops_dublin.csv"

pip install geopy
pip install folium
pip install gdown
pip install plotly
pip install pyshp

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, BooleanType, ArrayType, IntegerType, DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import col ,expr, when, explode, to_timestamp,from_unixtime,  to_date, hour, minute, dayofweek, date_format,unix_timestamp, isnull, lag, col, last, window, last, row_number, second, year, month, date_format
from pyspark.sql.window import Window

import numpy as np
import pandas as pd
import geopy.distance
from datetime import datetime, timedelta 
import os
import re
import pickle 
import folium
import time
import collections
import plotly.express as px
import plotly.graph_objs as go
import plotly
import shapefile
import math

In [0]:
df_block_points = pd.read_csv('block_points_list.csv')
map_osm = folium.Map(location=[df_block_points.iloc[0]["LatitudeX"], df_block_points.iloc[0]["LongitudeY"]], zoom_start=12)
df_block_points.apply(lambda row:folium.Marker(location=[row["LatitudeX"], row["LongitudeY"]], radius=5).add_to(map_osm).add_child(folium.Popup(row["SiteDescription"])), axis=1)
map_osm

In [0]:
def union_bus_lines(lines1, lines2):
  output = list(set((set(lines1)).intersection(set(lines2))))
  return output

def create_bus_matrix():
  bus_stops_list = sorted(list(set(df_bus_stops.index)))
  bus_stops_list = bus_stops_list

  bus_stops_dict = dict()
  for departure_stop in bus_stops_list:
    bus_stops_dict[departure_stop]= dict()
    for arrival_stop in bus_stops_list:
      lines = union_bus_lines(df_bus_stops.loc[departure_stop]['bus_lines'], df_bus_stops.loc[arrival_stop]['bus_lines'])
      if departure_stop != arrival_stop:
        if lines:
          bus_stops_dict[departure_stop][arrival_stop] = lines

  pickle.dump(bus_stops_dict, open("bsr_matrix.pkl", "wb" ))
  bus_stops_dict = pickle.load( open( "bsr_matrix.pkl", "rb" ))
  
  
df_bus_stops = pd.read_csv('NTA_Public_Transport_bus_stops_dublin.csv')

# erase the wrong bus stops: [1756, 7596, 1365] appears twice 
df_bus_stops = df_bus_stops.drop([4696,1109,833]) # the corresponding indexes to about bus stops
df_bus_stops = df_bus_stops.set_index('stop_code', drop=False)
df_bus_stops['bus_lines'] = df_bus_stops.apply(lambda x: set(x['routes_served'].split(', ')), axis=1) 
df_bus_stops = df_bus_stops.rename(columns = {'Y':'Latitude', 'X':'Longitude'})

# DOWNLOAD PICKLE OF BUS_STOPS_MATRIX
os.system("gdown https://drive.google.com/uc?id=14Vi_yl03gMyPXx5KwVNvxRoQYjjcFhjW")
bus_stops_dict = pickle.load(open( "bus_stop_matrix.pkl", "rb" ))

In [0]:
def arrange_aq_data(df):
  # convert pollutants to be [ug/m3] - we need this for the index
  for pol in ppb_to_ugm3.keys():
    mask = (df['name'] == pol)
    mask_valid = df[mask]
    df.loc[mask, 'value'] = mask_valid['value'] * ppb_to_ugm3[pol]
    df.loc[mask, 'units'] = 'ug/m3'
    
  # round lat, lon coordinated to 6 digits  
  df['latitude'] = round(df['latitude'], 6)
  df['longitude'] = round(df['longitude'], 6)
  
  # define location column (lat,lon) together
  df['loc'] = list(zip(df.latitude, df.longitude))
  
  # datetime column
  df['datetime'] = pd.to_datetime(df['time'], infer_datetime_format=True)
  # just date column 
  df['date'] = df['datetime'].dt.date

# https://www.epa.ie/air/quality/index/
data = [[1, 33, 67, 29, 11, 16],
        [2, 65, 134, 59, 23, 33],
        [3, 100, 200, 89, 35, 50],
        [4, 120, 267, 119, 41, 58],
        [5, 140, 334, 149, 47, 66],
        [6, 160, 400, 179, 53, 75],
        [7, 187, 467, 236, 58, 83],
        [8, 213, 534, 295, 64, 91],
        [9, 240, 600, 354, 70, 100],
        [10, np.inf, np.inf, np.inf, np.inf, np.inf]]
aq_table = pd.DataFrame(columns=['aq_index', 'o3', 'no2', 'so2', 'pm25', 'pm10'], data=data) #the thresholds for each 1-10 index value, and each pollutant 

# https://www.epa.ie/air/quality/index/
aqi_colors = {1:'#BFD730',
              2:'#65B345',
              3:'#328432',
              4:'#F2BE1A',
              5:'#F7931E',
              6:'#F26522',
              7:'#ED1C24',
              8:'#B11117',
              9:'#743618',
              10:'#B43F97'}
ppb_to_ugm3 = {'no2': 1.880493136, 'o3': 1.962019118, 'so2': 2.618723267}

def get_aqi(aq_table: pd.DataFrame, aqi_colors: dict, concentrations: dict):
  """Calculated by: https://www.epa.ie/air/quality/index/"""
  aqis = list()
  for pol, value in concentrations.items():
    for aqi, breakpoint in enumerate(aq_table[pol]):
      if value <= breakpoint:
        aqis.append(aqi + 1)
        break
  if aqis:
    aqi = max(aqis) # the infered aqi is the maximum of all different measured pollutants
    return aqi, aqi_colors[aqi]
  else:
    return None
  
def get_aqi_pandas_table():
  # Download Air Quality given by BreezoMeter  
  os.system('wget --output-file="logs.csv" "https://docs.google.com/spreadsheets/d/<Ask yonam92@gmail.com for id>/gviz/tq?tqx=out:csv&sheet=dublin_aq" -O "dublin_aq.csv"')
  aq = pd.read_csv('dublin_aq.csv')[['string_uid', 'name', 'value', 'units', 'latitude', 'longitude', 'time']]
  ppb_to_ugm3 = {'no2': 1.880493136, 'o3': 1.962019118, 'so2': 2.618723267}
  arrange_aq_data(aq)
  center_station = aq[(aq['loc'] == (53.353889, -6.278056)) & (aq['name'].isin(['o3', 'so2', 'no2', 'pm10', 'pm25']))]
  center_station = center_station[['name', 'value', 'datetime']].sort_values(by='datetime')
  center_station = center_station.set_index('datetime')
  datetimes, aqis = list(), list()
  for idx in set(center_station.index):
    datetime_df = center_station.loc[idx]
    pollutants_dict = dict()
    if type(datetime_df['name']) != str:
      pollutants, values = datetime_df['name'].values, datetime_df['value'].values
    else:
      pollutants, values = [datetime_df['name']], [datetime_df['value']]
    for pol, val in zip(pollutants, values):
      pollutants_dict[pol] = val
    datetimes.append(idx)
    aqis.append(get_aqi(aq_table, aqi_colors, pollutants_dict))
  
  data = {k:v for k,v in zip(datetimes, [aqi[0] for aqi in aqis])}
  data = collections.OrderedDict(sorted(data.items()))
  aqi_pandas_df = pd.DataFrame()
  aqi_pandas_df['datetime'] = list(data.keys())
  aqi_pandas_df['aqi'] = list(data.values())
  aqi_pandas_df = aqi_pandas_df.set_index('datetime')
  return aqi_pandas_df.sort_index()

In [0]:
kafka_server = '10.0.0.30:9091'
 
# Subscribe to multiple topics
# kafka_raw_df = spark \
#   .readStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", kafka_server) \
#   .option("subscribe", "vehicleId_28051,vehicleId_28052") \
#   .option("startingOffsets", "earliest") \
#   .load()
 
# Subscribe to a pattern
kafka_raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_server) \
  .option("subscribePattern", "vehicleId_.*") \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 500000) \
  .load()
  
kafka_value_df = kafka_raw_df.selectExpr("CAST(value AS STRING)")
schema = pickle.load(open("/dbfs/mnt/schema.pkl", "rb"))
kafka_df = kafka_value_df.select(F.from_json(F.col("value"), schema=schema).alias('json')).select("json.*")
 
# TRANSFORMATION
bus_df = kafka_df.withColumn("_id", kafka_df["_id"]["$oid"].cast(StringType()))
bus_df = bus_df.withColumn("timestamp", bus_df["timestamp"]["$numberLong"].cast(LongType()) / 1000) # convert to secondes unix
bus_df = bus_df.withColumnRenamed("timestamp", "unixtimestamp")
bus_df = bus_df.withColumn('timestamp', F.from_unixtime('unixtimestamp'))
bus_df = bus_df.withColumn('date', F.from_unixtime('unixtimestamp').cast(DateType()))
bus_df = bus_df.withColumn('hour', hour(bus_df.timestamp))
bus_df = bus_df.withColumn("ellapsedtime", bus_df["ellapsedtime"] / 1000) # convert to secondes
bus_df = bus_df.withColumn('datetime', F.concat(F.col("date"), F.lit(" "), F.col("hour"), F.lit(":00:00.000")))
bus_df = bus_df.withColumn('datetime', F.to_timestamp('datetime'))
bus_df = bus_df.withColumn('timestamp', F.to_timestamp('timestamp'))
bus_df = bus_df.withColumn("distancecovered", F.round(bus_df["distancecovered"], 3))
bus_df = bus_df.select('_id', 'actualDelay','angle','areaId','areaId1','areaId2','areaId3','atStop','busStop','congestion','currentHour','delay','distanceCovered','ellapsedTime','gridID','journeyPatternId',
'justLeftStop','justStopped','latitude','lineId','longitude','systemTimestamp','unixtimestamp','timestamp','vehicleId','vehicleSpeed','date','hour','datetime')

# rename columns names
for col in bus_df.columns:
  bus_df = bus_df.withColumnRenamed(f"{col}", f"{re.sub('[_()]', '', col).lower()}") # remove _ , () and lower column names for warehouse
  
display(bus_df)

id,actualdelay,angle,areaid,areaid1,areaid2,areaid3,atstop,busstop,congestion,currenthour,delay,distancecovered,ellapsedtime,gridid,journeypatternid,justleftstop,juststopped,latitude,lineid,longitude,systemtimestamp,unixtimestamp,timestamp,vehicleid,vehiclespeed,date,hour,datetime
595a2300e45b4b2ea81ae7ad,0,0.0,2385,8,148,2385,True,1957,False,0,366,0.0,0.0,12792,00130001,False,False,53.325518,13,-6.347039,615949689.0,1499079400.0,2017-07-03T10:56:40.000+0000,43029,0,2017-07-03,10,2017-07-03T10:00:00.000+0000
595a459ee45b4b4bbcea72dc,20,-2.0,4543,17,283,4543,False,323,False,15,335,0.0,23.0,154138,00131001,False,False,53.408185,13,-6.266098,624812222.0,1499088267.0,2017-07-03T13:24:27.000+0000,43029,0,2017-07-03,13,2017-07-03T13:00:00.000+0000
595a2310e45b4b2ea81ae970,26,225.0,2385,8,148,2385,False,1958,False,7,392,0.03,24.0,12792,00130001,True,False,53.325379,13,-6.347426,615965852.0,1499079424.0,2017-07-03T10:57:04.000+0000,43029,0,2017-07-03,10,2017-07-03T10:00:00.000+0000
595a2324e45b4b2ea81aeac9,0,225.0,2385,8,148,2385,False,1959,False,11,392,0.243,16.0,12691,00130001,False,False,53.324248,13,-6.350562,615985701.0,1499079440.0,2017-07-03T10:57:20.000+0000,43029,0,2017-07-03,10,2017-07-03T10:00:00.000+0000
595a2338e45b4b2ea81aec5c,0,225.0,2385,8,148,2385,False,1959,False,17,392,0.042,20.0,12691,00130001,False,False,53.32407,13,-6.351114,616005690.0,1499079460.0,2017-07-03T10:57:40.000+0000,43029,0,2017-07-03,10,2017-07-03T10:00:00.000+0000
595a45b1e45b4b4bbcea74a6,0,225.0,4543,17,283,4543,False,323,False,21,335,0.041,23.0,154138,00131001,False,False,53.408488,13,-6.266454,624830683.0,1499088290.0,2017-07-03T13:24:50.000+0000,43029,0,2017-07-03,13,2017-07-03T13:00:00.000+0000
595a234ae45b4b2ea81aedfb,22,225.0,2385,8,148,2385,False,1959,False,22,414,0.016,20.0,12691,00130001,False,False,53.323999,13,-6.351315,616023738.0,1499079480.0,2017-07-03T10:58:00.000+0000,43029,0,2017-07-03,10,2017-07-03T10:00:00.000+0000
595a45c4e45b4b4bbcea75f8,0,0.0,4543,17,283,4543,False,323,False,2,335,0.0,0.0,154139,00131001,False,False,53.410411,13,-6.265784,624850201.0,1499088305.0,2017-07-03T13:25:05.000+0000,43029,0,2017-07-03,13,2017-07-03T13:00:00.000+0000
595a235ee45b4b2ea81aef78,0,0.0,594,8,148,594,False,1959,False,4,398,0.0,0.0,12590,00130001,False,False,53.323026,13,-6.353947,616043736.0,1499079501.0,2017-07-03T10:58:21.000+0000,43029,0,2017-07-03,10,2017-07-03T10:00:00.000+0000
595a45e1e45b4b4bbcea77a8,0,45.0,4543,17,283,4543,False,323,False,9,335,0.248,27.0,155140,00131001,False,False,53.41263,13,-6.265402,624879198.0,1499088332.0,2017-07-03T13:25:32.000+0000,43029,0,2017-07-03,13,2017-07-03T13:00:00.000+0000


In [0]:
def geo_dist_to_source(source_location, destination_location):
  return round(geopy.distance.distance(source_location, destination_location).km, 3)

def distance_from_bus_to_source_spark(bus_location_latitude, bus_location_longitude, source_location_0, source_location_1):
  bus_location = (bus_location_latitude, bus_location_longitude)
  source_location = (source_location_0, source_location_1)
  try: # to do only once in the worker
      import geopy.distance
  except:
    os.system("pip install geopy")
    import geopy.distance
  dist_to_source = round(geopy.distance.distance(bus_location, source_location).km, 3)
  return dist_to_source

def distance_from_bus_to_destination_spark(bus_location_latitude, bus_location_longitude, destination_location_0, destination_location_1):
  bus_location = (bus_location_latitude, bus_location_longitude)
  destination_location = (destination_location_0, destination_location_1)
  try: # to do only once in the worker
      import geopy.distance
  except:
    os.system("pip install geopy")
    import geopy.distance
  dist_to_dest = round(geopy.distance.distance(bus_location, destination_location).km, 3)
  return dist_to_dest

#convert to a UDF Function by passing in the function and return type of function
distance_from_bus_to_source_spark_ = F.udf(distance_from_bus_to_source_spark, DoubleType())
distance_from_bus_to_destination_spark_ = F.udf(distance_from_bus_to_destination_spark, DoubleType())

In [0]:
# clone repos 
for repo in ['data', 'aq']:  # data == map in Dublin bus control room,  # aq == air quality data plot
  os.system(f'git remote add origin https://yotammarton:<pass>@github.com/yotammarton/{repo}.git')
  os.system(f'git clone https://github.com/yotammarton/{repo}.git')
  os.system('git config --global user.email "<gmail-username>@gmail.com"')
  os.system('git config --global user.name "<github-username>"')
  
def commit_push(repo):
  os.chdir(f'/databricks/driver/{repo}')
  os.system('git config --global push.default simple')
  os.system('git add index.html')
  os.system('git commit -m "update"')
  os.system(f'git push https://yotammarton:<pass>@github.com/yotammarton/{repo}.git --all')

def plot_air_quality(aqi_pandas_df, datetime):
  """
    call this function only if aqi_pandas_df.loc[datetime]['aqi'] == 10 (otherwise the colorscale is missleading)
    usage: plot_air_quality(aqi_pandas_df, aqi_pandas_df.index[100])
  """
  df = pd.DataFrame(columns=['datetime', 'aqi'])
  df['datetime'] = list(reversed([datetime - pd.Timedelta(hours=i) for i in range(8)])) # 7 hours back timestamps
  df['aqi'] = [aqi_pandas_df.loc[dt]['aqi'] for dt in df['datetime'].values] # aqi values 7 hours back

  fig = px.scatter(df, x='datetime', y='aqi', color='aqi', size='aqi',
                   color_continuous_scale=list(aqi_colors.values()), title='Pollution level')
  cwd = os.getcwd()
  os.chdir('/databricks/driver/aq')
  plotly.offline.plot(fig, filename='index.html')
  commit_push('aq')
  os.chdir(cwd)

def create_dashboard(datetime):
  # (53.3470256,-6.2559473) is the center of the closure circle
  m = folium.Map(location=[53.3500256,-6.2429473], tiles='cartodbpositron', zoom_start=14)
  
  df_block_points.apply(lambda row: folium.Marker(location=[row["LatitudeX"], row["LongitudeY"]], icon=folium.Icon(icon='ban', prefix='fa')).add_to(m), axis=1)
  # info
  text = f"""<div style="width: 370px; font-size: 15pt; color: black; font-weight: bold; background: white; opacity: 0.75; border: 2px solid black;">
            {str(datetime)[:-3]}<br>
            High Air Pollution recorded<br>
            Recommendation: City center closure
            </div>"""
  folium.Marker(location=[53.3700256,-6.3075], icon=folium.features.DivIcon(text)).add_to(m)  # info

  # air quality map  
  text = """
          <iframe id="id"
              title="id"
              width="700"
              height="350"
              src="https://vigilant-austin-c2debe.netlify.app">
          </iframe>
         """
  
  # air quality map
  folium.Marker(location=[53.3710256,-6.2349473], icon=folium.features.DivIcon(text)).add_to(m)
  # air quality monitoring station
  folium.Marker(location=[53.353889, -6.278056], icon=folium.Icon(icon='exclamation-triangle', prefix='fa', color='red'), tooltip='Air quality<br>monitoring station').add_to(m)  
  
  cwd = os.getcwd()
  os.chdir('/databricks/driver/data')
  m.save('index.html')
  commit_push('data')
  os.chdir(cwd)

# Download shapefiles for bus lines routes; source:
# https://hub.arcgis.com/datasets/f3cd2313a3e849a798da2dbc68835c77_7?geometry=-6.362%2C53.319%2C-6.145%2C53.355&selectedAttribute=Shape__Length
os.system('gdown https://drive.google.com/uc?id=1wYA3aeOobJlaMhn4lYkoxOB5onaXUhxO')
os.system('gdown https://drive.google.com/uc?id=1eEJ7rk0Maq5W-F9lRnnKjC_kVN-_XqYr')
os.system('gdown https://drive.google.com/uc?id=1WPxk6MO1v4wnSWePVumeIAizCUwX-MZI')
os.system('gdown https://drive.google.com/uc?id=1VlB9aky1L0_p1cREEk4MR5KdraL6Bo5U')
os.system('gdown https://drive.google.com/uc?id=1IzD8Z1WTZ7GQL6RK14e3z3Rwdm83JLyo')

shape = shapefile.Reader("NTA_Public_Transport.shp")
shape_dict = {r.record.route_name: r.shape for r in shape.shapeRecords()}  # {lineid : Shape}

# Download the block points
# source: https://data.gov.ie/dataset/traffic-volumes
os.system('wget --output-file="logs.csv" "https://docs.google.com/spreadsheets/d/1S-zNqY2QSBUZ9aozOiBDZVmKP895gaGCNDWCCfwHWeg/gviz/tq?tqx=out:csv&sheet=block_points_list" -O "block_points_list.csv"')
df_block_points = pd.read_csv('block_points_list.csv')

def get_route_by_lineid(line: str):
  """
    retrieves the line of specific line give by string of 'line' 
    *before running this function run:
      gdown https://drive.google.com/uc?id=1wYA3aeOobJlaMhn4lYkoxOB5onaXUhxO
    returns parts, points for the geometric line
  """
  if line in shape_dict:
    return shape_dict[line].parts, shape_dict[line].points  # points = [(lon, lat)]
  else:
    return None

def create_lines_map_for_routes(list_of_routes: list, source_location: tuple, dest_location: tuple):
  """
    creates .html map with the lines in list_of_routes
    list_of_routes example ['1', '15A', '220']
  """
  all_parts, all_points, all_routes = [], [], []
  for route in list_of_routes:
    result = get_route_by_lineid(route)
    if result is not None:
      parts, points = result
      all_parts.append(parts)
      all_points.append(points)
      all_routes.append(route)
    
  # (53.3470256,-6.2559473) is the center of the clousre circle
  m = folium.Map(location=[53.338256, -6.3085], tiles='cartodbpositron', zoom_start=14)
  
  for parts, points, route in zip(all_parts, all_points, all_routes):
    if route not in possible_colors:
      possible_colors[route] = all_colors[0]
      del all_colors[0]
    
    # arrange polyline points     
    # https://stackoverflow.com/a/56288894/13727260
    geom = []
    for i in range(len(parts)):
        xy = []
        pt = None
        if i < len(parts) - 1:
            pt = points[parts[i]:parts[i + 1]]
        else:
            pt = points[parts[i]:]
        for x, y in pt:
            xy.append([y, x])
        geom.append(xy)

    folium.vector_layers.PolyLine(geom, popup=f'<b>Line {route}</b>', color=possible_colors[route], tooltip=f'{route}', weight=5).add_to(m)
    
  folium.Marker(location=source_location, icon=folium.Icon(icon='street-view', prefix='fa'), tooltip='Your location').add_to(m)
  folium.Marker(location=dest_location, icon=folium.Icon(icon='map-pin', prefix='fa'), tooltip='Destination').add_to(m)
  
  df_block_points.apply(lambda row:folium.CircleMarker(location=[row["LatitudeX"], row["LongitudeY"]], radius=5, color='black').add_to(m)\
                        .add_child(folium.Popup(row["SiteDescription"])), axis=1)

  text = f"""<div style="width: 700px; font-size: 15pt; color: black; font-weight: bold; background: white; opacity: 0.75; border: 2px solid black;">
            The city center is closed to cars entrance due to high air pollution<br>
            Please use the following bus line(s) to your destination<br>
            Showing live bus locations
            </div>"""
  folium.Marker(location=[53.344023,-6.310905], icon=folium.Icon(icon='info', prefix='fa', color='red')).add_to(m).add_child(folium.Popup(text))  # info
  return m

def create_maps_for_user_request(source_location: tuple, dest_location: tuple, request_time):
  """
    source_location: (lat, lon) for the user current location - start bus stop
    dest_location: (lat, lon) for the user desired destination - end bus stop
    request_time: int - unix timestamp in senconds for when the user request received
  """
  current_relevant_dfs = 0
  while True:
    # update moovit-app like maps for user request to travel from A to B     
    if request_time not in datetimes_sensors or current_relevant_dfs == len(datetimes_sensors[request_time]):
      time.sleep(10)  # no new dfs are relevant for the user request for a bus ride
      continue
    else: # there is a new df processed with bus data that is relevant for the user request
      current_relevant_dfs = len(datetimes_sensors[request_time])
      current_df = datetimes_sensors[request_time][-1]
      current_df = current_df.sort_values(by='unixtimestamp')
      current_df['unixtimestamp'] = current_df['unixtimestamp'].astype(int)
      
      vehicles_to_remove = set()
      for vehicle in current_df['vehicleid'].unique():
        vehicle_df = current_df[current_df['vehicleid'] == vehicle].reset_index(drop=True)
        if len(vehicle_df) < 2:
          vehicles_to_remove.add(vehicle)
          continue
        # we want (bus getting closer to source & bus getting closer to destination)
        # the NOT is = ~(bus getting closer to source) or ~(bus getting closer to destination)
        if (not vehicle_df.loc[0]['distance_to_source'] > vehicle_df.loc[1]['distance_to_source'] or \
            not vehicle_df.loc[0]['distance_to_destination'] > vehicle_df.loc[1]['distance_to_destination']) or \
            (vehicle_df.loc[0]['distance_to_source'] > vehicle_df.loc[0]['distance_to_destination']): # we want only sensors that are closer to source than to destination
          vehicles_to_remove.add(vehicle)
          continue
      current_df = current_df[~current_df['vehicleid'].isin(vehicles_to_remove)]  
      wanted_vehicles = list(current_df['vehicleid'].value_counts().index[:3])  # TOP 3 vehicles with maximum amount of records
      current_df = current_df[current_df['vehicleid'].isin(wanted_vehicles)]
      wanted_lines = list(current_df['lineid'].unique())
      
      wanted_vehicles_dfs = [current_df[current_df['vehicleid'] == vehicle].reset_index(drop=True) for vehicle in wanted_vehicles]
      lens = [len(vehicle_df) for vehicle_df in wanted_vehicles_dfs]
      if not lens:
        continue
      min_rows = min(lens)
      for i in range(min_rows):
        m = create_lines_map_for_routes(list_of_routes=wanted_lines, source_location=source_location, dest_location=dest_location)
        for vehicle_df in wanted_vehicles_dfs:
          relevant_row = vehicle_df.iloc[i]
          lat, lon = relevant_row['latitude'], relevant_row['longitude']
          folium.Marker(location=(lat, lon), icon=folium.Icon(icon='bus', prefix='fa', color=possible_colors[relevant_row['lineid']]), tooltip=f'{relevant_row["vehicleid"]}')\
          .add_to(m).add_child(folium.Popup(f'{relevant_row["vehicleid"]}'))
        cur_time = time.time()
        filename = f"map_{i}_{str(cur_time).replace('.', '_')}.html"
        print(f'saving map for user request {os.path.join("/databricks/driver", filename)}')
        m.save(os.path.join('/databricks/driver', filename))
      break

In [0]:
aq = pd.read_csv('dublin_aq.csv')[['string_uid', 'name', 'value', 'units', 'latitude', 'longitude', 'time']]
aq.head(10)

Unnamed: 0,string_uid,name,value,units,latitude,longitude,time
0,c6h65335388-6278052017-07-15T00:00:00+00:00,c6h6,0.1944,ug/m3,53.353889,-6.278056,2017-07-15 00:00:00 UTC
1,c6h65335388-6278052017-07-23T02:00:00+00:00,c6h6,0.1944,ug/m3,53.353889,-6.278056,2017-07-23 02:00:00 UTC
2,c6h65335388-6278052017-09-04T06:00:00+00:00,c6h6,3.3372,ug/m3,53.353889,-6.278056,2017-09-04 06:00:00 UTC
3,c6h65335388-6278052018-07-15T20:00:00+00:00,c6h6,0.0,ug/m3,53.353889,-6.278056,2018-07-15 20:00:00 UTC
4,c6h65335388-6278052018-08-08T17:00:00+00:00,c6h6,0.0,ug/m3,53.353889,-6.278056,2018-08-08 17:00:00 UTC
5,c6h65335388-6278052018-08-13T02:00:00+00:00,c6h6,0.0,ug/m3,53.353889,-6.278056,2018-08-13 02:00:00 UTC
6,c6h65335388-6278052018-08-18T02:00:00+00:00,c6h6,0.0,ug/m3,53.353889,-6.278056,2018-08-18 02:00:00 UTC
7,c6h65335388-6278052018-08-29T15:00:00+00:00,c6h6,1.50984,ug/m3,53.353889,-6.278056,2018-08-29 15:00:00 UTC
8,c6h65335388-6278052018-09-05T22:00:00+00:00,c6h6,0.0,ug/m3,53.353889,-6.278056,2018-09-05 22:00:00 UTC
9,co5334166-6288882017-11-02T12:00:00+00:00,co,0.274912,ppb,53.341667,-6.288889,2017-11-02 12:00:00 UTC


In [0]:
def add_missing_values_null(dataframe, min_ts, max_ts):
  """
  dataframe should have timestamps as index, sorted by index
  first finds out where there is missing data than adds this data as NaN
  
  dataframe e.g.
                      aqi
  datetime	
  2017-07-01 00:00:00	1
  2017-07-01 01:00:00	1
  2017-07-01 03:00:00	1
  2017-07-01 04:00:00	1
  
  >>
                      aqi
  datetime	
  2017-07-01 00:00:00	1
  2017-07-01 01:00:00	1
  2017-07-01 02:00:00	NaN
  2017-07-01 03:00:00	1
  2017-07-01 04:00:00	1
  """
  all_timestamps = [min_ts]
  while True: # run on all timestamps from min to max
    all_timestamps.append(all_timestamps[-1] + pd.DateOffset(hours=1))
    if all_timestamps[-1] == max_ts:
      break
  missing_timestamps = set(all_timestamps) - set(dataframe.index)
  for timestamp in missing_timestamps:
    dataframe.loc[timestamp] = None
  dataframe.sort_index(inplace=True)

min_ts = pd.Timestamp(pd.datetime(2017, 7, 1, 0, 0 ,0))
max_ts = pd.Timestamp(pd.datetime(2018, 8, 13, 15, 0 ,0))
aqi_pandas_df = get_aqi_pandas_table()

# add nan where there's missing data
add_missing_values_null(aqi_pandas_df, min_ts, max_ts)
aqi_pandas_df.head(5)

Unnamed: 0_level_0,aqi
datetime,Unnamed: 1_level_1
2017-07-01 00:00:00,1.0
2017-07-01 01:00:00,1.0
2017-07-01 02:00:00,1.0
2017-07-01 03:00:00,1.0
2017-07-01 04:00:00,1.0


In [0]:
# We impute missing data in aqi value by:
# 1. the mean of the last 6 hours aqi, if only some available mean over them. if no data for the last 6 hours:
# 2. the mean over the whole time back from the current time
# anyway the imputed values are rounded to integers because aqi is in integer values

aqi_spark_df = spark.createDataFrame(aqi_pandas_df.reset_index())
aqi_spark_df.createOrReplaceTempView("df")

# replcae all values that have a measurment in the last 6 hours with the 6 hours mean
filled = spark.sql(
    """SELECT *, mean(aqi) OVER (
        ORDER BY CAST(datetime AS timestamp) 
        RANGE BETWEEN INTERVAL 6 HOURS PRECEDING AND CURRENT ROW
     ) AS filled_aqi FROM df""")

filled = filled.toPandas()
filled.loc[filled['aqi'].isnull(),'aqi'] = round(filled['filled_aqi'])
filled = filled.drop('filled_aqi', axis=1)

# in this stage we still have missing 'aqi' values - we will fill them with the mean over all the past samples with ts < current_ts

# convert from pandas to spark
filled = spark.createDataFrame(filled)

# define unbounded past windows 
window = Window.rowsBetween(Window.unboundedPreceding, 0)

# fill the column with mean values
filled_column = F.mean(filled['aqi']).over(window)

filled = filled.withColumn('aqi_fill', filled_column)
filled = filled.toPandas()
filled.loc[filled['aqi'].isnull(),'aqi'] = round(filled['aqi_fill'])
filled = filled.drop('aqi_fill', axis=1)

aqi_pandas_df = filled.set_index('datetime')
aqi_pandas_df.head()

Unnamed: 0_level_0,aqi
datetime,Unnamed: 1_level_1
2017-07-01 00:00:00,1.0
2017-07-01 01:00:00,1.0
2017-07-01 02:00:00,1.0
2017-07-01 03:00:00,1.0
2017-07-01 04:00:00,1.0


In [0]:
# Website: https://galgol93.wixsite.com/goldstein-and-martin
# Dashboard web link: https://inspiring-roentgen-bf583a.netlify.app/
# User app for transportation recommendations (demo): https://happy-villani-78bc31.netlify.app/

datetimes_aqis = dict() # {datetime : aqi}
datetimes_sensors = dict() # {datetime: points to plot which sensored in the area of source bus stop} 

### specific example for the presentation: ###
source_station = 3952
source_location = (float(df_bus_stops.loc[source_station,'Latitude']),float(df_bus_stops.loc[source_station,'Longitude']))  # (lat,lon)
destination_station = 2383
destination_location = (float(df_bus_stops.loc[destination_station,'Latitude']), float(df_bus_stops.loc[destination_station,'Longitude']))  # (lat,lon)
relevant_lines = bus_stops_dict[source_station][destination_station]  # get possible lines to transport from source to dest (pre-defined)
request_time = 1509008400 # in unixtimestamp seconds

# colors for the function 'create_maps_for_user_request'
all_colors = ['green', 'blue', 'black', 'red', 'yellow', 'pink']
possible_colors = dict()


###################### PROCESS STREAMING DATA ######################

def foreach_batch_function(df, epochId):
  os.system(f'echo "{epochId},   {time.time()}" >> epochs.out')  # log epochs processing
  
  ############## PROCESS AIR QUALITY DATA ##############
  df_datetimes = [x.datetime for x in df.select('datetime').distinct().collect()]  # will hold distinct datetimes of the current df
  for datetime in df_datetimes:
    try:
      if aqi_pandas_df.loc[datetime]['aqi'] == 10 and datetime not in datetimes_aqis:
        datetimes_aqis[datetime] = 10
        
        # build dashboard for the city of Dublin transportation control room
        plot_air_quality(aqi_pandas_df, datetime)
        create_dashboard(datetime)
        os.system(f'echo "{datetime}, reported air quality of AQI 10" >> AQ.out') # also log extreme air quality events
    except: # if .loc will fail (it won't)
      pass

  ############## PROCESS FOR MOOVIT APP USER REQUEST ##############
  df_sensors = df.where(F.col('lineid').isin(relevant_lines))
  df_sensors = df_sensors.filter((F.col('unixtimestamp') >= request_time - 2000) & (F.col('unixtimestamp') <= request_time + 2000))
  df_sensors = df_sensors.withColumn('distance_to_source', \
                                     distance_from_bus_to_source_spark_(df_sensors['latitude'], df_sensors['longitude'], F.lit(source_location[0]),  F.lit(source_location[1])))
  df_sensors = df_sensors.withColumn('distance_to_destination', distance_from_bus_to_destination_spark_(df_sensors['latitude'], df_sensors['longitude'], \
                                                                                                        F.lit(destination_location[0]), F.lit(destination_location[1])))
  df_sensors = df_sensors.withColumn("distance_to_source", df_sensors["distance_to_source"].cast(DoubleType())) # convert to seconds unix
  df_sensors = df_sensors.withColumn("distance_to_destination", df_sensors["distance_to_destination"].cast(DoubleType())) # convert to seconds unix
  df_sensors = df_sensors.sort("distance_to_source")
  df_sensors_chosen = df_sensors.select('id', 'latitude','longitude', 'timestamp','unixtimestamp','lineid','vehicleid','distance_to_source', 'distance_to_destination').limit(500)
  df_sensors_chosen = df_sensors_chosen.toPandas()
  if len(df_sensors_chosen):
    if request_time not in datetimes_sensors:  # for the first df containing the requested request_time
      datetimes_sensors[request_time] = [df_sensors_chosen]
    else:
      datetimes_sensors[request_time].append(df_sensors_chosen)
  os.system(f'echo "{epochId},   {time.time()} - END" >> epochs.out')  # log epochs processing


  #   START STREAM
bus_df.writeStream.format("console").foreachBatch(lambda df, epochId: foreach_batch_function(df, epochId)).start()

In [0]:
create_maps_for_user_request(source_location=source_location, dest_location=destination_location, request_time=request_time)