### Take home assessment for Spotter's Sr. Data Engineer role

At Spotter we work with a lot of YouTube data, we have prepared a small sample dataset for you to use during this assignment.
While this is a tiny dataset provided as a sample for you to work with, your submission needs to keep the scaleability in mind. Our datasets have many millions of records, sometimes billions of records so your solution should work regardless of the scale of the input in a performant manner.

The first cell includes the paths to the datasets that are stored in parquet and some helper functions that you would use to get video level view count for a batch of videos in a single call.
Note that we will not use the live YouTube APIs in this excersise and instead will assign a number per video_id to represent the views as of the time of the call (lifetime views)

There are three problems, please complete as many as you are able.
* Calculate monthly views from daily data: Using the daily level data in the sample dataset, count monthly views per video by aggregating the daily `viewCount`
* In some cases (for varying reasons) we have gaps in the data. Using the sample dataset find and present the date range gaps per video. The dataset has daily level records for selected videos from the date of video upload (created_date) till 2023-04-18.
* With some YouTube APIs we are able to send a batch of video_ids to get metrics such as lifetime views. For this problem, use the first two columns (`channel_id` and `video_id`) from the sample dataset and add two more columns (`date` with the value of the run date and `viewCount` that you can get from the provided helper function) 

Note: The default notebook language is set to Python but you can change and complete it using Spark's Scala APIs if you are more comfortable with it. We are not looking for SQL based solutions.

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType

path_video_samples = "s3://dataeng-interview-datasets/videos_sample_m" # parquet dataset


## Helper code for Problem 3 ##

# schema for the return type of process_batch_of_videos
views_schema = ArrayType(
    StructType([
        StructField("video_id", StringType(), False),
        StructField("views", IntegerType(), False)
    ])
)

# YouTube API response simulator on a single video
def calculate_views(video_id):
    # for this excersise we'll just generate a number instead of using the API
    return int(abs(hash(video_id) % 1000))

# UDF helper function that takes a list of video_ids (as a list) and returns a list of tuples
# :param video_ids: a list of up to 10 video_id values
# :return a list of tuples (video_id, viewCount). Same size as the input
@udf(returnType=views_schema)
def process_batch_of_videos(video_ids):
    if len(video_ids) > 10:
        raise Exception(f"Too many ({len(video_ids)}) videos in the list. Expected up to 10")
    return [(vid, calculate_views(vid)) for vid in video_ids]


### Load the data
First, let's create a dataframe with the sample data and display it in the notebook

In [0]:
# initialize a Spark dataframe
df_samples = spark.read.parquet(path_video_samples)
display(df_samples) # display it


#### Let's set some configs used to improve performance
##### DBR 15.4 - This is the runtime used with this notebook. 
**AQE** - Adaptive Query Execution dynamically optimizes query plans based on runtime statistics, which helps improve performance by adjusting things like join strategies, reducing the number of partitions, and handling skewed data more effectively.

**IO Caching** - Enable Databricks I/O cache for faster data retrieval by caching input data on local SSDs

**maxPartitionBytes** - Set the maximum size of files to read per partition, optimizing parallelism and memory usage

**CBO** - Enable Cost-Based Optimization (CBO) to improve query performance by using table statistics for query planning

**autoBroadcastJoinThreshold** - Set the size threshold for automatically broadcasting tables in joins to avoid shuffling

**shuffle.partitions** - Set the default number of partitions for shuffle operations, impacting the parallelism of jobs.  Note that AQE will dynamically adjust this value at runtime but a good initial value is recommended. 

*These are just suggestions of things that can be used to improve performance.   Note that some of them take contol away from the developer.   A good understanding of each config and it's behavior on your job is recommended.*

In [0]:
# Enable Adaptive Query Execution (AQE) to dynamically optimize query plans at runtime
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Enable skew join optimization, which splits skewed partitions during joins for better load balancing
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Enable dynamic coalescing of shuffle partitions, which merges small partitions into larger ones for better parallelism and reduced task overhead
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")


In [0]:

# Enable Databricks I/O cache to speed up repeated file reads from S3 by caching frequently accessed data in memory and local SSDs.  
# This reduces the need to re-read data from S3, which improves performance for iterative queries and workloads.
spark.conf.set("spark.databricks.io.cache.enabled", "true")

In [0]:
# Set maxPartitionBytes to 128MB = 128 * 1024 * 1024
spark.conf.set('spark.sql.files.maxPartitionBytes', "134217728b")   # Note this is the default

# Used to control the maximum size of data partitions when Spark reads data from a file source (such as Parquet, ORC, JSON or CSV). It helps determine the size of splits, spark will use when reading the input files.  This ensures that the data is divided into manageable partition sizes for efficient parallel processing.


In [0]:
# Enable Cost-Based Optimization (CBO)
spark.conf.set("spark.sql.cbo.enabled", "true")

