## Greenburgh Tax Assessment Analysis

### Setup

In [None]:
import numpy as np
import pandas as pd
import matplotlib as plt
import sqlite3 as lite
import matplotlib
import matplotlib.pyplot as plt
plt.style.use('ggplot')

In [None]:
## Tax assessment database and Assessment table.
TAX_ASSESS_DB = "/home/rsm/proj/tax_ass/taxdb/taxrec.db"
ASSESS_TABLE='taxrec'


In [None]:
pd.set_option('display.max_columns', 15)

In [None]:
## Read in data from our database.
conn  = lite.connect(TAX_ASSESS_DB)
query = f"SELECT * from {ASSESS_TABLE};"
df    = pd.read_sql_query(query, conn)

In [None]:
## We need data sorted by year as we will group by YEAR to get a list of FULL_MKT_VALUE(s)
## which we will use to create a return series.
df = df.sort_values(by='YEAR')

### Exceptions
These exception classes will be used to throw errors in the functions below.

In [None]:
class WeightArrayMisMatch(Exception):
  '''
    Array and associated weight array do not have the same length.
  '''
  def __init__(self, message="Weight array and associated values array are not the same length."):
    self.message = message
    super().__init__(self.message) 
    
class NegativeWeightArrayValue(Exception):
  '''
    Weight array has at least one negative value..
  '''
  def __init__(self, message="Weight array has at least one negative value."):
    self.message = message
    super().__init__(self.message) 
    
class WeightSumNotPositive(Exception):
  '''
    Weight array sum is not positive.
  '''
  def __init__(self, message="Sum of weight array values is not positive."):
    self.message = message
    super().__init__(self.message) 
    
class NotNumpyArray(Exception):
  '''
    Array is not a numpy array.
  '''
  def __init__(self, message="Array is not a numpy array."):
    self.message = message
    super().__init__(self.message) 
    
class NotProperQuantile(Exception):
  '''
    Array is not a numpy array.
  '''
  def __init__(self, message="Array as at least one value not in [0, 1]."):
    self.message = message
    super().__init__(self.message) 

### Functions
Functions used in the analysis of the Tax Assessment data.

In [None]:
def wgt_quantiles(vss, wgts, qss):
  '''
    Get a numpy array (or scalar if qss is scalar) consisting of an array of quantile weighted <vss> values.
    
  :param vss  A numpy(np) array of values. 
  :param wgts A numpy(np) array of weights. (Standard usage: Cummulative weights (increasing values in the range [0,1]).
  :param qss  A numpy(np) array of values.  (Meant to be quantiles -- numbers in the range [0, 1])
              OR, a scalar value.
  :return An numpy array (or scalar) consisting of the quantile weighted values of <vss> using weights, <wgts>, for each quantile in <qss>.
    :type numpy array of numeric values (or scalar) with the same length as <qss>.
  
  :packages numpy(np)
  
  :arg-contract 
    1. vss, wgts are all numpy arrays.
    2. qss in [0.0, 1.0]
    3. |vss| == |wgts|
    4. all(wgts) >= 0
    5. sum(wgts) > 0

  '''
  
  ## Check the argument contract...
  scalar_quants = False
  if type(vss)  != np.ndarray:
    raise(NotNumpyArray('wgt_quantiles: <vss>: Not an numpy array.' ))
  if type(wgts) != np.ndarray:
    raise(NotNumpyArray('wgt_quantiles: <wgts>: Not an numpy array.'))
  if type(qss)  != np.ndarray:
    qss = np.array([qss])
    scalar_quants = True
  if any((qss < 0.0) | (qss > 1.0)):
    raise(NotProperQuantile('wgt_quantiles: <qss>: Not a proper quantiles array.'))
  if np.size(vss) != np.size(wgts):
    raise(WeightArrayMisMatch('wgt_quantiles: <vss> and <wgts> do not have the same length.'))
  if any(wgts < 0.0):
    raise(NegativeWeightArrayValue('wgt_quantiles: <wgts> has one or more negative elements.'))
  if sum(wgts) <= 0:
    raise(WeightSumNotPositive('wgt_quantiles: Sum of <wgts> is not positive.'))
    
  ## Need to reshape these arrays in order to do broadcasting, so first copy them.
  vs  = vss.copy()
  qs  = qss.copy()
  ws  = wgts.copy()
  
  ## Sort the vs array and the associated weights.
  ## Turn the weights into proper weights and create a cummulative weight array.
  idx = np.argsort(vs)
  vs  = vs[idx]
  ws  = ws[idx]
  ws  = ws / np.sum(ws)
  cws = np.cumsum(ws)
  
  N   = np.size(cws)
  M   = np.size(qs)
  
  ## Reshape to broadcast.
  cws.shape = (N, 1)
  qs.shape  = (1, M)
  
  ## Use broadcasting to get all comparisons of <cws> with each entry from <qs>.  
  ## Do a diff (be mindfull of beginning and end of cws array) to get where the max of vs <= (each element of <qs>).
  A   = np.concatenate([np.ones(M).reshape(1,M), (cws <= qs) * 1, np.zeros(M).reshape(1,M)], axis=0)
  A   = np.diff(A, axis=0).astype(int)
  idx = np.maximum(0, np.where(A == -1)[0] - 1)
  
  ## Return the weighted quantile value of <vs> against each <qs>.
  if scalar_quants:
    return(vs[idx][0])
  return(vs[idx])

