# Get all the necessary data

In [1]:
from json2xml.utils import readfromurl    # to convert the json from the API into a python list
import json    # to save the data as json in a file
import os.path    # to check if their's a local file with the API-data or if a new API pull is inevitable
import datetime    # to convert linux time to readable time

## Control

In [2]:
# pulls new data about the shapes of the counties from the API 
# (or takes old, local backup from API, depends on the variable counties_geography_new_pull_from_api)
# and polishes it again
counties_geography_use_polished_data = True
# pulls new data about the covid 19 cases of the counties from the API 
# (or takes old, local backup from API, depends on the variable covid19_use_api_backup)
# and polishes it again
covid19_use_api = False
covid19_use_api_backup = False
# takes old, already polished data
covid19_use_polished_data = True

### Control the controls

In [3]:
# check if a polished version of the german counties covid19 cases exists
# check if a local pull of the API exists otherwise initiate a new pull from the API
if (not(os.path.isfile("modified_data/german_covid19.txt")) and 
    not(os.path.isfile("unmodified_data/covid19/dates.txt"))):    # no files
    covid19_use_polished_data = False
    covid19_use_api = True
elif not(os.path.isfile("modified_data/german_covid19.txt")):    # no polished
    covid19_use_polished_data = False
    # ensuring that one of the others is used
    covid19_use_api_backup = not(covid19_use_api)
elif not(os.path.isfile("unmodified_data/covid19/dates.txt")):    # no backup
    covid19_use_api_backup = False
    # ensuring that one of the others is used
    covid19_use_polished_data = not(covid19_use_api)

## Collect all data

In [4]:
# If the data does not contain that many counties,
# the program raises an error and pulls from a local backup of the API pull
number_of_counties = 412

### Get county shapes of the german counties
Either use old polished version or make new pull from API and polish it.
Depending on the variable counties_geography_use_polished_data in the controls of this file.

In [5]:
# check if a polished version of the german counties shapes exists
# and otherwise initiate a new pull from the API
if not(os.path.isfile("modified_data/german_counties_geography.txt")):
    counties_geography_use_polished_data = False

In [6]:
if counties_geography_use_polished_data:
    with open("modified_data/german_counties_geography.txt", "r") as file:
        counties_geography = json.loads(file.read())
    print("Polished county data from file is ready to go!")
else:
    no_outputs_from_file_get_shapes_of_german_counties = True
    %run get_shapes_of_german_counties.ipynb
    print("Data from (maybe old) API-pull is ready to go!")

Data directly from API is ready to go!
Data from (maybe old) API-pull is ready to go!


### Get covid19 data of the german counties
Either use old polished version or make new pull from API and polish it.
Depending on the variable covid19_use_polished_data in the controls of this file.
<br/>
<br/>
Get the correct URLs to the arcgis server:
We must pull all counties separatedly because the API only allows 1000 datapoints at a time.
<br/>
<br/>
This part is not outsourced because it is shorter than the pulling and polishing of the shape data and to ensure that the shape data is availlable to compare length and content.


In [7]:
def url_county(AdmUnitID, True_for_dates_False_for_covid19_cases):
    url = ("https://services7.arcgis.com/mOBPykOjAyBO2ZKk/arcgis/rest/services/" +
           "rki_history_hubv/FeatureServer/0/query?where=AdmUnitId%3D" +
           str(AdmUnitID) + "&outFields=")
    if True_for_dates_False_for_covid19_cases:
        return url + "Datum&orderByFields=Datum&f=pjson"
    return url + "KumFall&orderByFields=Datum&f=pjson"

In [8]:
def find_alternative_source_of_data_and_activate_it():
    global covid19_use_api
    global covid19_use_api_backup
    global covid19_use_polished_data
    global copy_of_covid19_for_debugging_purposes
    global covid19
    global non_county_specific_data
    copy_of_non_county_specific_data_for_debugging_purposes = non_county_specific_data.copy()
    copy_of_covid19_for_debugging_purposes = covid19.copy()
    del non_county_specific_data    # to prevent accidentall use of faulty data
    del covid19    # to prevent accidentall use of faulty data
    # check if a local pull of the API exists otherwise use the polished data
    if os.path.isfile("unmodified_data/covid19/dates.txt"):
        covid19_use_api_backup = True
    if os.path.isfile("modified_data/german_covid19.txt"):
        covid19_use_polished_data = True
    # neither local backup nor polished data found
    if not(covid19_use_api_backup) and not(covid19_use_polished_data):
        raise Exception("No usable data found!")

