In [13]:
import pandas as pd
import numpy as np
#import pandasql as ps
from datetime import datetime, timedelta
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
from pyhive import presto
from pymongo import MongoClient
import warnings
warnings.filterwarnings("ignore")

presto_conn = presto.connect(
    host='presto.processing.yoda.run',
    port=80,
    protocol='http',
    catalog='hive',
    username='mayank.jha@rapido.bike',
    # requests_kwargs=req_kw,
)

In [None]:
# Function to find the number of rides done by captains between a time period

def get_weekly_rides(start_date, end_date, city)
    query = """SELECT rider as captain_id,
                            CAST(WEEK(DATE(date_parse(orderdate,'%Y-%m-%d'))) as int) as week,
                            count(distinct orderdate) as active_days,
                            sum(case when serviceobj_service = 'Link' then 1 else 0 end) as link_orders,
                            sum(case when serviceobj_service = 'Delivery' then 1 else 0 end) as delivery_orders,
                            sum(case when serviceobj_service not in ('Link','Delivery') then 1 else 0 end) as other_orders
                        from legacy.orders
                        where orderdate >= '{sd}'
                            and orderdate <= '{ed}'
                            and status = 'dropped'
                            and spdfraud_flag != 1
                            and serviceobj_city in ('{ct}')
                        group by 1, 2""".format(sd = start_date, ed = end_date, ct = city)

    print('Fetching Rides Data from ',start_date, ' to ', end_date, ' for city : ', city)
    df_data = pd.read_sql(query, presto_conn)
    print('Fetching Completed with ', len(df_data), ' rows')
    return df_data

In [None]:
# Function to find the order earnings of captains between a time period

def get_weekly_order_earnings(start_date, end_date, city)
    query = """select riderid as captain_id, 
                        week,
                        sum(amount) as orders_earnings
                    from
                        (select distinct riderId as riderid,
                                orderid,
                                CAST(WEEK(DATE(date_parse(yyyymmdd,'%Y%m%d'))) as int) as week,
                                cast(totalearning as double) as amount 
                            from raw.mongodb_rapidopayroll_riderspaymentnew_immutable
                            where yyyymmdd between '{sd}' and '{ed}'
                                and city in ('{ct}')
                                and transactionType = 'orders' 
                                and status = 'success'
                        )
                    group by 1, 2""".format(sd = start_date.replace('-',''), ed = end_date.replace('-',''), ct = city)

    print('Fetching Order Earnings Data from ',start_date, ' to ', end_date, ' for city : ', city)
    df_data = pd.read_sql(query, presto_conn)
    print('Fetching Completed with ', len(df_data), ' rows')
    return df_data

In [None]:
# Function to find the incentive of captains between a time period

def get_incentive(start_date, end_date, city)
    query = """with incentive as 
            (select distinct _id as incentive_id
                from hive.raw.mongodb_rapidopayroll_incentives_immutable
                where json_array_contains(cities, '{ct}')
                    and startDate between '{sd}' and '{ed}'
                    and endDate between '{sd}' and '{ed}'
            ),
daily_incentive as
            (select riderid,
                    sum(daily_incentive_achieved) as daily_incentive
                from
                    (select distinct riderid,
                            yyyymmdd,
                            tincentiveIdl,
                            subIncentiveId,
                            incentivestage,
                            cast(amount as double) as daily_incentive_achieved
                        from raw.mongodb_rapidopayroll_riderspaymentnew_immutable
                        where incentivetype = 'Daily' 
                            and status = 'success' 
                            and yyyymmdd between '{sd1}' and '{ed1}'
                            and city in ('{ct}')
                    ) x
                    join incentive as i
                        on x.tincentiveIdl = i.incentive_id
                group by 1
            ),
weekly_incentive as
            (select riderid,
                    sum(weekly_incentive_achieved) as weekly_incentive
                from
                    (select distinct riderid,
                            yyyymmdd,
                            tincentiveIdl,
                            subIncentiveId,
                            incentivestage,
                            cast(amount as double) as weekly_incentive_achieved
                        from raw.mongodb_rapidopayroll_riderspaymentnew_immutable
                        where incentivetype = 'Weekly Fixed' 
                            and status = 'success' 
                            and yyyymmdd between '{sd1}' and '{ed1}'
                            and city in ('{ct}')
                    ) x
                    join incentive as i
                        on x.tincentiveIdl = i.incentive_id
                group by 1
            )
            select coalesce(d.riderid, w.riderid) as riderid,
                d.daily_incentive,
                w.weekly_incentive
            from daily_incentive as d
                full outer join 
                    weekly_incentive as w
                        on d.riderid = w.riderid """.format(sd = start_date, ed = end_date, sd1 = start_date.replace('-',''), ed1 = (pd.to_datetime(end_date) + timedelta(1)).strftime('%Y-%m-%d').replace('-',''),ct = city)

    print('Fetching Incentive Data from ',start_date, ' to ', end_date, ' for city : ', city)
    df_data = pd.read_sql(query, presto_conn)
    print('Fetching Completed with ', len(df_data), ' rows')
    return df_data

In [None]:
# Function to find the Login Hours of captains between a time period

def get_login_hours(start_date, end_date, city)
    query = """SELECT 
                        userid as rider, 
                        CAST(WEEK(DATE(date_parse(yyyymmdd,'%Y%m%d'))) as int) as week,
                        cast(sum(duration) as double)/cast((60*60*1000) as double) as login_hours
                    FROM hive.datasets.captain_login_hours
                    WHERE yyyymmdd >= '{sd}' AND yyyymmdd <= '{ed}'
                        AND status in ('2','3','6','7','8','10')
                        AND userid in (SELECT captainId from datasets.captain_single_view WHERE (registeredcity in ('{ct}') or lastridecity in ('{ct}')) AND activationdate is not null)
                    GROUP BY 1,2  """.format(sd = start_date.replace('-',''), ed = end_date.replace('-',''), ct = city)

    print('Fetching Login Hours Data from ',start_date, ' to ', end_date, ' for city : ', city)
    print(query)
    df_data = pd.read_sql(query, presto_conn)
    print('Fetching Completed with ', len(df_data), ' rows')
    return df_data

In [None]:
# Function to find the CU RF Segment of captains between a time period

def get_cu_rf_segment(dt, city)
    query = """SELECT captainid as rider, 
                        day, 
                        frequency_segment, 
                        recency_segment, 
                        segment as cu_segment
                      from datasets.captain_cu_immutable
                      where day = date('{sd}')
                          and city in ('{ct}')   """.format(sd = dt, ct = city)

    print('Fetching Login Hours Data from ',start_date, ' to ', end_date, ' for city : ', city)
    df_data = pd.read_sql(query, presto_conn)
    print('Fetching Completed with ', len(df_data), ' rows')
    return df_data