In [1]:
import os
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python"

In [2]:
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, TimestampType, FloatType, ArrayType, MapType
import pyspark.sql.functions as F
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
import math
import time

In [3]:
conf = SparkConf()
conf = conf.setMaster("yarn")
conf = conf.set("spark.app.name", "recommend-ctr")
conf = conf.set("spark.executor.memory", "5g")
conf = conf.set("spark.driver.memory", "8g")
conf = conf.set("spark.driver.maxResultSize", "3g")
conf = conf.set("spark.executor.instances", "100")
conf = conf.set("spark.default.parallelism", "200")

In [4]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [5]:
OUTPUT_BUCKET_FOLDER = "hdfs:/user/lzhao/data/outbrain/preprocessed/"
DATA_BUCKET_FOLDER = "hdfs:/user/lzhao/data/outbrain/"
SPARK_TEMP_FOLDER = "hdfs:/user/lzhao/data/outbrain/spark-temp/"

In [6]:
truncate_day_from_timestamp_udf = F.udf(lambda ts: int(ts / 1000 / 60 / 60 / 24), IntegerType())

In [7]:
extract_country_udf = F.udf(lambda geo: geo.strip()[:2] if geo != None else '', StringType())

In [8]:
documents_meta_schema = StructType(
                    [StructField("document_id_doc", IntegerType(), True),
                    StructField("source_id", IntegerType(), True),                    
                    StructField("publisher_id", IntegerType(), True),
                    StructField("publish_time", TimestampType(), True)]
                    )

In [9]:
documents_meta_df = spark.read.schema(documents_meta_schema) \
  .options(header='true', inferschema='false', nullValue='\\N') \
  .csv(DATA_BUCKET_FOLDER+"documents_meta.csv") \
  .withColumn('dummyDocumentsMeta', F.lit(1)).alias('documents_meta')

In [10]:
documents_meta_df.count()

2999334

In [11]:
print('Drop rows with empty "source_id"...')
documents_meta_df = documents_meta_df.dropna(subset="source_id")
documents_meta_df.count()

Drop rows with empty "source_id"...


2996816

In [12]:
source_publishers_df = documents_meta_df.select(["source_id", "publisher_id"]).dropDuplicates()
source_publishers_df.count()

14394

In [13]:
print('Get list of source_ids without publisher_id...')
rows_no_pub = source_publishers_df.filter("publisher_id is NULL")
source_ids_without_publisher = [row['source_id'] for row in rows_no_pub.collect()]
len(source_ids_without_publisher)

Get list of source_ids without publisher_id...


5058

In [14]:
print('Maximum value of publisher_id used so far...')
max_pub = max(source_publishers_df.select(["publisher_id"]).dropna().collect())['publisher_id']
max_pub

Maximum value of publisher_id used so far...


1263

In [15]:
print('Rows filled with new publisher_ids')
new_publishers = [(source, max_pub + 1 + nr) for nr, source in enumerate(source_ids_without_publisher)]
new_publishers_df = spark.createDataFrame(new_publishers, ("source_id", "publisher_id"))
new_publishers_df.take(10)

Rows filled with new publisher_ids


[Row(source_id=5803, publisher_id=1264),
 Row(source_id=7754, publisher_id=1265),
 Row(source_id=7833, publisher_id=1266),
 Row(source_id=8086, publisher_id=1267),
 Row(source_id=3918, publisher_id=1268),
 Row(source_id=1088, publisher_id=1269),
 Row(source_id=13285, publisher_id=1270),
 Row(source_id=13289, publisher_id=1271),
 Row(source_id=13623, publisher_id=1272),
 Row(source_id=13832, publisher_id=1273)]

In [16]:
fixed_source_publishers_df = source_publishers_df.dropna().union(new_publishers_df)
fixed_source_publishers_df.collect()[-10:]

