# Feature Engineering - 1) User Profiles

## Part A: 분석을 위한 사전 환경 설정

### A-1) 모듈/패키지 로드

In [1]:
# Bucket 관련 사전 설정

OUTPUT_BUCKET_FOLDER = "gs://cap-18/output/"
DATA_BUCKET_FOLDER = "gs://cap-18/data/"

In [2]:
# 필요한 모듈 로드

from IPython.display import display
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrameWriter
import numpy as np
import math
import datetime
import time
import random
random.seed(42)
start_time = time.time()

In [3]:
evaluation = True
truncate_day_from_timestamp_udf = F.udf(lambda ts: int(ts / 1000 / 60 / 60 / 24), IntegerType())

### A-2) 데이터 로드

In [4]:
# country를 추출하는 함수 생성
extract_country_udf = F.udf(lambda geo: geo.strip()[:2] if geo != None else '', StringType())

In [5]:
# documents_meta 스키마(document_id_doc, source_id, publisher_id, publish_time) 생성
documents_meta_schema = StructType(
                    [StructField("document_id_doc", IntegerType(), True),
                    StructField("source_id", IntegerType(), True),                    
                    StructField("publisher_id", IntegerType(), True),
                    StructField("publish_time", TimestampType(), True)]
                    )

# documents_meta_df 테이블 생성
documents_meta_df = spark.read.schema(documents_meta_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_meta.csv") \
                .withColumn('dummyDocumentsMeta', F.lit(1)).alias('documents_meta')

In [35]:
documents_meta_df.show(10)

+---------------+---------+------------+-------------------+------------------+
|document_id_doc|source_id|publisher_id|       publish_time|dummyDocumentsMeta|
+---------------+---------+------------+-------------------+------------------+
|        1595802|        1|         603|2016-06-05 00:00:00|                 1|
|        1524246|        1|         603|2016-05-26 11:00:00|                 1|
|        1617787|        1|         603|2016-05-27 00:00:00|                 1|
|        1615583|        1|         603|2016-06-07 00:00:00|                 1|
|        1615460|        1|         603|2016-06-20 00:00:00|                 1|
|        1615354|        1|         603|2016-06-10 00:00:00|                 1|
|        1614611|        1|         603|2016-06-05 13:00:00|                 1|
|        1614235|        1|         603|2016-06-09 00:00:00|                 1|
|        1614225|        1|         603|2016-06-09 00:00:00|                 1|
|        1488264|        1|         603|

In [6]:
# documents_categories 스키마 생성
documents_categories_schema = StructType(
                    [StructField("document_id_cat", IntegerType(), True),
                    StructField("category_id", IntegerType(), True),                    
                    StructField("confidence_level_cat", FloatType(), True)]
                    )

documents_categories_df = spark.read.schema(documents_categories_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_categories.csv") \
                .alias('documents_categories')

# documents_categories_grouped_df 생성
documents_categories_grouped_df = documents_categories_df.groupBy('document_id_cat') \ # documment_id_cat별로 묶은 후,
                                            .agg(F.collect_list('category_id').alias('category_id_list'), # category_id_list 컬럼: category_id를 list로 묶어 표시
                                                 F.collect_list('confidence_level_cat').alias('cat_confidence_level_list')) \ # cat_confidence_level_list 컬럼: category의 conf. level을 list로 묶어 표시  
                                            .withColumn('dummyDocumentsCategory', F.lit(1)) \ # documents_category인지 여부를 dummy로 표시
                                            .alias('documents_categories_grouped')

In [33]:
# documents_categories_grouped_df 구조 확인
documents_categories_grouped_df.show(2)

+---------------+----------------+-------------------------+----------------------+
|document_id_cat|category_id_list|cat_confidence_level_list|dummyDocumentsCategory|
+---------------+----------------+-------------------------+----------------------+
|            148|    [1403, 1702]|             [0.92, 0.07]|                     1|
|            463|    [1513, 1808]|     [0.8932095, 0.067...|                     1|
+---------------+----------------+-------------------------+----------------------+
only showing top 2 rows



In [7]:
# documents_topics 스키마 생성
documents_topics_schema = StructType(
                    [StructField("document_id_top", IntegerType(), True),
                    StructField("topic_id", IntegerType(), True),                    
                    StructField("confidence_level_top", FloatType(), True)]
                    )

documents_topics_df = spark.read.schema(documents_topics_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"documents_topics.csv")  \
                .alias('documents_topics')

# documents_topics_grouped_ef 생성
documents_topics_grouped_df = documents_topics_df.groupBy('document_id_top') \ # document_id_top별로 묶은 후,
                                            .agg(F.collect_list('topic_id').alias('topic_id_list'), # topic_id_list 컬럼: topic_id를 list로 묶어 표시
                                                 F.collect_list('confidence_level_top').alias('top_confidence_level_list')) \ # top_confidence_level_list 컬럼: topic의 conf. level을 list로 묶어 표시
                                            .withColumn('dummyDocumentsTopics', F.lit(1)) \ # documents_topics인지 여부를 dummy로 표시
                                            .alias('documents_topics_grouped')

In [9]:
# documents_df 생성: documents_meta-documents_categories_grouped_df-documents_topics_grouped_df join
documents_df = documents_meta_df.join(documents_categories_grouped_df, on=F.col("document_id_doc") == F.col("documents_categories_grouped.document_id_cat"), how='left') \
                         .join(documents_topics_grouped_df, on=F.col("document_id_doc") == F.col("documents_topics_grouped.document_id_top"), how='left') \
                         .cache()

In [49]:
documents_df.count()

2999334

In [37]:
documents_df.show(2)

+---------------+---------+------------+-------------------+------------------+---------------+----------------+-------------------------+----------------------+---------------+--------------------+-------------------------+--------------------+---------------+--------------------+-------------------------+----------------------+
|document_id_doc|source_id|publisher_id|       publish_time|dummyDocumentsMeta|document_id_cat|category_id_list|cat_confidence_level_list|dummyDocumentsCategory|document_id_top|       topic_id_list|top_confidence_level_list|dummyDocumentsTopics|document_id_ent|      entity_id_list|ent_confidence_level_list|dummyDocumentsEntities|
+---------------+---------+------------+-------------------+------------------+---------------+----------------+-------------------------+----------------------+---------------+--------------------+-------------------------+--------------------+---------------+--------------------+-------------------------+----------------------+
|   

In [38]:
documents_df.printSchema() # documents_df 구조 확인

root
 |-- document_id_doc: integer (nullable = true)
 |-- source_id: integer (nullable = true)
 |-- publisher_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- dummyDocumentsMeta: integer (nullable = false)
 |-- document_id_cat: integer (nullable = true)
 |-- category_id_list: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- cat_confidence_level_list: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- dummyDocumentsCategory: integer (nullable = true)
 |-- document_id_top: integer (nullable = true)
 |-- topic_id_list: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- top_confidence_level_list: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- dummyDocumentsTopics: integer (nullable = true)
 |-- document_id_ent: integer (nullable = true)
 |-- entity_id_list: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ent_confidence_l

In [10]:
if evaluation:
    validation_set_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER+"validation_set.parquet") \
                    .alias('validation_set')
    
    validation_set_df.select('uuid_event').distinct().createOrReplaceTempView('users_to_profile')    
    validation_set_df.select('uuid_event','document_id_promo').distinct().createOrReplaceTempView('validation_users_docs_to_ignore')
    
else:
    events_schema = StructType(
                    [StructField("display_id", IntegerType(), True),
                    StructField("uuid_event", StringType(), True),                    
                    StructField("document_id_event", IntegerType(), True),
                    StructField("timestamp_event", IntegerType(), True),
                    StructField("platform_event", IntegerType(), True),
                    StructField("geo_location_event", StringType(), True)]
                    )

    events_df = spark.read.schema(events_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"events.csv") \
                    .withColumn('dummyEvents', F.lit(1)) \
                    .withColumn('day_event', truncate_day_from_timestamp_udf('timestamp_event')) \
                    .withColumn('event_country', extract_country_udf('geo_location_event')) \
                    .alias('events')

    events_df.createOrReplaceTempView('events')


    promoted_content_schema = StructType(
                        [StructField("ad_id", IntegerType(), True),
                        StructField("document_id_promo", IntegerType(), True),                    
                        StructField("campaign_id", IntegerType(), True),
                        StructField("advertiser_id", IntegerType(), True)]
                        )

    promoted_content_df = spark.read.schema(promoted_content_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"promoted_content.csv") \
                    .withColumn('dummyPromotedContent', F.lit(1)).alias('promoted_content')

    clicks_test_schema = StructType(
                        [StructField("display_id", IntegerType(), True),
                        StructField("ad_id", IntegerType(), True)]
                        )

    clicks_test_df = spark.read.schema(clicks_test_schema).options(header='true', inferschema='false', nullValue='\\N') \
                    .csv(DATA_BUCKET_FOLDER+"clicks_test.csv") \
                    .withColumn('dummyClicksTest', F.lit(1)).alias('clicks_test')
    
    test_set_df = clicks_test_df.join(promoted_content_df, on='ad_id', how='left') \
                                .join(events_df, on='display_id', how='left')
        
    test_set_df.select('uuid_event').distinct().createOrReplaceTempView('users_to_profile')
    test_set_df.select('uuid_event','document_id_promo', 'timestamp_event').distinct().createOrReplaceTempView('test_users_docs_timestamp_to_ignore')

In [11]:
# page_views 구조 생성
page_views_schema = StructType(
                    [StructField("uuid_pv", StringType(), True),
                    StructField("document_id_pv", IntegerType(), True),
                    StructField("timestamp_pv", IntegerType(), True),
                    StructField("platform_pv", IntegerType(), True),
                    StructField("geo_location_pv", StringType(), True),
                    StructField("traffic_source_pv", IntegerType(), True)]
                    )

page_views_df = spark.read.schema(page_views_schema).options(header='true', inferschema='false', nullValue='\\N') \
                .csv(DATA_BUCKET_FOLDER+"page_views.csv") \
                .alias('page_views')        
            
page_views_df.createOrReplaceTempView('page_views')

In [12]:
additional_filter = ''
if evaluation:
    additional_filter = '''
                             AND NOT EXISTS (SELECT uuid_event FROM validation_users_docs_to_ignore 
                                                      WHERE uuid_event = p.uuid_pv
                                                     AND document_id_promo = p.document_id_pv)
                        '''
else:
    additional_filter = '''
                             AND NOT EXISTS (SELECT uuid_event FROM test_users_docs_timestamp_to_ignore 
                                                      WHERE uuid_event = p.uuid_pv
                                                     AND document_id_promo = p.document_id_pv
                                                     AND p.timestamp_pv >= timestamp_event)
                        '''

page_views_train_df = spark.sql('''SELECT * FROM page_views p 
                                    WHERE EXISTS (SELECT uuid_event FROM users_to_profile
                                                 WHERE uuid_event = p.uuid_pv)                                     
                                '''+ additional_filter
                               ).alias('views') \
                         .join(documents_df, on=F.col("document_id_pv") == F.col("document_id_doc"), how='left') \
                         .filter('dummyDocumentsEntities is not null OR dummyDocumentsTopics is not null OR dummyDocumentsCategory is not null')

### Processing document frequencies

In [13]:
import pickle

In [17]:
documents_total = documents_meta_df.count()
documents_total

2999334

In [16]:
categories_docs_counts = documents_categories_df.groupBy('category_id').count().rdd.collectAsMap()
len(categories_docs_counts)

97

In [14]:
df_filenames_suffix = ''
if evaluation:
    df_filenames_suffix = '_eval'

In [18]:
with open('categories_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(categories_docs_counts, output)

In [19]:
topics_docs_counts = documents_topics_df.groupBy('topic_id').count().rdd.collectAsMap()
len(topics_docs_counts)

300

In [20]:
with open('topics_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(topics_docs_counts, output)

In [21]:
entities_docs_counts = documents_entities_df.groupBy('entity_id').count().rdd.collectAsMap()
len(entities_docs_counts)

1326009

### Processing User Profiles

In [23]:
int_null_to_minus_one_udf = F.udf(lambda x: x if x != None else -1, IntegerType())
int_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(IntegerType()))
float_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(FloatType()))
str_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(StringType()))

In [24]:
# 형 변환

page_views_by_user_df = page_views_train_df.select(
                           'uuid_pv', 
                           'document_id_pv', 
                           int_null_to_minus_one_udf('timestamp_pv').alias('timestamp_pv'), 
                           int_list_null_to_empty_list_udf('category_id_list').alias('category_id_list'), 
                           float_list_null_to_empty_list_udf('cat_confidence_level_list').alias('cat_confidence_level_list'), 
                           int_list_null_to_empty_list_udf('topic_id_list').alias('topic_id_list'), 
                           float_list_null_to_empty_list_udf('top_confidence_level_list').alias('top_confidence_level_list')) \
                    .groupBy('uuid_pv') \
                    .agg(F.collect_list('document_id_pv').alias('document_id_pv_list'),
                         F.collect_list('timestamp_pv').alias('timestamp_pv_list'),
                         F.collect_list('category_id_list').alias('category_id_lists'),
                         F.collect_list('cat_confidence_level_list').alias('cat_confidence_level_lists'),
                         F.collect_list('topic_id_list').alias('topic_id_lists'),
                         F.collect_list('top_confidence_level_list').alias('top_confidence_level_lists')
                        )

In [25]:
from collections import defaultdict

# get_user_aspects 함수 생성:
def get_user_aspects(docs_aspects, aspect_docs_counts):
    docs_aspects_merged_lists = defaultdict(list)
    
    for doc_aspects in docs_aspects: 
        for key in doc_aspects.keys(): 
            docs_aspects_merged_lists[key].append(doc_aspects[key])
        
    docs_aspects_stats = {}
    for key in docs_aspects_merged_lists.keys():
        aspect_list = docs_aspects_merged_lists[key]
        tf = len(aspect_list)
        idf = math.log(documents_total / float(aspect_docs_counts[key]))
        
        confid_mean = sum(aspect_list) / float(len(aspect_list))
        docs_aspects_stats[key] = [tf*idf, confid_mean]

        
    return docs_aspects_stats

# generate_user_profile 함수 생성:
def generate_user_profile(docs_aspects_list, docs_aspects_confidence_list, aspect_docs_counts):    
    docs_aspects = []
    for doc_aspects_list, doc_aspects_confidence_list in zip(docs_aspects_list, docs_aspects_confidence_list):
        doc_aspects = dict(zip(doc_aspects_list, doc_aspects_confidence_list))
        docs_aspects.append(doc_aspects)
        
    user_aspects = get_user_aspects(docs_aspects, aspect_docs_counts)
    return user_aspects

In [26]:
get_list_len_udf = F.udf(lambda docs_list: len(docs_list), IntegerType())

In [27]:
generate_categories_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            categories_docs_counts), 
                                          MapType(IntegerType(), 
                                                  ArrayType(FloatType()),
                                                  False))


