In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
import sys
import pytz
from datetime import datetime
from collections import namedtuple
import random
#reload(sys)
#sys.setdefaultencoding("utf-8")


# Spark and Pandas
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
import pandas as pd

# Mongo connector
import pymongo

# Viz tool
import folium
from folium import plugins
print (folium.__version__)

# Jupyter magic
import ipywidgets
from ipywidgets import interact

0.5.0


In [3]:
# Set of underground cilacs in munich

underground_cilacs = ['35215872-21009', '45912-17352', '22456-16960', '35215105-21009', '35212289-21009', '53189-16960', '56198-17352', '35211266-21009', '12141-17352', '28024-16960', '35214337-21009', '35213825-21009', '52767-16960', '56194-17352', '52685-16960', '27885-16960', '35214336-21009', '52684-16960', '35214849-21009', '28389-16960', '28497-16960', '35211264-21009', '57061-16960', '56196-17352', '35212802-21009', '27938-16960', '35215104-21009', '27362-17352', '52770-16960', '46294-17352', '56725-16960', '16573-17352', '35873-17352', '18798-16960', '35216128-21009', '26729-17352', '57064-16960', '35213314-21009', '52692-16960', '56199-17352', '52781-16960', '56202-17352', '35214080-21009', '35213826-21009', '46293-17352', '57065-16960', '27361-17352', '56732-16960', '35215106-21009', '56717-16960', '56186-17352', '46292-17352', '46583-16960', '56185-17352', '52720-16960', '35213312-21009', '35211265-21009', '35214081-21009', '26733-17352', '35212801-21009', '16427-17352', '16574-17352', '52737-16960', '27366-17352', '35216129-21009', '52719-16960', '56714-16960', '26731-17352', '26728-17352', '57062-16960', '52745-16960', '35941-17352', '35214850-21009', '35212288-21009', '35214082-21009', '56747-16960', '26732-17352', '35212290-21009', '57067-16960', '34757-17352', '26726-17352', '52782-16960', '35214338-21009', '56729-16960', '35212032-21009', '35213313-21009', '55715-17352', '56710-16960', '45914-17352', '55778-17352', '56754-16960', '34756-17352', '56188-17352', '52688-16960', '56187-17352', '26725-17352', '55697-17352', '56200-17352', '35212034-21009', '56724-16960', '52682-16960', '35216130-21009', '27358-17352', '55818-17352', '46295-17352', '56127-17352', '35212033-21009', '35214848-21009', '14728-17352', '56709-16960', '52687-16960', '26730-17352', '45915-17352', '55724-17352', '56713-16960', '52746-16960', '5461-17352', '55700-17352', '35213824-21009', '46291-17352', '34755-17352', '56716-16960', '16428-17352', '35215874-21009', '52693-16960', '28509-16960', '35212800-21009', '35215873-21009', '52744-16960', '27365-17352', '56183-17352', '26727-17352']

In [4]:
# Helper functions

colors = [
    'black','green','orange','purple','darkred','darkgreen','darkpurple','darkblue',    'cadetblue',     'gray',
    'lightblue','lightgreen','red','pink','blue','red','black'
]

def get_db_connection(host_name, port, database, collection):

    client = pymongo.MongoClient(host_name, port)
    db_connection = client[database][collection]

    return db_connection


# Add BSE to maps

def get_bse(infra_conn_dev, cilac):
    response = infra_conn_dev.find({"cell_ci":int(cilac.split("-")[0]), "cell_lac":int(cilac.split("-")[1])})
    lista = (response[0]["geom"])
    return {"type":lista["type"], "coordinates":lista["coordinates"]}
        
def get_centroid(infra_conn_dev, cilac):
    try:
        response = infra_conn_dev.find({"cell_ci":int(cilac.split("-")[0]), "cell_lac":int(cilac.split("-")[1])})
        lista = (response[0]["centroid"])
        return lista[::-1]
    except:
        return [None, None]