[Row(source_id=10867, publisher_id=6312),
 Row(source_id=11292, publisher_id=6313),
 Row(source_id=11817, publisher_id=6314),
 Row(source_id=12141, publisher_id=6315),
 Row(source_id=12160, publisher_id=6316),
 Row(source_id=12285, publisher_id=6317),
 Row(source_id=12444, publisher_id=6318),
 Row(source_id=12622, publisher_id=6319),
 Row(source_id=12916, publisher_id=6320),
 Row(source_id=12991, publisher_id=6321)]

In [17]:
print('Update documents_meta with bew publishers...')
documents_meta_df = documents_meta_df.drop('publisher_id').join(fixed_source_publishers_df, on='source_id')
documents_meta_df.count()

Update documents_meta with bew publishers...


2996816

In [18]:
documents_meta_df.show(2)

+---------+---------------+-------------------+------------------+------------+
|source_id|document_id_doc|       publish_time|dummyDocumentsMeta|publisher_id|
+---------+---------------+-------------------+------------------+------------+
|       26|        2287601|2015-10-15 20:00:00|                 1|        1720|
|       29|        1756309|2016-06-13 00:00:00|                 1|        1161|
+---------+---------------+-------------------+------------------+------------+
only showing top 2 rows



In [19]:
documents_categories_schema = StructType(
                    [StructField("document_id_cat", IntegerType(), True),
                    StructField("category_id", IntegerType(), True),                    
                    StructField("confidence_level_cat", FloatType(), True)]
                    )

In [20]:
documents_categories_df = spark.read.schema(documents_categories_schema) \
  .options(header='true', inferschema='false', nullValue='\\N') \
  .csv(DATA_BUCKET_FOLDER+"documents_categories.csv") \
  .alias('documents_categories')

In [21]:
documents_categories_grouped_df = documents_categories_df.groupBy('document_id_cat') \
  .agg(F.collect_list('category_id').alias('category_id_list'),
    F.collect_list('confidence_level_cat').alias('cat_confidence_level_list')) \
  .withColumn('dummyDocumentsCategory', F.lit(1)) \
  .alias('documents_categories_grouped')  

In [22]:
documents_categories_grouped_df.take(2)

[Row(document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1),
 Row(document_id_cat=463, category_id_list=[1513, 1808], cat_confidence_level_list=[0.8932095170021057, 0.06796159595251083], dummyDocumentsCategory=1)]

In [23]:
documents_topics_schema = StructType(
                    [StructField("document_id_top", IntegerType(), True),
                    StructField("topic_id", IntegerType(), True),                    
                    StructField("confidence_level_top", FloatType(), True)]
                    )

In [24]:
documents_topics_df = spark.read.schema(documents_topics_schema) \
  .options(header='true', inferschema='false', nullValue='\\N') \
  .csv(DATA_BUCKET_FOLDER+"documents_topics.csv") \
  .alias('documents_topics')  

In [25]:
documents_topics_grouped_df = documents_topics_df.groupBy('document_id_top') \
  .agg(F.collect_list('topic_id').alias('topic_id_list'),
    F.collect_list('confidence_level_top').alias('top_confidence_level_list')) \
  .withColumn('dummyDocumentsTopics', F.lit(1)) \
  .alias('documents_topics_grouped') 

In [26]:
documents_topics_grouped_df.take(2)

[Row(document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1),
 Row(document_id_top=463, topic_id_list=[181, 292, 24, 254, 167], top_confidence_level_list=[0.1187012791633606, 0.05149438977241516, 0.04749272018671036, 0.021316789090633392, 0.008210956119000912], dummyDocumentsTopics=1)]

In [27]:
documents_entities_schema = StructType(
                    [StructField("document_id_ent", IntegerType(), True),
                    StructField("entity_id", StringType(), True),                    
                    StructField("confidence_level_ent", FloatType(), True)]
                    )

In [28]:
documents_entities_df = spark.read.schema(documents_entities_schema) \
  .options(header='true', inferschema='false', nullValue='\\N') \
  .csv(DATA_BUCKET_FOLDER+"documents_entities.csv") \
  .alias('documents_entities')