In [None]:
def assessment_agr_rets(df, ret_field, wgt_field, filt=True):
  '''
    Computes the weighted averge returns over the date range of the data frame, <df>.
    
  :param df: A Pandas DataFrame with required fields.
           :type pandas.DataFrame.core
  :param ret_field: A field representing an numpy array of returns.
           :type str
  :param wgt_field: A weight field, used to weight the returns in a given row.
           :type str
           
  :returns An numpy array of aggregated returns.
  :rtype numpy(np) array(float)
  
  :packages numpy(np), pandas
  '''
  ## If the filter value is a scalar -- replicate it to the length of the dataframe, <df>.
  fl = filt
  if np.shape(filt) == ():
    fl = np.repeat(filt, df.shape[0])

  return(np.average(np.stack(df.loc[fl, ret_field]), weights=df.loc[fl, wgt_field], axis=0)).astype(float)

In [None]:
## This function does not currently work. There is a mismatch witht he qgt_quantiles function it calls.
def assessment_wgt_quant_rets(df, ret_field, wgt_field, quants, filt=True):
  '''
    Computes weighted quantiles of returns over the date range of the data frame, <df>.
    
  :param df: A Pandas DataFrame with required fields.
            :type DataFrame
  :param ret_field: A field representing an numpy array of returns.
            :type str
  :param wgt_field: A weight field, used to weight the returns in a given row.
            :type str
  :param quants: A scalar or numpy(np) array of quantiles.
           
  :returns An numpy array of weighted quantile returns.
            :rtype numpy array(float)
  '''
  ## If the filter value is a scalar -- replicate it to the length of the dataframe, <df>.
  fl = filt
  if np.shape(filt) == ():
    fl = np.repeat(filt, df.shape[0])
  
  return(wgt_quantiles(np.stack(df.loc[fl, ret_field]), df.loc[fl, wgt_field].to_numpy(), quants))

In [None]:
def assessment_wgt_median_rets(df, ret_field, wgt_field, filt=True):
  '''
    Computes weighted quantiles of returns over the date range of the data frame, <df>.
    
  :param df: A Pandas DataFrame with required fields.
            :type DataFrame
  :param ret_field: A field representing an numpy array of returns.
            :type str
  :param wgt_field: A weight field, used to weight the returns in a given row.
            :type str
           
  :returns An numpy array of weighted quantile returns.
            :rtype numpy array(float)
  '''
  ## If the filter value is a scalar -- replicate it to the length of the dataframe, <df>.
  fl = filt
  if np.shape(filt) == ():
    fl = np.repeat(filt, df.shape[0])
  
  return(wgt_quantiles(np.stack(df.loc[fl, ret_field]), df.loc[fl, wgt_field].to_numpy(), np.array([0.5]))[0])

### Data Examination

In [None]:
df.head()

In [None]:
df.PARCEL_TYPE.unique()

### Data Cleaning
We use regular expression matching to determine which data should be filtered out.
We create a dictionary below, badLinesDct, that contains the number of bad lines
for each field of interest: FULL_MKT_VALUE (market value of parcel), ACCT (parcel account id), LUC (Land Use Code), ACCR (parcel acreage size).

In [None]:
badLinesDct = {}
good_mkt_filter = df["FULL_MKT_VALUE"].astype(str).str.match("^\d+$")
badLinesDct['FULL_MKT_VALUE']  = df.loc[~ good_mkt_filter].shape[0]

good_acct_filter = df["ACCT"].astype(str).str.match("^\d+$")
badLinesDct['ACCT'] = df.loc[~ good_acct_filter].shape[0]

good_luc_filter = df['LUC'].astype(str).str.match("^\d+$")
badLinesDct['LUC'] = df.loc[~ good_luc_filter].shape[0]

good_accr_filter = df['ACCR'].astype(str).str.match("(^(\d*)\.\d+$)|(^\d+(\.\d*)?$)")
badLinesDct['ACCR'] = df.loc[~ good_accr_filter].shape[0]

In [None]:
badLinesDct

In [None]:
df[~(good_mkt_filter & good_luc_filter & good_acct_filter)].shape[0]

In [None]:
## Filter out the bad mkt value, luc, and acct data.
df_filt = df[(good_mkt_filter & good_luc_filter & good_acct_filter)]

In [None]:
dd = df_filt.groupby('ACCT')['FULL_MKT_VALUE'].agg(lambda x: x.size).reset_index(name='MKT_COUNT')

dd1 = df_filt.groupby('ACCT')['FULL_MKT_VALUE'].agg(lambda x: any(x == 0)).reset_index(name='MKT_ZERO')

ddd = dd.merge(dd1, on='ACCT', how='inner')

## Now get the accounts that extend over the 11 period that we have data mkt value data for AND which aren't zero.
accts = ddd.loc[(ddd.MKT_COUNT == 11) & (~ ddd.MKT_ZERO)].ACCT

