In [None]:
!/opt/conda/envs/iu_py27/bin/pip install cassandra-driver

In [1]:
# packages
from itertools import chain
from datetime import datetime, timedelta
from pyspark.sql.functions import (
    col,
    count,
    countDistinct,
    create_map,
    dayofweek,
    posexplode,
    from_unixtime,
    greatest,
    hour,
    lit,
    max,
    minute,
    when,
)
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# Set the variables for cassandra
hostnames = {
    'single_node' : '10.241.0.110',
    'cluster' : '10.128.175.71'
}
hostname = 'cluster'
username = 'featurestore-superuser'
password = 'HlOmCDx4d6wv7uSL6NtV'
auth_provider = PlainTextAuthProvider(
    username=username, password=password
)

target_date = datetime(2022,7,1).strftime('%Y%m%d')
start_date = (
    datetime.strptime(target_date, '%Y%m%d') + timedelta(days=-27)
).strftime('%Y%m%d')

target_table = "coupon"
target_keyspace = "featurestore"

In [None]:
#====================================================================#
# create target table                                                #
#====================================================================#

if hostname == 'single_node':
    cluster = Cluster([hostnames[hostname]])
if hostname == 'cluster':
    cluster = Cluster(
        [hostnames[hostname]],
        port=9042,
        auth_provider=auth_provider
    )

session = cluster.connect()
    
create_keyspace = """
CREATE KEYSPACE IF NOT EXISTS featurestore
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : '1' }
AND durable_writes = false
"""

session.execute(create_keyspace)

create_table = """
CREATE TABLE IF NOT EXISTS featurestore.{target_table}
( mid text
, module_name text
, region text
, recent_date text
, frequency int
, user_segment text
, view_count int
, click_count int
, ctr float
, prf_visit_time text
, prf_visit_dow text
, log_date text
, PRIMARY KEY ((log_date, mid))
)
"""

session.execute(
    create_table.format(target_table=target_table)
)

In [3]:
#====================================================================#
# source tables from IU                                              #
#====================================================================#

query1 = """
select
    mid,
    user_segment,
    log_date
from
    linewallet_business_pro.w_user_base
where
    log_date between '{start_date}' and '{target_date}'
"""

query2 = """
select
    mid,
    ts,
    module_name,
    action,
    region,
    log_date
from
    linewallet_business_pro.w_tslog_base
where
    module_name = 'CouponBrand'
    and log_date between '{start_date}' and '{target_date}'
"""

w_user_base = spark.sql(
    query1.format(start_date=start_date, target_date=target_date)
)
w_tslog_base = spark.sql(
    query2.format(start_date=start_date, target_date=target_date)
)

#====================================================================#
# Generate the features to migrate                                   #
#====================================================================#

# recent_date
recent_date = (
    w_tslog_base
    .groupBy('mid')
    .agg(max('log_date').alias('recent_date'))
)

# frequency
frequency = (
    w_tslog_base
    .groupBy('mid')
    .agg(countDistinct('log_date').alias('frequency'))
)

# user_segment
user_segment = w_user_base.dropDuplicates(['mid']).drop('log_date')

# view & click
view_click = (
    w_tslog_base
    .groupBy('mid')
    .agg(
        count(when(col('action') == 'view', True)).alias('view_count'),
        count(when(col('action') == 'click', True)).alias('click_count'),
    )
)

# ctr
view_click = (
    view_click.withColumn(
        'ctr', view_click['click_count'] / view_click['view_count']
    )
)

# prf_visit_time
columns = ['6-10', '10-14', '14-18', '18-22', '22-2', '2-6']
prf_visit_time = (
    w_tslog_base
    .withColumn('ts_convert', from_unixtime(col('ts')/1000))
    .withColumn(
        'hour_minute', hour('ts_convert') + minute('ts_convert')/60
    )
    .groupBy('mid')
    .agg(
        count(when(
            (col('hour_minute') >= 6) & (col('hour_minute') < 10), True
        )).alias('6-10'),
        count(when(
            (col('hour_minute') >= 10) & (col('hour_minute') < 14), True
        )).alias('10-14'),
        count(when(
            (col('hour_minute') >= 14) & (col('hour_minute') < 18), True
        )).alias('14-18'),
        count(when(
            (col('hour_minute') >= 18) & (col('hour_minute') < 22), True
        )).alias('18-22'),
        count(when(
            (col('hour_minute') >= 22) | (col('hour_minute') < 2), True
        )).alias('22-2'),
        count(when(
            (col('hour_minute') >= 2) & (col('hour_minute') < 6), True
        )).alias('2-6')
    )
    .withColumn('max_visit_time', greatest(*columns))
    .select(
        "*",
        posexplode(
            create_map(
                list(
                    chain(*[(lit(time), col(time)) for time in columns])
                )
            )
        )
    )
    .filter(col('max_visit_time') == col('value'))
    .select(['mid'] + [col('key').alias('prf_visit_time')])
    .dropDuplicates(['mid'])
)