generate_topics_user_profile_map_udf = F.udf(lambda docs_aspects_list, 
                                                 docs_aspects_confidence_list: \
                                                      generate_user_profile(docs_aspects_list, 
                                                                            docs_aspects_confidence_list, 
                                                                            topics_docs_counts), 
                                          MapType(IntegerType(), 
                                                  ArrayType(FloatType()),
                                                  False))

In [28]:
users_profile_df = page_views_by_user_df \
                                 .withColumn('views', get_list_len_udf('document_id_pv_list')) \
                                 .withColumn('categories', 
                                             generate_categories_user_profile_map_udf('category_id_lists', 
                                                                   'cat_confidence_level_lists')) \
                                 .withColumn('topics', 
                                             generate_topics_user_profile_map_udf('topic_id_lists', 
                                                                               'top_confidence_level_lists')) \
                                 .select(F.col('uuid_pv').alias('uuid'),
                                         F.col('document_id_pv_list').alias('doc_ids'),
                                         'views',
                                         'categories', 'topics')

In [29]:
if evaluation:
    table_name = 'user_profiles_eval'
else:
    table_name = 'user_profiles'

users_profile_df.write.parquet(OUTPUT_BUCKET_FOLDER+table_name, mode='overwrite')

In [30]:
finish_time = time.time()
print("Elapsed min: ", (finish_time-start_time)/60/60)

