In [None]:
@task(name="Filter By Loan Status",description="We are interested in loans that were fully paid or charged off")
def filter_by_status(df):

    # Filter based on loan status
    df = df[(df['loan_status'] == 'Fully Paid') | (df['loan_status'] == 'Charged Off')]

    return df

@task(name="Drop Columns w/ Nulls",description="We want to keep only columns with at least 85% compeltion")
def drop_missing_value_columns(df):

    # Keep columns with more than 85% complete values
    keep = df.columns[(((df.isnull().sum() / df.shape[0])) * 100 < 85).values]

    return df[keep]

@task(name="Feature Selection",description="Additional feature selection based on domain expertise")
def feature_selection(df):

    final_features = ['addr_state', 'annual_inc', 'earliest_cr_line', 'emp_length', 'emp_title', 'fico_range_high',
                      'fico_range_low', 'grade', 'home_ownership', 'application_type',
                      'initial_list_status', 'int_rate', 'loan_amnt', 'num_actv_bc_tl', 'loan_status', 'mort_acc',
                      'tot_cur_bal', 'open_acc', 'pub_rec', 'pub_rec_bankruptcies',
                      'purpose', 'revol_bal', 'revol_util', 'sub_grade', 'term', 'title', 'total_acc',
                      'verification_status']

    return df[final_features]

@task(name="Outlier Removal",description="Remove top and bottom 5% outliers based on annual income")
def income_outlier_removal(df):

    upper_bound = df.annual_inc.quantile(.95)
    lower_bound = df.annual_inc.quantile(.05)

    no_outliers = df[(df.annual_inc < upper_bound) & (df.annual_inc > lower_bound)]

    print("{} outliers removed".format(df.shape[0]-no_outliers.shape[0]))

    return no_outliers

@task(name="One Hot Encoding",description="Additional feature selection based on domain expertise")
def status_dummies(df):

    return pd.get_dummies(df,columns="grade")

@task(name="Persist DF as GBQ Table",description="Remove top and bottom 5% outliers based on annual income")
def persist_gbq_table(df,tablename,con):

    df.to_sql(tablename,con,index=False,if_exists='replace')



@flow(name="Loan Processing Pipeline",description="Pre-Processing Lending Club Loan Data For Modeling")
def loan_processing_pipe():

    con = initialize_ponder("my_bigquery_key.json","LOANS")
    ponder.bigquery.init(con)
    df = data_source_select("LOANS.ACCEPTED",con)
    filtered = filter_by_status(df)
    selected = feature_selection(filtered)
    clean = income_outlier_removal(selected)
    final = status_dummies(clean)

    persist_gbq_table(final,"LOANS.GOLDENLOANS",con)

loan_processing_pipe()