# 0. Libs & Creds

In [2]:
import os
import json
import pickle
import glob
import gc

import calendar
from datetime import date, timedelta, datetime

from tqdm.auto import tqdm

import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
# from pyspark.sql.functions import *
# from pyspark.sql.functions import udf, struct, count_distinct, from_unixtime

In [3]:
SEED = 42
s3a = 's3a://pvc-75e6242d-1f96-4903-a9eb-22fa28f5b73e'

In [4]:
home_repo = '/home/jovyan'
project_repo = f'{home_repo}/__RAYDTT'
# svarygin_repo = f'{project_repo}/_SVARYGIN'
%cd {project_repo}
%pwd

/home/jovyan/__RAYDTT


'/home/jovyan/__RAYDTT'

In [5]:
def access_data(file_path):
    with open(file_path) as file:
        access_data = json.load(file)
    return access_data

access_s3_data = access_data('.access_s3.json')

print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

conf = SparkConf()
conf.set('spark.master', 'local[*]')
conf.set('spark.driver.memory', '40G')
conf.set('spark.driver.maxResultSize', '32G')
############################################
conf.set('spark.driver.memoryOverhead', '2G')
conf.set('spark.executor.memory', '36G') #32G
conf.set('spark.executor.memoryOverhead', '2G')
conf.set('spark.executor.cores', '10') # 8
conf.set('spark.executor.instances', '2')
conf.set('spark.dynamicAllocation.enabled', 'true')
conf.set('spark.dynamicAllocation.minExecutors', '1')
conf.set('spark.dynamicAllocation.maxExecutors', '50')
############################################
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_s3_data['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', access_s3_data['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')
spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 
                                     'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
spark

# conf.set('spark.driver.memory', '32G')
# conf.set('spark.driver.maxResultSize', '4G')

user: /user/st054552/


In [4]:
spark.stop()

In [109]:
parquets = glob.glob(f'vgtest/*.parquet')
parquets.remove('vgtest/data.parquet')
parquets

['vgtest/apr.parquet',
 'vgtest/aug.parquet',
 'vgtest/dec.parquet',
 'vgtest/feb.parquet',
 'vgtest/jan.parquet',
 'vgtest/jul.parquet',
 'vgtest/jun.parquet',
 'vgtest/mar.parquet',
 'vgtest/may.parquet',
 'vgtest/nov.parquet',
 'vgtest/oct.parquet',
 'vgtest/sep.parquet']

# sdf_init & target

In [5]:
columns = ['amplitude_id', 'event_time', '[server]_purchase', '[server]_subscription_start_date', 'paid']

sdf = None

for parquet in parquets:
    
    sdf_temp = spark.read.parquet(parquet).select(columns)
    
    sdf_temp = (
                sdf_temp
                #
                .selectExpr('*', 'SUBSTRING(`[server]_purchase`[0], 16) AS purchase')
                .drop(F.col('`[server]_purchase`'))
                #
                .selectExpr('*', 'SUBSTRING(`[server]_subscription_start_date`[0], -10) AS subscription_start_date') \
                .withColumn('subscription_start_date', F.col('subscription_start_date').cast('date')) \
                .drop(F.col('`[server]_subscription_start_date`'))
                #
                .withColumn('event_time', F.col('event_time').cast('date')) \
                .withColumn('paid', F.col('paid').cast('string'))
               )
                        
    if sdf is None:
        sdf = sdf_temp
    else:
        sdf = sdf.unionByName(sdf_temp, allowMissingColumns=True)

In [6]:
sdf.printSchema()

root
 |-- amplitude_id: long (nullable = true)
 |-- event_time: date (nullable = true)
 |-- paid: string (nullable = true)
 |-- purchase: string (nullable = true)
 |-- subscription_start_date: date (nullable = true)



In [7]:
%%time
target = (
            sdf
            .filter(
                (F.col('subscription_start_date').isNotNull()) & 
                ((F.col('paid') == 'yes') | (F.col('paid') == 'true'))
                    )
            .groupBy('amplitude_id')
            .agg(
                F.min('subscription_start_date').alias('target_date'),
                F.trunc(F.min('subscription_start_date'), 'month').alias('target_month')
                )
        )

CPU times: user 4.18 ms, sys: 102 µs, total: 4.29 ms
Wall time: 129 ms


In [8]:
target.printSchema()

root
 |-- amplitude_id: long (nullable = true)
 |-- target_date: date (nullable = true)
 |-- target_month: date (nullable = true)



In [14]:
%%time
file_path = f'{s3a}/bs_segments/sdf_init.parquet'

sdf.write.parquet(path = file_path, mode = 'overwrite')

CPU times: user 89.1 ms, sys: 63.2 ms, total: 152 ms
Wall time: 15min 14s


In [15]:
%%time
file_path = f'{s3a}/bs_segments/target.parquet'

target.write.parquet(path = file_path, mode = 'overwrite')

CPU times: user 89.2 ms, sys: 56.7 ms, total: 146 ms
Wall time: 14min 33s


# bs_for_month

In [7]:
year = 2023

month_date_ranges = []

for month in range(1, 13): 
    last_day = calendar.monthrange(year, month)[1]
    first_date_str = f'{year}-{month:02d}-01'
    last_date_str = f'{year}-{month:02d}-{last_day}'
    month_date_ranges.append((first_date_str, last_date_str))

bs_names = [f'bs_for_{x}' for x in ['feb', 'mar', 'apr', 'may', 'jun', 'jul', 
                                    'aug', 'sep', 'oct', 'nov', 'dec', 'jan']]

month_date_ranges = [(s, a, b) for ((a, b), s) in zip(month_date_ranges, bs_names)]
month_date_ranges

[('bs_for_feb', '2023-01-01', '2023-01-31'),
 ('bs_for_mar', '2023-02-01', '2023-02-28'),
 ('bs_for_apr', '2023-03-01', '2023-03-31'),
 ('bs_for_may', '2023-04-01', '2023-04-30'),
 ('bs_for_jun', '2023-05-01', '2023-05-31'),
 ('bs_for_jul', '2023-06-01', '2023-06-30'),
 ('bs_for_aug', '2023-07-01', '2023-07-31'),
 ('bs_for_sep', '2023-08-01', '2023-08-31'),
 ('bs_for_oct', '2023-09-01', '2023-09-30'),
 ('bs_for_nov', '2023-10-01', '2023-10-31'),
 ('bs_for_dec', '2023-11-01', '2023-11-30'),
 ('bs_for_jan', '2023-12-01', '2023-12-31')]

In [6]:
sdf = spark.read.parquet('bs_segments/sdf_init.parquet')
target = spark.read.parquet('bs_segments/target.parquet')

sdf.rdd.getNumPartitions(), target.rdd.getNumPartitions()

(16, 8)

In [9]:
%%time
for bs_name, start_date, end_date in month_date_ranges:
    
    print(f'{bs_name} start time: {datetime.now().strftime("%H:%M:%S")}')

    ids_wo_subs = (
                    sdf
                    .filter(
                            (F.col('event_time') >= F.lit(start_date)) &
                            (F.col('event_time') <= F.lit(end_date)) &
                             F.col('subscription_start_date').isNull()
                            )
                    .withColumn('target_month', F.date_add(F.lit(end_date), 1))
                   )

    excluded_ids = target.filter(F.col('target_date') <= F.lit(end_date)).select('amplitude_id').distinct()

    bs = ids_wo_subs.join(
                excluded_ids,
                ids_wo_subs['amplitude_id'] == excluded_ids['amplitude_id'],
                'left_anti'
                         ).select('amplitude_id', 'target_month').distinct()
    
    file_path = f'{s3a}/bs_segments/{bs_name}.parquet'
    
    bs.write.parquet(path = file_path, mode = 'overwrite')
    
    del ids_wo_subs, excluded_ids, bs
    gc.collect()
    
    print(f'{bs_name} finish time: {datetime.now().strftime("%H:%M:%S")}')

bs_for_feb start time: 08:18:12
bs_for_feb finish time: 08:18:19
bs_for_mar start time: 08:18:19
bs_for_mar finish time: 08:18:21
bs_for_apr start time: 08:18:21
bs_for_apr finish time: 08:18:23
bs_for_may start time: 08:18:23
bs_for_may finish time: 08:18:25
bs_for_jun start time: 08:18:25
bs_for_jun finish time: 08:18:27
bs_for_jul start time: 08:18:27
bs_for_jul finish time: 08:18:30
bs_for_aug start time: 08:18:30
bs_for_aug finish time: 08:18:32
bs_for_sep start time: 08:18:32
bs_for_sep finish time: 08:18:35
bs_for_oct start time: 08:18:35
bs_for_oct finish time: 08:18:37
bs_for_nov start time: 08:18:37
bs_for_nov finish time: 08:18:39
bs_for_dec start time: 08:18:39
bs_for_dec finish time: 08:18:41
bs_for_jan start time: 08:18:41
bs_for_jan finish time: 08:18:43
CPU times: user 426 ms, sys: 38.7 ms, total: 465 ms
Wall time: 30.9 s


-------

In [None]:
folders_to_remove = [x[0]+'.parquet' for x in month_date_ranges]

for folder in folders_to_remove:
    !cd bs_segments; rm -rf {folder}

In [30]:
!cd bs_segments; rm -rf bs_for_aug_features_0_test.parquet
!cd bs_segments; ls -a

.		    bs_for_dec.parquet	bs_for_jun.parquet  bs_for_oct.parquet
..		    bs_for_feb.parquet	bs_for_mar.parquet  bs_for_sep.parquet
bs_for_apr.parquet  bs_for_jan.parquet	bs_for_may.parquet  sdf_init.parquet
bs_for_aug.parquet  bs_for_jul.parquet	bs_for_nov.parquet  target.parquet


# sdf_features_0

In [24]:
sdf_columns = spark.read.parquet('vgtest/aug.parquet')
sdf_columns.printSchema()

root
 |-- $insert_id: string (nullable = true)
 |-- $insert_key: string (nullable = true)
 |-- $price: double (nullable = true)
 |-- $productid: string (nullable = true)
 |-- $quantity: long (nullable = true)
 |-- $revenue: double (nullable = true)
 |-- $revenuetype: string (nullable = true)
 |-- $schema: string (nullable = true)
 |-- [server]_currency: string (nullable = true)
 |-- [server]_purchase: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- [server]_subscription_start_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- [server]_total_revenue: double (nullable = true)
 |-- [server]_trial: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ab-test: string (nullable = true)
 |-- action: string (nullable = true)
 |-- activity: long (nullable = true)
 |-- adid: string (nullable = true)
 |-- ads_watched: long (nullable = true)
 |-- amplitude_attribution_ids: array (nullable = true)
 |    |-- element

In [26]:
# sdf_columns.show(2, 100, True)

In [None]:
city, region, country, currency, device_family, os_name, push_permission

agg: event_time / event_type

location_lat
location_lng - OK
* country - OK
* region - OK
* interface_language - OK
* language - OK
* interface_language_changed - OK

* trial - OK
* push_permission - OK

* device_type - OK
* device_family - OK
* device_carrier - OK
* os_name - OK
* ip_address - OK
* AB-test - OK
---------
* promocode_sd_exam_passed - OK
* AB-test -- OK
* ads_watched -- OK
* show_hint -- OK
* os_name / platform -- OK

In [111]:
columns = ['amplitude_id', 'event_time', 'event_type', 'location_lat', 'location_lng']

columns = ['amplitude_id', 'event_time', 'location_lat', 'location_lng', 
           'city', 'region', 'country', 'currency', 'device_family', 'os_name', 'push_permission']

sdf = None

for parquet in parquets:
    
    sdf_temp = spark.read.parquet(parquet).select(columns)
    
    sdf_temp = (
                sdf_temp
                #.cast('long'))
                .withColumn('event_time', F.col('event_time').cast('timestamp'))
                # BETTER !
                .withColumn('location_lat', F.coalesce(F.col('location_lat'), F.lit(0)))
                .withColumn('location_lng', F.coalesce(F.col('location_lng'), F.lit(0)))
               )
                        
    if sdf is None:
        sdf = sdf_temp
    else:
        sdf = sdf.unionByName(sdf_temp, allowMissingColumns=True)

In [112]:
%%time
file_path = f'{s3a}/bs_segments/sdf_features_0.parquet'

sdf.write.parquet(path = file_path, mode = 'overwrite')

CPU times: user 63.7 ms, sys: 23.8 ms, total: 87.6 ms
Wall time: 9min 18s


In [114]:
!cd bs_segments; ls -a

.		    bs_for_jan.parquet	bs_for_oct.parquet
..		    bs_for_jul.parquet	bs_for_sep.parquet
bs_for_apr.parquet  bs_for_jun.parquet	sdf_features_0.parquet
bs_for_aug.parquet  bs_for_mar.parquet	sdf_init.parquet
bs_for_dec.parquet  bs_for_may.parquet	target.parquet
bs_for_feb.parquet  bs_for_nov.parquet


# bs_for_month_features_0

In [8]:
bs_for_months = [i[0] for i in month_date_ranges]
bs_for_months

['bs_for_feb',
 'bs_for_mar',
 'bs_for_apr',
 'bs_for_may',
 'bs_for_jun',
 'bs_for_jul',
 'bs_for_aug',
 'bs_for_sep',
 'bs_for_oct',
 'bs_for_nov',
 'bs_for_dec',
 'bs_for_jan']

In [53]:
spark.sql('DROP VIEW IF EXISTS bs_for_aug_features_0')
spark.sql('DROP VIEW IF EXISTS bs_for_aug')
spark.sql('DROP VIEW IF EXISTS sdf')
spark.sql('SHOW TABLES').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [54]:
sdf = spark.read.parquet('bs_segments/sdf_features_0.parquet')
sdf.createOrReplaceTempView('sdf')
spark.sql('SHOW TABLES').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |      sdf|       true|
+---------+---------+-----------+



**Comment:** for agg choose an appropriate window period.

In [55]:
query = '''
(
WITH 

bs_0 AS 
(
    SELECT 
        sdf.*
        ,bs.target_month
    FROM sdf
    INNER JOIN bs
        ON bs.amplitude_id = sdf.amplitude_id
    WHERE sdf.event_time < bs.target_month
),

max_event_time AS
(
    SELECT 
        amplitude_id
        ,MAX(event_time) AS max_event_time
    FROM bs_0
    GROUP BY amplitude_id
),

bs_max_event_time AS
(
    SELECT 
        bs_0.*
        ,ROW_NUMBER() OVER (PARTITION BY bs_0.amplitude_id, bs_0.event_time ORDER BY bs_0.location_lat DESC) AS row_n
    FROM bs_0
    INNER JOIN max_event_time met
        ON bs_0.amplitude_id = met.amplitude_id
        AND bs_0.event_time = met.max_event_time
)

SELECT 
    *
FROM bs_max_event_time
WHERE row_n = 1

)
'''

In [56]:
%%time
for bs_for_month in bs_for_months:
    
    print(f'{bs_for_month} start time: {datetime.now().strftime("%H:%M:%S")}')
    
    bs = spark.read.parquet(f'bs_segments/{bs_for_month}.parquet')
    bs.createOrReplaceTempView('bs')
    
    bs_for_month_features_0 = spark.sql(query)
    
    file_path = f'{s3a}/bs_segments/{bs_for_month}_features_0.parquet'

    bs_for_month_features_0.write.parquet(path = file_path, mode = 'overwrite')
    
    spark.sql('DROP VIEW IF EXISTS bs')
    del bs, bs_for_month_features_0
    gc.collect()
     
    print(f'{bs_for_month} finish time: {datetime.now().strftime("%H:%M:%S")}')

bs_for_feb start time: 22:05:18
bs_for_feb finish time: 22:05:29
bs_for_mar start time: 22:05:29
bs_for_mar finish time: 22:05:39
bs_for_apr start time: 22:05:39
bs_for_apr finish time: 22:05:50
bs_for_may start time: 22:05:50
bs_for_may finish time: 22:06:01
bs_for_jun start time: 22:06:01
bs_for_jun finish time: 22:06:14
bs_for_jul start time: 22:06:14
bs_for_jul finish time: 22:06:27
bs_for_aug start time: 22:06:27
bs_for_aug finish time: 22:06:41
bs_for_sep start time: 22:06:41
bs_for_sep finish time: 22:06:57
bs_for_oct start time: 22:06:57
bs_for_oct finish time: 22:08:09
bs_for_nov start time: 22:08:09
bs_for_nov finish time: 22:08:25
bs_for_dec start time: 22:08:25
bs_for_dec finish time: 22:08:41
bs_for_jan start time: 22:08:41
bs_for_jan finish time: 22:08:56
CPU times: user 434 ms, sys: 10.9 ms, total: 445 ms
Wall time: 3min 38s


In [73]:
%%time
for bs_for_month in bs_for_months:
    bs_for_month = spark.read.parquet(f'bs_segments/{bs_for_month}.parquet')
    print(bs_for_month.count(), bs_for_month.select('amplitude_id').distinct().count(), \
          bs_for_month.count() == bs_for_month.select('amplitude_id').distinct().count())

39289 39289 True
40573 40573 True
47453 47453 True
50795 50795 True
61435 61435 True
74258 74258 True
82073 82073 True
85075 85075 True
69362 69362 True
64782 64782 True
74944 74944 True
72779 72779 True
CPU times: user 18.6 ms, sys: 35 ms, total: 53.6 ms
Wall time: 4.21 s


In [74]:
%%time
for bs_for_month in bs_for_months:
    bs_for_month_features_0 = spark.read.parquet(f'bs_segments/{bs_for_month}_features_0.parquet')
    print(bs_for_month_features_0.count(), bs_for_month_features_0.select('amplitude_id').distinct().count(), \
          bs_for_month_features_0.count() == bs_for_month_features_0.select('amplitude_id').distinct().count())

39289 39289 True
40573 40573 True
47453 47453 True
50795 50795 True
61435 61435 True
74258 74258 True
82073 82073 True
85075 85075 True
69362 69362 True
64782 64782 True
74944 74944 True
72779 72779 True
CPU times: user 29.7 ms, sys: 18.1 ms, total: 47.8 ms
Wall time: 5.15 s


# Sampling

In [9]:
parquets = [f'bs_segments/{bs_for_month}_features_0.parquet' for bs_for_month in bs_for_months]
parquets

['bs_segments/bs_for_feb_features_0.parquet',
 'bs_segments/bs_for_mar_features_0.parquet',
 'bs_segments/bs_for_apr_features_0.parquet',
 'bs_segments/bs_for_may_features_0.parquet',
 'bs_segments/bs_for_jun_features_0.parquet',
 'bs_segments/bs_for_jul_features_0.parquet',
 'bs_segments/bs_for_aug_features_0.parquet',
 'bs_segments/bs_for_sep_features_0.parquet',
 'bs_segments/bs_for_oct_features_0.parquet',
 'bs_segments/bs_for_nov_features_0.parquet',
 'bs_segments/bs_for_dec_features_0.parquet',
 'bs_segments/bs_for_jan_features_0.parquet']

In [10]:
%%time
sdf = None

for parquet in parquets:
    
    sdf_temp = spark.read.parquet(parquet).drop('row_n')
                        
    if sdf is None:
        sdf = sdf_temp
    else:
        sdf = sdf.unionByName(sdf_temp, allowMissingColumns=True)

CPU times: user 11.2 ms, sys: 3.75 ms, total: 15 ms
Wall time: 3.66 s


In [85]:
sdf.show(10, 100, True)

-RECORD 0----------------------------------
 amplitude_id    | 301945958567            
 event_time      | 2023-01-09 00:11:54.198 
 location_lat    | 51.6539                 
 location_lng    | -0.0888                 
 city            | Enfield                 
 region          | Enfield                 
 country         | United Kingdom          
 currency        | null                    
 device_family   | Apple iPhone            
 os_name         | ios                     
 push_permission | not_granted             
 target_month    | 2023-02-01              
-RECORD 1----------------------------------
 amplitude_id    | 302850059008            
 event_time      | 2023-01-30 08:23:12.592 
 location_lat    | 0.0                     
 location_lng    | 0.0                     
 city            | Manchester              
 region          | Manchester              
 country         | United Kingdom          
 currency        | null                    
 device_family   | Apple iPhone 

In [11]:
sdf.count()

762818

In [84]:
sdf.printSchema()

root
 |-- amplitude_id: long (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- location_lat: double (nullable = true)
 |-- location_lng: double (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device_family: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- push_permission: string (nullable = true)
 |-- target_month: date (nullable = true)



In [12]:
target = spark.read.parquet('bs_segments/target.parquet')
target = target.withColumnRenamed('amplitude_id', 'amplitude_id_target')
target = target.withColumnRenamed('target_month', 'target_month_target')
target.printSchema()

root
 |-- amplitude_id_target: long (nullable = true)
 |-- target_date: date (nullable = true)
 |-- target_month_target: date (nullable = true)



In [13]:
sdf_w_target = sdf.join(
                    target,
                    (sdf.amplitude_id == target.amplitude_id_target) & (sdf.target_month == target.target_month_target),
                    'left').drop('amplitude_id_target', 'target_month_target')

sdf_w_target = sdf_w_target.withColumn('target', F.when(F.col('target_date').isNull(), 0).otherwise(1))  

In [93]:
sdf_w_target.printSchema()

root
 |-- amplitude_id: long (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- location_lat: double (nullable = true)
 |-- location_lng: double (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- device_family: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- push_permission: string (nullable = true)
 |-- target_month: date (nullable = true)
 |-- target_date: date (nullable = true)



In [14]:
sdf_w_target.count()

762818

In [15]:
result_df = sdf_w_target.groupBy("target_month") \
    .agg(
        F.count("*").alias("count"),  # Counts all rows in each group
        F.sum("target").alias("sum_target"),  # Sums up the 'target' column values per group
        # Calculate the sum of target divided by the count of rows
        (F.sum("target") / F.count("*")).alias("average_target")
    )
result_df.show(100)

+------------+-----+----------+--------------------+
|target_month|count|sum_target|      average_target|
+------------+-----+----------+--------------------+
|  2023-02-01|39289|       148|0.003766957672631...|
|  2023-03-01|40573|       198|0.004880092672466911|
|  2023-04-01|47453|       220|0.004636166311929699|
|  2023-05-01|50795|       260|0.005118614036814647|
|  2023-06-01|61435|       337|  0.0054854724505575|
|  2023-07-01|74258|       557|0.007500875326564141|
|  2023-08-01|82073|       515|0.006274901611979579|
|  2023-09-01|85075|       495|0.005818395533352924|
|  2023-10-01|69362|       302|0.004353969032034...|
|  2023-11-01|64782|       322|0.004970516501497329|
|  2023-12-01|74944|       424|0.005657557643040...|
|  2024-01-01|72779|         0|                 0.0|
+------------+-----+----------+--------------------+



In [113]:
result_df = sdf_w_target \
    .agg(
        F.count("*").alias("count"),  # Counts all rows in each group
        F.sum("target").alias("sum_target"),  # Sums up the 'target' column values per group
        # Calculate the sum of target divided by the count of rows
        (F.sum("target") / F.count("*")).alias("average_target")
    )
result_df.show(100)

+------+----------+--------------------+
| count|sum_target|      average_target|
+------+----------+--------------------+
|762818|      3778|0.004952688583646...|
+------+----------+--------------------+



In [16]:
ratio_of_ones_in_train_sampled = 0.3
ratio_of_ones_in_test_sampled = 0.015

test_part = 0.2
train_part = 1 - test_part
test_to_train = test_part / train_part

train_part, test_part, test_to_train, ratio_of_ones_in_train_sampled, ratio_of_ones_in_test_sampled

(0.8, 0.2, 0.25, 0.3, 0.015)

In [17]:
train, test = sdf_w_target.randomSplit([train_part, test_part], seed = SEED)

## Train

In [18]:
train_sampled_count_1 = train.filter(F.col('target') == 1).count()
train_sampled_count = train_sampled_count_1 / ratio_of_ones_in_train_sampled
train_sampled_count_0 = train_sampled_count - train_sampled_count_1
train_fractions_0 = train_sampled_count_0 / train.filter(F.col('target') == 0).count()
train_fractions_0

0.011659943470267008

In [19]:
train_fractions = {0 : train_fractions_0, 1 : 1} # different fractions
train_sampled = train.sampleBy(col = 'target', fractions = train_fractions, seed = SEED)

In [20]:
train_sampled.count(),\
train_sampled.filter(F.col('target') == 0).count(),\
train_sampled.filter(F.col('target') == 1).count(),\
train_sampled_count_1

(10116, 7081, 3035, 3035)

In [64]:
result_df = train_sampled \
    .agg(
        F.count("*").alias("count"),  # Counts all rows in each group
        F.sum("target").alias("sum_target"),  # Sums up the 'target' column values per group
        # Calculate the sum of target divided by the count of rows
        (F.sum("target") / F.count("*")).alias("average_target")
    )
result_df.show(100)

+-----+----------+------------------+
|count|sum_target|    average_target|
+-----+----------+------------------+
|10116|      3035|0.3000197706603401|
+-----+----------+------------------+



In [65]:
file_path = f'bs_segments/train_sampled.csv'

train_sampled.toPandas().to_csv(file_path, index=False)

In [66]:
file_path = f'bs_segments/train_sampled.csv'
train_sampled = pd.read_csv(file_path)

(10116, 14)

# Test sampled too

In [21]:
test_sampled_count = train_sampled.count() * test_to_train
test_sampled_count_1 = test_sampled_count * ratio_of_ones_in_test_sampled
test_sampled_count_0 = test_sampled_count * (1 - ratio_of_ones_in_test_sampled)
test_fractions_0 = test_sampled_count_0 / test.filter(F.col('target') == 0).count()
test_fractions_1 = test_sampled_count_1 / test.filter(F.col('target') == 1).count()

In [22]:
test_fractions = {0 : test_fractions_0, 1 : test_fractions_1} # different fractions
test_sampled = test.sampleBy(col = 'target', fractions = test_fractions, seed = SEED)

In [23]:
test_sampled.count(),\
test_sampled_count,\
test_sampled.filter(F.col('target') == 0).count(),\
test_sampled.filter(F.col('target') == 1).count()

(2530, 2529.0, 2493, 37)

In [24]:
result_df = test_sampled \
    .agg(
        F.count("*").alias("count"),  # Counts all rows in each group
        F.sum("target").alias("sum_target"),  # Sums up the 'target' column values per group
        # Calculate the sum of target divided by the count of rows
        (F.sum("target") / F.count("*")).alias("average_target")
    )
result_df.show(100)

+-----+----------+--------------------+
|count|sum_target|      average_target|
+-----+----------+--------------------+
| 2530|        37|0.014624505928853756|
+-----+----------+--------------------+



In [63]:
# file_path = f'{s3a}/bs_segments/test_sampled.csv'
# test_sampled.write.csv(path = file_path, mode = 'overwrite')

file_path = f'bs_segments/test_sampled.csv'

test_sampled.toPandas().to_csv(file_path, index=False)

In [70]:
file_path = f'bs_segments/test_sampled.csv'
test_sampled = pd.read_csv(file_path)

## Test

In [185]:
test_sampled_count = train_sampled.count() * test_to_train
test_fraction = test_sampled_count / test.count()
test_fraction

0.016590895672196966

In [186]:
test_fractions = {0 : test_fraction, 1 : test_fraction} # same fractions
test_sampled = test.sampleBy(col = 'target', fractions = test_fractions, seed = SEED)

In [187]:
test_sampled.count(),\
test_sampled_count,\
test_sampled.filter(F.col('target') == 0).count(),\
test_sampled.filter(F.col('target') == 1).count()

(2527, 2529.0, 2517, 10)

In [190]:
result_df = test_sampled \
    .agg(
        F.count("*").alias("count"),  # Counts all rows in each group
        F.sum("target").alias("sum_target"),  # Sums up the 'target' column values per group
        # Calculate the sum of target divided by the count of rows
        (F.sum("target") / F.count("*")).alias("average_target")
    )
result_df.show(100)

+-----+----------+--------------------+
|count|sum_target|      average_target|
+-----+----------+--------------------+
| 2527|        10|0.003957261574990107|
+-----+----------+--------------------+



### Target:

**1. Target as `Weekly` / `Monthly` only with `subscription_start_date`**
* unified book with LEFT JOIN and NULL further

**2. Target as `Weekly` / `Monthly` / `Lifetime`**
* LAG() approach
* event_time - technically

### Questions

- только без подписки или как - условия базового сегмента - на сколько новых с какой глубиной
- paid - обязательно?

### Answers
- сравнить `MIN(event_time)` и первое поялвние `subscription_start_date` - OK
- `paid` c `Lifetime`
* **paid values:**
sdf.select('paid').distinct().show() = `null, false, true, no, yes`
* `sdf.filter((F.col('purchase') == 'Lifetime') & (F.col('subscription_start_date').isNotNull())).count()`
= **15737** / 74195551 = 0.2 %