In [1]:
import pandas as pd
import numpy as np
import csv
import os
from pandas import ExcelWriter
from GLSettingsByState import GLTaxSettingsByState
from datetime import datetime

In [2]:
def excelSave(df_dict, output_folder=True):    
    def write_df(dataframe, df_key):
        if dataframe is None:
            print("error w/ df")
        # output to "output" folder is True (default setting)
        output_path = df_key
        if output_folder: output_path = "output/" + df_key
        writer = ExcelWriter(output_path + ".xlsx", engine="xlsxwriter")
        dataframe.to_excel(writer, sheet_name=df_key + "_", index=False)
        writer.save()
        
    if isinstance(df_dict, type(dict())):
        for df_key in df_dict.keys():
            write_df(df_dict.get(df_key, None), df_key)
            
    
def currency_to_float(curr_str):
    """
    input -> currency: str
    output -> currency value: float
    
    Converts str of currency into a float.
    Will be used to apply to columns in DF.
    Note on behavior of round() from pyton docs:-
        'The behavior of round() for floats can be surprising: for example, 
        round(2.675, 2) gives 2.67 instead of the expected 2.68. 
        This is not a bug.'

    
    $1 -> 1.0
    ($1) -> -1.0
    $0 -> 0.00
    $50.50 -> 50.50
    $52,555.87 -> 52555.87
    float('nan') - > 0.0
    525.00 -> 525.00
    """
    # checks if val is already a float, if it is a nan float return 0.0 
    # otherwise return float val
    if isinstance(curr_str, float): 
        if isnan(curr_str): 
            return 0.0
        else: 
            return curr_str
    
    # check if str value is not valid, if so return 0
    if len(curr_str) < 1: 
        # print('Len less than 1: {}'.format(curr_str))
        return 0.0
    
    float_str = ''
    neg_val = False
    # check for negative value
    if '(' in curr_str : neg_val = True 
    # iterate over string, remove unwanted charachters
    for char in curr_str:
        if char in ['(', ')', '$', ',']:
            continue
        else:
            float_str+= char
    
    #print('str check:', float_str)
    # See notes on round() func behavior
    if neg_val == True:
        return float(float_str) * -1
    else:
        return float(float_str)

In [3]:
__author__ = 'nabeelh-dev'

class TaxRates(object):
    """
    Reads tax rate data per Alavara.com
    Dict will be contained in self.tax_rates.
    Will be able to query by zipcode.
    """
    def __init__(self, csv_path):
        self.filepath = csv_path
        self.tax_rates = self.parse_csv()

    def parse_csv(self):
        """
        Takes str value for directory stored in self.filepath and will process
        all tax rate csv files within. Will return a dict that will be set to
        TaxRates objects self.tax_rates so we can query by zipcode.
        self.tax_rates will contain a dict that has two main keys:
        'state_tax_rates': stores all State -> Zipcode -> TaxRates dict
        'zipcode_to_state': will be a list of lists, sorted by all states zipcode ranges

        zipcode_to_states list will be used for fast querying when looking for zipcodes tax rates.
        It will quickly indicate the state we will need to look inside to find the zipcode tax rates,
        isntead of looping through state_tax_rates dict which would be inefficient.
       
        :param: self.filepath: str
        :return: states_tax_rates: dict
        """
        states_tax_rates = dict()
        zipcode_to_state = dict()

        # directory input, process all .csv contained within directory
        dirpath = os.path.abspath('')
        csv_dp = os.path.join(dirpath, self.filepath)
        for filename in os.listdir(csv_dp):
            # splits filename to identify state: TAXRATES_ZIP5_AK201901.csv
            csv_state_abbr = filename.split("ZIP5_")[1][:2]
            if filename.endswith(".csv"):
                state_tax_csv = os.path.join(csv_dp, filename)
                # Read and get data from csv
                state_tax_data = TaxRates._parse_single_csv(state_tax_csv)
                states_tax_rates[csv_state_abbr] = state_tax_data['zipcode_rates']

        # Create and return a dict mapping zipcode to state
        # This reverse lookup is used by self.query_by_zipcode()
        for state, zipcode_tax_rates in states_tax_rates.items():
            for zipcode, tax_rates in zipcode_tax_rates.items():
                zipcode_to_state[zipcode] = state

        print("Zipcode Tax Rates loaded successfully.")
        return {
                "state_tax_rates": states_tax_rates,
                "zipcode_to_state": zipcode_to_state
                }

    @staticmethod
    def _parse_single_csv(csv_fp):
        """
        Parses csv at file path. Collects and stores all zipcode data as a dict.
        When being read by csv.reader, each row will be a list as follows:
        ['WY', '83118', 'LINCOLN COUNTY', '0.040000', '0.050000', '0.010000', '0', '0', '1']
    
        :param csv_fp: os path object
        :return: state_dict: dict
        """
        zipcode_rates = dict()
        with open(csv_fp) as csv_file:
            #print("Opened Tax CSV Successfully: {}".format(str(csv_fp)))
            csv_reader = csv.reader(csv_file)
            # Skip the first line of csv file due to header
            ## Keep header information in the future??
            for i in range(0, 1, 1):
                next(csv_reader, None)

            for zipcode_tax in csv_reader:
                zipcode = zipcode_tax[1]
                region_name = zipcode_tax[2]
                state_rate = float(zipcode_tax[3])
                est_combined_rate = float(zipcode_tax[4])
                est_country_rate = float(zipcode_tax[5])
                est_city_rate = float(zipcode_tax[6])
                est_special_rate = float(zipcode_tax[7])
                risk_level = int(zipcode_tax[8])

                zipcode_rates[zipcode] = {
                                        "region_name": region_name,
                                        "state_rate" : state_rate,
                                        "est_combined_rate": est_combined_rate,
                                        "est_country_rate": est_country_rate,
                                        "est_city_rate": est_city_rate,
                                        "est_special_rate": est_special_rate,
                                        "risk_level": risk_level
                                        }

        return {
                'zipcode_rates': zipcode_rates
                }

    def query_by_zipcode(self, zipcode_str):
        """
        Given a zipcode string, retrieve tax rates for that zipcode.
        First accesses 'zipcodes_to_state' dict contained in self.tax_rates to get
        the State the zipcode is in.
        If zipcode_str does not exist, then print error and return None.

        If it exists then uses State key and Zipcode key to obtain correct tax rates from
        'state_tax_rates' dict in self.tax_rates

        :param zipcode_str: str
        :return: zipcode_tax_rates: dict
        """

        # dict get method returns None if key is not found
        query_state = self.tax_rates['zipcode_to_state'].get(zipcode_str)
        if query_state is None:
            print("{} - zipcode not found!".format(zipcode_str))
            return None
        query_results = self.tax_rates['state_tax_rates'][query_state][zipcode_str]
        query_results['zipcode'] = zipcode_str
        query_results['state'] = str(query_state)
        return query_results