# Enable join reordering based on collected statistics
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")

# Enable creation of histograms to improve CBO decision-making for skewed data
spark.conf.set("spark.sql.statistics.histogram.enabled", "true")


In [0]:
# Set the broadcast join threshold as needed based on the size of the "smaller" dataset.  Adjust the cluster size to have more executor memory to handle the broadcast.  
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")  # Default is 10 MB


In [0]:
# Set an appropriate value for 
# spark.sql.shuffle.partitions.  
# This value defaults to 200.  
# The formula below is a good place to start. 
# 128 MB - 1 GB is the recommended target size range. 

# optimal_partitions = largest_total_shuffle_size / target_partition_size

# optimal_partitions = largest_total_shuffle_size MB / 128 MB

#spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.get("spark.sql.shuffle.partitions")


'200'

#### Problem 1
Using the sample dataset, create a new dataframe with the following columns: `channel_id`, `video_id`, `month` and `aggViewCount` where:
* `channel_id` and `video_id` values are from the sample dataset
* `month` has a format of YYYY-MM to represent the month for which the aggregated view count was computed
* `aggViewCount` is the sum of views for a given `channel_id` and `video_id` pair over all days of the month

In [0]:
# Problem 1:

from pyspark.sql.functions import date_format, sum, col
from pyspark import StorageLevel

# Load only the necessary columns from the columnar source
df_samples = spark.read.parquet(path_video_samples).select("channel_id", "video_id", "date", "viewCount")

# Add a column 'month' in the format YYYY-MM based on the 'date' column
df_samples_with_month = df_samples.withColumn("month", date_format(col("date"), "yyyy-MM"))

# Persist after adding the month column since it will be used twice in the aggregation
# Use memory and disk in production to give the job a better chance to succeed
# If you see disk used, add more memory to your cluster
df_samples_with_month.persist(StorageLevel.MEMORY_AND_DISK)

# display(df_samples_with_month)

# Perform the aggregation
df_monthly_views = df_samples_with_month.groupBy("channel_id", "video_id", "month") \
    .agg(sum("viewCount").alias("aggViewCount"))

# Display the final aggregated result
display(df_monthly_views)

# Unpersist cached DataFrame since no longer needed
df_samples_with_month.unpersist()




#### Problem 2
If all goes well with our data pulls we anticipate to have full daily level data for all videos starting from the upload date. However, sometimes we get no response from YouTube APIs (it is a result of some backend issue at their side). We can deal with these gaps if we can identify the dates that we don't have any data for a given video.

Let's write Spark code to find all missing dates. Remember that our sample data should have daily level viewCount up to 2023-04-18

