
----

# Limitations and challenges in mobility data: skewness


In [None]:
from IPython.display import Image

# Set to True to render the kepler map or altair charts when running the notebook in local
# If False, screenshots will be shown instead
INTERACTIVE_OUTPUT = False

# If the platform allows to render gifs, set to True:
RENDERING_GIF = False

# Scaffolding to save the maps in html format:
SAVE_HTML_MAPS = False

## Skewness


- **We can extract the dataset with pyspark**


In [None]:
import datetime as dt

from pyspark.sql import functions as F
from pyspark.sql.functions import col, input_file_name


def extract_and_save_to(destination: str = "dois_in_sydney.parquet") -> None:  # noqa: F811 redefinition
    """A function for databricks."""

    # move the import statements out of the function when running on databricks

    file_location = "<s3 location here>.*.csv.gz"

    # Read the data
    df_data = (
        spark.read.format("csv")
        .option("inferSchema", "true")
        .option("header", "false")
        .option("sep", "|")
        .load(file_location)
        .withColumn("filename", input_file_name())
    )
    df_data = (
        df_data.withColumnRenamed("_c0", "VRID")
        .withColumnRenamed("_c1", "MAID")
        .withColumnRenamed("_c2", "device_type")
        .withColumnRenamed("_c3", "timestamp")
        .withColumnRenamed("_c4", "time_zone")
        .withColumnRenamed("_c5", "lat")
        .withColumnRenamed("_c6", "lon")
    )
    df_data = df_data.select(
        col("VRID"),
        col("MAID"),
        col("device_type"),
        col("timestamp"),
        col("time_zone"),
        col("lat"),
        col("lon"),
    )

    # take only the pings in the bbox:
    min_lon, min_lat, max_lon, max_lat = 151.073941, -33.966366, 151.298097, -33.804344
    df_bbox = df_data.filter(
        (F.col("lon") >= min_lon) & (F.col("lon") <= max_lon) & (F.col("lat") >= min_lat) & (F.col("lat") <= max_lat)
    )

    # Time filter - milliseconds
    str_min_date = "2022-06-01 00:00:00+00:00"
    str_max_date = "2022-06-30 23:59:59+00:00"

    timestamp_min_time_msec = int(dt.datetime.strptime(str_min_date, "%Y-%m-%d %H:%M:%S%z").timestamp()) * 1000
    timestamp_max_time_msec = int(dt.datetime.strptime(str_max_date, "%Y-%m-%d %H:%M:%S%z").timestamp()) * 1000

    df_bbox_tfilter = df_bbox.filter(
        (F.col("timestamp") >= timestamp_min_time_msec) & (F.col("timestamp") <= timestamp_max_time_msec)
    )

    # take all the pings from devices that have appeared at least once in the bbox (devices of interests):
    unique_devices = df_bbox_tfilter.select("MAID").distinct()
    df_data = df_data.alias("df_data")
    df_pings_of_interest = df_data.join(unique_devices, df_data.MAID == unique_devices.MAID).select("df_data.*")

    # Save to a parquet file for future use:
    df_pings_of_interest.write.parquet((destination), "overwrite")

- **Or we can extract the dataset with dfipy** (with way less number of rows)

In [None]:
from datetime import datetime

from dfi import Client


def extract_and_save_to(destination: str = "dois_in_sydney.parquet") -> None:  # noqa: F811 redefinition
    """Function for any notebook with dfi credentials."""

    dfi = Client(
        api_token="<api token>",
        namespace="<namespace>",
        instance_name="<instance name>",
        base_url="<base url>",
        progress_bar=True,
    )

    min_lon, min_lat, max_lon, max_lat = 151.073941, -33.966366, 151.298097, -33.804344
    start_time = datetime(2022, 6, 1, 24, 0, 0)
    end_time = datetime(2022, 6, 30, 23, 59, 59)

    unique_entities = dfi.get.entities(
        geometry=[min_lon, min_lat, max_lon, max_lat],
        time_interval=(start_time, end_time),
    )

    df_doi = dfi.get.records(entities=unique_entities)

    df_doi.to_parquet(destination)

In [None]:
if INTERACTIVE_OUTPUT:
    # Run time ~ 3 minutes
    from copy import deepcopy

    import geopandas as gpd
    import h3
    import pandas as pd
    from keplergl import KeplerGl
    from shapely.geometry import Polygon

    # after running extract_and_save_to on the selected dataset:
    df = pd.read_parquet("dois_in_sydney.parquet")

    df["hex_id"] = df.apply(lambda row: h3.geo_to_h3(row["lat"], row["lon"], 8), axis=1)
    df_hex_bin = pd.DataFrame({"num_pings": df.groupby("hex_id").size()}).reset_index(drop=False)

    geometries = [Polygon(h3.h3_to_geo_boundary(idx, geo_json=True)) for idx in df_hex_bin["hex_id"]]
    gdf_grouped = gpd.GeoDataFrame(df_hex_bin, geometry=geometries)[["num_pings", "geometry"]]

    min_lon, min_lat, max_lon, max_lat = 151.073941, -33.966366, 151.298097, -33.804344

    gdf_bbox = gpd.GeoDataFrame(
        geometry=[
            Polygon(
                (
                    (max_lon, max_lat),
                    (max_lon, min_lat),
                    (min_lon, min_lat),
                    (min_lon, max_lat),
                    (max_lon, max_lat),
                )
            )
        ]
    )

    from kepler_configs import config_heatmap

    kmap = KeplerGl(
        data={"hexes": deepcopy(gdf_grouped), "bbox": deepcopy(gdf_bbox)},
        height=1200,
        config=config_heatmap,
    )

    if SAVE_HTML_MAPS:
        kmap.save_to_html(
            config=kmap.config.copy(),
            file_name="maps/skewness_0_data_presentation.html",
        )

else:
    display(Image("images/skewness_0_data_presentation.png"))

In [None]:
if INTERACTIVE_OUTPUT:
    import altair as alt
    import numpy as np

    df_pings_per_device = pd.DataFrame(df.groupby("MAID").size()).reset_index().rename(columns={0: "num"})
    df_pings_per_device.head()

    bins = np.arange(2, df_pings_per_device.num.max(), df_pings_per_device.num.max() / 100)
    binned = (
        df_pings_per_device.groupby(pd.cut(df_pings_per_device.num, bins=bins)).count()[["MAID"]].reset_index(drop=True)
    )

    binned["bin_min"] = bins[:-1]
    binned["bin_max"] = bins[1:]

    alt.themes.enable("dark")

    chart = (
        alt.Chart(binned, title="Number of pings per device")
        .mark_bar()
        .encode(
            x=alt.X("bin_min", bin="binned", title="number of pings"),
            x2="bin_max",
            y=alt.Y(
                "MAID",
                scale=alt.Scale(type="symlog"),
                title="number of devices per pings",
            ),
        )
        .properties(width=700, height=800)
        .configure_axis(
            labelFontSize=20,
            titleFontSize=20,
        )
        .configure_title(
            fontSize=20,
            anchor="start",
        )
        .interactive(True)
    )

    display(chart)

else:
    display(Image("images/skewness_1_pings_per_device.png"))