In [4]:
taxRates = TaxRates("TAXRATES_ZIP5/")
taxRates.query_by_zipcode("90247")

Zipcode Tax Rates loaded successfully.


{'region_name': 'GARDENA',
 'state_rate': 0.06,
 'est_combined_rate': 0.095,
 'est_country_rate': 0.0025,
 'est_city_rate': 0.0,
 'est_special_rate': 0.0325,
 'risk_level': 1,
 'zipcode': '90247',
 'state': 'CA'}

In [5]:
with open("REGION_TO_ZIP.csv") as csv_file:
    df = pd.read_csv(csv_file, delimiter=",").set_index("REGION")
    regionToZip = df.to_dict('index')

In [6]:
#taxRates.query_by_zipcode("92040")
#regionToZip[234]

In [7]:
folder_dir = "test_NAPGLDATA"

df_list = list()
for file in os.listdir(folder_dir):
    if file.endswith(".csv"):
        fp_ = os.path.join(folder_dir, file)
        with open(fp_) as csv_file:
            for i in range(0, 5):
                next(csv_file, i)
            df_list.append(pd.read_csv(csv_file, delimiter=","))
        
stax_df = pd.concat(df_list, axis=0, ignore_index=True)

In [8]:
nap_csv_colMap = {
                    'Segment3': 'section',
                    'Segment4': 'area',
                    'Segment5': 'region',
                    'Account Description': 'glAcctDesc',
                    'Record Type::Number': 'Record Type::Number',
                    'CM Trx Type': 'refNum',
                    'TRX Timestamp Date': 'trxDate',
                    'GL Posting Date': 'glDate',
                    'Description': 'trxDesc',
                    'Main Account Segment': 'glAcct',
                    'PaidToRcvd From': 'paidToRcvd',
                    'TRX Amount': 'trxAmount',
                    'Originating Debit Amount': 'debitAmt',
                    'Originating Credit Amount': 'creditAmt'
                    }
stax_df.rename(mapper=nap_csv_colMap, inplace=True, axis=1)
# dropping these columns because we don't need their information
stax_df.drop(["Segment6", "Segment2", "trxDate"], axis=1, inplace=True)

In [9]:
stax_df.head(3)