Create a new dataframe with the following columns: `channel_id`, `video_id`, `created_date` and `missing_dates` (or `missing_date`) where:
* `channel_id`, `video_id` and `created_date` values are from the sample dataset
* `missing_dates` is a list of dates for which a given video does not have data
* `missing_date` is a single date value (in this case we'll have one record per video) that is missing from available dates

Only keep the records that have missing dates

**Bonus Point** if you create both datasets one with `missing_dates` and the other with `missing_date` schema

In [0]:
# Problem 2:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Load the sample data (selecting only necessary columns)
df_samples = spark.read \
    .parquet(path_video_samples) \
    .select("channel_id", "video_id", "date", "created_date")
#df_samples.show(10, truncate=False)
#print(f"df_sample.count = {df_samples.count()}")


# Get distinct video records (channel_id, video_id, created_date)
df_dates_distinct = df_samples.select("channel_id", "video_id", "created_date").distinct()
# df_samples = df_samples.withColumnRenamed("date", "unique_created_date")
# df_dates_distinct = df_samples.dropDuplicates(["channel_id", "video_id", "unique_created_date"])
# df_samples = df_samples.withColumnRenamed("unique_created_date", "date")

#df_dates_distinct.show(10, truncate=False)
#print(f"df_dates_distinct.count = {df_dates_distinct.count()}")

# Cache the date range to avoid recalculating the sequence
df_dates_distinct.persist(StorageLevel.MEMORY_AND_DISK)


# Create the sequence of dates as an array in df_dates_with_sequence
df_dates_with_sequence = df_dates_distinct.withColumn(
    "date_range",
    F.expr(f"sequence(created_date, to_date('2023-04-18'), interval 1 day)")
)
# display(df_dates_with_sequence)
# print(f"df_dates_with_sequence = {df_dates_with_sequence.count()}")


# Join df_dates_with_sequence with df_samples on video_id to get the actual dates
df_combined = df_dates_with_sequence.join(
    df_samples.select("video_id", "date"), 
    on="video_id", 
    how="left"
).dropDuplicates(["video_id", "channel_id", "created_date", "date_range"])
# display(df_combined)
# print(f"df_combined = {df_combined.count()}")


# Use collect_list to gather the actual dates for each video_id and compare using array_except
df_missing_dates = df_combined.withColumn(
    "missing_dates", 
    F.expr("array_except(date_range, collect_list(date) over (partition by video_id))")
).select("channel_id", "video_id", "created_date", "missing_dates")

display(df_missing_dates)
# print(f"df_missing_dates = {df_missing_dates.count()}")


# Optionally explode missing_dates to get rows for each missing date
df_missing_date = df_missing_dates.withColumn(
    "missing_date", 
    F.explode("missing_dates")
).select("channel_id", "video_id", "created_date", "missing_date")

display(df_missing_date)
# print(f"df_missing_date = {df_missing_date.count()}")



#### Problem 3
In most cases we are able to batch multiple `video_id`s in a single YouTube API call and get a response for the full batch.

In simplified form it means getting back an array of tuples (`video_id`, `viewCount`) for an input of list of `video_id`s.

With the provided helper function `process_batch_of_videos` we can simulate this behavior and get the lifetime view count as of the execution time. The functions accepts a list of up to 10 videos as an input.

Let's write Spark code to get a `viewCount` value per unique video in the sample dataset and add these records to create a new dataset with lifetime views as of today.

There are two expected dataframes, one with only the views as of today and the second one is to combine the first with historical data that will give us view count snapshot for all of the days we have a knowledge of. 

Note: Due to the way we generated the view counts for this exercise, the count would not be increasing for newer dates but we don't need to worry about it for the purposes of this assessment.

Create a new dataframe with the following columns: `channel_id`, `video_id`, `created_date`, `date` and `viewCount` where:
* `channel_id`, `video_id` and `created_date` values are from the sample dataset
* `date` is the date you ran the code when you got the `viewCount` from the provided function
* `viewCount` from the provided function for the `video_id`

The second dataset has the same schema as the first one only it has the combined data from before and first dataset in this problem

In [0]:
# Problem 3:
from pyspark.sql import functions as F
from datetime import date


# Load the sample dataset and partition it by channel_id and created_date for more efficient reads
df_samples = spark.read.parquet(path_video_samples) \
    .select("channel_id", "video_id", "created_date", "viewCount") #\
    # .repartition("channel_id", "created_date")


# Add today's date (execution date) as 'date'
today_date = date.today()
df_samples_with_date = df_samples.withColumn("date", F.lit(today_date))

# Group video_ids into a list per channel_id and created_date
df_grouped_videos = df_samples_with_date.groupBy("channel_id", "created_date", "date").agg(
    F.collect_list("video_id").alias("video_ids")
)

# Calculate the number of batches (size of video_ids divided by batch size)
batch_size = 10
df_grouped_videos = df_grouped_videos.withColumn("video_count", F.size("video_ids"))
df_grouped_videos = df_grouped_videos.withColumn("num_batches", F.expr(f"ceil(video_count / {batch_size})"))

# Generate sequence of batch numbers for splitting
df_with_batches = df_grouped_videos.withColumn("batch_numbers", F.expr(f"sequence(1, num_batches)"))

# Explode batch numbers to create multiple rows, each representing a batch
df_exploded_batches = df_with_batches.withColumn("batch_number", F.explode("batch_numbers"))

# Use the batch_number to slice the video_ids into batches of size 10
df_batched_videos = df_exploded_batches.withColumn(
    "video_batch", 
    F.expr(f"slice(video_ids, (batch_number - 1) * {batch_size} + 1, {batch_size})")
).drop("video_count", "num_batches", "batch_numbers", "batch_number")

# Cache the DataFrame with batched videos to avoid recomputation in case of reuse
df_batched_videos.cache()

# Apply the process_batch_of_videos UDF to get view counts for each batch
df_with_views = df_batched_videos.withColumn(
    "view_data", process_batch_of_videos(F.col("video_batch"))
)

# Explode the result to get individual video_id and viewCount pairs
df_exploded = df_with_views.withColumn("exploded_view", F.explode("view_data"))

# Select individual columns from the exploded struct and alias them correctly
df_lifetime_views = df_exploded.select(
    "channel_id", 
    "created_date", 
    "date", 
    F.col("exploded_view.video_id").alias("video_id"), 
    F.col("exploded_view.views").alias("viewCount")
)

# Remove duplicate rows
df_lifetime_views = df_lifetime_views.dropDuplicates(["channel_id", "video_id", "created_date", "date"])
#df_lifetime_views = df_lifetime_views.select("channel_id", "video_id", "created_date", "date").distinct()


# Display the dataframe with lifetime views (today's view counts), ordered by viewCount
display(df_lifetime_views)

# Combine lifetime views with historical data (ensure column names match)
df_combined = df_samples_with_date.unionByName(df_lifetime_views).select("channel_id", "created_date", "date", "video_id", "viewCount")

# Display the combined dataframe, ordered by viewCount
display(df_combined)
