In [59]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    TimestampType,
    DecimalType,
    BooleanType,
    FloatType,
)

In [60]:
session = SparkSession.builder.appName("SocialEventsProject").getOrCreate()
spark = session.sparkContext

22/09/18 21:33:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/09/18 21:33:06 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [61]:
# ETL Parameters
## most recent event 2022, 3, 1
## we would aggregate data in 90-day timeframes

ETL_DATE = F.to_date(F.lit("2022-03-01"))
WINDOW_SIZE = 90
FITLER_STMT = f"where event_date > etl_date - INTERVAL {WINDOW_SIZE} DAYS"


def add_date_columns(df, event_timestamp):
    "This function creates etl_date and event_date columns to be used for filtering data accordingly"
    return df.withColumn("event_date", F.to_date(event_timestamp)).withColumn("etl_date", ETL_DATE)

# Events Table
**CSV Columns:** event,timestamp,user_id,platform,hashtag_count
* Schema assumptions have been described in comments based on meanings and values
* Bucket-numbers has been set to 45 based on 3M users creating avg of 300 events per year, max row 500 chars, max bucket size: 10GB

In [62]:
BUCKET_SIZE = 45
sch_event = StructType(
    [
        StructField("event", StringType(), nullable=False),
        StructField("timestamp", TimestampType(), nullable=False),   # timestamps are required
        StructField("user_id", StringType(), nullable=False),        # user_id is the merge-key and I asssume it is not Nullable.
                                                                     # In cases (like Google Analytics events) that user_id is null,
                                                                     # a stitching strategy should be implemented prior to data ingestion
        StructField("platform", StringType(), nullable=True),
        StructField("hashtag_count", DecimalType(), nullable=True),  # hashtag_count is stored as float, better to be defined as Integer
    ]
)
df_events = session.read.options(header=True).schema(sch_event).csv("data/events.csv")
df_events = add_date_columns(df_events, df_events.timestamp)
df_events.write.sortBy("user_id").bucketBy(BUCKET_SIZE, "user_id").format("parquet").mode("overwrite").option("path", "/tmp/tables/events.parquet").saveAsTable(
    "events"
)
!ls /tmp/tables/events.parquet | wc -l

[Stage 0:>                                                          (0 + 1) / 1]

46


                                                                                

# Payments Table
**CSV Columns:** amount_due,attempt_count,attempted,charge_id,closed,currency,user_id,date,description,discount_id,ending_balance,forgiven,id,line_ids,paid,receipt_number,received_at,starting_balance,subscription_id,subtotal,tax,tax_percent,total,webhooks_delivered_at
* Schema assumptions have been described in comments based on meanings and values
* Bucket-numbers has been set to 30 based on 3M users 

In [63]:
sch_payment = StructType(
    [
        StructField("amount_due", DecimalType(38, 18), nullable=False),  # no amount_due, no payment! non-Nullable
        StructField("attempt_count", IntegerType(), nullable=False),     # attemped is positive integer or 0, non-Nullable
        StructField("attempted", BooleanType(), nullable=False),         # boolean field, non-Nullable
        StructField("charge_id", StringType(), nullable=True),
        StructField("closed", BooleanType(), nullable=False),            # boolean field, non-Nullable
        StructField("currency", StringType(), nullable=False),           # currency cannot be null for a payment
        StructField("user_id", StringType(), nullable=False),            # user_id is the merge-key and I asssume it is not Nullable.
        StructField("date", TimestampType(), nullable=False),            # payment date is required
        StructField("description", StringType(), nullable=True),
        StructField("discount_id", StringType(), nullable=True),
        StructField("ending_balance", DecimalType(38, 18), nullable=True),
        StructField("forgiven", BooleanType(), nullable=False),          # boolean field, non-Nullable
        StructField("id", StringType(), nullable=False),                 # payment id is required
        StructField("line_ids", StringType(), nullable=True),
        StructField("paid", BooleanType(), nullable=False),              # payment is boolean, non-Nullable
        StructField("receipt_number", StringType(), nullable=True),
        StructField("received_at", TimestampType(), nullable=True),
        StructField("starting_balance", DecimalType(38, 18), nullable=True),
        StructField("subscription_id", StringType(), nullable=False),    # subscription_id is required for processing transactions
        StructField("subtotal", DecimalType(38, 18), nullable=True),
        StructField("tax", DecimalType(38, 18), nullable=True),
        StructField("tax_percent", FloatType(), nullable=True),
        StructField("total", DecimalType(38, 18), nullable=True),
        StructField("webhooks_delivered_at", StringType(), nullable=True),
    ]
)
df_payment = session.read.options(header=True).schema(sch_payment).csv("data/payments_data.csv")
df_payment = add_date_columns(df_payment, df_payment.date)
df_payment.write.sortBy("user_id").bucketBy(BUCKET_SIZE, "user_id").format("parquet").mode("overwrite").option(
    "path", "/tmp/tables/payments.parquet"
).saveAsTable("payments")
!ls /tmp/tables/payments.parquet | wc -l

