## Тестовый пример с битовыми операциями Spark для расчета признаков активности


В блокноте сделана эмуляция расчета признаков на созданных (фэйковых) сырых данных.

Реализован алгоритм ежедневного обновления данными только за последний день и потом уже расчет признаков.

В отладочных целях используется pandas совместно со spark (в датабрикс он не нужен)
Данные из колонок:
- date, user_id, app_id, event_type - основные колонки активности пользователей
- content_category - “суб-категория”

#### Шаги:
- "Создать фэйковые исходные данные" - Когда флаг не установлен `MAKE_FAKE_DATA = False`, шаг не запускаетсяю
- "Первоначальное заполнение основной таблицы" - этот шаг запускается только один раз. Заполняет основную таблицу по сырым данным за указанные 90 дней. И формирует активность в виде битов в двух колонках `activity_65_128`, `activity_1_64`. При этом в activity_65_128 используются только младшие 26 бит, чтобы в сумме было 90 бит активностей. Эта таблица формируется во всех нужных дальше разрезах
- "Ежедневное обновление" - этот шаг планируется запускать ежедневно. Из сырых данных берется только один нужный день - формируется бит активности в тех же разрезах. Дальше присоединяется к основной таблице. При этом в основной таблице биты сдвигаются влево на 1. Старший из `activity_1_64` переходит в младший `activity_65_128`. А новый бит за последний день устанавливается в младший `activity_1_64`. Опять же все данные (биты) больше 90 дней обнуляются.
- "Retention features" - это расчет финальных признаков retention за 7, 30 и 90 дней.


In [1]:
import uuid
import random
from datetime import datetime, timedelta

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.types import StringType, StructType, StructField
import pandas as pd


In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Lev_9_10") \
    .getOrCreate()    


java_address 127.0.0.1 59034
java_address 127.0.0.1 59034


In [3]:
MAKE_FAKE_DATA = True # True
CONTENT_CATEGORIES = ["News", "Sports", "Entertainment", "Technology", "Health"]
last_date = datetime(2024, 12, 2)
end_date = '2024-11-25'
next_date = '2024-11-26'

MASK_7_BITS = F.lit((1 << 7) - 1).cast("BIGINT") # Для младших 7 бит
MASK_30_BITS = F.lit((1 << 30) - 1).cast("BIGINT") # Для младших 30 бит
HIGHEST_BIT = F.lit(-9223372036854775808).cast("BIGINT") # для определения старшего бита 
MASK_26_BITS = F.lit((1 << 26) - 1).cast("BIGINT")  


In [4]:
def bin_10(num):
    binary_representation = bin(num & (2**64 - 1))[2:].zfill(64)
    print(binary_representation)    

### Создать фэйковые исходные данные

