## Introduction to Big Data
### Segment 4 of 5

# Spatial Big Data (Velocity)

*Lesson Developer: Jayakrishnan Ajayakumar, jxa421@case.edu*


In [None]:
# This code cell starts the necessary setup for Hour of CI lesson notebooks.
# First, it enables users to hide and unhide code by producing a 'Toggle raw code' button below.
# Second, it imports the hourofci package, which is necessary for lessons and interactive Jupyter Widgets.
# Third, it helps hide/control other aspects of Jupyter Notebooks to improve the user experience
# This is an initialization cell
# It is not displayed because the Slide Type is 'Skip'

from IPython.display import HTML, IFrame, Javascript, display
from ipywidgets import interactive
import ipywidgets as widgets
from ipywidgets import Layout
import queue
import threading
import getpass # This library allows us to get the username (User agent string)

# import package for hourofci project
import sys
sys.path.append('../../supplementary') # relative path (may change depending on the location of the lesson notebook)
import hourofci

# load javascript to initialize/hide cells, get user agent string, and hide output indicator
# hide code by introducing a toggle button "Toggle raw code"
HTML(''' 
    <script type="text/javascript" src=\"../../supplementary/js/custom.js\"></script>
    
    <style>
        .output_prompt{opacity:0;}
    </style>
    
    <input id="toggle_code" type="button" value="Toggle raw code">
''')

## Reminder
<a href="#/slide-2-0" class="navigate-right" style="background-color:blue;color:white;padding:8px;margin:2px;font-weight:bold;">Continue with the lesson</a>

<br>
</br>
<font size="+1">

By continuing with this lesson you are granting your permission to take part in this research study for the Hour of Cyberinfrastructure: Developing Cyber Literacy for GIScience project. In this study, you will be learning about cyberinfrastructure and related concepts using a web-based platform that will take approximately one hour per lesson. Participation in this study is voluntary.

Participants in this research must be 18 years or older. If you are under the age of 18 then please exit this webpage or navigate to another website such as the Hour of Code at https://hourofcode.com, which is designed for K-12 students.

If you are not interested in participating please exit the browser or navigate to this website: http://www.umn.edu. Your participation is voluntary and you are free to stop the lesson at any time.

For the full description please navigate to this website: <a href="../../gateway-lesson/gateway/gateway-1.ipynb">Gateway Lesson Research Study Permission</a>.

</font>

### Experiment II (Congestion is a problem!!!)


<img src = "supplementary/images/taxi_congestion.jpg" width = 50%>

In this experiment, we will use the traffic signal location dataset along with the taxi trajectory data. This experiment will be interactive and exploratory. The gist of the experiment is to find out whether there is traffic congestion near any of the traffic signal in <b>real-time</b>. There are two parameters

1. Scan Radius : This is essentially the radius from any traffic signal that we want to scan for congestion. 

2. Warning Car Count: The threshold number of cars with in the scan radius after which a congestion warning will be generated

An illustration is shown below

<img src="supplementary/images/congestion_illustration.jpg" width=80%>
Now lets explore the traffic signal dataset

In [None]:
import pandas as pd
import queue
import threading
import time
import geopandas as gpd
import json
from scipy.spatial import cKDTree
import numpy as np
import pandas as pd

In [None]:
nyc_traffic_signals = gpd.read_file(r'supplementary/data/nyc__traffic_signals/nyc__traffic_signals.shp')
nyc_traffic_signals

There are 20,096 traffic signals. Now imagine there are 2,000 taxis running at a point in time (so there are 2,000 GPS locations) (this is very very conservative as this number could be much higher). If we need to check whether there is traffic congestion, we need to check distance between traffic signals and the GPS coordinates. 

For 2,000 taxis

2000 x 20,096 = 40,192,000!!!. That's 40 million calculations in just one second.

Now let's try this out. We will explain the critical sections of the code as we move along.

