- create table first

```drop table yifanliu.temp_rider_onboarding_funnel_events  ;
create external table yifanliu.temp_rider_onboarding_funnel_events  
(
occurred_at timestamp,
user_lyft_id bigint,
ride_program_customer_id bigint,
coupon_code string,
ride_program_benefit_id bigint,
source string
)
partitioned by(ds string,event_name string)
STORED AS PARQUET
LOCATION
  's3://data-team/users/yifanliu/temp_rider_onboarding_funnel_events'
  TBLPROPERTIES ('PARQUET.COMPRESS'='SNAPPY');*/```

In [1]:
import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from datetime import timedelta


from pyspark.sql.functions import (
    col,
    split
)

spark_context =SparkContext.getOrCreate()
from pyspark.sql import SQLContext
spark = SparkSession.builder \
    .master("yarn") \
    .appName("AspectDetector") \
    .enableHiveSupport() \
    .getOrCreate()

sqlContext = SQLContext(spark)

# Edit S3 Paths here:
s3_bucket = "data-team"
s3_subpath = "yifanliu"
s3_fullpath = "s3://" + s3_bucket + "/" + s3_subpath + "/"

print(s3_fullpath)

s3://data-team/yifanliu/


In [6]:
#change parameter here
today = datetime.date.today()#datetime.date(2020, 12, 31)#datetime.datetime.now()
schema='yifanliu'
temp_event_funnel='temp_rider_onboarding_funnel_events'
output_path=s3_fullpath+temp_event_funnel
#visit_temp='tablename=temp_rider_onboarding_funnel_events'
lookback=60 #read data lookback window

In [7]:
def write_s3_partitioned_parquet_overwrite_dss_eventType(spark, df, suffix, schema, path):
    ds_distincts = df.select('ds').distinct().collect()
    events = df.select('event_name').distinct().collect()
    print(ds_distincts)
    print(events)

    for row in ds_distincts:
        for r2 in events:
            ds = row['ds']
            event=r2['event_name']
            s3_path = '{}/ds={}/event_name={}'.format(path, ds,event)

            print('\nSaving ds: {} event: {}, to: {}'.format(ds, event, s3_path))

            temp_df = df.filter(df.ds == ds).filter(df.event_name==event).drop("ds").drop("event_name")

            hql = """
                ALTER TABLE {}.{} DROP IF EXISTS
                  PARTITION (ds='{}',event_name='{}')
            """.format(schema, suffix, ds, event)

            print(hql)

            spark.sql(hql)

            temp_df.write \
                   .mode("overwrite") \
                   .parquet(s3_path)

            hql = """
                ALTER TABLE {}.{} ADD IF NOT EXISTS
                  PARTITION (ds='{}',event_name='{}')
                   LOCATION '{}'
            """.format(schema, suffix, ds,event, s3_path)

            print('Running\n {}'.format(hql))
            spark.sql(hql)

    print('\nAll S3 writes complete!')
    return


In [None]:
read_benefit_redeemed="""select 
'benefit_redeemed' as event_name,
  occurred_at,
  user_lyft_id, 
  ride_program_customer_id, 
  coupon_code, 
  ride_program_benefit_id,  
  claim_source source,
  ds
from default.event_rideprograms_benefit_redeemed
where ds >= CAST(date_add('{date}',-{lookback}) as STRING) and ds <= cast('{date}' as string)
""".format(date=today, lookback=lookback)

#print(read_benefit_redeemed)

benefit_redeemed = spark.sql(read_benefit_redeemed)

#benefit_redeemed.show()

write_s3_partitioned_parquet_overwrite_dss_eventType(spark, benefit_redeemed, temp_event_funnel, schema, output_path)

#benefit_redeemed.createOrReplaceTempView('benefit_redeemed_t')

