In [1]:
import pandas as pd
import utils
import utils_bux
import featuretools as ft
from sklearn.externals import joblib



### DEFINE PIPELINE PARAMETERS

In [2]:
prediction_problem_type = "binary classification"
show_report = True
load_to_vertica = False

print("Pipeline parameters defined")

Pipeline parameters defined


### CONNECT TO THE DATABASE

In [3]:
# connect to the vertica database, create a cursor
cur = utils.connect_to_db()
print("Connected to the database")

Connected to the database


### EXTRACT THE ENTITY SET AND LABELS DATA

#### Cohorts entity

#### Users entity

In [4]:
users_timeframe = {
                    'users_from': '2018-01-01',
                    'users_till': '2018-04-30'
}

In [None]:
query_users = """ 
        
            CREATE LOCAL TEMPORARY TABLE temp_users ON COMMIT PRESERVE ROWS AS
            SELECT user_id, country_cd, gender, nationality, platform_type_name, trading_experience, title, network, bux_account_created_dts, ams_first_funded_dts
            FROM reporting.user_details
            WHERE bux_account_created_dts::date BETWEEN '{users_from}' AND '{users_till}'
            LIMIT 2000 OVER (PARTITION BY date_trunc('month', bux_account_created_dts) ORDER BY RANDOMINT(1000000000));
        """.format(users_from=users_from,users_till=users_till)
utils.sql_query(cur, query_users)

In [30]:
### Users basic features

query_users = """ 
        
            CREATE LOCAL TEMPORARY TABLE temp_users ON COMMIT PRESERVE ROWS AS
            SELECT user_id, country_cd, gender, nationality, platform_type_name, trading_experience, title, network, bux_account_created_dts, ams_first_funded_dts
            FROM reporting.user_details
            WHERE bux_account_created_dts::date BETWEEN '{users_from}' AND '{users_till}'
            LIMIT 5000 OVER (PARTITION BY date_trunc('month', bux_account_created_dts) ORDER BY RANDOMINT(1000000000));
        """

utils.sql_query(cur, query_users)



### Time to features

query_time_to_event = """

     WITH b AS (SELECT * from meta.bux_events WHERE event_type_id IN (184517115891963731,
                                                                 1900024676472176506,
                                                                 6343323308049849851,
                                                                 260687741711542992,
                                                                 428210925287919525,
                                                                 1106928361530839317,
                                                                 2193673344032322170,
                                                                 7378507635926096679,
                                                                 98569569354769745,
                                                                 229789708966444978,
                                                                 617792645229736175,
                                                                 1280889395537804445,
                                                                 1498292874234664683,
                                                                 1793263894043445928,
                                                                 2092633507578741654,
                                                                 2389031425774675520,
                                                                 3784994946525179991,
                                                                 4665563951835963528,
                                                                 5283878399315578611,
                                                                 5678827135657938137,
                                                                 6070753512604390982,
                                                                 8843571341358969389))
   SELECT
         a.user_id
       , b.event_name
       , case
              when c.event_type_id is not null
              then 1
              else 0
         end                                                       as did_event
       , datediff ('hour', a.bux_account_created_dts, c.event_dts) as hours_till_event
    FROM
         temp_users a
    CROSS JOIN
         b
    LEFT JOIN
         reporting.user_session_events_first_occurence c
      ON
         c.user_id = a.user_id
     AND c.event_type_id = b.event_type_id
     AND c.event_dts::date - a.bux_account_created_dts::date <= 7

"""

time_to_event_features = utils_bux.build_time_to_features(cur, query_time_to_event)



### Initial deposit features

query_get_initial_deposit = """
    SELECT a.user_id, (amount * c.exchange_rate)::numeric(20,2) as initial_deposit_amount_lim, DATEDIFF(DAY, bux_account_created_dts, created_dts) as days_to_initial_deposit
    FROM temp_users a
    JOIN reporting.transactions b 
    ON a.user_id = b.user_id 
    AND b.created_dts < a.bux_account_created_dts + interval '1 week'
    JOIN reporting.exchange_rates_eur c
    ON c.currency = b.currency
    AND c.report_date = b.created_dts::date    
    WHERE b.transaction_type = 'DEPOSIT' OR (b.transaction_type = 'CASH_TRANSFER' AND b.amount * c.exchange_rate >= 50)
    LIMIT 1 OVER (PARTITION BY a.user_id ORDER BY b.created_dts asc)

"""