## Only use these accounts from the filtered data. This is the data set we will use for analysis.
df_clean = df_filt.loc[df_filt['ACCT'].isin(accts), :]
df_clean.to_csv("clean_tax_ass.psv", sep='^', encoding='utf-8')

In [None]:
## Describe the reduction in data after cleaning.
print(f"Data cleaning reduced the overall data set by {100.0 * np.round( (df.shape[0] - df_clean.shape[0]) / df.shape[0], 2)}%.")
raw_residencial_count = df.loc[df.LUC == 210].shape[0]
filtered_residencial_count = df_clean.loc[df_clean.LUC == 210].shape[0]
print(f"Data cleaning reduced the residencial data set by {100.0 * np.round( (raw_residencial_count - filtered_residencial_count) / raw_residencial_count, 2)}%.")

### Compute Market Return Series

In [None]:
## Create a field, 'mkt_vals' that is an np.array of returns (ordered by YEAR).
dd = df_clean.groupby('ACCT').apply(lambda row: np.array(row['FULL_MKT_VALUE'])).reset_index(name='mkt_vals')

dd['mkt_rets'] = dd['mkt_vals'].apply(lambda x: np.diff(x)) / dd['mkt_vals'].apply(lambda x: x[:-1])

dd['avg_mkt_val'] = dd.apply(lambda row: np.mean(row['mkt_vals']), axis=1)

df_rets = df_clean.merge(dd, on='ACCT', how='inner')

df_rets

In [None]:
dff = pd.value_counts(df_rets.LUC).to_frame(name='LUC_cnt').reset_index()
dff['log_LUC'] = np.log10(dff.LUC_cnt)
ax = dff.plot.scatter(x = 'LUC', y='log_LUC', xlabel='LUC\n(Residencial LUC=210)', ylabel='Log10 of LUC Count', title="Log10 of LUC counts")
ax.axvline(210, linestyle='--');

### Aggregated Market Returns

In [None]:
## Empty data frame to store returns.
df_results = pd.DataFrame({})

In [None]:
## Overall Aggregated Market Returns
df_results['overall'] = df_rets['mkt_rets'].agg(np.mean)

In [None]:
## Aggregated Market Returns for LUC 210 -- Single family residence.
df_results['residence'] = df_rets.loc[df_rets['LUC'] == 210, 'mkt_rets'].agg(np.mean)

In [None]:
df_rets.columns

In [None]:
## Compute overall weighted returns using the average market value as the weight 
## -- Also recompute but restrict analysis to single family residences -- LUC = 210.
df_results['overall_mkt_wgt'] = assessment_agr_rets(df_rets, 'mkt_rets', 'avg_mkt_val')
df_results['residence_mkt_wgt'] = assessment_agr_rets(df_rets, 'mkt_rets', 'avg_mkt_val', filt = df_rets['LUC'] == 210)

In [None]:
df_results.columns

In [None]:
ax = df_results[['residence', 'overall_mkt_wgt', 'residence_mkt_wgt']].plot( 
                     xlabel="Year"                     , 
                     ylabel="Assessment Change from Previous Year",
                     title="Tax Assessment Comparison (Greenburgh)" ,
                     xticks=df_results.index, rot=90    )
ax.set_xticklabels(['2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022']);
ax.legend(['Residential Avg', 'Overall Mkt Wgt Avg', 'Residential Mkt Wgt Avg']);

In [None]:
## Overall Aggregated Market Returns
df_results['overall_cs'] = np.cumprod(1.0 + df_rets['mkt_rets'].agg(np.mean)) - 1.0

In [None]:
## Aggregated Market Returns for LUC 210 -- Single family residence.
df_results['residence_cs'] = np.cumprod(1.0 + df_rets.loc[df_rets['LUC'] == 210, 'mkt_rets'].agg(np.mean)) - 1.0

In [None]:
df_rets.columns

In [None]:
## Compute overall weighted returns using the average market value as the weight.
df_results['overall_mkt_wgt_cs'] = np.cumprod(1.0 + assessment_agr_rets(df_rets, 'mkt_rets', 'avg_mkt_val')) - 1.0

In [None]:
## Compute overall weighted returns using the average market value as the weight 
## -- but restrict analysis to single family residences -- LUC = 210.
df_results['residence_mkt_wgt_cs'] = np.cumprod(1.0 + assessment_agr_rets(df_rets, 'mkt_rets', 'avg_mkt_val', filt = df_rets['LUC'] == 210)) - 1.0

In [None]:
ax = df_results[['residence_cs', 'overall_mkt_wgt_cs', 'residence_mkt_wgt_cs']].plot( 
                     xlabel="Year"                     , 
                     ylabel="Assessment Change from 2012",
                     title="Tax Assessment Comparison (Greenburgh)\n(Cumulative Change)" ,
                     xticks=df_results.index, rot=90    )
ax.set_xticklabels(['2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022']);
ax.legend(['Residential Avg', 'Overall Mkt Wgt Avg', 'Residential Mkt Wgt Avg']);