What this script does:
1. Load the full dataset of vacancies and merge with the SOC letters that have been estimated for each vacancy
2. Compute the un-weighted monthly stock of online job adverts (OJA) vacancies broken down by SIC
3. Compute the per-vacancy weights to align with the ONS vacancy dataset
4. Compute the adjusted per-vacancy weights after taking into account adverts with "uncertain" SIC letters
5. Save the results for future analysis

Note on the presence of adverts with "uncertain" SIC letters. 
- First of all, these adverts are assigned the average weight for that month. The same is true for job adverts with SIC codes that are not measured by the ONS vacancy survey. This obviously messes up the total stock count, which is now higher than the ONS one because the overall sum includes vacancies that were not used to compute the per-vacancy weights. However, if we adjust for this, then the absolute levels by SIC from online job adverts will be smaller than the absolute levels from the ONS because the weights have been decreased to take into account the uncertain vacancies. We can't have it both ways I think. However, I decided to prioritise the alignment by 'total stock level' because otherwise breakdowns by other characteristics might be artificially inflated.


Suggestions for future work:
- Implement the final per-vacancy weight as the weighted average of the monthly weights spanned by each job advert (weighted by how long a vacancy is open in that month).
- Instead of taking the median value across weights to use with "uncertain" job adverts, use the mean across all sectors, minus "Mining and Quarrying"


# Imports

In [None]:

# ------------------------ DEPENDENCIES AND FUNCTIONS ------------------------

# standard imports
from collections import Counter
from copy import deepcopy
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
import dask.dataframe as dd
import datetime
import json
import matplotlib.pyplot as plt
#import nltk
import numpy as np
import os
import pandas as pd
from pandas.api.types import CategoricalDtype
from pathlib import Path
import pickle
from pprint import PrettyPrinter
#from pytorch_pretrained_bert import BertTokenizer, BertModel, BertForMaskedLM
import seaborn as sns
import scipy.stats as st
import sys
import statsmodels as sm
from time import time as tt
from tqdm import tqdm

# custom imports
sys.path.append('/Users/stefgarasto/Google Drive/Documents/scripts/utils_stef')
from flow_to_stock_funcs import get_stock_breakdown, load_ons_vacancies, \
                                set_month_at_beginning, set_month_at_end, scale_weights_by_total_levels
#from importlib import reload
#import textkernel_load_utils
#reload(textkernel_load_utils)
from textkernel_load_utils import tk_params, create_tk_import_dict, read_and_append_chunks, \
                                  load_full_column, data_path, data_folder
# note: data_path is the root to where the data is stored, data_folder = {data_path}/data/processed
from utils_general import nesta_colours, flatten_lol, sic_letter_to_text, print_elapsed, TaskTimer, printdf

pp = PrettyPrinter(indent=4)
# Add custom SIC groups
sic_letter_to_text['Z'] = 'others'
sic_letter_to_text['L_O_S'] = 'personal_and_public_services'
sic_letter_to_text['D_E'] = 'utilities'
sic_letter_to_text['M_P'] = 'educational_and_professional_activities'
sic_letter_to_text['uncertain'] = 'uncertain'

# NOTE: change to local results folder
res_folder = '/Users/stefgarasto/Local-Data/textkernel/results/flow_to_stock'

timer = TaskTimer()
print('Done')


# Hardcoded parameters setup

In [None]:
FIXED_DURATION_TH = 55 #55
TEST_DURATION_TH = 1
CONSTANT_DUR = False
DURATION_TH = TEST_DURATION_TH if CONSTANT_DUR else FIXED_DURATION_TH
START_MONTH = '2015-03'
END_MONTH= '2019-11'
FIRST_VALID_MONTH = '2015-04'
print(f'Duration threshold is {DURATION_TH}')


# Functions, parameters and helpers

In [None]:
#%%
def norm_series(df_series):
    return (df_series - df_series.mean())/df_series.std()

#%%
def cap_duration(data, duration_th = 55):
    data.loc[data.duration>duration_th,'duration'] = duration_th
    return data

# invert the sic letter to text mapping
sic_text_to_letter = {v: k for k,v in sic_letter_to_text.items()}


In [None]:
# Get which TK ID value for sector corresponds to which label
def get_map_industry_label_values():
    tmp_data = pd.read_csv(os.path.join(data_folder, tk_params.file_name_template.format(0)), 
                           compression='gzip',
                encoding = 'utf-8',usecols = ['organization_industry_label','organization_industry_value'])
    map_label2value = {}
    map_value2label = {}
    for name,g in tmp_data.groupby('organization_industry_value'):
        map_value2label[name] = g.organization_industry_label.value_counts().index.values[0]
        map_label2value[map_value2label[name]] = name
    return map_label2value, map_value2label

# create the maps
map_label2value, map_value2label = get_map_industry_label_values()

map_label2value


In [None]:
#%%
# [TEST] Create models for the duration field from data
def best_fit_distribution(data, bins=200, ax=None):
    """Model data by finding best fit distribution to data
    
    Keyword arguments:
    data -- duration data to model
    bins -- number of bins to use to discretise the data
    ax -- ax of figure where to plot
    """
    
    # Get histogram of original data
    y, x = np.histogram(data, density= True, bins = bins)
    x = (x + np.roll(x, -1))[:-1] / 2.0
    plt.close()
    # MLE fit
    all_params = {}
    
    # fit geometric distribution
    dist_name = 'geometric'
    all_params[dist_name] = {}
    all_params[dist_name]['params'] = 1/np.mean(data)
    # generate sample
    pdf = st.geom.pmf(x, all_params[dist_name]['params'])
    all_params[dist_name]['sse'] = np.sum(np.power(y - pdf, 2.0))
    
    # poisson
    dist_name = 'poisson'
    all_params[dist_name] = {}
    all_params[dist_name]['params'] = np.mean(data)
    # generate sample
    pdf = st.poisson.pmf(x, all_params[dist_name]['params'])
    all_params[dist_name]['sse'] = np.sum(np.power(y - pdf, 2.0))
    
    # Other distributions to fit
    DIST_DICT = {'levy': st.levy, 'gamma': st.gamma, 'invgamma': st.invgamma,
                 'chi_squared': st.chi2, 'beta1': st.beta, 'exponential': st.expon}
    
    # Fit other distributions via MLE 
    for dist_name in DIST_DICT.keys():
        print(dist_name)
        all_params[dist_name] = {}
        all_params[dist_name]['params'] = DIST_DICT[dist_name].fit(data)
        params= all_params[dist_name]['params']
        # generate sample
        if len(params)==4:
            pdf = DIST_DICT[dist_name].pdf(x, params[0],
                                           params[1],
                                           loc=params[2], 
                                           scale=params[3])
        elif len(params)==3:
            pdf = DIST_DICT[dist_name].pdf(x, params[0],
                                           loc= params[1],
                                           scale=params[2])
        else:
            pdf = DIST_DICT[dist_name].pdf(x, loc= params[0],
                                           scale= params[1])
        all_params[dist_name]['sse'] = np.sum(np.power(y - pdf, 2.0))

    # beta - after normalisation
    dist_name = 'beta2'
    loc = min(data) - 1e-5
    scale = max(data) - loc + .1
    all_params[dist_name] = {}
    all_params[dist_name]['params'] = st.beta.fit(data, floc=loc, fscale= scale)
    # generate sample
    pdf = st.beta.pdf(x, a= all_params[dist_name]['params'][0],
                         b=all_params[dist_name]['params'][1],
                         loc=all_params[dist_name]['params'][2], 
                         scale=all_params[dist_name]['params'][3])
    all_params[dist_name]['sse'] = np.sum(np.power(y - pdf, 2.0))
    
    return all_params