In [5]:
def generate_data(output_path, constant_date, num_days=120, num_records=100000):
    # Настройки
    USER_ID_LIST = [str(uuid.uuid4()) for _ in range(15)]
    APP_IDS = [f"app{str(i)}" for i in range(1, 11)]
    EVENT_TYPES = ['install', 'session_start']
    
    user_id_dates = {}

    # Создаем Spark сессию
    spark = SparkSession.builder \
        .master("local[1]") \
        .appName("DataGeneration") \
        .getOrCreate()

    # Базовые данные
    data = []
    for _ in range(num_records):
        # Генерация активности для user_id
        user_id = random.choices(USER_ID_LIST, weights=[1, 3, 2, 5, 10, 10, 1, 1, 3, 2, 4, 1, 2, 3, 2])[0]
        app_id = random.choice(APP_IDS)
        content_category = random.choice(CONTENT_CATEGORIES)
        event_type = random.choice(EVENT_TYPES)

        if user_id in user_id_dates:
            start_date = user_id_dates[user_id]['start_date']
            end_date = user_id_dates[user_id]['end_date']
        else:
            # Логика для определения дат активности
            if random.random() < 0.5:
                start_date = constant_date - timedelta(days=(3 * num_days) // 4)
                end_date = constant_date - timedelta(days=num_days // 4)
            else:
                start_date = constant_date - timedelta(days=num_days)
                end_date = constant_date
            user_id_dates[user_id] = {'start_date': start_date, 'end_date': end_date}
        
        event_date = start_date + timedelta(days=random.randint(0, (end_date - start_date).days))

        data.append((event_date.strftime("%Y-%m-%d"), user_id, app_id, 
                     content_category, event_type))

    # Схема данных
    schema = StructType([
        StructField("date", StringType(), True),
        StructField("user_id", StringType(), True),
        StructField("app_id", StringType(), True),
        StructField("content_category", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

    # Создаем DataFrame и сохраняем в Parquet
    df = spark.createDataFrame(data, schema=schema)
    pandas_df = df.toPandas()
    pandas_df.to_parquet(output_path, index=False)



In [6]:
if MAKE_FAKE_DATA:
    generate_data("mock_table.parquet", last_date, num_days=120, num_records=10000) # 500000


In [7]:
df = pd.read_parquet('mock_table.parquet')
print(df.shape)
df.head()

(10000, 5)


Unnamed: 0,date,user_id,app_id,content_category,event_type
0,2024-09-12,dc3cabab-adfd-41f1-9c4c-026cf7fa72fe,app6,News,install
1,2024-08-12,44d476d0-3db4-4d81-a1a7-02f1c426548c,app4,Entertainment,install
2,2024-10-20,44d476d0-3db4-4d81-a1a7-02f1c426548c,app3,News,session_start
3,2024-08-23,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app7,Health,session_start
4,2024-09-17,f5e2355e-7d83-489e-bdd0-3db67fa4570e,app8,News,install


In [8]:
df['date'].min(), df['date'].max()

('2024-08-04', '2024-12-02')

In [9]:
df.groupby("user_id", as_index=False).aggregate({
        "date": ["min", "max"],
    })

Unnamed: 0_level_0,user_id,date,date
Unnamed: 0_level_1,Unnamed: 1_level_1,min,max
0,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,2024-08-05,2024-12-02
1,365266e0-f128-4e5f-ba95-f4bb24023e24,2024-08-04,2024-12-02
2,44d476d0-3db4-4d81-a1a7-02f1c426548c,2024-08-04,2024-12-02
3,4bb0baef-b855-449a-8b3f-45a9343913d8,2024-08-05,2024-12-02
4,4cbde1b3-e523-4d80-9f23-e1516b38addd,2024-08-04,2024-12-02
5,595f4c27-f875-41cb-92d5-535ae8b71a4f,2024-09-03,2024-11-02
6,67f248d4-1688-438f-9886-95ae56d535b4,2024-09-03,2024-11-02
7,69a79330-f160-4921-8b20-ba5a5e854ad4,2024-09-03,2024-11-02
8,6d9b4e2d-3f21-47be-80c1-0e563c1d01d3,2024-09-03,2024-11-02
9,975b093c-dc65-47be-bd4a-97083221a7ab,2024-08-04,2024-12-02


### Первоначальное заполнение основной таблицы

In [10]:
spark_df = spark.createDataFrame(df)

In [11]:
start_date = F.date_sub(F.lit(end_date), 89)  # 90 дней назад


In [12]:
filtered_df = spark_df.filter((F.col("date") >= start_date) & (F.col("date") <= end_date))
activity_df = filtered_df.withColumn("days_ago", F.datediff(F.lit(end_date), F.col("date")))

In [13]:
result_df = activity_df.groupBy("user_id", "app_id", "content_category") \
    .agg(
        # Для дней 33-96 с применением побитового сдвига
        F.expr("BIT_OR(CASE WHEN days_ago >= 64 AND days_ago < 91 THEN CAST(POWER(2, days_ago - 64) AS BIGINT) ELSE 0 END)").alias("activity_65_128"),
        # Для первых 32 дней
        F.expr("BIT_OR(CASE WHEN days_ago < 64 THEN CAST(POWER(2, days_ago) AS BIGINT) ELSE 0 END)").alias("activity_1_64"),
        F.max("days_ago").alias("first_activity_bit"),
    )

result_df = result_df.withColumn(
    "first_activity",
    F.when(F.col("first_activity_bit") >= 90, F.lit(91)).otherwise(F.col("first_activity_bit") + 1)
).drop("first_activity_bit")

In [14]:
result_df_pd = result_df.toPandas()
result_df_pd = result_df_pd.sort_values(by=['user_id','app_id','content_category'])
print(result_df_pd.shape)
result_df_pd.head()


(742, 6)


Unnamed: 0,user_id,app_id,content_category,activity_65_128,activity_1_64,first_activity
391,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Entertainment,4096,2252907915247648,77
491,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Health,35651844,0,90
188,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,News,33833024,0,90
403,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Sports,16908288,72057662776279040,89
435,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Technology,67584,5629499538407424,81


### Ежедневное обновление

#### Получаем данные за последний день

- до этого была заполнена до '2024-11-25' включительно
- получаем за '2024-11-26'

#### Объединяем с основной

- сдвигаем биты на 1

In [15]:
# Получение данных за новую дату '2024-11-26'
new_data_df = spark_df.filter(F.col("date") == F.lit(next_date))
# Создание DataFrame с активностями за новую дату
new_activity_df = new_data_df.withColumn("days_ago", F.datediff(F.lit(next_date), F.col("date")))

# Аггрегация новых данных
new_aggregated_df = new_activity_df.groupBy("user_id", "app_id", "content_category") \
    .agg(
        # Заполнение младшего бита для новых активностей
        F.expr("CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END").alias("new_activity_bit")
    )

new_aggregated_df_pd = new_aggregated_df.toPandas()
print(new_aggregated_df_pd.shape)
new_aggregated_df_pd.head()



(47, 4)


Unnamed: 0,user_id,app_id,content_category,new_activity_bit
0,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app9,Technology,1
1,c8b09e9e-1062-43c2-8961-da65887f6c08,app5,Technology,1
2,4cbde1b3-e523-4d80-9f23-e1516b38addd,app3,Sports,1
3,44d476d0-3db4-4d81-a1a7-02f1c426548c,app6,Sports,1
4,c8b09e9e-1062-43c2-8961-da65887f6c08,app3,News,1


In [16]:
def update_activity_bits(result_df, new_aggregated_df):

    updated_df = result_df.alias("old") \
        .join(new_aggregated_df.alias("new"), 
              [
                  "user_id", 
                  "app_id", 
                  "content_category"
              ], 
            "full_outer") \
        .select(
            F.coalesce(F.col("old.user_id"), F.col("new.user_id")).alias("user_id"),
            F.coalesce(F.col("old.app_id"), F.col("new.app_id")).alias("app_id"),
            F.coalesce(F.col("old.content_category"), F.col("new.content_category")).alias("content_category"),

            # Обновляем activity_65_128: сдвиг влево и добавляем старший бит из предыдущего activity_1_64
            (F.shiftleft(F.coalesce(F.col("old.activity_65_128"), F.lit(0)), 1)
             .bitwiseOR(
                 F.when(
                     F.coalesce(F.col("old.activity_1_64"), F.lit(0)).bitwiseAND(HIGHEST_BIT) != 0, 
                     F.lit(1).cast("BIGINT")
                 )
                 .otherwise(F.lit(0).cast("BIGINT"))
             )
             # Обнуляем старшие 38 бит
             .bitwiseAND(MASK_26_BITS)
            ).alias("activity_65_128"),
            
            # Обновляем activity_1_64: сдвиг влево и добавление нового бита активности
            (F.shiftleft(F.coalesce(F.col("old.activity_1_64"), F.lit(0)), 1)
             .bitwiseOR(F.coalesce(F.col("new.new_activity_bit"), F.lit(0)))
            ).alias("activity_1_64"),

            # Обновление first_activity
            F.when(F.col("old.user_id").isNull(), F.lit(1))
            .otherwise(
                F.when(F.coalesce(F.col("old.first_activity"), F.lit(91)) < 91, 
                       F.coalesce(F.col("old.first_activity"), F.lit(91)) + 1)
                .otherwise(F.lit(91))
            ).alias("first_activity")
        )

    # Фильтрация записей с ненулевой активностью за 90 дней
    final_df = updated_df.filter(
        (F.col("activity_1_64") != 0) | (F.col("activity_65_128") != 0)
    )
    
    return final_df

In [17]:

result_df_next = update_activity_bits(result_df, new_aggregated_df)

result_df_next_pd = result_df_next.toPandas()
result_df_next_pd = result_df_next_pd.sort_values(by=['user_id','app_id','content_category'])
print(result_df_next_pd.shape)
result_df_next_pd.head()



(742, 6)


Unnamed: 0,user_id,app_id,content_category,activity_65_128,activity_1_64,first_activity
0,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Entertainment,8192,4505815830495296,78
1,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Health,4194824,0,91
2,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,News,557184,0,91
3,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Sports,33816576,144115325552558080,90
4,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Technology,135168,11258999076814848,82


ДО обновления

In [18]:
result_df_pd[result_df_pd['user_id'].isin(new_aggregated_df_pd['user_id'])].head()


Unnamed: 0,user_id,app_id,content_category,activity_65_128,activity_1_64,first_activity
391,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Entertainment,4096,2252907915247648,77
491,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Health,35651844,0,90
188,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,News,33833024,0,90
403,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Sports,16908288,72057662776279040,89
435,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Technology,67584,5629499538407424,81


ПОСЛЕ обновления

In [19]:
result_df_next_pd[result_df_next_pd['user_id'].isin(new_aggregated_df_pd['user_id'])].head()


Unnamed: 0,user_id,app_id,content_category,activity_65_128,activity_1_64,first_activity
0,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Entertainment,8192,4505815830495296,78
1,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Health,4194824,0,91
2,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,News,557184,0,91
3,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Sports,33816576,144115325552558080,90
4,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,app1,Technology,135168,11258999076814848,82


In [20]:
# Примеры изменений числе после сдвига
# activity_65_128
bin_10(8388608)
bin_10(16777216)
# activity_1_64
bin_10(72057594037927936)
bin_10(144115188075855872)


0000000000000000000000000000000000000000100000000000000000000000
0000000000000000000000000000000000000001000000000000000000000000
0000000100000000000000000000000000000000000000000000000000000000
0000001000000000000000000000000000000000000000000000000000000000


### Retention features
Группируем по `content_category` применяя побитовый OR и рассчитываем признаки

In [21]:
ret_features = result_df_next.groupBy("user_id", "content_category").agg(
    F.expr("bit_or(activity_1_64)").alias("activity_1_64"),
    F.expr("bit_or(activity_65_128)").alias("activity_65_128"),
    F.max(F.col("first_activity")).alias("first_activity"),
)

ret_features = ret_features.withColumn(
    "ret_7d",
    F.bit_count(F.col("activity_1_64").bitwiseAND(MASK_7_BITS)) /
    F.when(F.col("first_activity") < 7, F.col("first_activity")).otherwise(7)
).withColumn(
    "ret_30d",
    F.bit_count(F.col("activity_1_64").bitwiseAND(MASK_30_BITS)) /
    F.when(F.col("first_activity") < 30, F.col("first_activity")).otherwise(30)
).withColumn(
    "ret_90d",
    (
        F.bit_count(F.col("activity_1_64")) + 
        F.bit_count(F.col("activity_65_128").bitwiseAND(MASK_26_BITS))
    ) / F.when(F.col("first_activity") < 90, F.col("first_activity")).otherwise(90)
).drop('activity_1_64', 'activity_65_128', 'first_activity')

# ret_features.show(5)

In [22]:
ret_features_pd = ret_features.toPandas()
ret_features_pd.head(5)

Unnamed: 0,user_id,content_category,ret_7d,ret_30d,ret_90d
0,4bb0baef-b855-449a-8b3f-45a9343913d8,Health,0.285714,0.3,0.314607
1,213a867e-4ccf-4257-b7b5-9d3ce6e4b50a,Sports,0.285714,0.5,0.555556
2,365266e0-f128-4e5f-ba95-f4bb24023e24,Sports,0.571429,0.666667,0.651685
3,975b093c-dc65-47be-bd4a-97083221a7ab,News,1.0,1.0,0.933333
4,69a79330-f160-4921-8b20-ba5a5e854ad4,News,0.857143,0.966667,0.964706


In [23]:
# Останавливаем Spark
spark.stop()