In [None]:
class GPSThread(threading.Thread):
    # we will use two queues, one for pushing the GPS data and other to recieve any message from main thread
    def __init__(self, dataFrame,status):
        threading.Thread.__init__(self)
        self.dataFrame = dataFrame
        self.status = status
    def run(self):
        #load the data file
        data = pd.read_parquet(r'supplementary/data/taxi1hr_gps.parquet')
        #create an index based on seconds 
        data.set_index('sec',inplace=True)
        #now we need to loop through the dataset
        for sec in range(data.index.min(),data.index.max()):
            #kill switch is a message in the message queue
            if self.status[0]==0:
                break
            dat = data.loc[sec]
            gpsDat = dat[['id','lng','lat']].to_json(orient='records')
            self.dataFrame.loc[sec] = gpsDat
            #after one iteration sleep for a second......
            time.sleep(1)
            #remove data that is older than 10 seconds with respect to current time
            #self.dataFrame.drop(self.dataFrame.index[self.dataFrame.index<(sec-10)],inplace=True)
        #if simulation is over put a pill in the outQueue
        self.status[0] = 2
        
        
class TrafficThread(threading.Thread):
    # we will use two queues, one for pushing the GPS data and other to recieve any message from main thread
    def __init__(self, gpsDataFrame,resultsDict,status,params):
        threading.Thread.__init__(self)
        self.gpsDataFrame = gpsDataFrame
        self.resultsDict = resultsDict
        self.status = status
        self.tree = None
        self.params = params
    #function to calculate nearest neighbour for a GeoDataframe with a given distance (a very poor implementation)
    def findNearestNeighbors(self,source,data,distance=100):
        # a list to store all the nearest neighbors
        out = [0]*len(source)
        for sidx,srow in source.iterrows():
            for didx,drow in data.iterrows():
                if srow.geometry.distance(drow.geometry)<=distance:
                    out[sidx]+=1
        return out
        
    
    def run(self):
        processed = []
        #load the traffic signals 
        data = gpd.read_file(r'supplementary/data/nyc__traffic_signals/nyc__traffic_signals.shp')
        outframe = pd.DataFrame({'id':data.id,'lat':data.geometry.y,'lng':data.geometry.x,'counts':[0]*len(data)})
        #we need to project the data for distance calculations
        data_projected = data.to_crs('EPSG:32618')
        #Now we will monitor the gpsDataFrame continuosly for changes
        while True:
            if self.status[0]==0 or self.status[0]==2:
                break
            #avoid dirtyread problem by making a copy
            currentFrame = self.gpsDataFrame.copy()
            #retreive the earliest element not processed from gpsDataFrame
            toProcess = currentFrame.loc[~currentFrame.index.isin(processed)]
            if len(toProcess)>0:
                currentGPS = toProcess.iloc[0]
                currentGPSData = pd.read_json(currentGPS.data)
                #we need to convert to GeoDataFrame and project the data for distance calculation
                currentGPSProjected = gpd.GeoDataFrame(currentGPSData['id'],geometry=gpd.points_from_xy(currentGPSData.lng,currentGPSData.lat),crs='EPSG:4326').to_crs('EPSG:32618')
                #now perform the nearest neighbor caclulation and add it to result dict
                #TODO, the result will be signals with id and count
                nearestNeighbors = self.findNearestNeighbors(data_projected,currentGPSProjected,self.params['monitorDistance'])
                newData = outframe[['id','lat','lng']].assign(counts=nearestNeighbors)
                self.resultsDict[currentGPS.name] = newData.loc[newData.counts>=self.params['warningCount']]
                processed.append(currentGPS.name)
                

gpsData = None
trafficResults = None
status = [-1]
trafficParams = None
def start():
    global gpsData
    global trafficResults
    global status
    global trafficParams
    gpsData = pd.DataFrame(columns = ['sec','data'])
    gpsData.set_index('sec',inplace=True)
    trafficResults = {}
    status[0] = 1
    trafficParams = {'monitorDistance':200,'warningCount':20}
    #startup threads
    gpsThread = GPSThread(gpsData,status)
    gpsThread.start()
    trafficThread = TrafficThread(gpsData,trafficResults,status,trafficParams)
    trafficThread.start()
    #small delay for the thread to startup
    time.sleep(.5)
    #gpsThread.join()
    #trafficThread.join()
    return "started"