import random
def add_bse_to_map(infra_conn_dev, infra_conn_subway, cilac, map_object):
    """
    Example: After creating map object m, call add_bse_to_map(infra_conn_dev, "56127-17352", f)
    """
    lista = get_bse(infra_conn_dev, cilac)
    coords_reversed = []
    # the coordinates field in mongo can differ
    if len(lista["coordinates"][0]) == 1:
        for group_elements in lista["coordinates"]:
            coords_reversed.append([element[::-1] for element in group_elements[0]])
    elif len(lista["coordinates"][0]) > 1:
        for group_elements in lista["coordinates"]:
            coords_reversed.append([element[::-1] for element in group_elements])
    lista["coordinates"] = coords_reversed
    try:
        pop_info = get_cilac_to_stations(infra_conn_subway, cilac)
        print(pop_info)
    except:
        pop_info = "not in cilac_nodes"
    folium.PolyLine(lista["coordinates"], popup=str(cilac) ,color=random.choice(colors)).add_to(map_object)
    
def utc_to_local(timestamp):
    return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
    
    
def apply_timestamp(row):
    d = row.asDict()
    d["min_ts"] = utc_to_local(d["min_ts"])
    d["max_ts"] = utc_to_local(d["max_ts"])
    return Row(**d)


def apply_array_mapping(row):
    d = row.asDict()
    event_type_arr = row["event_type_arr"]
    d["mapped_event_type_arr"] = [mapping[i] for i in event_type_arr]
    return Row(**d)

def extract_trips(df_imsi, infra_conn_dev):
    if df_imsi.rdd.isEmpty():
        print("No events with this IMSI")
        return
    lista = df_imsi.map(lambda x: (x[2], x[1], x[6], x[8])).collect()
    area_id_arr = lista[0][1]
    cell_id_arr = lista[0][2]
    mapped_event_type_arr = lista[0][2]
    timestamp_arr = lista[0][3]

    # Create ci_lac array
    ci_lac_list = zip(lista[0][0], lista[0][1])
    ci_lac_arr = [ci_lac[0] + "-" + ci_lac[1] for ci_lac in ci_lac_list]

    chain = zip(ci_lac_arr, timestamp_arr, mapped_event_type_arr)
    # chain to nametuple

    steps_list = [Section(x=get_centroid(infra_conn_dev, cilac)[0],y=get_centroid(infra_conn_dev, cilac)[1], cilac=cilac, timestamp=timestamp, \
                          record_type=record_type ) for cilac, timestamp, record_type in chain]
    print (steps_list)
    return steps_list

In [5]:
#####################################
# Get the data from parsed trips in csv
######################################

def parse_sms(path_to_csv, IMSI):
    parsed_sms_df = pd.read_csv(path_to_csv, sep=";", encoding='latin1')
    df_parsed_imsi = parsed_sms_df.loc[parsed_sms_df["IMSI"] == IMSI]

    trips_list = []
    # parsed trips format
    # [[(x, y), from_address, to_date, to_time], [(x, y), from_address, to_date, to_time]]

    for row in df_parsed_imsi.iterrows():
        index, data = row
        trips_list.append(data.tolist())
    return trips_list

def format_trips(trips_list):
    trips_formatted = []
    for trip in trips_list:
        from_address = trip[1]
        from_lon = float(trip[3].replace(",", "."))
        from_lat = float(trip[4].replace(",", "."))
        from_date = trip[5]
        from_time = trip[6]
        to_address = trip[7]
        to_lon = float(trip[8].replace(",", "."))
        to_lat = float(trip[9].replace(",", "."))
        to_date = trip[10]
        to_time = trip[11]

        new_trip = [[(from_lat, from_lon), from_address, from_date, from_time],
                    [(to_lat, to_lon), to_address, to_date, to_time]]

        trips_formatted.append(new_trip)
    return trips_formatted

# Second layer
# Extracted sms/calls in csv format
def plot_sms_trips(path_to_csv, IMSI, f):
    # Add to map
    trips_list = parse_sms(path_to_csv, int(IMSI))
    trips_formatted = format_trips(trips_list)

    radius = 100
    for trip in trips_formatted:
        for coords in trip:
            print (coords[0])
            popup_string = "address: {address}, date: {date}, time: {time}".format(address=coords[1], date=coords[2], time=coords[3])
            folium.Circle(coords[0], color="orange", radius=radius, popup = popup_string).add_to(f)
            radius +=30



