# SC02_Gruppe_10 | Basic Cost of Living
## Problem Description
The cost of living increases faster than the wages increase leading to a higher demand of cost of living optimization.
Two of the biggest positions in the cost of living are rent and health care.

### Thesis
By visualizing the basic cost of living per canton (in a per-canton heatmap) the choice of canton of habitat for people/families willing to move is simplified.

In [None]:
# init
%pip install beautifulsoup4 openpyxl numpy matplotlib seaborn pandas prettyprint requests urllib3
from bs4 import BeautifulSoup       # scrape a website
from datetime import datetime       # deal with dates and times
import json
# import matplotlib.pyplot as plt
import numpy as np                  # the vector victor
import os                           # operating system specific tasks incl. file handling
import pandas as pd                 # pandas - handle dataframes (2d)
from pprint import pprint as pp     # make output of lists, dicts and co more readable
import urllib.request               # pythons built in way to get stuff from the web
import re                           # regular expressions. almost inevidable for code of more than 100 lines
import requests                     # !! probably not needed. urllib.request might be good enough for our case
# import seaborn as sns
import zipfile                      # for looking into and extracting files from a zip-archive

In [2]:
# misc
home_base = os.getcwd()

In [None]:
# logging
import traceback
import logging

# custom log level TRACE with super-detailed information we would not usually need
TRACE_LEVEL = 7
logging.addLevelName(TRACE_LEVEL,'TRACE')

# Define te TRACE function for handling the new log level
def trace(self, message, *args, **kwargs):
    if self.isEnabledFor(TRACE_LEVEL):
        self._log(TRACE_LEVEL, message, args, **kwargs)
# Attach this method to the logger class
logging.Logger.trace = trace


# initialize a logger

logger = logging.getLogger(__name__)
# this line is needed because we might re-run code repeatedly within a notebook and are adding handlers over and over again,
# if we do not clear them. This would result in repetitive log outputs.
logger.handlers.clear()
logger.setLevel("DEBUG")
# change to this if you would like more details
# logger.setLevel("TRACE")

# log handlers (how/whereto output logs)
console_handler = logging.StreamHandler() # log to console
file_handler = logging.FileHandler("logs/app.log", mode="a", encoding="utf-8") # log more important info to a file
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# format currently the same for console and file but might make sense to change in the future
formatter = logging.Formatter(
"{asctime}:{levelname} - {name}:{funcName}: {message}",
    style="{",
    datefmt="%Y-%m-%d_%H%M%S",
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# only log info and higher to the console
console_handler.setLevel("INFO")

logger.debug(logger.handlers)

logger.info(f"Program initialized at {home_base=}. Dependencies loaded.")

In [4]:
class File:
    """ common methods for handling files """
    def get_content_from_url(self, url: str) -> str:
        """ get the raw contents from some http url and return it without parsing a thing """
        return requests.get(url).content
    
    def download(self, url: str, file_path=None) -> str:
        """ download a file from the internet and store it locally """
        try:
            path, headers = urllib.request.urlretrieve(url,file_path)
            logger.debug("File downloaded to path=%s", path)
            for name, value in headers.items():
                logger.trace(f'{name}:\t{value}')
            return path
        except Exception as e:
            logger.exception("FileDownloadError")
            # raise
    
    def extract(self, zip_file_path: str,file_in_archive: str, extract_to_filepath: str) -> bool:
        # zip file handler
        zip_file = zipfile.ZipFile(zip_file_path)
        file_in_archive_parts = file_in_archive.split('.')
        logger.debug(zip_file.namelist())
        logger.debug(f'File we are looking for: {file_in_archive}')
        # this is very hacky. the name pattern of the files within the archives is not as consistent as I had hoped
        if file_in_archive in zip_file.namelist():
            logger.debug(f'Found {file_in_archive} in archive.')

        elif file_in_archive.replace('ä','ae') in zip_file.namelist():
            logger.debug(f'Found {file_in_archive.replace('ä','ae')} in archive.')
            file_in_archive = file_in_archive.replace('ä','ae')
        elif f"{file_in_archive_parts[0].replace('ä','ae')}_.{file_in_archive_parts[1]}" in zip_file.namelist():
            logger.debug(f'Found {file_in_archive_parts[0].replace('ä','ae')}_.{file_in_archive_parts[1]} in archive.')
            file_in_archive = f"{file_in_archive_parts[0].replace('ä','ae')}_.{file_in_archive_parts[1]}"
        else:
            return False
        

        # extract the file from the zip archive
        f = zip_file.open(file_in_archive)
        # save the extraced file
        content = f.read()
        f = open(extract_to_filepath, 'wb')
        f.write(content)
        f.close()

        return True

# some testing of the class and its methods
# url = (
#     "https://api.worldbank.org/v2/en/indicator/"
#     "NY.GDP.MKTP.CD?downloadformat=csv"
# )
# filename = "gdp_by_country.zip"
# temp_file_path = File.download(url)


In [None]:
# original rent data
# as we do not need to pre-process this source (e.g. extract a zip) we do not need to store it locally
source_price_rent_url = "https://dam-api.bfs.admin.ch/hub/api/dam/assets/30885375/master"
xls = source_price_rent_data = pd.ExcelFile(source_price_rent_url)
rent_years = [int(x) for x in xls.sheet_names]

# check which years we have available (tab-name = year of statistic)
logger.info('We have data on rent of the following years:')
logger.info(rent_years)

# TODO: Loop through the files and create a multi-indexed dataframe off of it
# to read just one sheet to dataframe:
#df = pd.read_excel(file_name, sheet_name="house")


In [None]:
# original health care data
source_price_health_care_url = "https://ckan.opendata.swiss/api/3/action/package_show?id=health-insurance-premiums"
price_health_care_map_name = 'Prämien_CH.csv'
health_care_years = set()
file_handler = File()
with urllib.request.urlopen(source_price_health_care_url) as url:
    data = json.load(url)
    resources = data['result']['resources']
    file_count = 0
    zip_file_count = 0
    for resource in resources:
        file_count+=1
        # we are only interested in Prämien_CH.csv, which we can either find directly in the resoureces
        # and sometime within a zip archive
        if 'ZIP' == resource['format']:
            zip_file_count+=1
            # Regular expression to extract the date
            # (assuming the filename pattern remains unchanged throughout the years)
            year = re.search(r'.*(\d{4})\.zip', resource['name']['en']).group(1)
            
            logger.debug(f"{resource['format']=} | {year} | {resource['name']['en']} | {resource['download_url']=} | extraction needed ... .. .")
            
            temp_file_path = file_handler.download(url=resource['download_url'])
            extract_success = file_handler.extract(temp_file_path,price_health_care_map_name,f'data/original/{year}_health-care_premium.csv')

            if not extract_success:
                logger.warning("Unable to find {price_health_care_map_name} or any variation of it in {resource['name']['en']}. There will be no data for the year {year}")
            else:
                health_care_years.add(int(year))

        elif price_health_care_map_name == resource['name']['en']:
            year = datetime.now().year + 1
            logger.debug(f"{resource['format']=} | {resource['name']['en']=} | {resource['download_url']=}")
            
            file_handler.download(url=resource['download_url'],file_path=f'data/original/{year}_health-care_premium.csv')
            health_care_years.add(int(year))

    health_care_years = set(health_care_years)

logger.info('We have data on health care of the following years:')
logger.info(health_care_years)

In [None]:
years = sorted([x for x in rent_years if x in health_care_years])
logger.info('We have date on the following years in both datasets:')
logger.info(years)