('Elapsed min: ', 0.3151605778270297)


---

In [32]:
# users_profile_df의 구조 확인
users_profile_df.show(10)

+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|          uuid|             doc_ids|views|          categories|              topics|            entities|
+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|100013af048bbf|[2447063, 2444477...|   46|Map(1205 -> Wrapp...|Map(5 -> WrappedA...|Map(14a7d4c4ebcc6...|
|100163b35102c4|[2516821, 2356657...|   12|Map(1907 -> Wrapp...|Map(174 -> Wrappe...|Map(6904a5638b5cf...|
|1003370a1c2d0f|[1521640, 2053639...|    4|Map(1808 -> Wrapp...|Map(69 -> Wrapped...|Map(531cadf46e145...|
|100659017f177b|            [429642]|    1|Map(1510 -> Wrapp...|Map(296 -> Wrappe...|               Map()|
|100aa12f880396|[1792136, 2504276...|    3|Map(1408 -> Wrapp...|Map(265 -> Wrappe...|Map(b366917165b76...|
|101324634e39b0|  [2672785, 2690250]|    2|Map(1210 -> Wrapp...|Map(20 -> Wrapped...|Map(14a7d4c4ebcc6...|
|101487b48a7780|[2713662, 428673,...|

In [41]:
users_profile_df.count()

4961756

In [None]:
users_profile_df.select('uuid').distinct().show()

# 유저별로 누적적(?)으로 page_view 정보가 저장되는가?