In [6]:
# First layer
# Choloropeth geojson for tiles

def add_tiles(f):
    f.choropleth(
        geo_data="/home/sabeiro/lav/motion/gis/mvg/mvg_tcs_part.geojson",
        name='tiles',
        fill_color='blue',
        line_color = "red",
        fill_opacity=0.1,
        line_opacity=1.
    )
    
def add_centroids(f, is_centroids):
    if is_centroids:
        path_to_centroids = "/media/sf_D_DRIVE/Data/infrastructure/MVG-shapes/mvg_centroids_part.csv"
        list_centroids = add_centroids_to_map(path_to_centroids)
        for x, y, nr in list_centroids:
            folium.RegularPolygonMarker([y, x], popup=str(int(nr)), radius=2).add_to(f)
    
def add_centroids_to_map(path_to_centroids):
    df_centroids = pd.read_csv(path_to_centroids)
    df_part = df_centroids[["X", "Y", "NR"]]
    list_centroids = []
    for row in df_part.iterrows():
        index, data = row
        list_centroids.append(data.tolist())
    return list_centroids

In [10]:
# Third layer
# Events from the aggregator
def plot_all_coords(steps_imsi):
    print ("length steps imsi",len(steps_imsi))
    coords_all = []
    cilacs = dict()
    for section in steps_imsi:
        for steps in section:
            for step in steps:
                if step.x:
                    try:
                        # in case record_type is in step
                        coords_all.append([step.cilac, step.timestamp, step.y, step.x, step.record_type])
                    except:
                        # in case no record_type is in step
                        coords_all.append([step.cilac, step.timestamp, step.y, step.x])
                    cilacs[step.cilac] = {"x": step.x, "y": step.y}
    return cilacs, coords_all


class ContinueLoop(Exception):
    pass

def add_aggregator_to_map(coords_all, infra_conn_dev, infra_conn_subway, intervals, f, m, other):
    #folium.PolyLine([[y, x] for cilac, ts, y, x in coords_all],color="gray", weight=2, popup="aggregator output").add_to(m)
    if len(coords_all[0]) == 4:
        for cilac, ts, y, x in coords_all:
            folium.RegularPolygonMarker([y, x], popup = cilac + " " + utc_to_local(ts), radius=30).add_to(m)
    elif len(coords_all[0]) == 5:
        radius = 100
        for cilac, ts, y, x, record in coords_all:
            try:
                if intervals:
                    for interval in intervals:
                        if ts >= interval[0] or ts <= interval[1]:
                            #print ("contninuing")
                            raise ContinueLoop
            except ContinueLoop:
                continue
            #print cilac, record
            if cilac in underground_cilacs:
                folium.RegularPolygonMarker([y, x], color="green", popup = cilac + " " + utc_to_local(ts), radius=10).add_to(f)
                folium.RegularPolygonMarker([y, x], color="green", popup = cilac + " " + utc_to_local(ts), radius=10).add_to(other)
            elif cilac not in underground_cilacs:
                folium.RegularPolygonMarker([y, x], color="red", popup = cilac + " " + utc_to_local(ts), radius=10).add_to(f)
                folium.RegularPolygonMarker([y, x], color="red", popup = cilac + " " + utc_to_local(ts), radius=10).add_to(other)
            if record == "Online" or record == "SMS":
                add_bse_to_map(infra_conn_dev, infra_conn_subway, cilac, m)
                folium.Marker([y, x], popup = record + " " + cilac + " "+ utc_to_local(ts)).add_to(f)
                folium.Marker([y, x], popup = record + " " + cilac + " "+ utc_to_local(ts)).add_to(m)
                folium.Circle([y, x], color="orange", radius=radius, popup = record + " " + cilac + " "+ utc_to_local(ts)).add_to(f)
                radius += 30
                

In [11]:
# Connect to the database to get BSEs