Unnamed: 0,Record Type::Number,refNum,glDate,Trx Number,paidToRcvd,trxDesc,trxAmount,debitAmt,creditAmt,glAcct,section,area,region,glAcctDesc
0,20::1,Supplier Invoice,2018-07-07,1,NOCRA,,$0.00,$0.00,$156.00,2010,0A11,Q,0,Accounts Payable/AP Trade
1,20::1,Supplier Invoice,2018-07-07,1,NOCRA,// May/June 2018 Assignor Fees,$0.00,$156.00,$0.00,5313,0A11,Q,0,Referee Fees
2,20::2,Supplier Invoice,2018-07-07,2,AYSO Area 11Q,Reimbursement for Adult League Expenses from 1...,$0.00,$0.00,$6858.66,2010,0A11,Q,0,Accounts Payable/AP Trade


In [10]:
# get rid of rows that don't have a region value or have a value of 0
stax_df = stax_df.loc[stax_df["region"] != 0]
# get rid of rows that have a 0 for Trx Amount, these are reversed or voided journal lines.
#stax_df = stax_df.loc[stax_df["trxAmount"] != 0.0] DOING THIS IN THE STATE FILTERING STEP WAY BELOW

# apply state, city and zipcode for each invoice row in dataset using loaded csv file (REGION_TO_ZIP.csv)
stax_df["state"] = pd.Series(stax_df["region"].apply(lambda x: regionToZip[x]['STATE']))
stax_df["city"] = pd.Series(stax_df["region"].apply(lambda x: regionToZip[x]['CITY']))
stax_df["zipcode"] = pd.Series(stax_df["region"].apply(lambda x: regionToZip[x]["ZIP"]))

In [11]:
stax_df["trxAmount"] = stax_df["trxAmount"].apply(lambda x: currency_to_float(x))
stax_df["debitAmt"] = stax_df["debitAmt"].apply(lambda x: currency_to_float(x))
stax_df["creditAmt"] = stax_df["creditAmt"].apply(lambda x: currency_to_float(x))
stax_df["glAcct"] = stax_df["glAcct"].astype(np.int64)
# old dec test csv files have a date format of "%m/%d/%Y"
stax_df["glDate"] = pd.to_datetime(stax_df["glDate"], format="%Y-%m-%d", errors="coerce")

In [12]:
stax_df.shape

(90462, 17)

Anything with a GL code less than 5000 is a Revenue account.
Anything with a GL code above 5000 is an Expense account

In [13]:
"""conditions = [region_df["glAcct"] < 5000,
             region_df["glAcct"] > 5000]
outputs = ["Expense", "Revenue"]

res = np.select(conditions, outputs)
region_df["Exp/Rev"] = pd.Series(res)
"""
stax_df["Exp/Rev"] = np.where(stax_df['glAcct']>=5000, "Expense", "Revenue")

In [14]:
# nets credits and debits values into one column (taxableAmt). Credits are negative and debits are positive values
stax_df["taxableAmt"] = stax_df["debitAmt"] + (stax_df["creditAmt"] * -1)

In [15]:
stax_df.head(1)

Unnamed: 0,Record Type::Number,refNum,glDate,Trx Number,paidToRcvd,trxDesc,trxAmount,debitAmt,creditAmt,glAcct,section,area,region,glAcctDesc,state,city,zipcode,Exp/Rev,taxableAmt
55,2::11,Bank Deposit,2018-07-10,11,Blue Sombrero - Credit Card,,-86.94,0.0,86.94,4005,0A14,L,1408,Registration Fee,FL,Wildwood,34785.0,Revenue,-86.94


# SINGLE STATE TESTS

In [None]:
state_ = "PA"
period = ["12-01-2018", "12-31-2018"]

In [None]:
# filter by input State
state_df = stax_df.loc[stax_df["state"]==state_]

# filter by start and enddate using glDate col
state_df = state_df.loc[(state_df["glDate"] >= period[0]) & (state_df["glDate"] <= period[1])]

In [None]:
stateGL_settings = GLTaxSettingsByState[state_]
stateGL_settings

In [None]:
taxableGL = [int(x) for x,y in stateGL_settings.items() if y]
taxableGL

In [None]:
# if glAcct is in Taxable_gl code list then mark as Taxable and if not then mark Non-Taxable
state_df["Taxable?"] = np.where(state_df['glAcct'].isin(taxableGL), "Taxable", "Non-Taxable")

In [None]:
# If Taxable then update taxableAmt to zero.
# UPDATE: tried doing so but too annoying to seperate out.
#state_df["taxableAmt"] = state_df["taxableAmt"] * np.where(state_df["Taxable?"]=="Taxable", 1, 0)

In [None]:
state_df.head(3)

In [None]:
parsed_state_df = state_df.sort_values(by=["Taxable?", "Record Type::Number"], ascending=True)

In [None]:
parsed_state_df.head(3)

In [None]:
#parsed_state_df.set_index(["section", "area", "region", "Taxable?"])