In [9]:
# check if new pull from the API is necessary or wished and
# if it is even possible otherwise "pull" from local backup
if covid19_use_api:
    print("Pulling from API...")
    covid19 = dict()
    non_county_specific_data = dict()
    # check if the needed directory is availlable - otherwise create it
    if not(os.path.isdir("unmodified_data/covid19")): os.makedirs("unmodified_data/covid19")
    number_of_timestamps = -1
    
    # get data - every county must be called individually because of the Max Record Count of the API
    for AdmUnitID in list(counties_geography.keys()):
        # get dates of first county
        if number_of_timestamps == -1:
            raw_dates = readfromurl(url_county(AdmUnitID, True))
            if len(raw_dates['features']) < 200:
                print("The dates of {} sends to little timestamps ({}) - check the url"
                      .format(AdmUnitID, len(raw_dates['features'])))
                find_alternative_source_of_data_and_activate_it()
                covid19_use_api = False
                break
            number_of_timestamps = len(raw_dates['features'])
            non_county_specific_data['unixtime'] = [e['attributes']['Datum'] for e in raw_dates['features']]
            # save raw data
            with open("unmodified_data/covid19/dates.txt", "w") as file:
                file.write(json.dumps(raw_dates))

        # get countys covid19 data
        raw_covid19_data = readfromurl(url_county(AdmUnitID, False))
        if number_of_timestamps != len(raw_covid19_data['features']):
            print("The provided data from the API does not have the same number of timestamps of " +
                  "{}, it has {}.".format(number_of_timestamps, len(raw_covid19_data['features'])))
            find_alternative_source_of_data_and_activate_it()
            covid19_use_api = False
            break
        covid19[AdmUnitID] = dict()
        covid19[AdmUnitID]['cases'] = [e['attributes']['KumFall'] for e in raw_covid19_data['features']]
        with open("unmodified_data/covid19/" + AdmUnitID + ".txt", "w") as file:
            file.write(json.dumps(raw_covid19_data))
        
    if covid19_use_api:
        print("Covid19 Data directly from API is ready to go!")

Pulling from API...
Covid19 Data directly from API is ready to go!


In [10]:
# Use data from local backup originating from old API pull
# covid19_use_api could be modified in the if-statement - therefore no else-statement here
if not(covid19_use_api) and covid19_use_api_backup:
    print("Reading backup of old API pull...")
    covid19 = dict()
    non_county_specific_data = dict()
    list_of_countys = list(counties_geography.keys())
    # get the dates
    with open("unmodified_data/covid19/dates.txt", "r") as file:
        raw_dates = json.loads(file.read())
    non_county_specific_data['unixtime'] = [e['attributes']['Datum'] for e in raw_dates['features']]
    number_of_timestamps = len(non_county_specific_data['unixtime'])

    for root, dirs, files in os.walk('unmodified_data/covid19'):
        # to little dates - something is wrong. Checking here to skip for-loop
        if len(raw_dates['features']) < 200:
            print("There are only {} dates - check your backup or make a new pull from the api."
                  .format(len(raw_dates['features'])))
            find_alternative_source_of_data_and_activate_it()
            covid19_use_api = False
            break
        for filename in files:
            AdmUnitID = filename[:-4]
            if AdmUnitID == 'dates':    # already done
                continue

            list_of_countys.remove(AdmUnitID)
            covid19[AdmUnitID] = dict()
            with open(os.path.join(root, filename), "r") as file:
                covid19[AdmUnitID]['cases'] = [e['attributes']['KumFall'] for e in json.loads(file.read())['features']]

            if number_of_timestamps != len(covid19[AdmUnitID]['cases']):
                print("The data from file {} does not have {} timestamps, it has {}."
                      .format(filename, number_of_timestamps, len(covid19[AdmUnitID])))
                find_alternative_source_of_data_and_activate_it()
                covid19_use_api_backup = False
                break

    if len(list_of_countys) > 0 and covid19_use_api_backup:
        print("No backup found for {}".format(list_of_countys))
        find_alternative_source_of_data_and_activate_it()
        covid19_use_api_backup = False

    if covid19_use_api_backup:
        print("Covid19 Data from (maybe old) API-pull-backup is ready to go!")

### Get AdmUnitID of the german federal states
This data is harcoded because it is unlikely to change. Even if the names of the federal states get outdatet and don't fit the current official name the functionality of this project will not be affected.
The names originate from arcgis.
https://services7.arcgis.com/mOBPykOjAyBO2ZKk/arcgis/rest/services/rki_admunit_hubv/FeatureServer/0/query?where=AdmUnitId%3C20&resultType=none&outFields=*&f=pjson