[Stage 1:>                                                          (0 + 1) / 1]

46


                                                                                

# Posts Table
**CSV Columns:** id,active,delivered,media_id,media_item_id,posted_at,scheduled_at,social_profile_id,type,user_id
* Schema assumptions have been described in comments based on meanings and values

In [64]:
sch_post = StructType(
    [
        StructField("id", StringType(), nullable=False),                    # Post.id is non-Nullable
        StructField("active", BooleanType(), nullable=False),               # status field is non-Nullable, active/inactive
        StructField("delivered", BooleanType(), nullable=False),            # delivery status is non-Nullable, delivered/not
        StructField("media_id", StringType(), nullable=True),
        StructField("media_item_id", StringType(), nullable=True),
        StructField("posted_at", TimestampType(), nullable=False),          # posted_at is non-Nullable
        StructField("scheduled_at", TimestampType(), nullable=True),        # the post might be non-scheduled, so Nullable
        StructField("social_profile_id", StringType(), nullable=False),     # profile_id is non-Nullable
        StructField("type", StringType(), nullable=False),                  # post.type is Enum, non-Nullable
        StructField("user_id", StringType(), nullable=False),               # user_id is the merge-key and I asssume it is not Nullable.
    ]
)
df_post = session.read.options(header=True).schema(sch_post).csv("data/posts.csv")

# Here I am considering `min(scheduled_at, posted_at)` as the event_date for the `user activities` in the timeframe.
# IF only posted_at timestamp is sufficient, the `posted_at` should be used.
df_post = add_date_columns(df_post, F.least(df_post.posted_at, df_post.scheduled_at))
df_post.write.sortBy("user_id").bucketBy(BUCKET_SIZE, "user_id").partitionBy("type").format("parquet").mode("overwrite")\
             .option("path", "/tmp/tables/posts.parquet").saveAsTable("posts")
!ls /tmp/tables/posts.parquet

[Stage 2:>                                                          (0 + 1) / 1]

 _SUCCESS	      'type=InstagramStory'  'type=TikTokPost'
'type=FacebookPost'   'type=LinkedinPost'    'type=TwitterPost'
'type=InstagramPost'  'type=PinterestPost'


                                                                                

In [65]:
# I assume that each user performs his/her transactions only in one currency
payment_query = f"""
select user_id,
       cast(sum(total) as decimal(8, 2)) as total_revenue,
       first_value(currency) as currency
from payments
{FITLER_STMT}
group by user_id
;
"""
payment_report = session.sql(payment_query)
payment_report.show(10)

+--------------------+-------------+--------+
|             user_id|total_revenue|currency|
+--------------------+-------------+--------+
|-1061970574945339405|        38.00|     usd|
| -107917820828177159|        19.00|     usd|
|-1320943626619822068|        30.00|     usd|
|-1664641004571275844|         9.00|     usd|
|-1675915774625901833|        19.00|     usd|
|-2011115130227688920|        38.00|     usd|
|-2310678391727383421|        90.00|     usd|
|-2342875281951203170|        25.36|     usd|
|-2377089662735596849|        30.00|     usd|
|-2651059597082438967|        90.00|     usd|
+--------------------+-------------+--------+
only showing top 10 rows