def modifyTrafficParams(param):
    global trafficParams
    trafficParams['monitorDistance'] = float(param['monitorDistance'])
    trafficParams['warningCount'] = int(param['warningCount'])
    return json.dumps(trafficParams)
    
def getGPSData(sec):
    if sec in gpsData.index:
        return gpsData.loc[sec].data
    return "No Data" 

def getTrafficData(sec):
    if sec in trafficResults:
        return trafficResults.pop(sec).to_json(orient="records")
    return "No Data" 

def getStatus():
    global status
    if status[0] == 2 or status[0] == 0:
        return "sim over"
    return "running"

def stop():
    global status
    status[0] = 0
    return "stopping"


In [None]:
%%html
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.8.0/dist/leaflet.css"/>
<div id="main">
    <div id="mapandcontrols">
        <div id="map"></div>
        <div id="controls">
            <button id="start_button" class ="controlbuttons" onclick = "run()">Start Simulation</button>
            <button id="stop_button" class ="controlbuttons" onclick = "stop()" disabled>Stop Simulation</button>
            <span id="active_cars" class ="controlbuttons">Active Cars:0</span>
            <span id="gpstime" class ="controlbuttons">GPSTime:0</span>
            <span id="traffictime" class ="controlbuttons">TrafficTime:0</span>
        </div>
    </div>
    <div id="params">
        <div class = "maxwidth" style="height:10%;">
            <span class="controlbuttons" style="width:50%">Scan Radius (m):</span>
            <input type="text" id="scanrad" class="controlbuttons " style="width:30%" value="200">
        </div>
        <div class = "maxwidth" style="height:10%;">
            <span class="controlbuttons" style="width:55%">Warning Car Count:</span>
            <select id="warncount" class="controlbuttons " style="width:25%"></select>
        </div>
        <div class = "maxwidth" style="height:10%;">
            <button style="width:25%; margin-left:15%;" onclick="updateParams();">Update</button>
        </div>
    </div>