In [None]:
#%%
def get_top_month(x):
    """Get month in which a vacancy is most active based on a string 
    listing all the months in which the vacancy is active
    """
    if isinstance(x,str):
        x = x.split(';')[1:]
    else:
        x = x.active_months.split(';')[1:]
    months = [t.split(': ')[0].split(' ')[1] for t in x]
    durations = [t.split(': ')[1] for t in x]
    # if multiple maxes it'll return the first one, which seems reasonable
    if len(durations):
        best_idx = np.argmax(durations)
        return months[best_idx]#, durations[best_idx]
    else:
        return 'oob'

def get_top_duration(x):
    """Get the amount of time a vacancy is active in its top month
    based on a string listing all the months in which the vacancy is active
    with respective durations
    """
    if isinstance(x,str):
        if x == 'oob':
            return 0
        x = x.split(';')[1:]
    else:
        if x.active_months == 'oob':
            return 0
        x = x.active_months.split(';')[1:]
    months = [t.split(': ')[0] for t in x]
    durations = [t.split(': ')[1] for t in x]
    # if multiple maxes it'll return the first one, which seems reasonable
    best_idx = np.argmax(durations)
    return durations[best_idx]


In [None]:
#%%
def twin_plot(ons_data,ojv_data,xlims = [pd.to_datetime(START_MONTH + '-01'),
                                         pd.to_datetime(END_MONTH + '-01')]):
    
    """ Plot two timeseries on same axis (ONS vacancies and (un-)weighted stock)"""
    fig, ax1 = plt.subplots(figsize=(10,6))

    color = 'tab:red'
    ax1.set_xlabel('date (year-month)')
    ax1.set_ylabel('ONS vacancy stock', color=color)
    ax1.plot(ons_data, 'x-', color=color)
    ax1.tick_params(axis='y', labelcolor=color)

    ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

    color = 'tab:blue'
    ax2.set_ylabel('OJA vacancy stock', color=color)  # we already handled the x-label with ax1
    ax2.plot(ojv_data, 'o-', color=color)
    ax2.tick_params(axis='y', labelcolor=color)


    #plt.figure()
    #plt.plot(raw_jvs) #norm_series(raw_jvs))
    #plt.plot(norm_series(stock_month1)) #df_stock))
    plt.xlim(xlims[0],xlims[1])
    fig.tight_layout()
    return fig, ax1


# Load the data

## Online vacancy data

### Duration, start date, soc code and organisation name

In [None]:
USUAL_LOADING = False
SAVE_DF = False
#------------------- START OF MAIN SCRIPT --------------------------------------------
#%
print(tk_params)

# define names of files to load
N_to_load = tk_params.N_files
dfilename = os.path.join(data_folder,tk_params.file_name_full)
indices_to_load = np.random.permutation(tk_params.N_files)[:N_to_load]
dfilenames = [os.path.join(data_folder,tk_params.file_name_template.format(i)) for i in
              indices_to_load]
#              np.random.randint(low=0,high=40,size=2)]
import_dict, dates_to_parse = create_tk_import_dict()
#dimport_dict, dates_to_parse = create_tk_import_dict(DASK_DIRECT=True)


cols_to_load = ['job_id',
'date','duration',
'expiration_date',
'posting_id',
'profession_soc_code_value',
'organization_name',
'organization_industry_value']

import_dict_new = {}
for col in cols_to_load: #import_dict.keys():
    if col in import_dict.keys():
        import_dict_new[col]= import_dict[col]
    else:
        import_dict_new[col]= 'object'
dates_to_parse_new = [t for t in dates_to_parse if t in cols_to_load]
    
if USUAL_LOADING:    
    # Load only the relevant columns from each data chunk and then join them together
    timer.start_task('loading relevant columns from full data')
    filename_chunks = os.path.join(data_folder,tk_params.file_name_template)
    data_df = read_and_append_chunks(filename_chunks, range(N_to_load), #indices_to_load,
                import_dict_new, dates_to_parse_new, col_to_load = cols_to_load)
        
    timer.end_task()
    
    # compute the length once and save it
    len_data = len(data_df)
    # rename the index in case you need it
    #data_df = data_df.map_partitions(dask_rename_index,'index')
    #duration_backup = data_df.duration.copy()
    if SAVE_DF:
        # Save to disk for faster future loading
        data_df.to_csv(
            f'{data_path}/data/interim/interim_duration_df2.gz',
            encoding='utf-8',compression='gzip',index=False)
else:
    # Load from disk - this version of the datatset will have been saved before with only the relevant columns
    timer.start_task('Loading reduced dataframe from disk')
    data_df = pd.read_csv(
        f'{data_path}/data/interim/interim_duration_df2.gz',
        encoding='utf-8',compression='gzip', dtype = import_dict, 
        parse_dates = dates_to_parse, infer_datetime_format = True)
    timer.end_task()


#%% get beginning and ending of the collection period
first_date = data_df.date.min()#.compute()
last_date = data_df.date.max()#'2019-10-31' #data_df.date.max().compute()



In [None]:
print(f"Number of rows: {len(data_df)}")
data_df.head(5)


In [None]:
#%%
# reset index because at the moments it starts again after loading each chunk
data_df = data_df.reset_index(drop=True)
# turn date into date time
data_df['date'] = pd.to_datetime(data_df.date)
data_df = data_df.sort_values(by = 'date')

base_columns = data_df.columns


In [None]:
data_df.head(1)


### Per-vacancy SIC code

Note:
We made an algorithm that assigned a SIC code to each vacancy based on a combination of methods (see e.g. the script "consolidate_sic_matches"). The main part of the algorithm was run separately and produced a set of candindates SIC per vacancy using the job_id as the unique identifier. Since our dataset has been deduplicated, the job_id is supposed to be unique. However, after running the algorithm we realised that the job_id was not actually unique (the posting_id is). 

