# PySpark Analytics for Crowd Dynamics
# Harvard University
# CS 205, Spring 2019
# Group 1

In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/ef/88/8e5f4cfb99a4186b4b7f06aa1294353e0be6b05b25802a82f3d16cb30b79/pyspark-2.4.2.tar.gz (193.9MB)
[K     |████████████████████████████████| 193.9MB 126kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 44.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/dc/0e/02/e9fdf0bf3ad20284175307d4ab31afcf967604f25f3b4f1d96
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.2


In [4]:
from google.colab import drive
import pandas as pd
import numpy as np
from functools import reduce

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.mllib.linalg import Vectors
from pyspark.sql.functions import udf, array, avg, col, size, struct, lag, window, countDistinct, monotonically_increasing_id, collect_list
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, IntegerType, StringType, ArrayType, LongType, FloatType
from pyspark.sql.window import Window
import re

drive.mount('/content/gdrive', force_remount=True)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
spark = SparkSession.builder.getOrCreate()
directory = '/content/gdrive/My Drive/CS 205 Project/records/'

# Load data

In [6]:
df = spark.read.json(directory + '1-min')
df.show()

+--------------------+--------------------+
|              bboxes|              scores|
+--------------------+--------------------+
|[0.59336024522781...|[0.93980032205581...|
|[0.62502855062484...|[0.89926099777221...|
|[0.72911220788955...|[0.98777532577514...|
|[0.78701549768447...|[0.97126829624176...|
|[0.48519986867904...|[0.93738293647766...|
|[0.67814970016479...|[0.97286134958267...|
|[0.57940828800201...|[0.98793494701385...|
|[0.71229845285415...|[0.98705601692199...|
|[0.34067243337631...|[0.77425789833068...|
|[0.78984218835830...|[0.97743964195251...|
|[0.39440840482711...|[0.92999798059463...|
|[0.60313117504119...|[0.93863993883132...|
|[0.82778960466384...|[0.91649687290191...|
|[0.51006656885147...|[0.97323024272918...|
|[0.56186217069625...|[0.89469927549362...|
|[0.67285186052322...|[0.97309362888336...|
|[0.35979881882667...|[0.84619909524917...|
|[0.39222764968872...|[0.71783727407455...|
|[0.47849225997924...|[0.92085105180740...|
|[0.51487535238265...|[0.8125121

# Add Frame Index (Timestep) and Display Schema

In [7]:
df = df.withColumn("frame", monotonically_increasing_id().cast("timestamp")).select("frame", "bboxes", "scores")
df.show()

+-------------------+--------------------+--------------------+
|              frame|              bboxes|              scores|
+-------------------+--------------------+--------------------+
|1970-01-01 00:00:00|[0.59336024522781...|[0.93980032205581...|
|1970-01-01 00:00:01|[0.62502855062484...|[0.89926099777221...|
|1970-01-01 00:00:02|[0.72911220788955...|[0.98777532577514...|
|1970-01-01 00:00:03|[0.78701549768447...|[0.97126829624176...|
|1970-01-01 00:00:04|[0.48519986867904...|[0.93738293647766...|
|1970-01-01 00:00:05|[0.67814970016479...|[0.97286134958267...|
|1970-01-01 00:00:06|[0.57940828800201...|[0.98793494701385...|
|1970-01-01 00:00:07|[0.71229845285415...|[0.98705601692199...|
|1970-01-01 00:00:08|[0.34067243337631...|[0.77425789833068...|
|1970-01-01 00:00:09|[0.78984218835830...|[0.97743964195251...|
|1970-01-01 00:00:10|[0.39440840482711...|[0.92999798059463...|
|1970-01-01 00:00:11|[0.60313117504119...|[0.93863993883132...|
|1970-01-01 00:00:12|[0.82778960466384..

In [8]:
df.printSchema()

root
 |-- frame: timestamp (nullable = false)
 |-- bboxes: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- scores: array (nullable = true)
 |    |-- element: double (containsNull = true)



# Functions used for UDFs (including velocity and group size)

In [0]:
def count(column):
    return len(column)
  
def sum_vals(column):
    return float(sum(column))
  
def avg_vals(columns):
    return float(columns[0] / columns[1])
  
def get_center(values):
    res = []
    for i in range(int(len(values)/4)):
        y1, x1, y2, x2 = values[4*i:4*i+4]
        x_mean, y_mean = round(float(x1+x2)/2, 3), round(float(y1+y2)/2, 3)
        res.extend([x_mean, y_mean])
    return res

def get_x(values):
    return [values[i] for i in range(0, len(values), 2)]

def get_y(values):
    return [values[i] for i in range(1, len(values), 2)]
  
def fudf(val):
    return reduce(lambda x, y:x+y, val)

In [0]:
def compute_velocities(cols, fps=0.7, threshold=0.3):
    # Assume frame_1 = [x_i, y_i, x_{i+1}, y_{i+1}, ...]
    f_1 = iter(cols[0])
    f_2 = iter(cols[1])
    frame_1 = list(zip(f_1, f_1))
    frame_2 = list(zip(f_2, f_2))
    
    val_1 = {k: v for k, v in enumerate(frame_1)}
    val_2 = {k: v for k, v in enumerate(frame_2)}

    # Compute pairwise distances
    distances = {}
    for i in range(len(frame_1)):
        for j in range(len(frame_2)):
            # Euclidean distance between two people
            distances[i, j] = np.sqrt(
                (val_1[i][0] - val_2[j][0]) ** 2 + (val_1[i][1] - val_2[j][1]) ** 2)

    # Assigned ids from frame 1 (reference frame), {id_i: vel_i}
    velocities = dict()

    # Assigned ids from frame 2 (with values as match in frame 1)
    targets = dict()
    num_assigned = 0
    num_ids = min(len(frame_1), len(frame_2))

    # Sort distances by key: (id in frame 1, id in frame 2)
    pairs = sorted(distances.items(), key=lambda v:v[1])
    for p, dist in pairs:
        # Stop assigning ids when the distance exceeds a user-defined threshold
        # i.e. this covers the case when a person leaves one end of the image
        # and another person enters at the opposite side. We should not match
        # these ids to each other.
        if dist > threshold:
            break

        # Found closest ids between frames
        if p[0] not in velocities and p[1] not in targets and num_assigned < num_ids:
            num_assigned += 1
            # Velocity (distance units per second)
            velocities[p[0]] = dist * fps
            targets[p[1]] = p[0]

            
    return [float(v) for v in velocities.values()]

  
def dfs_all(graph):
    def dfs(node, graph):
        stack = [node]
        cc = [node]
        while stack:
            u = stack.pop()
            for v in graph[u]:
                if not visited[v]:
                    visited[v] = True
                    cc.append(v)
                    stack.append(v)
        return cc

    ccs = []
    visited = [False for _ in range(len(graph))]
    for i in range(len(graph)):
        if not visited[i]:
            visited[i] = True
            cc = dfs(i, graph)
            ccs.append(cc)
    return list(map(len, ccs))

  
def compute_groups(positions, threshold=0.1):
    p_1 = iter(positions)
    positions = list(zip(p_1, p_1))
                   
    # Compute pairwise distances
    graph = {i: set() for i in range(len(positions))}
    for i in range(len(positions)):
        for j in range(i, len(positions)):
            # Euclidean distance between two people
            dist = np.sqrt(
                (positions[i][0] - positions[j][0]) ** 2 + (positions[i][1] - positions[j][1]) ** 2)
            # Add edge to graph 
            if dist < threshold:
                graph[i].add(j)
                graph[j].add(i)
    lengths = dfs_all(graph)           
    return lengths

# UDFs

In [0]:
count_udf = udf(count, IntegerType())
sum_udf = udf(sum_vals, DoubleType())
avg_udf = udf(lambda arr: avg_vals(arr), DoubleType())
center_udf = udf(get_center, ArrayType(FloatType()))
velocity_udf = udf(lambda arr: compute_velocities(arr), ArrayType(DoubleType()))
group_udf = udf(compute_groups, ArrayType(IntegerType()))
x_udf = udf(get_x, ArrayType(DoubleType()))
y_udf = udf(get_y, ArrayType(DoubleType()))
flattenUdf = udf(fudf, ArrayType(DoubleType()))

# Window size for pairwise shifting

In [0]:
w_pair = Window().partitionBy().orderBy(col("frame"))

# Create and Modify Columns

In [68]:
df = (df.withColumn('num_people', count_udf('scores'))
        .withColumn('centers', center_udf('bboxes'))
        .withColumn('x_centers', x_udf('centers'))
        .withColumn('y_centers', y_udf('centers'))
        .withColumn('group_sizes', group_udf('centers'))
        .withColumn('num_groups', count_udf('group_sizes'))
        .withColumn('next_frame_centers', lag("centers", -1).over(w_pair)).na.drop()
        .withColumn('velocities', velocity_udf(struct('centers', 'next_frame_centers')))
        .withColumn('num_velocities', count_udf('velocities'))
        .withColumn('sum_velocities', sum_udf('velocities')))
        
df.show()

+-------------------+--------------------+--------------------+----------+--------------------+------------------+----------+--------------------+--------------------+--------------+-------------------+--------------------+--------------------+
|              frame|              bboxes|              scores|num_people|             centers|       group_sizes|num_groups|  next_frame_centers|          velocities|num_velocities|     sum_velocities|           x_centers|           y_centers|
+-------------------+--------------------+--------------------+----------+--------------------+------------------+----------+--------------------+--------------------+--------------+-------------------+--------------------+--------------------+
|1970-01-01 00:00:00|[0.59336024522781...|[0.93980032205581...|         9|[0.543, 0.636, 0....|         [3, 3, 3]|         3|[0.512, 0.674, 0....|[0.00773177782449...|             8|0.24668608255553603|[0.543, 0.226, 0....|[0.636, 0.479, 0....|
|1970-01-01 00:00:01

# Aggregate each 5 minute window to compute:
- average number of people detected
- average group size
- average velocity

In [0]:
# 0.7 frames/second => 210 frames for 300 seconds (5 minutes)
# 2.4 frames/second => 720 frames for 300 seconds (5 minutes)
agg_df = (df.groupBy(window("frame", windowDuration="3 seconds", slideDuration="3 seconds"))
           .agg(F.sum('num_people'),
                F.sum('num_groups'),
                F.sum('sum_velocities'),
                F.sum('num_velocities'),
                avg('num_people'),
                collect_list('x_centers'),
                collect_list('y_centers'))
           .withColumn('avg_group_size', avg_udf(struct('sum(num_people)', 'sum(num_groups)')))
           .withColumn('avg_velocity', avg_udf(struct('sum(sum_velocities)', 'sum(num_velocities)')))
           .withColumnRenamed('avg(num_people)', 'avg_num_people')
           .withColumn('x_centers', flattenUdf('collect_list(x_centers)'))
           .withColumn('y_centers', flattenUdf('collect_list(y_centers)'))
           .drop('collect_list(x_centers)')
           .drop('collect_list(y_centers)')
           .drop('sum(num_people)')
           .drop('sum(num_groups)')
           .drop('sum(sum_velocities)')
           .drop('sum(num_velocities)')
           .orderBy('window'))

In [88]:
agg_df.show()

+--------------------+-----------------+------------------+--------------------+--------------------+--------------------+
|              window|   avg_num_people|    avg_group_size|        avg_velocity|           x_centers|           y_centers|
+--------------------+-----------------+------------------+--------------------+--------------------+--------------------+
|[1970-01-01 00:00...|9.666666666666666| 2.230769230769231| 0.05146267625497401|[0.543, 0.226, 0....|[0.636, 0.479, 0....|
|[1970-01-01 00:00...|8.666666666666666|             1.625| 0.04999083798698387|[0.349, 0.638, 0....|[0.846, 0.546, 0....|
|[1970-01-01 00:00...|              7.0|1.6153846153846154| 0.07121197529844026|[0.483, 0.377, 0....|[0.627, 0.832, 0....|
|[1970-01-01 00:00...|6.333333333333333|1.4615384615384615| 0.09851995530730731|[0.339, 0.439, 0....|[0.858, 0.636, 0....|
|[1970-01-01 00:00...|8.666666666666666|               2.0| 0.05999229139523885|[0.597, 0.734, 0....|[0.895, 0.448, 0....|
|[1970-01-01 00:

In [0]:
pdf = agg_df.toPandas()

In [90]:
pdf

Unnamed: 0,window,avg_num_people,avg_group_size,avg_velocity,x_centers,y_centers
0,"(1970-01-01 00:00:00, 1970-01-01 00:00:03)",9.666667,2.230769,0.051463,"[0.543, 0.226, 0.568, 0.591, 0.737, 0.698, 0.7...","[0.636, 0.479, 0.63, 0.642, 0.52, 0.405, 0.428..."
1,"(1970-01-01 00:00:03, 1970-01-01 00:00:06)",8.666667,1.625,0.049991,"[0.349, 0.638, 0.462, 0.46, 0.734, 0.554, 0.55...","[0.846, 0.546, 0.364, 0.365, 0.495, 0.535, 0.3..."
2,"(1970-01-01 00:00:06, 1970-01-01 00:00:09)",7.0,1.615385,0.071212,"[0.483, 0.377, 0.411, 0.406, 0.603, 0.403, 0.7...","[0.627, 0.832, 0.859, 0.391, 0.599, 0.388, 0.4..."
3,"(1970-01-01 00:00:09, 1970-01-01 00:00:12)",6.333333,1.461538,0.09852,"[0.339, 0.439, 0.369, 0.59, 0.378, 0.765, 0.27...","[0.858, 0.636, 0.883, 0.613, 0.379, 0.443, 0.4..."
4,"(1970-01-01 00:00:12, 1970-01-01 00:00:15)",8.666667,2.0,0.059992,"[0.597, 0.734, 0.249, 0.769, 0.226, 0.73, 0.26...","[0.895, 0.448, 0.395, 0.529, 0.522, 0.448, 0.5..."
5,"(1970-01-01 00:00:15, 1970-01-01 00:00:18)",7.333333,2.444444,0.062164,"[0.472, 0.195, 0.502, 0.612, 0.657, 0.621, 0.4...","[0.737, 0.438, 0.75, 0.588, 0.54, 0.591, 0.377..."
6,"(1970-01-01 00:00:18, 1970-01-01 00:00:21)",6.666667,1.818182,0.087983,"[0.61, 0.774, 0.606, 0.77, 0.506, 0.378, 0.622...","[0.515, 0.488, 0.516, 0.49, 0.294, 0.341, 0.55..."
7,"(1970-01-01 00:00:21, 1970-01-01 00:00:24)",9.0,2.25,0.086272,"[0.545, 0.263, 0.566, 0.565, 0.199, 0.274, 0.5...","[0.658, 0.48, 0.646, 0.385, 0.471, 0.406, 0.38..."
8,"(1970-01-01 00:00:24, 1970-01-01 00:00:27)",8.0,2.666667,0.087715,"[0.612, 0.748, 0.608, 0.706, 0.527, 0.524, 0.7...","[0.49, 0.468, 0.483, 0.421, 0.306, 0.304, 0.42..."
9,"(2242-03-16 12:56:30, 2242-03-16 12:56:33)",7.0,2.333333,0.099536,"[0.437, 0.553, 0.457, 0.397, 0.477, 0.452, 0.622]","[0.749, 0.373, 0.389, 0.545, 0.553, 0.391, 0.375]"


# Visualization

In [81]:
from pathlib import Path

from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource, HoverTool
from bokeh.models.widgets import Tabs, Panel

output_notebook()
pdf.index.name = 'index'

colors = ['red', 'blue', 'green']
colnames = ['avg_velocity', 'avg_group_size', 'avg_num_people']
plot_titles = ['Average Velocities During the Day',
               'Average Group Size During the Day',
               'Average Number of People During the Day']
tab_titles = ['Velocity', 'Group Size', 'Number of People']
ylabels = ['Average Velocity (m/s)',
           'Average Group size (number of people)',
           'Average Number of People']

# Create panels for each tab of the visualization
panels = []
for i in range(len(colors)):
    hover = HoverTool()
    hover.tooltips = [('Timestamp', '@{}'.format(colnames[i]))]
    p = figure(title=plot_titles[i],
               plot_height=500, 
               plot_width=500,
               tools=[hover, "pan,reset,wheel_zoom"])
    p.vbar(x='index', 
           top=colnames[i],
           width=0.9,
           color=colors[i],
           source=pdf)
    p.xaxis.axis_label = "Time of Day (5 minute windows)"
    p.yaxis.axis_label = ylabels[i]
    panels.append(Panel(child=p, title=tab_titles[i]))

tabs = Tabs(tabs=panels)
show(tabs)