infra_conn_dev = get_db_connection("172.25.100.21", 31024, "tdg_17d08", "infrastructure")
infra_conn_subway = get_db_connection("172.25.100.21", 31024, "subway_graph", "munich_cilac_nodes")

# Specify where the data are, what are the IMSIs, what are the dates with data

imsis_at_agency = ["262011009513469", "262011009513476", "262011009511316", "262011009490916"]
imsis_mvg = ["262011208893099","262011208893096","262011208893092","262011208893105","262011208893102"]
imsis_mvg = ['262011208893092', '262011208893102', '262011208893100', '262011208893096', '262011208893091', '262011208893095', '262011208893093', '262011009513476', '262011208893105', '262011009511314', '262011009511316', '262011208893103', '262011208893099', '262011009490916', '262011009513469']
dates = ['2017-08-09', '2017-08-14', '2017-08-17', '2017-08-18', '2017-08-22', '2017-08-23', '2017-08-24', '2017-08-25', '2017-08-28', '2017-08-29', '2017-08-30', '2017-09-01', '2017-09-04', '2017-09-05', '2017-09-06', '2017-09-07', '2017-09-11', '2017-09-12', '2017-09-15', '2017-09-18', '2017-09-26', '2017-10-04', '2017-10-06', '2017-10-07', '2017-10-08', '2017-10-11', '2017-10-12', '2017-10-13', '2017-10-15', '2017-10-16', '2017-10-17', '2017-10-18', '2017-10-20', '2017-10-23', '2017-10-24', '2017-10-25', '2017-10-26', '2017-10-27', '2017-10-30', '2017-11-03', '2017-11-06', '2017-11-07']

parsed_csvs_paths = ["/motionlogic/outdoor/data/whitelisted_parsed_sms/NonSubTrips_20170926.csv"]

# these is the the output of the record based trip extractor
agg_path_template="/home/sabeiro/lav/motion/log/trips/{date}/whitelisted_event_trips"

In [12]:
# Main function that is interact 
# Enable javascript plugin using "jupyter nbextension enable --py --sys-prefix widgetsnbextension" and restart jupyter 

@interact(IMSI=imsis_mvg, date=dates, agg_path_template=agg_path_template, is_centroids=False)
def main(IMSI, date, agg_path_template, is_centroids=False):
    intervals = []#[(1502261640, 1502282612)]
    # Create the map object
    zoom_start = 11
    global f
    f = folium.Map(location=(48.137154,11.576124), zoom_start=zoom_start)
    marker_cluster = folium.plugins.MarkerCluster().add_to(f)
    marker_cluster_other = folium.plugins.MarkerCluster().add_to(f)
    # Add 1st layer: tiles
    add_tiles(f)
    
    # Add 1.5st layer: centroids of tiles with popup for the tile id
    # This is the heaviest part 
    # Can not be added on the tiles, as this is not supported at the moment (there are people working on it but no open MR)
    #add_centroids(f, is_centroids)
    
    # Add 2nd layer: parsed SMS
    #plot_sms_trips(path_to_csv, IMSI, f)
    
    # Add 3rd layer: extracted trips from aggregator output
    agg_path = agg_path_template.format(date=date.replace("/","-"))
    df_agg = sqlContext.read.parquet(agg_path)
    df_imsi = df_agg.where(df_agg.other_fields.clean_IMSI == IMSI)
    if df_imsi.rdd.isEmpty():
        print ("empty")
        return
    steps_imsi = df_imsi.select("steps").collect()
    cilacs, coords_all = plot_all_coords(steps_imsi)
    #print (coords_all)
    add_aggregator_to_map(coords_all, infra_conn_dev, infra_conn_subway, intervals, f, marker_cluster, marker_cluster_other)
    
    
    # Add layer control to the map
    #folium.LayerControl().add_to(f)
    return f


    #f.save("/media/sf_D_DRIVE/exchange/test_plot_parsed.html")
    #webbrowser.open_new_tab("/media/sf_D_DRIVE/exchange/test_plot_parsed.html")

In [60]:
f.save("/tmp/test_plot.html")

In [38]:
??folium.Map