In [66]:
# only average time for scheduled posts is calculated.
# Linkedin total was not requested in the problem statement
post_query = f"""
select user_id,
       count(id) as total_posts,
       avg(cast(posted_at as long) - cast(scheduled_at as long)) as average_schedule_seconds,
       sum(if(type='TwitterPost', 1, 0)) as total_twitter_post,
       sum(if(type='FacebookPost', 1, 0)) as total_facebook_post,
       sum(if(type='PinterestPost', 1, 0)) as total_pinterest_post,
       sum(if(type='InstagramPost', 1, 0)) as total_instagram_post,
       sum(if(type='InstagramStory', 1, 0)) as total_instagram_story,
       sum(if(type='LinkedinPost', 1, 0)) as total_linkedin_post,
       sum(if(type='TikTokPost', 1, 0)) as total_tiktok_post
from posts
{FITLER_STMT}
group by user_id
order by user_id
;
"""
post_report = session.sql(post_query)
post_report.show(10)



+--------------------+-----------+------------------------+------------------+-------------------+--------------------+--------------------+---------------------+-------------------+-----------------+
|             user_id|total_posts|average_schedule_seconds|total_twitter_post|total_facebook_post|total_pinterest_post|total_instagram_post|total_instagram_story|total_linkedin_post|total_tiktok_post|
+--------------------+-----------+------------------------+------------------+-------------------+--------------------+--------------------+---------------------+-------------------+-----------------+
|-1005761344070730147|          3|       393633.6666666667|                 0|                  1|                   0|                   2|                    0|                  0|                0|
|-1011253794481586943|          3|                622847.0|                 0|                  0|                   0|                   3|                    0|                  0|              

                                                                                

In [67]:
event_query = f"""
select user_id,
       sum(if(event='suggested_hashtags_inserted', hashtag_count, 0)) as suggested_hashtags_count,
       sum(if(event='scheduled_at_best_time', 1, 0)) as suggested_time_count
from events
{FITLER_STMT}
group by user_id
;
"""
event_report = session.sql(event_query)
event_report.show(10)

+--------------------+------------------------+--------------------+
|             user_id|suggested_hashtags_count|suggested_time_count|
+--------------------+------------------------+--------------------+
|-1061970574945339405|                       5|                   3|
| -107917820828177159|                       0|                   2|
|-1116514337704231938|                       7|                   3|
|-1205038767369361839|                       0|                   2|
|-1232408021217212784|                      13|                   1|
|-1320943626619822068|                       3|                   1|
|-1660451121168657102|                       0|                   1|
|-1664641004571275844|                       0|                   1|
|-1675915774625901833|                       6|                   1|
|-2011115130227688920|                       0|                   1|
+--------------------+------------------------+--------------------+
only showing top 10 rows



# Export report

In [68]:
user_report = payment_report.join(post_report, ["user_id"], "outer").join(event_report, ["user_id"], "outer")
user_report.coalesce(1).write.option("header", True).mode("overwrite").csv("data/user_report.csv")

                                                                                

In [69]:
user_report.show(10, truncate=False)



+--------------------+-------------+--------+-----------+------------------------+------------------+-------------------+--------------------+--------------------+---------------------+-------------------+-----------------+------------------------+--------------------+
|user_id             |total_revenue|currency|total_posts|average_schedule_seconds|total_twitter_post|total_facebook_post|total_pinterest_post|total_instagram_post|total_instagram_story|total_linkedin_post|total_tiktok_post|suggested_hashtags_count|suggested_time_count|
+--------------------+-------------+--------+-----------+------------------------+------------------+-------------------+--------------------+--------------------+---------------------+-------------------+-----------------+------------------------+--------------------+
|-1061970574945339405|38.00        |usd     |3          |275412.5                |0                 |1                  |0                   |2                   |0                    |0    

                                                                                

In [70]:
spark.stop()