In [None]:
import random
import statistics
from collections import defaultdict
from datetime import datetime, timedelta, date

import matplotlib.pyplot as plt
from oauthlib.oauth2 import BackendApplicationClient
from requests_oauthlib import OAuth2Session
import os

In [None]:
BASE_URL = "https://api.swisscom.com/layer/heatmaps/demo"
TOKEN_URL = "https://consent.swisscom.com/o/oauth2/token"
MAX_NB_TILES_REQUEST = 100
headers = {"scs-version": "2"}  # API version
client_id = os.environ.get("SWISSCOM_KEY","") # customer key in the Swisscom digital market place
client_secret = os.environ.get("SWISSCOM_SECRET","") # customer secret in the Swisscom digital market place

assert client_id, "client id not defined"
assert client_secret, "client_secret not defined"

In [None]:
def compute_density_baseline(tile_ids: list,
                             start_date=date(year=2020, month=1, day=27),
                             nb_days: int = 29) -> dict:
    # The baseline is the median value, for the corresponding day of the week,
    # during the nb_days following start_date"
    day2densities = defaultdict(list)
    for delta in range(nb_days):
        dt = start_date + timedelta(days=delta)
        day2densities[dt.weekday()].append(get_daily_density(dt, tile_ids))
    weekday2density = {wday: statistics.median(v)
                       for wday, v in day2densities.items()}
    return weekday2density


def get_daily_demographics(date: date, tile_ids: list) -> dict:
    api_request = oauth.get(BASE_URL + "/heatmaps/dwell-demographics/daily/{0}".format(date.isoformat()),
                            headers=headers, params={'tiles': tile_ids})
    f = open('daily_demographics.json', 'w+')
    f.write(api_request.text)
    f.close()
    return api_request.json()


def get_daily_density(from_date: date, tile_ids: list) -> float:
    hacker_date = date(year=2020, month=1, day=27)
    api_request = oauth.get(BASE_URL + "/heatmaps/dwell-density/daily/{0}".format(hacker_date.isoformat()),
                            headers=headers, params={'tiles': tile_ids})
    tiles_json = api_request.json()
    f = open('get_daily_density.json', 'w+')
    f.write(api_request.text)
    f.close()
    #print(from_date, tile_ids)
    #print(tiles_json)
    tiles_date = tiles_json['tiles']
    return sum([t['score'] for t in tiles_date])


def get_bezirk_tiles(bezirk_id: int):
    api_request = oauth.get(BASE_URL + "/grids/districts/%d" % bezirk_id,
                           headers=headers)
    f = open('bezirk_tiles.json', 'w+')
    f.write(api_request.text)
    f.close()
    print(api_request.text)


def get_density_variation(date: date, tile_ids: list, weekday2density: dict) -> float:
    daily_score_date = get_daily_density(date, tile_ids)
    density_baseline = weekday2density[date.weekday()]
    variation = (daily_score_date - density_baseline) / density_baseline
    return 100*variation


def get_tile_ids_postal_code(postal_code: int) -> list:
    # Randomly sample MAX_NB_TILES_REQUEST tile ids
    # associated with the postal code of interest
    url = BASE_URL + "/grids/postal-code-areas/{0}".format(postal_code)
    tiles_raw = oauth.get(url, headers=headers).json()
    if "message" in tiles_raw:
        print(tiles_raw["message"])
        return None
    tiles_json = tiles_raw #.json()
    tile_ids = random.sample([t["tileId"]
                              for t in tiles_json["tiles"]], MAX_NB_TILES_REQUEST)
    return tile_ids


def get_density_variation_time_period(tile_ids, start_date, nb_days) -> dict:
    weekday2density = compute_density_baseline(tile_ids)
    date2variation = dict()
    #if True:
    #    return get_density_variation(start_date, tile_ids, weekday2density)
    for delta in range(nb_days):
        dt = start_date + timedelta(days=delta)
        date2variation[dt] = get_density_variation(
            dt, tile_ids, weekday2density)
    return date2variation


def plot_density_variation_tile_ids(tile_ids, start_date, nb_days):
    date2variation = get_density_variation_time_period(
        tile_ids, start_date, nb_days)
    dates, scores = zip(*sorted(date2variation.items()))
    fig, ax = plt.subplots()
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.yaxis.grid()
    plt.ylim(-100, 100)
    plt.fill_between(dates, scores, alpha=0.5,
                     joinstyle="round", color='tab:blue')
    fig.autofmt_xdate(rotation=45)
    plt.ylabel("Percentage variation")
    fig.tight_layout()
    plt.show()


In [None]:
get_bezirk_tiles(311)

In [None]:
# The following base url is associated with the standard plan
# For the demo plan, you need replace the word `standard`
# by `demo` in the URL. Note that the demo plan data is limited 
# to only 27/01/2020, which requires modifications to the code below.

# Fetch an access token
client = BackendApplicationClient(client_id=client_id)
oauth = OAuth2Session(client=client)
oauth.fetch_token(token_url=TOKEN_URL, client_id=client_id,
                  client_secret=client_secret)
assert oauth.authorized


In [None]:
postal_code = 6020
start_date = date(year=2020, month=1, day=27)

tile_ids = get_tile_ids_postal_code(postal_code)
assert tile_ids

get_daily_demographics(start_date, tile_ids)

In [None]:
plot_density_variation_tile_ids(tile_ids, start_date=start_date, nb_days=1)

In [None]:
print(start_date)