</div>
<style>
#main { height: 500px;width:800px; }
#mapandcontrols { height: 100%;width:65%;float:left}
#params { height: 100%;width:30%;float:left;margin-left:2%;}
#map { height:75%;width:100%; }
#controls { height: 20%;margin-top:4%; }
.maxwidth{width:100%;}
.maxheight{height:100%;}
.halfwidth{width:50%;}
.halfheight{height:50%;}
.controlbuttons{float:left;margin:1%;}
</style>
<script>
    var map, datInterval, current, currentTraffic, trafficDatInterval, statusInterval;
    require.config({
        paths: {
            d3: 'https://d3js.org/d3.v7.min',
            L: 'https://unpkg.com/leaflet@1.8.0/dist/leaflet'
        }
    });
    
    function updateParams(){
        require(['d3'], function(d3) {
            var warningCount = d3.select('#warncount').property('value');
            var scanRadius = d3.select('#scanrad').property('value');
        
            paramObj = JSON.stringify({'monitorDistance':scanRadius,'warningCount':warningCount});
            IPython.notebook.kernel.execute(
                "modifyTrafficParams("+paramObj+")", 
                {
                    iopub: {
                        output: function(response) {
                            var dataString = response.content.data['text/plain'];
                        }
                    }
                },
                {
                    silent: false, 
                    store_history: false, 
                    stop_on_error: true
                }
            );
        });
    }
    
    function getStatus(){
        IPython.notebook.kernel.execute(
            "getStatus()", 
            {
                iopub: {
                    output: function(response) {
                        var dataString = response.content.data['text/plain'];
                        if (dataString.includes("sim over")){
                            console.log('Time to clean up everything');
                            clearInterval(datInterval)
                            clearInterval(trafficDatInterval)
                            clearInterval(statusInterval)
                            currentTraffic = 0;
                            current = 0;
                            require(['d3'], function(d3) {
                                d3.select("#trafficFrame").selectAll("circle").remove();
                                d3.select("#gpsFrame").selectAll("circle").remove();
                            });
                            document.getElementById("neighbors").innerText = "Time for neighbor lookup:";
                            document.getElementById("start_button").disabled = false;
                            document.getElementById("stop_button").disabled = true;
                            document.getElementById("active_cars").innerText = "Active Cars:0";
                            document.getElementById("traffictime").innerText = "TrafficTime:0";
                            document.getElementById("gpstime").innerText = "GPSTime:0";
                            d3.select('#warncount').property('value',20);
                            d3.select('#scanrad').property('value',200);
                        }
                    }
                }
            },
            {
                silent: false, 
                store_history: false, 
                stop_on_error: true
            }
        );    
    }

    function fetchTrafficData() {
        //first check whether the points have been loaded. If points are not loaded we need to retry
        require(['d3'], function(d3) {
            //if points are already loaded then we just need to update the traffic signals based on counts
            IPython.notebook.kernel.execute(
                "getTrafficData(" + currentTraffic + ")", {
                    iopub: {
                        output: function(response) {
                            // Print the return value of the Python code to the console
                            var dataString = response.content.data['text/plain'];
                            if (!(dataString.includes("sim over") || dataString.includes("No Data"))) {
                                var data = JSON.parse(dataString.slice(1, dataString.length - 1));
                                d3.select("#trafficFrame")
                                    .selectAll("circle")
                                    .data(data, d => d.id)
                                    .join(
                                        enter => enter.append('circle')
                                        .attr("cx", d => map.latLngToLayerPoint([d.lat, d.lng]).x)
                                        .attr("cy", d => map.latLngToLayerPoint([d.lat, d.lng]).y)
                                        .attr("r", 1)
                                        .style("fill", "red")
                                        .attr("stroke", "red")
                                        .attr("stroke-width", 1)
                                        .attr("fill-opacity", 1)
                                        .attr("opacity", 1)
                                        .transition()
                                        .duration(500)
                                        .attr("r", 5)
                                        .selection(),
                                        
                                        update => update
                                        .attr("r", 1)
                                        .transition()
                                        .duration(500)
                                        .attr("r", 5)
                                        .selection(),
                                        
                                        exit => exit
                                        .remove()
                                    )
                                currentTraffic += 1;
                                document.getElementById("traffictime").innerText = "TrafficTime: " + currentTraffic;
                            }
                        }
                    }
                }, {
                    silent: false,
                    store_history: false,
                    stop_on_error: true
                }
            );
        });
    }

    function fetchGPSData() {
        IPython.notebook.kernel.execute(
            "getGPSData(" + current + ")", {
                iopub: {
                    output: function(response) {
                        // Print the return value of the Python code to the console
                        var dataString = response.content.data['text/plain'];
                        if (!(dataString.includes("sim over") || dataString.includes("No Data"))) {
                            require(['d3'], function(d3) {
                                var data = JSON.parse(dataString.slice(1, dataString.length - 1));
                                document.getElementById("active_cars").innerText = "Active Cars: " + data.length;
                                d3.select("#gpsFrame")
                                    .selectAll("circle")
                                    .data(data, d => d.id)
                                    .join(
                                        enter => enter.append('circle')
                                        .attr("cx", d => map.latLngToLayerPoint([d.lat, d.lng]).x)
                                        .attr("cy", d => map.latLngToLayerPoint([d.lat, d.lng]).y)
                                        .attr("r", 2)
                                        .attr("stroke", "green")
                                        .attr("stroke-width", 0.4)
                                        .attr("fill-opacity", 0.3)
                                        .style("fill", "green")
                                        .selection(),

                                        update => update
                                        .attr("cx", d => map.latLngToLayerPoint([d.lat, d.lng]).x)
                                        .attr("cy", d => map.latLngToLayerPoint([d.lat, d.lng]).y)
                                        .selection(),

                                        exit => exit
                                        .remove()
                                    )
                            });
                            current += 1;
                            document.getElementById("gpstime").innerText = "GPSTime: " + current;
                        }
                    }
                }
            }, {
                silent: false,
                store_history: false,
                stop_on_error: true
            }
        );
    }

    function update() {
        require(['d3'], function(d3) {
            d3.selectAll("circle")
                .attr("cx", d => map.latLngToLayerPoint([d.lat, d.lng]).x)
                .attr("cy", d => map.latLngToLayerPoint([d.lat, d.lng]).y)
        });
    }

    function stop() {
        IPython.notebook.kernel.execute(
            "stop()", {
                iopub: {
                    output: function(response) {}
                }
            }, {
                silent: false,
                store_history: false,
                stop_on_error: true
            }
        )
    }

    function run() {
        current = 0;
        currentTraffic = 0;
        IPython.notebook.kernel.execute(
            "start()", {
                iopub: {
                    output: function(response) {
                        statusInterval = setInterval(getStatus, 500);
                        datInterval = setInterval(fetchGPSData, 1000);
                        trafficDatInterval = setInterval(fetchTrafficData, 1000);
                        //disable the start button
                        document.getElementById("start_button").disabled = true;
                        document.getElementById("stop_button").disabled = false;
                    }
                }
            }, {
                silent: false,
                store_history: false,
                stop_on_error: true
            }
        )
    }

    require(['d3', 'L'], function(d3, L) {
        map = L
            .map('map')
            .setView([40.763231753511604, -73.98383956127027], 10); // center position + zoom

        // Add a tile to the map = a background. Comes from OpenStreetmap
        L.tileLayer('https://tile.openstreetmap.org/{z}/{x}/{y}.png', {
            maxZoom: 19,
            attribution: '© OpenStreetMap'
        }).addTo(map);
        // Add a svg layer to the map
        L.svg().addTo(map);
        map.on("moveend", update)
        d3.select("#map").select("svg").append("g").attr("id", "gpsFrame");
        d3.select("#map").select("svg").append("g").attr("id", "trafficFrame");
        var carcount = [];
        for(i=1;i<=1000;i++)
            carcount.push(i);
        d3.select('#warncount').selectAll('option').data(carcount).enter().append('option').property('value',function(d){return d;}).text(function(d){return d;});
        d3.select('#warncount').property('value',20);
    });
    
    
