# Mobility analysis from mobile phone data

The goal of this notebook is to analyse a large amount of raw mobile phone data. The target outcome is a mobility matrix that describes mobility between all antennas in the data set.

In [None]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

Create a spark session:

In [None]:
spark = (
    SparkSession.builder.master("local[*]")
    .config("spark.driver.memory", "32g")
    .appName("mobility_RJ")
    .getOrCreate()
)

Define the data schema used in the dataset. Our rows are of the form `timestamp, userid, zip1, zip2, lat, lon`:

In [None]:
from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType, LongType

schema = (
    StructType()
    .add("time", LongType(), True)
    .add("user", StringType(), True)
    .add("zip1", IntegerType(), True)
    .add("zip2", IntegerType(), True)
    .add("lat", DoubleType(), True)
    .add("lon", DoubleType(), True)
)

Load the dataset from the csv files located in the `data` directory:

In [None]:
data = spark.read.csv("../data", sep="|", schema=schema)

To get a grasp of how good we perform, we print the number of data points once:

In [None]:
data.count()

As a first preprocessing step, we try to clean up the data by introducing nice consecutive indices for antennas. To do so, we create a mapping of `hash(lat, lon) -> idx` such that the index `idx` is consecutive across the antennas that are present in the data. The mapping is stored as a Python `dict` on the frontend but also distributed back to the cluster for use in further data transformations. The first step could be done once and loaded from disk when you are sure that you all antennas are included. The second step needs to be performed even with the mapping being loaded from disk.

In [None]:
antennas_dict = dict(
    data.rdd.map(lambda row: hash((row["lat"], row["lon"])))
    .distinct()
    .zipWithIndex()
    .collect()
)

In [None]:
antennas = spark.sparkContext.broadcast(antennas_dict)

Next, we replace the `lat` and `lon` field in the original RDD with above index and at the same time drop unnecessary data. Note that this RDD is never `collect`ed, which means that the entire evaluation is lazy and will be executed in one sweep with the follow-up data transformations. After this transformation, the rows are of the following form: `userid, (timestamp, antennaid)`:

In [None]:
preprocessed = data.rdd.map(
    lambda row: (
        row["user"],
        (row["time"], antennas.value[hash((row["lat"], row["lon"]))]),
    )
)

The next transformation is the cornerstone of the analysis as it does the tracking of all users in a single dataset sweep. After the grouping operation, we drop the userid as it is not needed anymore. The data then has the form `List[(timestamp, antenna_id)]` with one row per user.

In [None]:
grouped = preprocessed.groupByKey().map(lambda row: row[1].data)

Next, we want to identify only transitions between antennas. The following function extracts transitions for a single user. I am not entirely sure how exactly the sorting should be implemented, maybe it is better to do in spark directly. The code with `zip` is a bit weird, but I think it is the shortest code to do the task.

In [None]:
def extract_transitions(events):
    # Is this the correct sorting criterion?
    sorted_events = sorted(events, key=lambda e: e[0])
    ret = []
    for a, b in zip(sorted_events[:-1], sorted_events[1:]):
        ret.append((a[1], b[1]))
    return ret

Next, we find all transitions in the dataset. The rows in our dataset are of the form `antenna1, antenna2` with one row per registered transition. Note that we still have not `collect`ed the result!

In [None]:
transitions = grouped.flatMap(extract_transitions)

Finally, we count the transitions for all pairs of antennas. The `countByValue` operations does an implicit collect (no idea why):

In [None]:
counts = transitions.countByValue()

These counts can be fed into a dense `numpy` data structure:

In [None]:
antenna_map = np.full((len(antennas_dict), len(antennas_dict)), 0)
entries = np.array([(i0, i1, v) for (i0, i1), v in counts.items()])
antenna_map[entries[:, 0], entries[:, 1]] = entries[:, 2]

Finally, we export this map for further use in other scripts:

In [None]:
np.save("antenna_map.npy", antenna_map)