In [1]:
from datetime import datetime
from pathlib import Path

import pandas as pd

from common.connectors.elastic import ESClient
from common.handlers import csv_read

#### Functions

In [2]:
def get_price_index(es: ESClient, municipality_id_filter: list) -> pd.DataFrame:
    """
    This functions queries ES for all the historical price indexes for every
    municipality in the given municipality_id_filter list.
    :param es: AvixES elastic client
    :param municipality_id_filter: List of municipality codes
    :return: pd.DataFrame
    """
    query_condition = " OR ".join([str(n) for n in municipality_id_filter])
    q = {"query": {"bool": {"must": [{"match": {"gemeentecode": query_condition}}]}}}
    result = es.findall(query=q, index="dev_realestate.avix_nl_corop_index")
    # result is of the type list and will be empty if Elastic Search could not find the estate
    if result:
        # Cleaning up results
        result_df = pd.DataFrame(hit["_source"] for hit in result)
        results_df = (result_df
                      .rename(columns={"avix_corop_index": "index", "gemeentecode": "mun_code"})
                      .filter(["index", "mun_code", "year", "quarter"])
                      .astype({"index": float, "mun_code": int}))
        assert 3 in results_df['mun_code'].unique().tolist()
        return results_df


def add_indexed_transactions(es: ESClient, df: pd.DataFrame) -> pd.DataFrame:
    """
    Used for querying historical price indices per municipality, and using these to index the
    historical sale prices to the most recently available index (usually last yearly quarter).
    :param es: AvixES elastic client
    :param df: This needs to be the nearby_estates dataframe
    :return: The original input dataframe, with additional columns "indexed_price" and "indexed_date"
    """
    original_columns = df.columns.tolist()

    # Getting the price indices from ES
    df["mun_code"] = df["mun_code"].astype(int)
    nearby_mun_codes = df['mun_code'].unique().tolist()
    price_indices = get_price_index(es, nearby_mun_codes)
    for mun in nearby_mun_codes:
        _sorted = price_indices[price_indices["mun_code"] == mun].sort_values(["year", "quarter"])
        first = _sorted.iloc[0].tolist()
        last = _sorted.iloc[-1].tolist()
    
    # Preparing dataframes for joining
    price_indices["YQ"] = price_indices['year'] + '-Q' + price_indices['quarter']
    df["YQ"] = df["date"].dt.year.apply(str) + '-Q' + df["date"].dt.quarter.apply(str)
    most_recent_index = price_indices[price_indices["YQ"] == price_indices.groupby("mun_code")["YQ"].transform(max)]
    most_recent_index = most_recent_index.rename(columns={"index": "current_index", "YQ": "indexed_date"})
    most_recent_yq = most_recent_index["indexed_date"].iloc[0]
    
    # If we have sales more recent than the most current indices, we change their YQ to the most recent index so that
    # we index the prices to that quarter. Here we assume that all municipalities have the same most-recent-yq.
    df.loc[df["YQ"] > most_recent_yq, "YQ"] = most_recent_yq
    
    # Joining and doing the indexing calculations
    df_indexed = df.merge(price_indices, on=["YQ", "mun_code"])
    df_indexed = df_indexed.merge(most_recent_index, on="mun_code")
    for key in ("current_index", "amount", "index"):
        if df_indexed[key].dtype == "object":
            df_indexed[key] = df_indexed[key].astype(float)
    df_indexed = df_indexed.assign(indexed_price=lambda x: x["current_index"] * x["amount"] / x["index"])
    df_indexed = df_indexed.filter(original_columns + ["indexed_price", "indexed_date"])
    
    if len(df) - len(df_indexed) > 0:
        print(f"We dropped {len(df) - len(df_indexed)} when joining on price indices!")
        dropped = df.merge(price_indices, on=["YQ", "mun_code"], how="outer").query("quarter != quarter")["YQ"]
        print(f"{len(dropped)} of these were dropped due to missing YQ values: {dropped.unique().tolist()}")

    return df_indexed

#### Data

In [3]:
path = Path.home() / "Google Drive/DDMA Hackathon"
list(path.glob("*.csv"))

[WindowsPath('C:/Users/PSaalbrink/Google Drive/DDMA Hackathon/buurt_data (1).csv'),
 WindowsPath('C:/Users/PSaalbrink/Google Drive/DDMA Hackathon/GROENE_DAKEN.csv'),
 WindowsPath('C:/Users/PSaalbrink/Google Drive/DDMA Hackathon/hackathon.csv')]

In [15]:
red_data = [{**d,
             "mun_code": d["Gemeente2019"],
             "buurt_code": d["Buurt2019"].rjust(8, "0"),
             "date": datetime.strptime(d["date"], "%d/%m/%Y")
    } for d in csv_read(path / "hackathon.csv")
           if "2019" in d["date"]]
red_data = pd.DataFrame(red_data, dtype=str)
red_data["date"] = pd.to_datetime(red_data["date"])

cbs_data = [d for d in csv_read(path / "buurt_data.csv")]
cbs_data = pd.DataFrame(cbs_data, dtype=str).drop(columns="")

data = pd.merge(cbs_data, red_data, left_on="gwb_code_10", right_on="Buurt2019", how="right")
del red_data, cbs_data

In [5]:
# data.columns.tolist()
# data.head()

#### Elastic

In [6]:
avix = ESClient("dev_realestate.avix_nl_corop_index")

In [16]:
data = add_indexed_transactions(avix, data)

In [17]:
data.to_csv(path / "complete_hackathon_dataset.csv", index=False)