</script>

What do you notice instantly.......The traffic time is stuck at 0 seconds, and how long before it moves to 1.....Its almost 160 seconds of GPS time after which the traffic time changes to 1 second. 
Imagine if this happens in real world. We are not getting any real-time insights due to the computational challenges associated with the congestion calculation. So here, the Velocity of data combined with a naive algorithm defeats the utility of real-time data. 

Let us see the critical section of the code. 

```python
#function to calculate nearest neighbour for a GeoDataframe with a given distance (a very poor implementation)
def findNearestNeighbors(self,source,data,distance=100):
    # a list to store all the nearest neighbors
    out = [0]*len(source)
    for sidx,srow in source.iterrows():
        for didx,drow in data.iterrows():
            if srow.geometry.distance(drow.geometry)<=distance:
                out[sidx]+=1
    return out
```

Here the source is the traffic signal data (20,096) and the data is the taxi GPS data for a particular instance (for a second). Distance is the Scan radius. 

First, we set an output list filled with zeros having length as the length of the source data (20,096).

<code>out = [0]*len(source)</code>

Now we will iterate through each of the traffic signals, and for each of the traffic signal we will find out which GPS points are within the scan radius. If a GPS point is within scan radius, we will update the value in the output list location for the traffic signal by one.

Iterating through each signal
<code>for sidx,srow in source.iterrows():</code>

Iterating through each GPS location
<code>for didx,drow in data.iterrows():</code>

Check whether the distance between the traffic location and gps coordinate is less than scan radius
<code>srow.geometry.distance(drow.geometry)<=distance:</code>

If the GPS location is within the scan radius, update the output list value. <code>out[sidx]+=1</code>


So we have seen the first case where Velocity becomes an issue. Can we solve it??? We will see in the exploration segment where we illustrate another experiment.  


<font size="+1"><a style="background-color:blue;color:white;padding:12px;margin:10px;font-weight:bold;" 
href="bigdata-exploration.ipynb">Click here to go to the next notebook.</a></font>