In [None]:
read_invites_sent = """
SELECT 
'invites_sent' as event_name,
occurred_at,
       user_id as user_lyft_id,
       ride_program_customer_id,
       'N/A' as coupon_code,
       -1 as ride_program_benefit_id,
       'N/A' as source,
       rbcs.ds
  FROM default.event_rideprograms_benefit_communication_sent rbcs
  join enterprise.dim_ride_program_customers rpc
    on rbcs.ride_program_customer_id = rpc.rideprogram_customer_id
 WHERE rbcs.ds >= CAST(date_add('{date}',-{lookback}) as STRING) and rbcs.ds <= cast('{date}' as string)
   and date(rpc.ds) = current_date - interval '2' day
   and (is_sms_sent = True or is_push_notification_sent = True)
   and action_type = 'invite'
""".format(date=today, lookback=lookback)

#print(read_invites_sent)

invites_sent = spark.sql(read_invites_sent)

#invites_sent.show()

write_s3_partitioned_parquet_overwrite_dss_eventType(spark, invites_sent, temp_event_funnel, schema, output_path)



#invites_sent.createOrReplaceTempView('invites_sent_t')

In [None]:
#today = datetime.date(2020,1,26)
#lookback=0
d=today#datetime.date(2020,10,31)

while d >= (today - timedelta(days=lookback)):
    read_invite_clicked = """
    select 
    'invite_clicked' as event_name,
    occurred_at,
    user_id user_lyft_id,
    -1 as ride_program_customer_id,
    'N/A' as coupon_code,
    -1 as ride_program_benefit_id,
    split(parameter,'lp/')[1] as source,
    ds
    from default.event_client_action
    where ds >= CAST(date_add('{date}',-{lookback}) as STRING) and ds <= cast('{date}' as string)
       AND action = 'handle_deep_link'
       AND type = 'result'
       AND parameter like 'https://lyft.com/lp/%'
    """.format(date=d, lookback=1)

    print(read_invite_clicked)

    invite_clicked = spark.sql(read_invite_clicked)
    invite_clicked.show()
    
    if invite_clicked.head() is not None:
        write_s3_partitioned_parquet_overwrite_dss_eventType(spark, invite_clicked, temp_event_funnel, schema, output_path)
    d = d - timedelta(days=1)
    print(d)


In [8]:

d=today#datetime.date(2021,2,20)

while d >= (today - timedelta(days=lookback)):
    print(d)
    read_ux_events="""select 
        CONCAT('ux_',type) as event_name,
        occurred_at,
       user_id user_lyft_id,
       -1 as ride_program_customer_id,
        'N/A' as coupon_code,
        -1 as ride_program_benefit_id,
               element as source,
               ds
          from default.event_client_ux ux
          WHERE ds >= CAST(date_add('{date}',-{lookback}) as STRING) and ds <= cast('{date}' as string)
            AND (type = 'displayed' or type = 'tapped')
            AND (element = 'welcome_modal' or element = 'welcome_v2_modal' or element = 'program_details_prompt' or element = 'welcome_v2_modal_continue_button'
                  or element = 'welcome_modal_done_button' or element = 'program_details_v2_done_button')
        """.format(date=d, lookback=1)

    ux_events = spark.sql(read_ux_events)

    ux_events.show()
    
    if ux_events.head() is not None:

        write_s3_partitioned_parquet_overwrite_dss_eventType(spark, ux_events, temp_event_funnel, schema, output_path)
    
    d = d - timedelta(days=1)
    print(d)


2021-04-08
+------------+--------------------+-------------------+------------------------+-----------+-----------------------+--------------------+----------+
|  event_name|         occurred_at|       user_lyft_id|ride_program_customer_id|coupon_code|ride_program_benefit_id|              source|        ds|
+------------+--------------------+-------------------+------------------------+-----------+-----------------------+--------------------+----------+
|ux_displayed|2021-04-08 18:47:...|1213445896234781166|                      -1|        N/A|                     -1|       welcome_modal|2021-04-08|
|   ux_tapped|2021-04-08 18:47:...|1213445896234781166|                      -1|        N/A|                     -1|welcome_modal_don...|2021-04-08|
|   ux_tapped|2021-04-01 04:33:...|1499748533210134572|                      -1|        N/A|                     -1|welcome_modal_don...|2021-04-08|
|   ux_tapped|2021-04-08 21:02:...|1243785399177575368|                      -1|        N/A|   