# Combine and Clean Data Sources

This notbook is used to combine all data in to a single Sqlite database. The cleaning evolved heavily so at some point, all this cleaning should be refactored into a seperate module.

In [1]:
%load_ext lab_black

In [2]:
from pathlib import Path
import json
import gettext

from cleantext import clean
import dataset
import requests
import pycountry
from tqdm import tqdm
from datetime import date, datetime

In [3]:
german = gettext.translation("iso3166", pycountry.LOCALES_DIR, languages=["de"])
german.install()

## Manual Fixes

There are sometimes errors in the data. Since each incident has a uniquie id, fix it here.

In [5]:
manual_fixes = {
    "ca0dee4be1029c2ab24cdea755b88086": {"city": "Halle (Saale)"},
    "https://www.raa-sachsen.de/support/chronik/vorfaelle/bon-courage-fassungslos-ueber-naziuebergriffe-auf-dem-bornaer-stadtfest-3219": {
        "city": "Borna"
    },
    "a10d62daa23594df92c9704e1606dcfc": {
        "city": "Oranienbaum-Wörlitz",
        "county": "Wittenberg",
    },
    "000dad6aace24b3d8ebae940a8de8e72": {"city": "Dessau"},
    "https://www.raa-sachsen.de/support/chronik/vorfaelle/goerlitz-2779": {
        "date": date(2010, 11, 14)
    },
    "https://www.raa-sachsen.de/support/chronik/vorfaelle/leipzig-reudnitz-4976": {
        "date": date(2020, 11, 13)
    },
}

## Valid Regions

Taken regions.json from <https://github.com/datenguide/metadata>

These files contains a list of all valid regios. If a region is not in there, filter it out. This greatly the performance of the geocoding api. It's not optimal that we acutlaly throw this information await. This information should be kept actually. (TODO)

In [8]:
with open("regions.json") as json_file:
    regions = json.load(json_file)

regions = list(regions.values())
regions_counties = [x for x in regions if x["level"] == 3]

def is_valid_county(county):
    return (
        len(
            [
                x
                for x in regions_counties
                if x["name"].startswith(county)
                and x["duration"]["until"] == "2019-12-31T00:00:00.000Z"
            ]
        )
        != 0
    )

Read it some secrets later needed to comunicate with an internal API

In [9]:
auth = tuple(Path("secrets.txt").read_text().split()[1:])

In [10]:
all_incidents = []
all_src = []
all_chronicle = []

for p in Path("data").glob("*.db"):
    print(p)
    db = dataset.connect("sqlite:///" + str(p))
    all_incidents += db["incidents"].all()
    all_src += db["sources"].all()
    all_chronicle += db["chronicle"].all()

data/mobile-opferberatung-scraper.db
data/raa-sachsen-scraper.db
data/opferperspektive-scraper.db


In [11]:
for x in all_chronicle:
    if "region" in x and len(x["region"]) > 0:
        continue
    if "iso3166_2" in x and x["iso3166_2"] != None and len(x["iso3166_2"]) > 0:
        x["region"] = pycountry.subdivisions.get(code=x["iso3166_2"]).name
    elif "iso3166_1" in x and x["iso3166_1"] != None and len(x["iso3166_1"]) > 0:
        x["region"] = pycountry.countries.get(alpha_2=x["iso3166_1"]).name
    else:
        raise ValueError("Need to specify region somehow")

In [12]:
# only using date (and without time (hour/minute)) for now

def ensure_date(x):
    if x is None:
        return x
    if isinstance(x, datetime):
        return x.date()
    elif isinstance(x, date):
        return x
    ValueError("neither date or datetime")

for row in all_src:
    if "date" in row:
        row["date"] = ensure_date(row["date"])
    else:
        row["date"] = None

In [13]:
db = dataset.connect("sqlite:///rechtegewalt.db")

tab_incidents = db["incidents"]
tab_incidents.insert_many(all_incidents)

tab_src = db["sources"]
tab_src.insert_many(all_src)

tab_chro = db["chronicles"]
tab_chro.insert_many(all_chronicle)

tab_incidents.create_index(["rg_id"])
tab_src.create_index(["rg_id"])

tab_incidents.create_index(["id"])

In [14]:
def add_state_country(row):
    chro = tab_chro.find_one(chronicler_name=row["chronicler_name"])
    sub = pycountry.subdivisions.get(code=chro["iso3166_2"])
    row["state"] = sub.name
    row["country"] = _(sub.country.name)
    return row

In [14]:
county_words = ["Landkreis", "Landkeis", "Kreis", "LK"]