There was no time to re-run the algorithm and as long as the same SIC is assigned to vacancies that have the same set of characteristics used to identify that SIC code (e.g. SOC and TK's own organization industry), it shouldn't matter. The one thing to be careful about is that when joining to the main dataframe, the dataframe with the sic codes should have unique rows or the join creates "new" vacancies.

For the future, the recommendation would be to re-run the algorithm using posting_id as the unique identifier.

Also, it might seem weird that one dataset loads ORGANIZATION_INDUSTRY_VALUE and the other one ORGANIZATION_INDUSTRY_LABEL and then this are converted to match. This is because re-loading both datasets from chunks would take longer than loading the cached version with this discrepancy.


In [None]:
# Load SIC letters column
timer.start_task('Loading results from SIC matching')
data_sic = pd.read_csv(f"{data_path}/data/aux/job_id_and_final_sic_letter.gz",
                          encoding ='utf-8', compression = 'gzip', usecols = ['job_id', 'profession_soc_code_value',
                                                                                      'clean_organization_name',
                                                                                      'final_sic_letter'],
                          dtype= {'job_id': object, 'clean_organization_name': str,
                                 'final_sic_letter': 'category',# 'helper_sic_letter': 'category',
                                 'profession_soc_code_value': 'category'})
timer.end_task()
print(f"Number of rows: {len(data_sic)}")
data_sic.head()
# note, there are more rows because adverts not tagged with "en" are included



### Other information to better merge final SIC letter with original dataframe

In [None]:
timer.start_task('Load full data with SIC codes')
DATA_PATH_SIC = f"{data_path}/data/aux/sic_code_assigned_compressed"
#DATA_PATH0 = ("/Users/stefgarasto/Local-Data/textkernel/data/"
#              "sic_code_assigned_compressed_missing_sic_letters")
FILE_NAME = "sic_code_assigned_full_jobs_200330"
data = []
for i in range(435):
    loaded_df = pd.read_csv(f"{DATA_PATH_SIC}/{FILE_NAME}_{i}.gz", compression = 'gzip',
                           usecols = ['job_id',#'profession_soc_code_value','clean_organization_name',
                                      'organization_industry_label','fuzzy_match_sic_letter'])
    if 'Unnamed: 0' in loaded_df.columns:
        loaded_df = loaded_df.drop('Unnamed: 0', axis = 1)
    data.append(loaded_df)

full_data_sic = pd.concat(data)
full_data_sic = full_data_sic.reset_index(drop= True)
data = None
timer.end_task()

printdf(full_data_sic.head())


#### Combine dataframes with information on SIC  codes
I don't need to perform a join in this specific case - I know the two dataframes were loaded and saved with rows in the same order. However, if in the future this is done with a different dataset, one needs to be careful.



In [None]:
# add extra column about organization industry to improve the merging with the main dataframe
assert(data_sic.iloc[:10].job_id.tolist() == full_data_sic.iloc[:10].job_id.tolist())

timer.start_task('adding column to sic dataframe')
data_sic = data_sic.assign(organization_industry_label = 
                            full_data_sic.reset_index().organization_industry_label)
# If a second column is needed
#data_sic['fuzzy_match_sic_letter'] = full_data_sic.reset_index().fuzzy_match_sic_letter
timer.end_task()



In [None]:
# Change from oganization_type_label to organization_type_value
timer.start_task('Going from label to value for the organization_type variable')
# crosswalk to organization_industry_value
data_sic['organization_industry_value'] = data_sic.organization_industry_label.map(lambda x:
                                                                                   map_label2value[x])
timer.end_task()


In [None]:
# One dataframe is not needed anymore
full_data_sic = None


In [None]:
# get the duplicate rows: these are rows with the same job_id, same soc_code, same organization_type and, therefore,
# same SIC. These rows should be dropped so that the merge doesn't create "new" job adverts.
timer.start_task('Get duplicated rows from the SIC dataframe')
duplicated_sic = data_sic.duplicated(subset = ['job_id','profession_soc_code_value','organization_industry_value'],
                                     keep = False)
duplicated_sic_first = data_sic.duplicated(
                                subset = ['job_id','profession_soc_code_value','organization_industry_value'],
                                     keep = 'first')

timer.end_task()


In [None]:
print(f'Number of duplicated rows in the dataframe with SIC codes: {duplicated_sic.sum()}')
gs = data_sic[duplicated_sic].groupby(by = 
                        ['job_id','profession_soc_code_value','organization_industry_value'])
    
# check how many duplicates have mismatched sics
A = []
for name, g in gs:
    A.append(len(set(g.final_sic_letter.values)))

counter_A = Counter(A)
print(f'Number of duplicated rows with mismatched SIC codes: {counter_A[2]}')


In [None]:
data_sic.columns, data_df.columns



In [None]:
# convert the SOC code to float in preparation for the merging
data_sic.profession_soc_code_value = data_sic.profession_soc_code_value.map(lambda x: float(x))
#data_sic.profession_soc_code_value.sample(10)


### Merge dataframes of online job adverts

In [None]:
timer.start_task('merging data frames')
old_len_data = len(data_df)
keep_data_sample = deepcopy(data_df.iloc[100:105])
#merged_data 
data_df = data_df.merge(data_sic[~duplicated_sic_first].reset_index()[
    ['job_id','profession_soc_code_value','organization_industry_value', 
     'clean_organization_name','final_sic_letter']], 
                            on = ['job_id','profession_soc_code_value','organization_industry_value'],
                           how= 'left', indicator = True)
timer.end_task()


In [None]:
print(f"Number of rows before joining: {old_len_data}")
print(f"Number of rows after joining: {len(data_df)}")
# Check that everything stayed the same
printdf(keep_data_sample)
printdf(data_df.iloc[100:105])


In [None]:
# Check the merging didn't create any new row: count for 'right_only' should be 0
data_df['_merge'].value_counts()


In [None]:
# Composition of job adverts by SIC letter
data_df.final_sic_letter.value_counts()


## ONS data

In [None]:
# Load ONS data on vacancies
raw_jvs_full, jvs_sic_letters = load_ons_vacancies(f"{data_path}/data")
# Change all the columns names
raw_jvs_full = raw_jvs_full.rename(columns = {t: jvs_sic_letters.loc[t] for t in jvs_sic_letters.index})

printdf(raw_jvs_full.head())


In [None]:
# drop the columns that are not needed
raw_jvs = raw_jvs_full.drop(['D','E', 'G45', 'G46', 'G47', 'L', 'M', 'O', 'P', 'S', 'G46_47'] , axis = 1)

printdf(raw_jvs.head())


In [None]:
# Note: A, T and uncertain will all have to get the average weight across month


# Analysis and processing of the duration field

In [None]:
SAVEFIGS = False


In [None]:
# Quick analysis of duration field
full_median_duration = data_df.duration.median()
tmp = data_df.duration.dropna().value_counts().sort_index()
plt.plot(tmp.cumsum()/tmp.sum()*100)
plt.xlim([0,100])
plt.plot(DURATION_TH,tmp.cumsum()[DURATION_TH]/tmp.sum()*100,'x')
plt.xlabel('Duration value')
plt.ylabel('Proportion of jobs')
print((f'Percentage of filtered job adverts with duration within limit ({DURATION_TH} days is the threshold),'
       ' among the ones with a not null duration field:'
       f' {tmp.cumsum()[DURATION_TH]/tmp.sum()*100:.2f}%'))
if SAVEFIGS:
    plt.savefig(f"{res_folder}/cumulative_sum_of_durations.jpg")


In [None]:
#%%
# If we want to try to model distributions outliers, we first need to fit the duration distribution 
# However, we didn't find a good theoretical distribution, so we're not doing this at the moment
# ATM, we just use a duration threshold that has been chosen manually
if False:
    #%
    # durations to fit
    duration_to_fit = data_df[good_counts].duration.dropna() #[good_counts].duration.dropna()
    # remove the zeros
    duration_to_fit = duration_to_fit[duration_to_fit>0]
    # Plot for comparison
    plt.figure(figsize=(12,8))
    ax = sns.distplot(duration_to_fit)
    # Save plot limits
    dataYLim = ax.get_ylim()
    
    # Find best fit distribution
    all_fit_params = best_fit_distribution(duration_to_fit, 200, ax)

    #%
    DIST_DICT = {'levy': st.levy, 'gamma': st.gamma, 'invgamma': st.invgamma,
                     'chi_squared': st.chi2, 'beta1': st.beta, 'beta2': st.beta,
                     'exponential': st.expon, 'geometric': st.geom,
                     'poisson': st.poisson}

    #% plot all the distributions
    plt.figure()
    sns.distplot(duration_to_fit[duration_to_fit<200])
    x = np.arange(1,200)#1353)
    legend_list = []
    result_fit = pd.DataFrame.from_dict(all_fit_params, orient = 'index')
    result_fit= result_fit.sort_values('sse')
    print(result_fit)
    for k in result_fit.index:
        params = all_fit_params[k]['params']
        if isinstance(params,(np.float,np.float32)):
            pdf = DIST_DICT[k].pmf(x, params)
        elif len(params)==2:
            pdf = DIST_DICT[k].pdf(x, loc = params[-2], scale = params[-1])
        elif len(params)==3:
            pdf = DIST_DICT[k].pdf(x, params[0], loc = params[-2], scale = params[-1])
        elif len(params)==4:
            pdf = DIST_DICT[k].pdf(x, a= params[0], b= params[1], loc = params[-2], scale = params[-1])
        # plot pdf
        if pdf.max()<.06:
            plt.plot(pdf),plt.xlim([0,200])
            legend_list.append(k)
    plt.legend(legend_list + ['data'])
    
    #%
    # take the best fit distribution to determine after which value do advert life
    # spans become so low probability that they may be considered unreliable
    select_dist = result_fit.iloc[0]
    select_pdf= DIST_DICT[select_dist.name].pdf(x, *select_dist.params)
    select_cdf= DIST_DICT[select_dist.name].cdf(x, *select_dist.params)
    # find very unlikely advert life spans
    # if I were to take .95 I would get a threshold of 89 days - 
    # .84 is needed to get 55 days, which is consistent with what we know from the cumsum above
    duration_th = (select_cdf>.95).argmax()
    print(f'Data-driven cut off duration is {duration_th}')
    
    #%
else:
    # Just assign the duration threshold that has been manually chosen
    duration_th = DURATION_TH
    print(f'Cut off duration is {duration_th}')
    

In [None]:
#%%
# replace "bad" duration values (that is, those that are zeros or higher than the threshold)
good_durations = (data_df.duration>0) & (data_df.duration<=duration_th)
# take the median from those durations that will not be changed
median_duration = data_df[good_durations].duration.median()
print(f'Median duration to use is {median_duration}')

if CONSTANT_DUR:
    data_df['duration_to_use'] = DURATION_TH
    print(f'Using constant duration of {DURATION_TH}')
else:
    data_df['duration_to_use'] = data_df.duration.copy()
    # replace 0s and 1s
    data_df.loc[data_df.duration_to_use==0,'duration_to_use'] = median_duration
    data_df.loc[data_df.duration_to_use>duration_th,'duration_to_use'] = duration_th #median_duration
    data_df.duration_to_use = data_df.duration_to_use.fillna(median_duration)
    sns.distplot(data_df.duration_to_use)

assert(data_df.duration_to_use.isna().sum()==0)
print(f'Max duration used in the dataset is {data_df.duration_to_use.max()}')



In [None]:
#(re-)compute end date 
'''
Note that I'm using the convention of removing 1 (even though it means I need to shift the removal day by 1 
when computing the stock) because a) it makes sense to have the expiration date rather than the removal date
and b) that is how the original expiration date is in the TK dataset
'''
data_df['end_date'] = data_df.date + pd.to_timedelta(
        data_df.duration_to_use - 1, unit='D')

# initialise weight column with 1
data_df['vacancy_weight']= 1



In [None]:
data_df.head(5)


# Flow to stock model

In [None]:
# get and plot un-weighted monthly stock of vacancies against ons vacancies
t0 = tt()
# new way
stock_per_month, stock_per_day, _, _ = get_stock_breakdown(
    data_df, agg_func = 'count', agg_col = 'vacancy_weight', breakdown_col = 'final_sic_letter')

print_elapsed(t0,'computing daily and monthly stock')


In [None]:
stock_per_month.head()


In [None]:
# Get which SIC codes are in both stocks (ONS and online job adverts)
sic_in_common = sorted(set(data_df.final_sic_letter.value_counts().index).intersection(raw_jvs.columns))
print(sic_in_common)
    

In [None]:
# Plot ONS vs OJA stock (on separate scales)
_ = twin_plot(1e3*raw_jvs.vacancies, stock_per_month[sic_in_common].sum(axis=1))
    
#plt.legend(['ONS VS', 'OJV'])
print('Starting date for the stock: ', stock_per_month.index.min())
if SAVEFIGS:
    plt.savefig(f"{res_folder}/raw_stock_vs_ons_double_axis.jpg")


In [None]:
# Plot ONS vs OJA stock (on same scale)
# Note: this is just indicative and the overall level depends on the exact flow to stock model used,
# including the details of the boundary conditions - if OJA levels are lower, it doesn't say anything about the
# coverage of Textkernel data, especially because the stock of OJA is divided by a factor of 2 to account for the 
# fact that the ONS vacancy survey is only open for two weeks per month.
plt.figure(figsize = (10,7))
plt.plot(stock_per_month.sum(axis=1))
plt.plot(raw_jvs.vacancies*1000)
plt.xlabel('Date', fontsize = 13)
plt.ylabel('Vacancy stock', fontsize = 13)
plt.legend(['OJA','ONS'], fontsize = 13) #OJA = online job adverts
plt.tight_layout()
if SAVEFIGS:
    plt.savefig(f"{res_folder}/raw_stock_vs_ons_single_axis.svg")


In [None]:
# Plot SIC letters separately: this can only be used to understand whether the trends are similar between OJA and ONS
for col in sic_in_common: #stock_per_month.columns:
    if col == 'V':
        continue
    _ = twin_plot(1e3*raw_jvs[col], stock_per_month[col])
    tmp = np.corrcoef(raw_jvs[col].astype('float'),stock_per_month[col])[0,1]
    print((f"Time series correlation for {sic_letter_to_text[col]} is "
           f"{tmp:.3f}"))
    try:
        plt.title(sic_letter_to_text[col].capitalize())
    except:
        plt.title('others')
    if SAVEFIGS:
        plt.savefig(f"{res_folder}/raw_stock_vs_ons_sic_{col}_double_axis.jpg")


In [None]:
# plot the time series of the non-assigned stock
plt.plot(stock_per_month['uncertain'])


In [None]:
# Proportion of jobs without a SIC?
(data_df.final_sic_letter == 'uncertain').mean()


In [None]:
# Get the full list of sectors and which ones are in common with the ONS data 
oja_names = sorted(stock_per_month.columns)
ons_names = [col+'_ons' for col in sorted(raw_jvs.columns) if col in oja_names]
shared_oja_names = [t for t in oja_names if t in raw_jvs.columns]
extra_oja_names = [t for t in oja_names if t not in shared_oja_names]
shared_oja_names, extra_oja_names


In [None]:
# Keep the uncertain stock separate
stock_per_day_full = stock_per_day.copy() #[['A','T','uncertain']]
stock_per_month_full = stock_per_month.copy() #[['A','T','uncertain']]

# drop the original column
stock_per_day = stock_per_day.drop(axis = 1, labels = extra_oja_names) #['A','T','uncertain'])
stock_per_month = stock_per_month.drop(axis = 1, labels = extra_oja_names) #['A','T','uncertain'])


In [None]:
(stock_per_day<0).sum() #THE STOCK CAN NOT BE NEGATIVE: this should be empty


# Compute post-sampling weights to align the two data sources

## ------------------- MONTHLY WEIGHTS ESTIMATES -------------------

Compute the ratio between the two stocks per month, from the ONS and from OJA

Assign an average weight to the vacancies with uncertain SIC based on their assigned month

Assigning per-vacancy weight based on an assigned month and SIC letter

Rescale the per-vacancy weight by a monthly factor to increase alignment.


### Extract needed information for each vacancy

In [None]:
# all months of interest
all_months = pd.date_range(start= START_MONTH, end = END_MONTH, freq = 'M').map(
    set_month_at_beginning)

# initialise new dataframe - for each vacancy I want to compute the best month
# and how long it stays open during that month
data_df = data_df.assign(active_months = '')

# For each vacancy, get all the months in which it is active and the relative duration
for month in tqdm(all_months):
    tot_days = month.days_in_month
    month_begins = month # - pd.Timedelta(tot_days-1,unit='days')
    month_ends = set_month_at_end(month)
    # extract all jobs that are active during this month
    jobs_starting_now = data_df.date.between(month_begins, month_ends)
    jobs_ending_later = ((data_df.date<month_begins) & (
                data_df.end_date>month_begins))
    valid_jobs = jobs_starting_now | jobs_ending_later

    valid_durations = (data_df[valid_jobs].end_date.map(lambda x: 
        min([x,month_ends])) - data_df[valid_jobs].date.map(lambda x: 
        max([x,month_begins]))).map(lambda x: (x.days+1)/tot_days)
                                                          
    #vacancies_per_month.loc[valid_jobs,'active_durations'] = valid_durations
    # record the active month
    data_df.loc[valid_jobs,'active_months'] = data_df[
        valid_jobs].active_months.map(
        lambda x: x+ f';month {month.year}-{month.month:02}: ')
    # append the durations
    data_df.loc[valid_jobs,'active_months'] = data_df[
        valid_jobs].active_months + valid_durations.map(lambda x: f'{x:.3f}')
        #vacancies_per_month[valid_jobs].tmp_months.map(lambda x: f'{x:.3f}')

#assert(len(vacancies_per_month)== len(data_df))


In [None]:
#%% Add column with best month
t0 = tt()
data_df['best_month'] = data_df.active_months.map(
    get_top_month)

#turn all months to beginning of the month timestamp
data_df.best_month = pd.to_datetime(data_df.best_month).map(set_month_at_beginning)


print_elapsed(t0,'getting the best month')
#assert(len(vacancies_per_month) == len(data_df))


In [None]:
#%% Add column with the duration of a vacancy in its best month
t0 = tt()
data_df['best_month_duration'] = data_df.active_months.map(
    get_top_duration)
data_df.best_month_duration = data_df.best_month_duration.astype('float')
print_elapsed(t0,'getting how long vacancies are open in their best months')
#assert(len(vacancies_per_month) == len(data_df))


In [None]:
data_df.head()

### Compute the ratio between the two stocks per month, from the ONS and from OJA

In [None]:
#%% # compute weights by months and SIC
# join ONS and OJA data

joint_stock = raw_jvs[['vacancies']+shared_oja_names].merge(stock_per_month.copy(), how = 'outer',
                            suffixes = ('_ons','_counts'),
                            left_index = True, right_index=True)

for col,ons_col in zip(shared_oja_names,ons_names):
    #print(col,ons_col)
    assert(col+'_ons'==ons_col)
    #joint_stock = joint_stock.assign(ratio = lambda x: x[ons_name]*1000/x[oja_name])
    joint_stock[col+'_weight'] = joint_stock[ons_col]*1000/joint_stock[col+'_counts'].replace(0,pd.NA)

joint_stock = joint_stock[sorted(joint_stock.columns)]

joint_stock = joint_stock.replace(np.inf, 0)

#monthly_weights = pd.Series(1000*raw_jvs.vacancies.values[1:]/stock_month1.job_id.values,
#                            index= all_months[1:])
#joint_stock.loc[pd.to_datetime('2015-03-01'),'ratio'] = 1
for col in shared_oja_names:
    joint_stock[col+'_weight'] = joint_stock[col+'_weight'].astype('float')

# rename the columns
joint_stock = joint_stock.rename(columns = {'vacancies': 'vacancies_ons'})#, 
#                                            'vacancy_weight': 'stock_per_month_count'})

# replace NaN with the neutral weight (which is 1)
joint_stock = joint_stock.fillna(1)

print('Done')


In [None]:
joint_stock.head(5)


#### Assign an average weight to the vacancies with uncertain SIC based on their assigned month

In [None]:
# Add the weight for the 'uncertain' category as the median across all SIC codes
# I need to replicate them for all the categories of SIC codes that are not present in the ONS survey
joint_stock = joint_stock.assign(uncertain_weight = joint_stock[
    [col+'_weight' for col in shared_oja_names]].median(axis = 1))
'''
# In future, suggest replacing with this other line. Results are VERY similar but not identical
joint_stock = joint_stock.assign(uncertain_weight = joint_stock[
    [col+'_weight' for col in shared_oja_names if not 'B' in col]].mean(axis = 1))
'''

joint_stock = joint_stock.assign(A_weight = joint_stock.uncertain_weight.values)

joint_stock = joint_stock.assign(T_weight = joint_stock.uncertain_weight.values)

# rename columns for consistency
for col in oja_names:
    if col not in shared_oja_names:
        joint_stock = joint_stock.rename(columns = {col: col+'_counts'})

joint_stock = joint_stock[sorted(joint_stock.columns)]

In [None]:
joint_stock.head(5)


In [None]:
# example weights for agriculture
joint_stock.iloc[:5].A_weight


### Merge the monthly weights with the main dataframe 
That is, Assigning per-vacancy weight based on an assigned month and SIC letter

In [None]:
# Prepare dataframe for merging, i.e. put it in long form
weights_cols = [col for col in joint_stock.columns if 'weight' in col]
joint_stock_weights = joint_stock[weights_cols]
joint_stock_weights = joint_stock_weights.reset_index()
joint_stock_weights = pd.melt(joint_stock_weights, id_vars='month', 
                              value_vars = weights_cols,
                             value_name='vacancy_weight_adj',
                             var_name = 'sic_letter')
joint_stock_weights = joint_stock_weights.rename(columns = {'month': 'best_month'})
joint_stock_weights.sic_letter = joint_stock_weights.sic_letter.map(lambda x: x[:-7])
joint_stock_weights


In [None]:
data_df.columns

In [None]:
#%%%
# [if needed] remove weights from previous iterations with vacancies per month
if 'vacancy_weight_adj' in data_df.columns:
    print('removing old iteration of monthly weights')
    data_df = data_df.drop(axis = 1, labels = 'vacancy_weight_adj')
    #data_df = data_df[keep_columns]


#%%
# Merge vacancy weights based on sic classification and month
timer.start_task('joining new monthly weights')

small_df = None
# MERGING WEIGHTS WITHIN THE MAIN DATAFRAME
data_df = pd.merge(data_df, joint_stock_weights, 
                               left_on = ['final_sic_letter','best_month'], 
                               right_on= ['sic_letter','best_month'],
                               how = 'left')
assert(old_len_data== len(data_df))

data_df.vacancy_weight = data_df.vacancy_weight_adj
#small_df = small_df.drop(axis = 1, labels = 'monthly_weight')
timer.end_task()

# multiply by the duration percentage
data_df['vacancy_weight_adj'] = data_df.vacancy_weight * data_df.best_month_duration


data_df.head()


In [None]:
len(data_df)


In [None]:
t0 = tt()
print(data_df.vacancy_weight.max())
print_elapsed(t0,'')


### Rescale the per-vacancy weight by a monthly factor to increase alignment

This function is used to scale the per-vacancy weights used to align the stock of online job vacancies with the stock of vacancies from the ONS survey. For more info see the docstring of the function 'scale_weights_by_total_levels'

In [None]:

# compute the scaled weights
new_weights_df = scale_weights_by_total_levels(joint_stock_weights.rename(
    columns = {'sic_letter': 'final_sic_letter', 'vacancy_weight_adj': 'vacancy_weight'}), 
    raw_jvs, stock_per_month_full, sectors_in_common = shared_oja_names)
printdf(new_weights_df.head(5))

In [None]:
# join up with main dataframe
timer.start_task('joining dataframe to add re-scale adjustment weights')
data_df = data_df.merge(new_weights_df[['best_month','final_sic_letter','vacancy_weight_new']], 
                        on = ['best_month','final_sic_letter'], how ='left')
timer.end_task()


In [None]:
data_df = data_df.rename(columns = {'vacancy_weights_new': 'vacancy_weight_new'})


## Re-compute the stock of vacancies and show the results

TODO: clean up some of these cells

In [None]:
#%% get new daily stock of vacancies
timer.start_task('recomputing daily and monthly OJV stock')
new_stock_per_month, _, _ , _ = get_stock_breakdown(data_df, agg_func = 'sum', 
                               agg_col = 'vacancy_weight_new', breakdown_col = 'final_sic_letter')
timer.end_task()


In [None]:
#%% Plot ONS stock, un-weighted OJA stock, weighted  OJA stock, WITHOUT the uncertain weights - there should be
# a drop in level

plt.figure(figsize = (8,5))
plt.plot(new_stock_per_month[sic_in_common].sum(axis=1))
plt.plot(1000*raw_jvs.vacancies,'--')
plt.plot(stock_per_month[sic_in_common].sum(axis=1))
plt.xlabel('Date', fontsize = 13)
plt.ylabel('Vacancy stock', fontsize = 13)
plt.legend(['OJV after','ONS', 'OJV before'], fontsize = 13)
if SAVEFIGS:
    plt.savefig(f"{res_folder}/adjusted_stock_vs_raw_vs_ons_common_sic_single_axis.svg")


#%% Plot ONS stock, un-weighted OJA stock, weighted  OJA stock, WITH the uncertain weights - now total level should
# be the same
plt.figure(figsize = (8,5))
plt.plot(new_stock_per_month.sum(axis=1))
plt.plot(1000*raw_jvs.vacancies,'--')
plt.plot(stock_per_month.sum(axis=1))
plt.xlabel('Date', fontsize = 13)
plt.ylabel('Vacancy stock', fontsize = 13)
plt.legend(['OJV after','ONS', 'OJV before'], fontsize = 13)
if SAVEFIGS:
    plt.savefig(f"{res_folder}/adjusted_stock_vs_raw_vs_ons_all_sic_single_axis.svg")


In [None]:
# Plot ONS and weighted OJA stock for each SIC letters separately. Again, total levels are likely to be different
for col in shared_oja_names: #stock_per_month.columns:
    if col in ['A','T','uncertain']:
        plt.plot(new_stock_per_month[col])
        plt.title("Stock of 'uncertain' vacancies")
    else:
        #_ = twin_plot(1e3*raw_jvs[col], new_stock_per_month[col])
        plt.figure(figsize = (8,8))
        plt.plot(new_stock_per_month[col],label = 'OJV after')
        plt.plot(1000*raw_jvs[col],'--',label='ONS')
        plt.plot(stock_per_month[col],label = 'OJV before')
        plt.xlabel('Date', fontsize = 13)
        plt.ylabel('Vacancy stock', fontsize = 13)
        plt.legend(fontsize = 13)
        tmp = np.corrcoef(raw_jvs[col].astype('float'),new_stock_per_month[col])[0,1]
        print((f"Time series correlation for {sic_letter_to_text[col]} is "
               f"{tmp:.3f}"))
        plt.title(sic_letter_to_text[col])
        if SAVEFIGS:
            plt.savefig(f"{res_folder}/adjusted_stock_vs_raw_vs_ons_sic_{col}_single_axis")
    

In [None]:
# merge the new stock per month with the un-weighted one
joint_stock = joint_stock.merge(new_stock_per_month, left_index = True, 
                                right_index = True)
joint_stock = joint_stock.rename(columns = 
                                 {col: col+'_sum' for col in new_stock_per_month.columns})
joint_stock = joint_stock[sorted(joint_stock.columns)]
#'vacancy_weight': 'stock_per_month_sum'})
joint_stock.head()


In [None]:
# compute and show the correlation between ONS and OJA stock before (un-weighted) and after (weighted)
for col in shared_oja_names: #new_stock_per_month.columns:
    if col in ['A','T','uncertain']:
        continue
    print(f"Correlation before and after for {col}")
    joint_stock[col+'_ons'] = joint_stock[col+'_ons'].astype('float')
    print(joint_stock[[col+'_ons',col+'_counts',col+'_sum']].corr()[col+'_ons'])
    print(f"MSE before and after for {col}")
    mse_before = ((joint_stock[col+'_ons'] - joint_stock[col+'_counts']/1000)**2).sum()
    mse_after = ((joint_stock[col+'_ons'] - joint_stock[col+'_sum']/1000)**2).mean()
    print(pd.Series([mse_before,mse_after],index = [col + t for t in ['_counts','_sum']]))
    print()
    

In [None]:
# save results
# Might be useful to save results to test and compare different flow-to-stock and re-weighting models
SAVE_STOCK = False
if SAVE_STOCK:
    joint_stock.to_csv((f"{res_folder}"
                f"/monthly_stock_by_sic_before_and_after_with_duration_th_at_{DURATION_TH}.csv"))
    # save the old stock
    stock_per_month.to_csv((f"{res_folder}"
                f"/raw_monthly_stock_by_sic_from_jobs_with_duration_th_at_{DURATION_TH}.csv"))


In [None]:
data_df.head(1)


# Save monthly weights to disk 

Do this so that we can load and join them with the main dataframe for future analysis


In [None]:
# save per-vacancy weights so that they can be more easily loaded for further analysis
data_df[['job_id', 'date', 'duration', 'posting_id',
         'profession_soc_code_value', 'organization_industry_value',
         'organization_name', 'clean_organization_name',
         'final_sic_letter', 'duration_to_use', 
         'vacancy_weight', 'vacancy_weight_new',
         'active_months', 'best_month', 'best_month_duration']].to_csv(
    f"{data_path}/data/interim/interim_job_id_and_vacancy_weights.gz", 
    compression = 'gzip', encoding = 'utf-8', index= False)


# Legacy cells

These functions are either not needed or have been moved to the script "flow_to_stock_funcs.py"

In [None]:
'''
def load_ons_vacancies(start_date = '2015-03-01', end_date = '2019-10-31'):
    """ 
    Load and process ONS monthly, non-seasonally adjusted vacancy data [dataset x06].
    This function is tailored to the current grouping of some SIC codes.
    
    Note: it assumes the dataset has been downloaded and put in the data/aux folder path.
    
    Arguments:
    start_date = first day of month from when to start the time series
    end_date = last day of month from when to end the time series
    """
    ons_df = pd.read_excel(f"{data_path}/data/aux/x06apr20.xls", sheet_name='Vacancies by industry',
                          skiprows = 3)
    # keep columns with data and clean the column names
    ons_df = ons_df[[col for col in ons_df.columns if not 'Unnamed:' in col]]
    cleaned_col_names = {}
    for col in ons_df.columns[2:]:
        cleaned_col = col.replace('&', 'and').replace('-','').replace(',','').lower()
        cleaned_col = ''.join([t for t in cleaned_col if not t.isdigit()])
        cleaned_col_names[col] = '_'.join(cleaned_col.split())
    # manual adjustment for one column
    cleaned_col_names['Manu-    facturing'] = 'manufacturing'
    cleaned_col_names['SIC 2007 sections'] = 'month'
    cleaned_col_names['All vacancies1 '] = 'vacancies'
    ons_df = ons_df.rename(columns = cleaned_col_names)
    # extract the row with the letters
    sic_letters = ons_df.iloc[0]
    # remove empty rows
    ons_df = ons_df.loc[(ons_df.month.notna()) & (ons_df.vacancies.notna())]
    # join up some industries
    ons_df = ons_df.assign(wholesale_retail_motor_trade_and_repair = 
                           ons_df.motor_trades + ons_df.wholesale + ons_df.retail)
    ons_df = ons_df.assign(wholesale_and_retail = ons_df.wholesale + ons_df.retail)
    #ons_df = ons_df.assign(others = ons_df.vacancies - ons_df[partial_map_tk2sic.values()].sum(axis=1))
    
    ons_df = ons_df.assign(education_and_professional_activities = ons_df.education + 
                                                        ons_df.professional_scientific_and_technical_activities)
    ons_df = ons_df.assign(utilities = ons_df.electricity_gas_steam_and_air_conditioning_supply + 
                           ons_df.water_supply_sewerage_waste_and_remediation_activities)
    ons_df = ons_df.assign(personal_and_public_services = ons_df.real_estate_activities + 
                                                    ons_df['public_admin_and_defence;_compulsory_social_security'] +
                                                    ons_df.other_service_activities)
    sic_letters.loc['wholesale_retail_motor_trade_and_repair'] = 'G'
    sic_letters.loc['wholesale_and_retail'] = 'G46_47'
    sic_letters.loc['education_and_professional_activities'] = 'M_P'
    sic_letters.loc['utilities'] = 'D_E'
    sic_letters.loc['personal_and_public_services'] = 'L_O_S'
    sic_letters.loc['others'] = 'Z'
    sic_letters.loc['vacancies'] = 'vacancies'
    #
    ons_df.month = pd.to_datetime(ons_df.month)
    # only need vacancies within a certain period
    ons_df = ons_df[(ons_df.month>=pd.to_datetime(start_date)) & 
                    (ons_df.month<=pd.to_datetime(end_date))]
    ons_df = ons_df.set_index('month')
    return ons_df, sic_letters
'''
1


In [None]:
#%% -------------------------------------------------------------------------
#              Main functions to convert from flow to stock
#% --------------------------------------------------------------------------

'''
#%% Moved to separate file
def get_stock_v0(data, agg_func = 'count', agg_col = 'vacancy_weight', BOUNDARY = None):
                    #start_day = '2015-03-01', end_day = '2019-10-31',
#                    GET_END_DATE = False, BOUNDARY = None):
    """Compute the daily stock of vacancies via difference of cumulative sums.
    
    Keyword arguments:
    data -- dataframe with online job vacancies. Need to have "date", "end_date" and agg_col columns
    agg_func: whether to count the vacancies or to sum the weights
    agg_col -- reference column to aggregate (usually column with per-vacancy weights)
    BOUNDARY -- what to do wrt boundary conditions (start and end month)

    # obs: taking the difference of the cumulative sum is the same as doing this:
    df_stock2 = pd.DataFrame(columns = ['open_vacancies'], 
                             index= pd.date_range(start='2015-03-13',end='2019-10-31',freq='D'))
    df_stock2.open_vacancies = 0
    for reference_day in tqdm(df_stock.index):
        df_stock2.loc[reference_day] = ((data.date<=reference_day) &
                               (data.end_date>reference_day)).sum()
    TO CHECK: Why is it different from the function below?
    """
    
    start_day = data.date.min()
    end_day = data.date.max()
    
    if agg_func == 'count':
        df_start = data.groupby('date').count()[agg_col].to_frame()#.reset_index()
        df_end = data.groupby('end_date').count()[agg_col].to_frame()#.reset_index()
    elif agg_func == 'sum':
        df_start = data.groupby('date').sum()[agg_col].to_frame()#.reset_index()
        df_end = data.groupby('end_date').sum()[agg_col].to_frame()#.reset_index()
    else:
        print('Wrong aggregate function')
        assert(agg_func in ['count','sum'])
    
    #df_start = df_start.set_index('date').resample('1D').mean().fillna(0)
    #df_end = df_end.set_index('end_date').resample('1D').mean().fillna(0)
    # shift the end date by one, since vacancies disappear the day after their expiration date
    df_end = df_end.shift(1)

    df_start = df_start.reindex(pd.date_range(start=start_day,
                                    end=end_day,freq='D'), fill_value =0)
    df_end = df_end.reindex(pd.date_range(start=start_day,
                                    end=end_day,freq='D'), fill_value =0)
    
    # compute daily stock
    df_stock = df_start.cumsum() - df_end.cumsum()
    
    # add boundary conditions, if requested
    if BOUNDARY == 'valid':
        # TO BE REVIEWED
        # start from the month after the first date
        if not df_stock.index[0].is_month_start:
            valid_start = pd.offsets.MonthBegin(0).rollforward(df_stock.index[0])
        else:
            valid_start = pd.offsets.MonthBegin(0).rollforward(df_stock.index[1])
        print(f'Starting from {valid_start}')
        df_stock = df_stock[df_stock.index>=valid_start]

    # resample to monthly
    stock_month1 = df_stock.resample('M').mean()
    stock_month1.index = stock_month1.index.map(set_month_at_beginning)
    
    # Remove first month if we want to avoid the boundary
    if BOUNDARY=='valid':
        stock_month1 = stock_month1[stock_month1.index>=set_month_at_beginning(
            pd.to_datetime(FIRST_VALID_MONTH))]
    return stock_month1, df_stock, df_start, df_end


def get_stock(data, agg_func = 'sum', agg_col = 'vacancy_weight', BOUNDARY = None):
    """Compute the daily stock of vacancies via cumulative sum of net Flow.
    
    Keyword arguments:
    data -- dataframe with online job vacancies. Need to have "date", "end_date" and agg_col columns
    agg_func: whether to count the vacancies or to sum the weights
    agg_col -- reference column to aggregate (usually column with per-vacancy weights)
    BOUNDARY -- what to do wrt boundary conditions (start and end month)
    """
    
    start_day = data.date.min()
    end_day = data.date.max()
    
    if agg_func == 'sum':
        vacancy_flow_per_day = data.groupby('date')[agg_col].sum()
        vacancy_remove_per_day = data.groupby('end_date')[agg_col].sum()
    else:
        vacancy_flow_per_day = data.groupby('date')[agg_col].count()
        vacancy_remove_per_day = data.groupby('end_date')[agg_col].count()
    
    # shift vacancy_remove_per_day by one day since vacancies disappear the day after their expiration date
    vacancy_remove_per_day = vacancy_remove_per_day.shift(1)
    
    # adjust so that they start and end on the same dates
    vacancy_flow_per_day = vacancy_flow_per_day.reindex(pd.date_range(start=start_day,
                                        end=end_day,freq='D'), fill_value =0)
    vacancy_remove_per_day = vacancy_remove_per_day.reindex(pd.date_range(start=start_day,
                                        end=end_day,freq='D'), fill_value =0)
    
    # compute the net Flow
    net_flow = vacancy_flow_per_day.fillna(0) - vacancy_remove_per_day.fillna(0)
    
    # Get the daily stock
    daily_stock = net_flow.cumsum()
    
    # Resample to monthly stock
    monthly_stock = net_flow.resample('M').sum().cumsum()
    monthly_stock.index = monthly_stock.index.map(set_month_at_beginning)
    
    # enforce boundary conditions
    if BOUNDARY == 'valid':
        monthly_stock = monthly_stock[monthly_stock.index>=set_month_at_beginning(
            pd.to_datetime(FIRST_VALID_MONTH))]
    return monthly_stock, daily_stock, vacancy_flow_per_day, vacancy_remove_per_day
        
def get_stock_breakdown(data, agg_func = 'sum', agg_col = 'vacancy_weight', 
                              breakdown_col = 'organization_industry_value', BOUNDARY = None):
    """Compute the daily stock of vacancies via cumulative sum of net Flow.
    
    Keyword arguments:
    data -- dataframe with online job vacancies. Need to have "date", "end_date" and agg_col columns
    agg_func: whether to count the vacancies or to sum the weights
    agg_col -- reference column to aggregate (usually column with per-vacancy weights)
    BOUNDARY -- what to do wrt boundary conditions (start and end month)
    """
    
    start_day = data.date.min()
    end_day = data.date.max()
    
    if agg_func == 'sum':
        vacancy_flow_per_day = data.groupby(['date',breakdown_col])[agg_col].sum()
        vacancy_remove_per_day = data.groupby(['end_date',breakdown_col])[agg_col].sum()
    else:
        vacancy_flow_per_day = data.groupby(['date',breakdown_col])[agg_col].count()
        vacancy_remove_per_day = data.groupby(['end_date',breakdown_col])[agg_col].count()
    
    vacancy_flow_per_day = vacancy_flow_per_day.unstack()
    vacancy_remove_per_day = vacancy_remove_per_day.unstack()
    
    # shift vacancy_remove_per_day by one day since vacancies disappear the day after their expiration date
    vacancy_remove_per_day = vacancy_remove_per_day.shift(1)
    
    # adjust so that they start and end on the same dates
    vacancy_flow_per_day = vacancy_flow_per_day.reindex(pd.date_range(start=start_day,
                                        end=end_day,freq='D'), fill_value =0)
    vacancy_remove_per_day = vacancy_remove_per_day.reindex(pd.date_range(start=start_day,
                                        end=end_day,freq='D'), fill_value =0)
    
    # compute the net Flow
    net_flow = vacancy_flow_per_day.fillna(0) - vacancy_remove_per_day.fillna(0)
    
    # Get the daily stock
    daily_stock = net_flow.cumsum()
    
    # Resample to monthly stock
    monthly_stock = net_flow.resample('M').sum().cumsum()/2
    monthly_stock.index = monthly_stock.index.map(set_month_at_beginning)
    
    # enforce boundary conditions
    if BOUNDARY == 'valid':
        monthly_stock = monthly_stock[monthly_stock.index>=set_month_at_beginning(
            pd.to_datetime(FIRST_VALID_MONTH))]
    return monthly_stock, daily_stock, vacancy_flow_per_day, vacancy_remove_per_day
'''
1