In [None]:
# for Taxable rows, groupby section, area, city, region and taxable and sum up all taxableAmts
parsed_state_df.loc[parsed_state_df["Taxable?"]=="Taxable"].groupby(by=["section", "area", "city", "region", "Taxable?"])["taxableAmt"].sum()

In [None]:
#state_df[state_df["taxableAmt"]==1402.79]

In [None]:
# sums for record number for only Taxable ones
trx_sums = parsed_state_df.loc[parsed_state_df["Taxable?"]=="Taxable"].groupby(by="Record Type::Number")["taxableAmt"].sum().sort_values()
trx_sums

In [None]:
# just group index by record number
parsed_state_df_byRecs = parsed_state_df.set_index("Record Type::Number")
parsed_state_df_byRecs.head(5)

In [None]:
recordTaxableTotals = parsed_state_df_byRecs.groupby("Record Type::Number")["taxableAmt"].sum()
recordTaxableTotals[0:10]

In [None]:
#recordTaxableTotals.align(trx_sums, join="outer")[0]

In [None]:
#recordTaxableTotals.loc['1::702']

In [None]:
# Creates series that holds True for first time a unique record number appears in df (assuming its indexed by record number)
r_dict = dict()

r_values = list()
r_taxableSum = list()
r_index = list()
for item in parsed_state_df_byRecs.index:
    if r_dict.get(str(item)):
        r_values.append(False)
        r_index.append(item)
        r_taxableSum.append(0)
        #print([item, False])
        continue
    
    # if row has record number that doesn't exist in sum taxable record pivot abot
    r_values.append(True)
    r_index.append(item)
    r_taxableSum.append(recordTaxableTotals.loc[item])
    #print([item, True])
    r_dict[str(item)] = True

firstUniqueRecordPosition = pd.Series(r_values, index= r_index)
firstUniqueRecordPosition[0:10]

In [None]:
recSumsByRecordPosition = pd.Series(r_taxableSum, index= r_index)
recSumsByRecordPosition[0:10]

In [None]:
#pd.Series(r_values, index= r_index) & parsed_state_df.set_index("Record Type::Number")["taxableAmt"]
parsed_state_df_byRecs["netTax"] = recSumsByRecordPosition

In [None]:
parsed_state_df_byRecs.sort_index().head(10)

In [None]:
excelSave({state_+"_"+period[0]+"-"+period[1]+"_test": parsed_state_df})

In [None]:
"""
ASK about: 1::702 taxable and non taxable

LOOK AT : 2::271

"""

# PROCESS MULTIPLE STATES AND DATES

In [16]:
def process_SalesTax(state_, period):
    #period[1][0], period[1][1] = datetime.strptime(period[1][0], "%m-%d-%Y"), datetime.strptime(period[1][1], "%m-%d-%Y")
    # filter by input State
    state_df = stax_df.loc[stax_df["state"]==state_]
    
    # get rid of rows that have a 0 for Trx Amount, these are reversed or voided journal lines.
    #state_df = state_df.loc[state_df["trxAmount"] != 0.0]

    # filter by start and enddate using glDate col
    try:
        state_df = state_df.loc[(state_df["glDate"] >= period[0]) & (state_df["glDate"] <= period[1])]
    except TypeError:
        print("ERROR W/ DATES INPUT- {} for State- {}".format(period, state_))
        return None

    stateGL_settings = GLTaxSettingsByState[state_]
    taxableGL = [int(x) for x,y in stateGL_settings.items() if y]

    # if glAcct is in Taxable_gl code list then mark as Taxable and if not then mark Non-Taxable
    state_df["Taxable?"] = np.where(state_df['glAcct'].isin(taxableGL), "Taxable", "Non-Taxable")

    parsed_state_df = state_df.sort_values(by=["Taxable?", "Record Type::Number"], ascending=False)
    
    # Set output_folder=False if you don't want files to be saved into output folder
    excelSave({state_+"_"+period[0]+"-"+period[1]: parsed_state_df})
    print("SUCCESS {} for State- {}".format(period, state_))

In [25]:
"""
For each item in list:
Process State for specified date range.

process_list = [
                ["PA", ["12-01-2018", "12-31-2018"]],
                ["CA", ["11-01-2018", "11-30-2018"]],
                ["LA", ["12-01-2018", "12-31-2018"]],
                ["AZ", ["12-01-2018", "12-31-2018"]],
                ["NE", ["12-01-2018", "12-31-2018"]]
                ]
"""
process_list = [
                ["FL", ["11-01-2018", "01-31-2019"]],
                ]

In [26]:
for state in process_list:
    process_SalesTax(state[0], state[1])

SUCCESS ['11-01-2018', '01-31-2019'] for State- FL


In [None]:
stax_df.loc[(stax_df["paidToRcvd"] == "Score Sports") & (stax_df["state"] == "AZ")]