users_initial_deposit_replace = utils.sql_query(cur, query_get_initial_deposit)



### Trading segment feature

query_get_users = """
        SELECT a.*, nvl(b.segment_value, 'No Trades') as trading_segment, date_trunc('week', bux_account_created_dts)::date as report_week
        FROM temp_users a
        LEFT JOIN reporting.user_segments b
        ON b.segment_type = 'Trading Segment'
        AND b.user_id = a.user_id
        AND a.bux_account_created_dts::date + interval '1 week' between b.valid_from_date and b.valid_to_date
        LIMIT 1 OVER (PARTITION BY a.user_id ORDER BY valid_to_date desc)
"""
user_details = utils.sql_query(cur, query_get_users)

### Merge all users features

user_details = utils_bux.merge_users_features(user_details, users_initial_deposit_replace, cohorts, time_to_event_features)
print("Users entity built")

KeyError: 'did_event'

#### Transactions entity

In [14]:
query_transactions= """

    SELECT a.user_id,
    a.date,
    trades_sb_invested_amount,
    financing_deposits_amount,
    trades_sb_short,
    trades_sb_long,
    view_position,
    trades_sb_open_positions,
    total_session_duration,
    education_topic_read,
    trades_sb_commission,
    trades_fb_forex_open,
    trades_sb_forex_open,
    conversion_to_sb,
    trades_sb_forex_average_leverage,
    trades_sb_forex_average_leverage,
    (trades_fb_forex_average_leverage + trades_sb_forex_average_leverage) as trades_fbsb_forex_average_leverage,
    (trades_fb_forex_average_leverage + trades_sb_forex_average_leverage*10) as trades_fbsb10_forex_average_leverage
    FROM temp_users b
    JOIN calendar c ON c.date BETWEEN b.bux_account_created_dts::date and b.bux_account_created_dts::date + interval '1 week'
    LEFT JOIN reporting.cube_daily_user a ON a.user_id = b.user_id AND a.date = c.date
    

"""

daily_transactions = utils_bux.mungle_transactions(cur, query_transactions)
print("Transactions entity built")



print("Entity set and labels data extracted")

Entity set and labels data extracted


### CREATE THE ENTITY SET & LABELS

In [None]:
entityset_name = "bux_clv"

entityset_quads = (
    # entity name, entity dataframe, entity index, time index
    ['cohorts', cohorts, 'cohort_id', None],
    ['users', user_details, 'user_id', 'bux_account_created_dts'],
    ['transactions', daily_transactions, 'transaction_id', 'date']
    )

entity_relationships = (
    # parent entity, child entity, key
    ['cohorts', 'users', 'cohort_id'],
    ['users', 'transactions', 'user_id']
)

es = utils.create_entity_set(entityset_name, entityset_quads, entity_relationships)

# labels = 

print("Entity set and labels built")

### BUILD FEATURES

In [None]:
fm = utils.calculate_feature_matrix_top_features(es, top_features)
X = fm_encoded.reset_index().merge(labels)
# X.to_csv("production_features.csv")
print("Features built")

### PREDICTION

In [16]:
model = joblib.load('models/model.pkl')
print("Model loaded")

Model loaded


In [None]:
X, y = utils.make_labels(X, prediction_problem_type)
X_train, X_test, y_train, y_test = utils.train_test_splitting(X, y)
model = utils.xgboost_train(X_train, y_train, prediction_problem_type)
y_pred = utils.xgboost_predict(model, X_test, prediction_problem_type)
print("Prediction done")

### REPORT

In [12]:
if show_report == True:
    # execute the report
    print("Report shown")

Report


### LOAD RESULTS INTO VERTICA

In [13]:
print("Scoring loaded to vertica")

Scoring loaded to vertica