def clean_county(x):
    if x is None or x == "None":
        return None
    x = clean_string(x)

    for w in county_words:
        w += " "
        if x.startswith(w):
            x = x[len(w) :]

    if not is_valid_county(x):
        print("removing", x)
        return None
    return x

def clean_city(x):
    x = clean_string(x)
    if x is None:
        return None
    assert len(x) > 0
    return x

def clean_string(x):
    x = clean(x, lang="de", lower=False)
    if len(x) == 0:
        return None
    return x

In [17]:
def fill_missing_county():
    statement = "SELECT * FROM incidents GROUP BY city, state having count(*) > 1"
    for row in db.query(statement):
        dupli = list(tab_incidents.find(city=row["city"], state=row["state"]))
        county_can = []
        contains_none = False
        for d in dupli:
            if d["county"] is not None:
                county_can.append(d["county"])
            else:
                contains_none = True

        unique_can = list(set(county_can))
        if contains_none and len(unique_can) == 1:
            tab_incidents.update(
                {"city": row["city"], "county": unique_can[0]}, ["city"]
            )
            print(unique_can)

In [18]:
# clean location text because there were still some errors
for x in tqdm(tab_incidents.all()):
    if x["rg_id"] in manual_fixes:
        x = {**x, **manual_fixes[x["rg_id"]]}

    x["orig_county"] = clean_string(x["county"])
    x["orig_city"] = clean_string(x["city"])

    x["county"] = clean_county(x["county"])
    x["city"] = clean_string(x["city"])

    x = add_state_country(x)

    if x["date"] is None:
        print(x)
        raise ValueError

    # ignore older data
    if x["date"].year < 1990:
        tab_incidents.delete(id=x["id"])

    x["date"] = ensure_date(x["date"])

    #   manual fix
    if x["city"] == "Zerbst" and x["state"] == "Sachsen-Anhalt":
        x["city"] = "Zerbst/Anhalt"

    tab_incidents.update(x, ["id"])

93it [00:00, 407.45it/s]

removing Anhalt-Zerbst
removing Bördekreis
removing Ohrekreis
removing Anhalt-Zerbst
removing Anhalt-Zerbst
removing Anhalt-Zerbst
removing Regionalexpress nach Berlin über Bitterfeld
removing Ohrekreis
removing Merseburg-Querfurt
removing Bitterfeld
removing Ohrekreis
removing Halberstadt
removing Ohrekreis
removing Anhalt-Zerbst


208it [00:00, 477.69it/s]

removing Mansfelder Land
removing Mansfelder Land
removing Köthen
removing Wernigerode
removing Verwaltungsgemeinschaft
removing Schönebeck
removing Bördekreis


308it [00:00, 481.50it/s]

removing Merseburg-Querfurt
removing Ohrekreis
removing Wernigerode
removing Köthen
removing Schönebeck
removing Saalkreis
removing Köthen


527it [00:01, 528.07it/s]

removing Bördekreis
removing Querfurt


903it [00:01, 518.09it/s]

removing Mansfeld Südharz
removing Mansfeld Südharz


1122it [00:02, 535.89it/s]

removing Desssau-Roßlau


6107it [00:11, 527.78it/s]

removing Uckemark


8349it [00:15, 530.42it/s]


In [19]:
# fill_missing_county()

In [3]:
def geocode_all():
    statement = "SELECT DISTINCT city, county, state, country FROM incidents"
    subs = list(db.query(statement))
    
    # the geocoding api has problems with Leipzig as County. (There is a Landkreis Leipzig and a seperate City Leipzig)
    removed_counties_with_ids = []
    for i, x in enumerate(subs):
        print(i)
        if x['county'] == 'Leipzig':
            removed_county = x.pop('county')
            x['county'] = None
            removed_counties_with_ids.append([removed_county, i])
            
            
    r = requests.post(
        "https://geocode.app.vis.one/",
        auth=auth,
        json={"provider": "here", "locations": [{"query": dict(x)} for x in subs]},
    )
    r.raise_for_status()
    subs_location = r.json()["locations"]
    
    # add back county since it was correct
    for x, i in removed_counties_with_ids:
        print(x)
        subs_location[i]['county'] = x 

    return subs_location

In [21]:
subs_location = geocode_all()

In [22]:
subs_location[0]

{'city': 'Halle (Saale)',
 'country': 'Deutschland',
 'county': 'Halle (Saale)',
 'district': None,
 'house_number': None,
 'latitude': 51.4822,
 'longitude': 11.97494,
 'postal_code': '06108',
 'query': {'city': 'Halle (Saale)',
  'country': 'Deutschland',
  'county': None,
  'state': 'Sachsen-Anhalt'},
 'state': 'Sachsen-Anhalt',
 'street': None}