# prf_visit_dow
columns = ['mon', 'tues', 'weds', 'thur', 'fri', 'sat', 'sun']
prf_visit_dow = (
    w_tslog_base
    .withColumn('ts_convert', from_unixtime(col('ts')/1000))
    .withColumn('dayofweek', dayofweek('ts_convert'))
    .groupBy('mid')
    .agg(
        count(when(col('dayofweek') == 1, True)).alias('sun'),
        count(when(col('dayofweek') == 2, True)).alias('mon'),
        count(when(col('dayofweek') == 3, True)).alias('tues'),
        count(when(col('dayofweek') == 4, True)).alias('weds'),
        count(when(col('dayofweek') == 5, True)).alias('thur'),
        count(when(col('dayofweek') == 6, True)).alias('fri'),
        count(when(col('dayofweek') == 7, True)).alias('sat'),
    )
    .withColumn('max_visit_dow', greatest(*columns))
    .select(
        "*",
        posexplode(
            create_map(
                list(
                    chain(*[(lit(time), col(time)) for time in columns])
                )
            )
        )
    )
    .filter(col('max_visit_dow') == col('value'))
    .select(['mid'] + [col('key').alias('prf_visit_dow')])
    .dropDuplicates(['mid'])
)

#====================================================================#
# Join the features to the IU source table                           #
#====================================================================#

result = (
    w_tslog_base
    .drop('log_date', 'action', 'ts')
    .dropDuplicates(['mid'])
    .join(recent_date, on=['mid'], how='leftouter')
    .join(frequency, on=['mid'], how='leftouter')
    .join(user_segment, on=['mid'], how='leftouter')
    .join(view_click, on=['mid'], how='leftouter')
    .join(prf_visit_time, on=['mid'], how='leftouter')
    .join(prf_visit_dow, on=['mid'], how='leftouter')
    .withColumn('log_date', lit(target_date))
)

In [4]:
#====================================================================#
# Insert the features to cassandra                                   #
#====================================================================#

print(
    "size of the {target_table} in IU : {size}"
    .format(
        target_table = target_table,
        size = format(result.count(), ',')
    )
)
print(
    "[{start_time}] Insert START - table : {target_table}"
    .format(
        start_time = datetime.now(),
        target_table = target_table
    )
)

# extract 10 samples
sample = result.head(10)
df_sample = spark.createDataFrame(sample)

# write to cassandra
(
    df_sample
    .write
    .format("org.apache.spark.sql.cassandra")
    .mode("overwrite")
    .option("confirm.truncate", "true")
    .options(table=target_table, keyspace=target_keyspace)
    .save()
)
print(
    "[{end_time}] Insert DONE - table : {target_table}"
    .format(
        end_time = datetime.now(),
        target_table = target_table
    )
)

#====================================================================#
# Verify the migrated features in cassandra                          #
#====================================================================#

df_cassandra = (
    spark
    .read
    .format("org.apache.spark.sql.cassandra")
    .options(table=target_table, keyspace=target_keyspace)
    .load()
)
df_cassandra.show()

Insert DONE - table : coupon
size of the coupon in IU : 955,546
size of the coupon in Cassandra : 10
+--------+--------------------+-----------+---+---------+-----------+-------------+--------------+-----------+------+------------+----------+
|log_date|                 mid|click_count|ctr|frequency|module_name|prf_visit_dow|prf_visit_time|recent_date|region|user_segment|view_count|
+--------+--------------------+-----------+---+---------+-----------+-------------+--------------+-----------+------+------------+----------+
|20220701|u005173aecb9d4dc5...|          0|0.0|        1|CouponBrand|          sat|         10-14|   20220618|    JP|New/Comeback|         6|
|20220701|u0001549713da2d09...|          0|0.0|        1|CouponBrand|          fri|         14-18|   20220610|    JP|New/Comeback|         1|
|20220701|u0005f904c63a8df3...|          0|0.0|        1|CouponBrand|         weds|         10-14|   20220608|    JP| Go-straight|         1|
|20220701|u00284965bfba3db4...|          0|0.0|