In [11]:
if not(covid19_use_polished_data):
    non_county_specific_data['states'] = {
        "1":"Schleswig-Holstein",
        "2":"Hamburg",
        "3" : "Niedersachsen",
        "4" : "Bremen",
        "5" : "Nordrhein-Westfalen",
        "6" : "Hessen",
        "7" : "Rheinland-Pfalz",
        "8" : "Baden-Württemberg",
        "9" : "Bayern",
        "10" : "Saarland",
        "11" :  "Berlin",
        "12" : "Brandenburg",
        "13" : "Mecklenburg-Vorpommern",
        "14" : "Sachsen",
        "15" : "Sachsen-Anhalt",
        "16" : "Thüringen"}

### Calculate seven days incidence and	population density and find highest numbers

In [12]:
if not(covid19_use_polished_data):
    non_county_specific_data['highest_case_number'] = 0
    non_county_specific_data['lowest_case_number'] = 100000000000000
    non_county_specific_data['highest_incidence'] = 0
    non_county_specific_data['lowest_incidence'] = 100000000000000
    for AdmUnitID in covid19.keys():
        covid19[AdmUnitID]['incidences'] = list()
        for timestamp in range(len(covid19[AdmUnitID]['cases'])):
            cases_7_days_prior = 0
            cases_on_day = covid19[AdmUnitID]['cases'][timestamp]
            if timestamp >= 7:
                cases_7_days_prior = covid19[AdmUnitID]['cases'][timestamp - 7]
            incidence = (((cases_on_day - cases_7_days_prior) * 100000) /
                         counties_geography[AdmUnitID]['population'])
            covid19[AdmUnitID]['incidences'].append(incidence)
            if non_county_specific_data['highest_case_number'] < cases_on_day:
                non_county_specific_data['highest_case_number'] = cases_on_day
            if non_county_specific_data['lowest_case_number'] > cases_on_day:
                non_county_specific_data['lowest_case_number'] = cases_on_day
            if non_county_specific_data['highest_incidence'] < incidence:
                non_county_specific_data['highest_incidence'] = incidence
            if non_county_specific_data['lowest_incidence'] > incidence:
                non_county_specific_data['lowest_incidence'] = incidence

In [13]:
if not(covid19_use_polished_data):
    # is calculated here instead inside the get_shapes_of_german_counties.ipynb-file
    # to be able to put it together in one dictionary non_county_specific_data
    non_county_specific_data['highest_population_density'] = 0
    non_county_specific_data['lowest_population_density'] = 100000000000000
    for county in counties_geography.values():
        county["population_density"] = (county['population'] * 1000000)/county['area_in_m2']
        if non_county_specific_data['highest_population_density'] < county["population_density"]:
            non_county_specific_data['highest_population_density'] = county["population_density"]
        if non_county_specific_data['lowest_population_density'] > county["population_density"]:
            non_county_specific_data['lowest_population_density'] = county["population_density"]

### get the polished data

In [14]:
if covid19_use_polished_data:
    with open("modified_data/german_covid19.txt", "r") as file:
        covid19, non_county_specific_data = json.loads(file.read())
    print("Polished covid19 data from file is ready to go!")

### save polished covid19 data

In [15]:
def check_polished_data():
    result = True    # Assume everything is correct
    if len(covid19) != number_of_counties:
        print("covid19 has not the right amount of counties: {} instead of {}."
              .format(len(covid19), number_of_counties))
        result = False
    for AdmUnitID in covid19.keys():
        if len(covid19[AdmUnitID]['cases']) != len(non_county_specific_data['unixtime']):
            print("The county {} has not the right amount of dates: {} instead of {}."
                  .format(county, len(covid19[AdmUnitID]['cases']),
                          len(non_county_specific_data['unixtime'])))
            result = False
    return result

In [16]:
if check_polished_data():
    # check if the needed directory is availlable - otherwise create it
    if not(os.path.isdir("modified_data")): os.makedirs("modified_data")
    with open("modified_data/german_covid19.txt", "w") as file:
        file.write(json.dumps((covid19, non_county_specific_data)))
    print("Saved seemingly flawless covid19 data.")

Saved seemingly flawless covid19 data.


##  Add human-readable time
Is added inhere because it can't be safed. Calculating it inhere keeps the plotting part very clean and excludes all data manipulation from it.

In [17]:
non_county_specific_data['UTC'] = [datetime.datetime.utcfromtimestamp(date/1000)
                           for date in non_county_specific_data['unixtime']]