In [29]:
documents_entities_grouped_df = documents_entities_df.groupBy('document_id_ent') \
  .agg(F.collect_list('entity_id').alias('entity_id_list'),
    F.collect_list('confidence_level_ent').alias('ent_confidence_level_list')) \
  .withColumn('dummyDocumentsEntities', F.lit(1)) \
  .alias('documents_entities_grouped')

In [30]:
documents_df = documents_meta_df.join(
    documents_categories_grouped_df, 
    on=F.col("document_id_doc") == F.col("documents_categories_grouped.document_id_cat"), 
    how='left') \
  .join(documents_topics_grouped_df, 
    on=F.col("document_id_doc") == F.col("documents_topics_grouped.document_id_top"), 
    how='left') \
  .join(documents_entities_grouped_df, 
    on=F.col("document_id_doc") == F.col("documents_entities_grouped.document_id_ent"), 
    how='left') \
  .cache()

In [31]:
documents_df.take(1)

[Row(source_id=1787, document_id_doc=148, publish_time=datetime.datetime(2008, 6, 30, 16, 0), dummyDocumentsMeta=1, publisher_id=118, document_id_cat=148, category_id_list=[1403, 1702], cat_confidence_level_list=[0.9200000166893005, 0.07000000029802322], dummyDocumentsCategory=1, document_id_top=148, topic_id_list=[153, 140, 8, 172, 244, 179, 36, 2, 64, 10, 216], top_confidence_level_list=[0.0752369686961174, 0.0719832107424736, 0.06108427047729492, 0.042060330510139465, 0.03971264883875847, 0.03684856742620468, 0.030745454132556915, 0.026049140840768814, 0.01605464331805706, 0.010918059386312962, 0.008370612747967243], dummyDocumentsTopics=1, document_id_ent=148, entity_id_list=['e1c74838563ef5d205063b0d95afa414', '6fd68f102042c6554cb2592fae942264', 'ae3de5466bfa10459eebcbe02ac9b3ee', '9da9595caa381755c9353ae7179f2117', 'b973c2e55831fb4025003e0259aa820f', '6eb92e281e46d463ce80317efd785d68', 'c323569535ca4c3d2ce474f4d825cc80', 'daf2f4c9cd8dbf10482f06200e613939'], ent_confidence_level_l

In [32]:
evaluation = False

In [33]:
if evaluation:
    validation_set_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER+"validation_set.parquet") \
    .alias('validation_set')        
  
    validation_set_df.select('uuid_event').distinct().createOrReplaceTempView('users_to_profile') 
    validation_set_df.select('uuid_event','document_id_promo').distinct() \
    .createOrReplaceTempView('validation_users_docs_to_ignore')
else:
    events_schema = StructType(
                  [StructField("display_id", IntegerType(), True),
                  StructField("uuid_event", StringType(), True),                    
                  StructField("document_id_event", IntegerType(), True),
                  StructField("timestamp_event", IntegerType(), True),
                  StructField("platform_event", IntegerType(), True),
                  StructField("geo_location_event", StringType(), True)]
                  )

    events_df = spark.read.schema(events_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER+"events.csv") \
    .withColumn('dummyEvents', F.lit(1)) \
    .withColumn('day_event', truncate_day_from_timestamp_udf('timestamp_event')) \
    .withColumn('event_country', extract_country_udf('geo_location_event')) \
    .alias('events')

  # Drop rows with empty "geo_location"
    events_df = events_df.dropna(subset="geo_location_event")
  # Drop rows with empty "platform"
    events_df = events_df.dropna(subset="platform_event")

    events_df.createOrReplaceTempView('events')


    promoted_content_schema = StructType(
                      [StructField("ad_id", IntegerType(), True),
                      StructField("document_id_promo", IntegerType(), True),
                      StructField("campaign_id", IntegerType(), True),
                      StructField("advertiser_id", IntegerType(), True)]
                      )

    promoted_content_df = spark.read.schema(promoted_content_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER+"promoted_content.csv") \
    .withColumn('dummyPromotedContent', F.lit(1)).alias('promoted_content')

    clicks_test_schema = StructType(
                      [StructField("display_id", IntegerType(), True),
                      StructField("ad_id", IntegerType(), True)]
                      )

    clicks_test_df = spark.read.schema(clicks_test_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER+"clicks_test.csv") \
    .withColumn('dummyClicksTest', F.lit(1)).alias('clicks_test')
  
    test_set_df = clicks_test_df.join(promoted_content_df, on='ad_id', how='left') \
    .join(events_df, on='display_id', how='left')
    test_set_df.cache()
      
    test_set_df.select('uuid_event').distinct().createOrReplaceTempView('users_to_profile')
    test_set_df.select('uuid_event','document_id_promo', 'timestamp_event').distinct() \
    .createOrReplaceTempView('test_users_docs_timestamp_to_ignore')

In [34]:
test_set_df.show(2)

+----------+------+---------------+-----------------+-----------+-------------+--------------------+--------------+-----------------+---------------+--------------+------------------+-----------+---------+-------------+
|display_id| ad_id|dummyClicksTest|document_id_promo|campaign_id|advertiser_id|dummyPromotedContent|    uuid_event|document_id_event|timestamp_event|platform_event|geo_location_event|dummyEvents|day_event|event_country|
+----------+------+---------------+-----------------+-----------+-------------+--------------------+--------------+-----------------+---------------+--------------+------------------+-----------+---------+-------------+
|  16874807|192759|              1|          1469601|      22742|         1975|                   1|a296494aa7a041|           399863|          87414|             2|             AU>02|          1|        0|           AU|
|  16874807|137006|              1|           916403|      17587|          859|                   1|a296494aa7a041|     

In [35]:
page_views_schema = StructType(
                    [StructField("uuid_pv", StringType(), True),
                    StructField("document_id_pv", IntegerType(), True),
                    StructField("timestamp_pv", IntegerType(), True),
                    StructField("platform_pv", IntegerType(), True),
                    StructField("geo_location_pv", StringType(), True),
                    StructField("traffic_source_pv", IntegerType(), True)]
                    )

In [36]:
page_views_df = spark.read.schema(page_views_schema) \
  .options(header='true', inferschema='false', nullValue='\\N') \
  .csv(DATA_BUCKET_FOLDER+"page_views.csv") \
  .alias('page_views') 

In [37]:
page_views_df.createOrReplaceTempView('page_views')      

In [38]:
additional_filter = ''

if evaluation:
    additional_filter = '''
    AND NOT EXISTS (SELECT uuid_event FROM validation_users_docs_to_ignore 
      WHERE uuid_event = p.uuid_pv
      AND document_id_promo = p.document_id_pv)
    '''
else:
    additional_filter = '''
    AND NOT EXISTS (SELECT uuid_event FROM test_users_docs_timestamp_to_ignore 
      WHERE uuid_event = p.uuid_pv
      AND document_id_promo = p.document_id_pv
      AND p.timestamp_pv >= timestamp_event)
    '''

In [39]:
page_views_train_df = spark.sql('''
  SELECT * FROM page_views p 
    WHERE EXISTS (SELECT uuid_event FROM users_to_profile
    WHERE uuid_event = p.uuid_pv)                                     
  ''' + additional_filter).alias('views') \
  .join(documents_df, on=F.col("document_id_pv") == F.col("document_id_doc"), how='left') \
  .filter('dummyDocumentsEntities is not null OR dummyDocumentsTopics is not null OR dummyDocumentsCategory is not null')

In [40]:
import pickle

In [41]:
documents_total = documents_meta_df.count()
documents_total

2996816

In [42]:
categories_docs_counts = documents_categories_df.groupBy('category_id').count().rdd.collectAsMap()
len(categories_docs_counts)

97

In [43]:
df_filenames_suffix = ''
if evaluation:
    df_filenames_suffix = '_eval'

In [44]:
OUTPUT_BUCKET_FOLDER = '../data/outbrain/preprocessed'

In [45]:
with open(OUTPUT_BUCKET_FOLDER+'categories_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(categories_docs_counts, output)

In [46]:
topics_docs_counts = documents_topics_df.groupBy('topic_id').count().rdd.collectAsMap()
len(topics_docs_counts)

300

In [47]:
with open(OUTPUT_BUCKET_FOLDER+'topics_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(topics_docs_counts, output)

In [48]:
entities_docs_counts = documents_entities_df.groupBy('entity_id').count().rdd.collectAsMap()
len(entities_docs_counts)

1326009

In [49]:
with open(OUTPUT_BUCKET_FOLDER+'entities_docs_counts'+df_filenames_suffix+'.pickle', 'wb') as output:
    pickle.dump(entities_docs_counts, output)

In [50]:
int_null_to_minus_one_udf = F.udf(lambda x: x if x != None else -1, IntegerType())
int_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(IntegerType()))
float_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(FloatType()))
str_list_null_to_empty_list_udf = F.udf(lambda x: x if x != None else [], ArrayType(StringType()))

In [51]:
page_views_by_user_df = page_views_train_df \
  .select(
    'uuid_pv', 
    'document_id_pv', 
    int_null_to_minus_one_udf('timestamp_pv').alias('timestamp_pv'), 
    int_list_null_to_empty_list_udf('category_id_list').alias('category_id_list'), 
    float_list_null_to_empty_list_udf('cat_confidence_level_list').alias('cat_confidence_level_list'), 
    int_list_null_to_empty_list_udf('topic_id_list').alias('topic_id_list'), 
    float_list_null_to_empty_list_udf('top_confidence_level_list').alias('top_confidence_level_list'), 
    str_list_null_to_empty_list_udf('entity_id_list').alias('entity_id_list'), 
    float_list_null_to_empty_list_udf('ent_confidence_level_list').alias('ent_confidence_level_list')) \
  .groupBy('uuid_pv') \
  .agg(F.collect_list('document_id_pv').alias('document_id_pv_list'),
    F.collect_list('timestamp_pv').alias('timestamp_pv_list'),
    F.collect_list('category_id_list').alias('category_id_lists'),
    F.collect_list('cat_confidence_level_list').alias('cat_confidence_level_lists'),
    F.collect_list('topic_id_list').alias('topic_id_lists'),
    F.collect_list('top_confidence_level_list').alias('top_confidence_level_lists'),
    F.collect_list('entity_id_list').alias('entity_id_lists'),
    F.collect_list('ent_confidence_level_list').alias('ent_confidence_level_lists'))

In [52]:
page_views_by_user_df.cache()

DataFrame[uuid_pv: string, document_id_pv_list: array<int>, timestamp_pv_list: array<int>, category_id_lists: array<array<int>>, cat_confidence_level_lists: array<array<float>>, topic_id_lists: array<array<int>>, top_confidence_level_lists: array<array<float>>, entity_id_lists: array<array<string>>, ent_confidence_level_lists: array<array<float>>]

In [55]:
page_views_by_user_df.unpersist()

DataFrame[uuid_pv: string, document_id_pv_list: array<int>, timestamp_pv_list: array<int>, category_id_lists: array<array<int>>, cat_confidence_level_lists: array<array<float>>, topic_id_lists: array<array<int>>, top_confidence_level_lists: array<array<float>>, entity_id_lists: array<array<string>>, ent_confidence_level_lists: array<array<float>>]

In [56]:
from collections import defaultdict

In [57]:
def get_user_aspects(docs_aspects, aspect_docs_counts):
    docs_aspects_merged_lists = defaultdict(list)
  
    for doc_aspects in docs_aspects: #循环每篇文章
        for key in doc_aspects.keys(): #遍历每篇文章的主题/类别/实体
            docs_aspects_merged_lists[key].append(doc_aspects[key]) # 相同类目聚合
      
    docs_aspects_stats = {}
    for key in docs_aspects_merged_lists.keys():
        aspect_list = docs_aspects_merged_lists[key] 
        tf = len(aspect_list)
        idf = math.log(documents_total / float(aspect_docs_counts[key]))
    
        confid_mean = sum(aspect_list) / float(len(aspect_list))
        docs_aspects_stats[key] = [tf*idf, confid_mean]
      
    return docs_aspects_stats

In [58]:
def generate_user_profile(docs_aspects_list, docs_aspects_confidence_list, aspect_docs_counts):
    docs_aspects = []
    for doc_aspects_list, doc_aspects_confidence_list in zip(docs_aspects_list, docs_aspects_confidence_list):
        doc_aspects = dict(zip(doc_aspects_list, doc_aspects_confidence_list))
        docs_aspects.append(doc_aspects)
      
    user_aspects = get_user_aspects(docs_aspects, aspect_docs_counts)
    return user_aspects

In [59]:
get_list_len_udf = F.udf(lambda docs_list: len(docs_list), IntegerType())

In [60]:
generate_categories_user_profile_map_udf = F.udf(
  lambda docs_aspects_list, docs_aspects_confidence_list: \
    generate_user_profile(docs_aspects_list, 
    docs_aspects_confidence_list, 
    categories_docs_counts), 
  MapType(IntegerType(), ArrayType(FloatType()), False))

In [61]:
generate_topics_user_profile_map_udf = F.udf(
  lambda docs_aspects_list, docs_aspects_confidence_list: \
    generate_user_profile(docs_aspects_list, 
    docs_aspects_confidence_list, 
    topics_docs_counts), 
  MapType(IntegerType(), ArrayType(FloatType()), False))

In [62]:
generate_entities_user_profile_map_udf = F.udf(
  lambda docs_aspects_list, docs_aspects_confidence_list: \
    generate_user_profile(docs_aspects_list, 
    docs_aspects_confidence_list, 
    entities_docs_counts), 
  MapType(StringType(), ArrayType(FloatType()), False))

In [63]:
users_profile_df = page_views_by_user_df \
  .withColumn('views', get_list_len_udf('document_id_pv_list')) \
  .withColumn('categories', generate_categories_user_profile_map_udf('category_id_lists', 
    'cat_confidence_level_lists')) \
  .withColumn('topics', generate_topics_user_profile_map_udf('topic_id_lists', 
    'top_confidence_level_lists')) \
  .withColumn('entities', generate_entities_user_profile_map_udf('entity_id_lists', 
    'ent_confidence_level_lists')) \
  .select(
    F.col('uuid_pv').alias('uuid'), 
    F.col('document_id_pv_list').alias('doc_ids'),
    'views', 'categories', 'topics', 'entities')

In [85]:
users_profile_df.cache()

DataFrame[uuid: string, doc_ids: array<int>, views: int, categories: map<int,array<float>>, topics: map<int,array<float>>, entities: map<string,array<float>>]

In [64]:
users_profile_df.show(5)

+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|          uuid|             doc_ids|views|          categories|              topics|            entities|
+--------------+--------------------+-----+--------------------+--------------------+--------------------+
|1000615e760786|[2959725, 2730005...|    3|[1914 -> [4.85030...|[77 -> [6.0778503...|[753fa42329661c4e...|
|10042103b7ff2b|[1415882, 1415309...|    5|[1808 -> [2.51650...|[97 -> [3.4147518...|[023f51d65c5bdb42...|
|1007b9cd87526d|[1493650, 1493650...|    3|[1408 -> [8.8686,...|[160 -> [9.604641...|                  []|
|100bc3d05f3126|[2668170, 2729482...|    5|[1408 -> [2.9562,...|[65 -> [4.2764845...|[b165150dc5cfdf67...|
|10135333f64db3|           [2893318]|    1|[1708 -> [3.04310...|[14 -> [7.055967,...|[97d3c39f93fd28b3...|
+--------------+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [65]:
if evaluation:
    table_name = 'user_profiles_eval'
else:
    table_name = 'user_profiles'

In [66]:
table_name

'user_profiles'

In [67]:
users_profile_df.write.parquet('hdfs:/user/lzhao/data/outbrain/preprocessed/'+table_name, mode='overwrite')

In [68]:
spark.stop()