In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import SparkConf

In [2]:
class Assignment:
    def __init__(self, spark):
        self.spark = spark
        
    
    def _create_streams_df(self):
        """Creating dataframes for started_streams data"""
        schema = StructType([
        StructField("dt",DateType(),True), \
        StructField("time",TimestampType(),True), \
        StructField("device_name",StringType(),True), \
        StructField("house_number", IntegerType(), True), \
        StructField("user_id", StringType(), True), \
        StructField("country_code", StringType(), True), \
        StructField("program_title",StringType(),True), \
        StructField("season",IntegerType(),True), \
        StructField("season_episode", StringType(), True), \
        StructField("genre", StringType(), True), \
        StructField("product_type", StringType(), True)
        ])
        
#         started_streams dataframe
        streams_df = self.spark.read \
        .format("csv") \
        .option("header", True) \
        .option("delimiter",";") \
        .schema(schema) \
        .option("path","D:\\BigData\\Interview\\New folder\\started_streams.csv") \
        .load()
        
        return streams_df
    
    def _create_whatson_df(self):
        """Creating dataframes for whatson data"""
        whatson_df = spark.read \
        .format("csv") \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("path","D:\\BigData\\Interview\\New folder\\whatson.csv") \
        .load()
        
        return whatson_df
    
    
    def _assignment_task1(self):
        whatson_df = self._create_whatson_df()
        streams_df = self._create_streams_df()
        
        whatson_df2 = whatson_df.groupBy("house_number") \
        .agg(max("dt").alias("recent_date"),
        first("broadcast_right_start_date").alias("broadcast_right_start_date"),
        last("broadcast_right_end_date").alias("broadcast_right_end_date"))
        
        
        streams_df2 = streams_df.select(streams_df.dt,streams_df.time,streams_df.device_name,streams_df.house_number,
                                        streams_df.user_id, streams_df.country_code,streams_df.program_title, 
                                        streams_df.season,streams_df.season_episode,streams_df.genre, streams_df.product_type) \
        .filter(streams_df.product_type.isin(['tvod','est']))
        
        task1_df = whatson_df2.join(broadcast(streams_df2), whatson_df2.house_number == streams_df2.house_number, "right") \
        .select(streams_df2.dt,streams_df2.time,streams_df2.device_name,streams_df2.house_number,streams_df2.user_id,
        streams_df2.country_code,streams_df2.program_title,streams_df2.season,streams_df2.season_episode,streams_df2.genre,
        streams_df2.product_type, whatson_df2.broadcast_right_start_date, whatson_df2.broadcast_right_end_date).show()
    
    
    def _assignment_task2(self):
        streams_df = self._create_streams_df()
        
        streams_task2_df = streams_df.groupBy("device_name", "country_code", "product_type") \
        .agg(first("dt").alias("dt"),
        first("program_title").alias("program_title"),
        countDistinct("user_id").alias("unique_users"),
        count("product_type").alias("content_count")) \
        .select("dt", "program_title", "device_name", "country_code", "product_type", "unique_users", "content_count") \
        .show()
        
    
    def _assignment_task3(self):
        streams_df = self._create_streams_df()
        
        streams_task3_df = streams_df.withColumn("watched_hour", hour(streams_df.time) + (minute(streams_df.dt)/60) + 
                                         second(streams_df.dt)/3600)
        
        streams_task3_df.groupBy("genre", "user_id") \
        .agg(sum("watched_hour").alias("watched_hours")).show()

In [3]:
if __name__ == "__main__":
    conf = SparkConf().setMaster("local").setAppName("data_engineer_assignment")
    spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()
    
    task_obj = Assignment(spark)
    task_obj._assignment_task1()
    task_obj._assignment_task2()
    task_obj._assignment_task3()

+----------+-------------------+--------------+------------+--------------------+------------+--------------------+------+--------------+------------+------------+--------------------------+------------------------+
|        dt|               time|   device_name|house_number|             user_id|country_code|       program_title|season|season_episode|       genre|product_type|broadcast_right_start_date|broadcast_right_end_date|
+----------+-------------------+--------------+------------+--------------------+------------+--------------------+------+--------------+------------+------------+--------------------------+------------------------+
|2018-10-01|2021-05-14 16:51:54|        ps4xdk|    20340232|Ua5PCtp2jyKzGNSXw...|          no|Captain America: ...|  null|          null|Movies Other|         est|                2016-08-29|              2017-08-12|
|2018-10-01|2021-05-14 09:23:47|   androiddash|    20340232|zd40xZSMJvB_SYu4v...|          se|Captain America: ...|  null|          null