In [23]:
def geocode_second(subs_location):
    second_check = []
    second_check_ids = []

    for i, x in enumerate(subs_location):
        if len(x) == 1 and "county" in x["query"] and x["query"]["county"] is not None:
            x["query"]["city"] = x["query"]["city"] + ", " + x["query"]["county"]
            x["query"]["county"] = None
            second_check.append(x)
            second_check_ids.append(i)

    print(len(second_check))
    if len(second_check) == 0:
        return subs_location
    r = requests.post(
        "https://geocode.app.vis.one/",
        auth=auth,
        json={
            "provider": "here",
            "locations": [{"query": x["query"]} for x in second_check],
        },
    )
    r.raise_for_status()

    for i, x in enumerate(r.json()["locations"]):
        if len(x) != 1:
            subs_location[second_check_ids[i]] = x
            print("found!", x)
    return subs_location

In [24]:
# not used right now?
# subs_location = geocode_second(subs_location)

In [25]:
good_locations = []

for x in subs_location:
    if len(x) == 1:
        print("error here, deleting for now", x)
    #         tab_incidents.delete(**x['query'])
    else:
        #         rename
        query = x.pop("query")
        x["query_county"] = query["county"]
        x["query_city"] = query["city"]
        good_locations.append(x)

error here, deleting for now {'query': {'city': 'Dresden, Dresdner Heide', 'country': 'Deutschland', 'county': None, 'state': 'Sachsen'}}


In [26]:
tab_loc = db["locations"]
tab_loc.insert_many(good_locations)

In [27]:
tab_loc.create_index(["id"])

## Merging Locations

We are trying out several ways to merge the location with geolocation back to the old without. Since we maniputlated the county etc., we have to try varioous ways how to merge. This should get improved (TDOO)


Not really sure whether a seperate table for location is needed. It was introduced because in some cases, multiple locations are associated with incident.

In [28]:
for x in tqdm(tab_incidents.all()):
    x_query = {name: x[name] for name in ["state", "country"]}
    row_loc = tab_loc.find_one(
        query_county=x["orig_county"], query_city=x["orig_city"], **x_query
    )
    if row_loc is None:
        row_loc = tab_loc.find_one(
            query_county=x["county"], query_city=x["orig_city"], **x_query
        )
        if row_loc is None:
            row_loc = tab_loc.find_one(
                query_county=x["orig_county"], query_city=x["city"], **x_query
            )
            if row_loc is None:
                row_loc = tab_loc.find_one(
                    query_county=x["county"], query_city=x["city"], **x_query
                )
                if row_loc is None:
                    print(x)
                    print(x_query)
                    continue
    row_loc_geo = {
        name: row_loc[name]
        for name in [
            "latitude",
            "longitude",
            "postal_code",
            "street",
            "house_number",
            "district",
            "city",
            "county",
            "state",
            "country",
        ]
    }
    merged = {**x, **row_loc_geo}
    tab_incidents.update(merged, ["id"])

4324it [00:08, 542.74it/s]

OrderedDict([('id', 4240), ('description', 'Auf einem Feld in der Dresdner Heide, in der Nähe des Hammerweges, haben Unbekannte aus mehreren faustgroßen Steinen ein Hakenkreuz gelegt. Dieses hatte eine Größe von 6,50m x 6,80m. Die Kriminalpolizei hat die weiteren Ermittlungen übernommen.'), ('date', datetime.datetime(2013, 12, 23, 0, 0)), ('url', 'https://www.raa-sachsen.de/support/chronik/vorfaelle/dresdner-heide-3485'), ('rg_id', 'https://www.raa-sachsen.de/support/chronik/vorfaelle/dresdner-heide-3485'), ('city', 'Dresden, Dresdner Heide'), ('county', None), ('chronicler_name', 'RAA Sachsen'), ('title', 'Hakenkreuz aus Steinen gelegt'), ('orig_county', None), ('orig_city', 'Dresden, Dresdner Heide'), ('state', 'Sachsen'), ('country', 'Deutschland')])
{'state': 'Sachsen', 'country': 'Deutschland'}


8348it [00:15, 523.95it/s]


In [29]:
final_loc = list(
    tab_loc.distinct(
        *[
            "latitude",
            "longitude",
            "postal_code",
            "street",
            "house_number",
            "district",
            "city",
            "county",
            "state",
            "country",
        ]
    )
)

In [30]:
len(final_loc)

887

In [31]:
tab_loc.drop()

In [32]:
# tab_loc_final = db['locations_final']
tab_loc.insert_many(final_loc)

In [33]:
len(list(tab_loc.all()))

887