In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import split
from pyspark.sql.functions import avg ,count,col,lit, expr,dayofweek,to_date,desc
from pyspark.sql.functions import sum as sums


from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("my project 1").getOrCreate()
sc = spark.sparkContext

# Read a CSV into a dataframe
# There is a smarter version, that will first check if there is a Parquet file and use it
def load_PD_file(filename_or_dir, schema) :
    dataPath = "/mnt/ddscoursedatastorage/fwm-stb-data/" + filename_or_dir
    df = spark.read.format("csv")\
      .option("header","false")\
      .option("delimiter", "|")\
      .schema(schema)\
      .load(dataPath)
    return df

In [0]:
df1 = spark.read.csv("/mnt/ddscoursedatastorage/dds-students/test.csv")

In [0]:

# Reading the Reference Parquet files

ref_data = spark.read.parquet('/ref_data_raw').withColumnRenamed("_device-id","device_id")\
                                                .withColumnRenamed("_dma","dma")\
                                                .withColumnRenamed("_dma-code","dma_code")\
                                                .withColumnRenamed("_household-id","household_id")\
                                                .withColumnRenamed("_household-type","household_type")\
                                                .withColumnRenamed("_system-type","system_type")\
                                                .withColumnRenamed("_zipcode","zipcode")
ref_data_count = ref_data.count()
print(ref_data_count)
ref_data.limit(5).show()


203581233
+------------+------+--------+------------+--------------+-----------+-------+
|   device_id|   dma|dma_code|household_id|household_type|system_type|zipcode|
+------------+------+--------+------------+--------------+-----------+-------+
|00000008354e|Toledo|     547|     1522829|        FWM-ID|          H|  43434|
|0000009ef08e|Toledo|     547|     1522829|        FWM-ID|          H|  43434|
|0000037bb803|Toledo|     547|     1522829|        FWM-ID|          H|  43434|
|0000006610c1|Toledo|     547|     1438798|        FWM-ID|          H|  43460|
|00000066807b|Toledo|     547|     1438798|        FWM-ID|          H|  43460|
+------------+------+--------+------------+--------------+-----------+-------+



In [0]:

# Reading the Daily Programs CSV file

daily_prog_schema =  StructType([StructField('prog_code',StringType()),
                     StructField('title',StringType()),
                     StructField('genre',StringType()),
                     StructField('air_date',StringType()),
                     StructField('air_time',StringType()),
                     StructField('Duration',FloatType())
                                       ])
daily_prog_data = load_PD_file("Daily program data/" , daily_prog_schema )

daily_prog_data.limit(3).show()

+--------------+--------------+-----------+--------+--------+--------+
|     prog_code|         title|      genre|air_date|air_time|Duration|
+--------------+--------------+-----------+--------+--------+--------+
|EP000000250035|21 Jump Street|Crime drama|20151219|  050000|    60.0|
|EP000000250035|21 Jump Street|Crime drama|20151219|  110000|    60.0|
|EP000000250063|21 Jump Street|Crime drama|20151219|  180000|    60.0|
+--------------+--------------+-----------+--------+--------+--------+



In [0]:
 #Reading the 2.5% sample of the viewing data from a Parquet file
 
viewing_data = spark.read.parquet('/sample_viewing_2_5percent')
 
print(f'There are {viewing_data.count():,} entries in viewing_data dataframe!')

# selected_viewing_data_df=viewing_data.select("device_id","event_date","event_time","prog_code").dropDuplicates()
#selected_viewing_data_df.show(5)

There are 130,289,194 entries in viewing_data dataframe!


In [0]:
# Reading the Demographic CSV file

demographic_schema =  StructType([StructField('household_id',StringType()),
                      StructField('household_size',IntegerType()),
                      StructField('num_adults',IntegerType()),
                      StructField('num_generations',IntegerType()),
                      StructField('adult_range',StringType()),
                      StructField('marital_status',StringType()),
                      StructField('race_code',StringType()),
                      StructField('presence_children',StringType()),
                      StructField('num_children',IntegerType()),
                      StructField('age_children',StringType()), #format like range - 'bitwise'
                      StructField('age_range_children',StringType()),
                      StructField('dwelling_type',StringType()),
                      StructField('home_owner_status',StringType()),
                      StructField('length_residence',IntegerType()),
                      StructField('home_market_value',StringType()),
                      StructField('num_vehicles',IntegerType()),
                      StructField('vehicle_make',StringType()),
                      StructField('vehicle_model',StringType()),
                      StructField('vehicle_year',IntegerType()),
                      StructField('net_worth',IntegerType()),
                      StructField('income',StringType()),
                      StructField('gender_individual',StringType()),
                      StructField('age_individual',IntegerType()),
                      StructField('education_highest',StringType()),
                      StructField('occupation_highest',StringType()),
                      StructField('education_1',StringType()),
                      StructField('occupation_1',StringType()),
                      StructField('age_2',IntegerType()),
                      StructField('education_2',StringType()),
                      StructField('occupation_2',StringType()),
                      StructField('age_3',IntegerType()),
                      StructField('education_3',StringType()),
                      StructField('occupation_3',StringType()),
                      StructField('age_4',IntegerType()),
                      StructField('education_4',StringType()),
                      StructField('occupation_4',StringType()),
                      StructField('age_5',IntegerType()),
                      StructField('education_5',StringType()),
                      StructField('occupation_5',StringType()),
                      StructField('polit_party_regist',StringType()),
                      StructField('polit_party_input',StringType()),
                      StructField('household_clusters',StringType()),
                      StructField('insurance_groups',StringType()),
                      StructField('financial_groups',StringType()),
                      StructField('green_living',StringType())
                                       ])

demographic_data = load_PD_file("demographic/" , demographic_schema  )  



In [0]:

#select_only_relevant columns
selected_daily_prog_data_df=daily_prog_data.select("prog_code","title","genre","air_date","air_time","Duration").distinct()
selected_demographic_data_df=demographic_data.select("household_id","household_size","num_adults","net_worth","income").distinct()

selected_viewing_data_df=viewing_data.select("device_id","event_date","event_time","prog_code").dropDuplicates()

selected_ref_df=ref_data.select("device_id","DMA","dma_code","household_id").na.drop().dropDuplicates()

selected_demographic_data_df.select("income").distinct().collect()

Out[8]: [Row(income='7'),
 Row(income='3'),
 Row(income='8'),
 Row(income=None),
 Row(income='5'),
 Row(income='B'),
 Row(income='6'),
 Row(income='D'),
 Row(income='C'),
 Row(income='A'),
 Row(income='9'),
 Row(income='1'),
 Row(income='4'),
 Row(income='2')]

In [0]:
from pyspark.sql.functions import col, when
#Convert the values of the Letters A-D to 10-13 and convert income to integer and drop nulls
selected_demographic_data_df2=selected_demographic_data_df
selected_demographic_data_df2 = selected_demographic_data_df2.withColumn('income', when(selected_demographic_data_df.income == 'A', 10).otherwise(selected_demographic_data_df.income))
selected_demographic_data_df2 = selected_demographic_data_df2.withColumn('income', when(selected_demographic_data_df2.income == 'B', 11).otherwise(selected_demographic_data_df2.income))
selected_demographic_data_df2 = selected_demographic_data_df2.withColumn('income', when(selected_demographic_data_df2.income == 'C', 12).otherwise(selected_demographic_data_df2.income))
selected_demographic_data_df2 = selected_demographic_data_df2.withColumn('income', when(selected_demographic_data_df2.income == 'D', 13).otherwise(selected_demographic_data_df2.income))
#Drop nulls
selected_demographic_data_df2 = selected_demographic_data_df2.withColumn('income', col('income').cast('integer'))
selected_demographic_data_df2 = selected_demographic_data_df2.na.drop(subset=['income'])
selected_demographic_data_df.show()


+------------+--------------+----------+---------+------+
|household_id|household_size|num_adults|net_worth|income|
+------------+--------------+----------+---------+------+
|    00000122|             3|         2|        7|     5|
|    00000109|             1|         1|     null|  null|
|    00000026|          null|      null|     null|  null|
|    00000117|          null|      null|     null|  null|
|    00000099|          null|      null|     null|  null|
|    00000035|             1|         1|        4|  null|
|    00000126|             2|         2|        6|  null|
|    00000085|             2|         1|        5|  null|
|    00000048|          null|      null|     null|  null|
|    00000040|             2|         2|        4|     5|
|    00000056|             2|         2|        5|  null|
|    00000111|             2|         2|        8|     4|
|    00000024|             2|         2|        7|     7|
|    00000098|             3|         2|        7|     8|
|    00000042|

In [0]:
from pyspark.sql.functions import countDistinct, avg
# Calculate the average income
average_income = selected_demographic_data_df2.na.drop().select(avg(col("income"))).collect()[0][0]

#Question 1.1 applaying trasformations, joining the data and adding 3 new columns

In [0]:
#join the whole df's into 1 df and adding 3 columns that will help us to make the queries 

from pyspark.sql.functions import col, date_format, to_date
from pyspark.sql.functions import col, udf, desc,avg
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf, desc, avg
from pyspark.sql.types import IntegerType


#join the whole df's into one df

view_daily = selected_daily_prog_data_df.join(selected_viewing_data_df, selected_daily_prog_data_df.prog_code == selected_viewing_data_df.prog_code, "inner")
join_without_demographic = view_daily.join(selected_ref_df, view_daily.device_id == selected_ref_df.device_id, "inner")
full_join = join_without_demographic.join(selected_demographic_data_df2, join_without_demographic.household_id == selected_demographic_data_df2.household_id, "inner")

# Define a UDF to check if any bad genre is present and create a column that will count them
bad_genres = ["talk", "politics", "news", "community", "crime"]


contains_bad_genre = udf(lambda genre: int(any(g.lower() in genre.lower().split(",") for g in bad_genres)), IntegerType())

# Replace null values in the genre column with an empty string
full_join = full_join.withColumn("genre", when(col("genre").isNull(), "").otherwise(col("genre")))

# present how many "bad geners" each program has
full_join = full_join.withColumn("has_bad_genre", contains_bad_genre(col("genre"))).cache()


#add column to show in which day of the week the program had been brodcasted
full_join = full_join.withColumn("day_prog", to_date(col("air_date"), "yyyyMMdd"))
full_join=full_join.withColumn("day_prog", date_format(col("day_prog"), "EEEE"))


#add column that give us indication if the row ias applying condition 3
full_join = full_join.withColumn("less_adult_high_networth", when((col("num_adults") <3) &(col("net_worth")>8) , True).otherwise(False))

print(full_join.columns)
full_join.limit(5).show()





['prog_code', 'title', 'genre', 'air_date', 'air_time', 'Duration', 'device_id', 'event_date', 'event_time', 'prog_code', 'device_id', 'DMA', 'dma_code', 'household_id', 'household_id', 'household_size', 'num_adults', 'net_worth', 'income', 'has_bad_genre', 'day_prog', 'less_adult_high_networth']


#question 1.2_2 calculating the queries and printing the number of programs that having the condition of the query

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, col

#query 1: The prog code was viewed by a device with less than 5 average daily events

# Register the DataFrame as a temporary view
selected_viewing_data_df.createOrReplaceTempView("selected_viewing_data_view")

# Execute the SQL query that counts the average daily events fot each unique device
avg_daily_events = spark.sql("""
    SELECT device_id, COUNT(*) / COUNT(DISTINCT event_date) AS avg_daily_events
    FROM selected_viewing_data_view
    GROUP BY device_id
""")

# Join the "device_id" and "prog_code" columns from "selected_viewing_data_df" with "avg_daily_events"
device_prog = selected_viewing_data_df.select("device_id", "prog_code")
condition_1 = device_prog.join(avg_daily_events, device_prog["device_id"] == avg_daily_events["device_id"], "inner").distinct()
condition_1.cache()
# Filter rows where "avg_daily_events" is less than 5 and count the number of rows
condition_1 = condition_1.select("prog_code").filter(col("avg_daily_events") < 5).distinct()
q1_num = condition_1.count()
print(f'There are {q1_num:,} entries in condition_1 dataframe!')


There are 9,138 entries in condition_1 dataframe!


In [0]:
#query 2: The prog code was watched by a device from a DMA name that contains the letter [‘z’] 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

selected_ref_df=selected_ref_df.withColumnRenamed("device_id","device_id1")
                                    
condition_2=selected_ref_df.join(selected_viewing_data_df, selected_ref_df["device_id1"] == selected_viewing_data_df["device_id"], "inner").distinct()
#take dma with Z in their DMA name
condition_2 = condition_2.filter(col("DMA").like("%z%") | col("DMA").like("%Z%")).select("prog_code").distinct()
q2_num=condition_2.count()
print(f'There are {q2_num:,} entries in condition_2 dataframe!')

There are 235,428 entries in condition_2 dataframe!


In [0]:
# query 3 : The program was watched by a family with less than 3 adults and their net worth is higher than 8
#Take only houses with less than 3 adults and net-worth larger than 8
relevant_houses=selected_demographic_data_df.filter((selected_demographic_data_df.num_adults.cast("int") < 3 )& (selected_demographic_data_df.net_worth.cast("int") > 8))

relevant_houses=relevant_houses.withColumnRenamed("household_id","household_id1")
#joining with reference data to get the devices in these households
relevant_devices = relevant_houses.join(selected_ref_df, relevant_houses["household_id1"] == selected_ref_df["household_id"], "inner").distinct()

relevant_devices=relevant_devices.select("device_id1").distinct().withColumnRenamed("device_id","device_id1")
condition_3=relevant_devices.join(selected_viewing_data_df, relevant_devices["device_id1"] == selected_viewing_data_df["device_id"], "inner").distinct()
                                      
condition_3 = condition_3.select("prog_code").distinct()
q3_num = condition_3.count()
print(f'There are {q3_num:,} entries in condition_3 dataframe!')

There are 180,061 entries in condition_3 dataframe!


In [0]:
from pyspark.sql.functions import col, date_format, to_date

# #query 4: The program code was aired (at least once) between Friday at 6PM and Saturday at 7PM (both
# inclusive) and there was at-least one household who watched the program with size higher than or
# equal to 8 (inclusive).



#Add column of day_prog its the day in the week as a string 
selected_viewing_data_df_edited = selected_daily_prog_data_df.withColumn("day_prog", to_date(col("air_date"), "yyyyMMdd"))
selected_viewing_data_df_edited=selected_viewing_data_df_edited.withColumn("day_prog", date_format(col("day_prog"), "EEEE"))
selected_viewing_data_df_edited = selected_viewing_data_df_edited.withColumn("air_time", col("air_time").cast("integer"))


#Filter based on air time between Friday 6PM and Saturday 7PM
relevant_programs_days = selected_viewing_data_df_edited.filter( ((col("day_prog") == "Friday") & (col("air_time") >= 180000)) |
    ((col("day_prog") == "Saturday") & (col("air_time") <= 190000))
).select("prog_code", "air_date", "air_time").withColumnRenamed("prog_code", "prog_code1").distinct()



relevant_programs_days = selected_viewing_data_df.join(
    relevant_programs_days,
    (selected_viewing_data_df["prog_code"] == relevant_programs_days["prog_code1"]) ,
   
    "inner"
).select("device_id", "prog_code").distinct()


# Join the selected_ref_df DataFrame with relevant_programs_days DataFrame based on the "device_id1" column
# Using an inner join and selecting distinct rows
join2 = selected_ref_df.join(relevant_programs_days, selected_ref_df["device_id1"] == relevant_programs_days["device_id"], "inner").distinct()

# Drop rows with null values in the "household_size" column of selected_demographic_data_df DataFrame
selected_demographic_data_df = selected_demographic_data_df.na.drop(subset=["household_size"])

# Filter selected_demographic_data_df to get relevant houses with household_size greater than or equal to 8
relevant_houses = selected_demographic_data_df.filter(col("household_size") >= 8).select("household_id").distinct().withColumnRenamed("household_id", "household_id1")

# Join join2 DataFrame with relevant_houses DataFrame based on the "household_id" column
# Using an inner join and selecting the "prog_code" column
condition_4 = join2.join(relevant_houses, join2["household_id"] == relevant_houses["household_id1"], "inner").select("prog_code").distinct()

# Cache the condition_4 DataFrame for better performance
condition_4.cache()

# Count the distinct values in the "prog_code" column of the condition_4 DataFrame
condition_4 = condition_4.select("prog_code").distinct()
q4_num = condition_4.count()
# Print the value of q4_num
print(f'There are {q4_num:,} entries in condition_4 dataframe!')



There are 67,646 entries in condition_4 dataframe!


In [0]:
selected_demographic_data_without_null = selected_demographic_data_df2.na.drop(subset=["income"])

# Calculate the average income for query 5
average_income = selected_demographic_data_without_null.select(avg("income")).first()[0]
average_income

Out[18]: 6.715162771873656

In [0]:
#query 5 :The prog code was watched (at least once) by a device from an household with more than 3 devices
# (exclusive) and the income of that household is lower than the average household income in the
# data.



from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, countDistinct

#Drop the nulls from income column
selected_demographic_data_without_null = selected_demographic_data_df2.na.drop(subset=["income"])

# Calculate the average income
average_income = selected_demographic_data_without_null.select(avg("income")).first()[0]
HighIncome=selected_demographic_data_without_null.filter((col("income")<average_income)).select("household_id").withColumnRenamed("household_id","household_id1")

# Rename the "household_id" column in selected_ref_df DataFrame to "household_id2"
ref_df=selected_ref_df.withColumnRenamed("household_id","household_id2")
# Join ref_df DataFrame with HighIncome DataFrame based on the "household_id2" column using an inner join
DeviceHousehold=ref_df.join(HighIncome, ref_df["household_id2"]==HighIncome["household_id1"], "inner")


# Select the "household_id1" and "device_id1" columns from DeviceHousehold DataFrame and drop the "household_id2" column
DeviceHousehold = DeviceHousehold.select("household_id1", "device_id1").drop("household_id2")

# Group the DeviceHousehold DataFrame by "household_id1" and compute the count of distinct values in the "device_id1" column # Rename the result column as "sum_value" which counts the devices of the household  and filter
#the result where it has more than 3 devices 
DeviceHousehold=DeviceHousehold.groupBy('household_id1').agg(countDistinct('device_id1').alias('sum_value')).filter(col("sum_value")>3).withColumnRenamed("device_id", "device_id1")


# Join the DeviceHousehold DataFrame with selected_ref_df DataFrame based on the "household_id" column using an inner join
DeviceHousehold = DeviceHousehold.join(selected_ref_df, selected_ref_df["household_id"] == DeviceHousehold["household_id1"], "inner")

# Select the "household_id1" and "device_id1" columns from the DeviceHousehold DataFrame, drop duplicates, and rename "device_id1" as "device_id"
DeviceHousehold = DeviceHousehold.select("household_id1", "device_id1").dropDuplicates().withColumnRenamed("device_id1", "device_id")

# Join the selected_viewing_data_df DataFrame with the DeviceHousehold DataFrame based on the "device_id" column using an inner join
condition_5 = selected_viewing_data_df.join(DeviceHousehold, selected_viewing_data_df["device_id"] == DeviceHousehold["device_id"], "inner").select("prog_code").distinct()

# Count the distinct values in the "prog_code" column of the condition_5 DataFrame
q5_num = condition_5.count()

# Print the value of query_5_num
print(f'There are {q5_num:,} entries in condition_5 dataframe!')



There are 255,489 entries in condition_5 dataframe!


In [0]:
# query 6 :• The program contains at least one of the genres [‘Talk’, ‘Politics’, ‘News’, ‘Community’, ‘Crime’]
# and has a duration of more than 35 minutes (exclusive).
from pyspark.sql.functions import col, udf, desc, avg
from pyspark.sql.types import IntegerType

bad_genres = ["talk", "politics", "news", "community", "crime"]

# Define a UDF to check if any bad genre is present
contains_bad_genre = udf(lambda genre: int(any(g.lower() in genre.lower().split(",") for g in bad_genres)), IntegerType())

# Replace null values in the genre column with an empty string
query_6 = selected_daily_prog_data_df.withColumn("genre", when(col("genre").isNull(), "").otherwise(col("genre")))

#add a column has_bad_genre and the method contains_bad_genre will split the genres and add 1 if it contains this genre else 0
query_6 = query_6.withColumn("has_bad_genre", contains_bad_genre(col("genre")))

condition_6 = query_6.filter((col("has_bad_genre")>0)&(col("Duration")>35)).select("prog_code").distinct()
q6_num=condition_6.count()
print(f'There are {q6_num:,} entries in condition_6 dataframe!')


There are 33,648 entries in condition_6 dataframe!


In [0]:
from pyspark.sql.functions import col, lit, sum
#we are assuming that there are might be prog_codes in viewing data that does not appear in daily_prog data so there might be diffrent ressult if we take diffrent data frames in our
#conditions, further more we understad that the assigment asks us to print only the prog_codes
# Create a new column to count the number of conditions that apply to each record
condition_1=condition_1.select("prog_code").withColumn("condition",lit(1)).distinct()
condition_2=condition_2.select("prog_code").withColumn("condition",lit(1)).distinct()                                  
condition_3=condition_3.select("prog_code").withColumn("condition",lit(1)).distinct()                                     
condition_4=condition_4.select("prog_code").withColumn("condition",lit(1)).distinct()                                     
condition_5=condition_5.select("prog_code").withColumn("condition",lit(1)).distinct() 
condition_6=condition_6.select("prog_code").withColumn("condition",lit(1)).distinct()                                 
                                  
condition_1.cache()
condition_2.cache()
condition_3.cache()
condition_4.cache()
condition_5.cache()
condition_6.cache()


# Combine all conditions using union
combined_conditions = condition_1.union(condition_2).union(condition_3).union(condition_4).union(condition_5).union(condition_6)


# Count occurrences of each record and filter records where count >= 4
malicious_records = combined_conditions.groupBy("prog_code").agg(sum("condition").alias("condition_count")).filter(col("condition_count") >= 4)

# Show the top 150 malicious program entries, ordered by ascending lexicographic order of the prog code
malicious_records=malicious_records.orderBy("prog_code")


+--------------+---------------+
|     prog_code|condition_count|
+--------------+---------------+
|     DVRPGMKEY|              4|
|EP000000211576|              4|
|EP000000211614|              4|
|EP000000211639|              4|
|EP000000211645|              4|
|EP000000211646|              5|
|EP000000211647|              5|
|EP000000211648|              5|
|EP000000211650|              4|
|EP000000211654|              4|
|EP000000211659|              4|
|EP000000211661|              4|
|EP000000211665|              4|
|EP000000211666|              4|
|EP000000211667|              4|
|EP000000211669|              5|
|EP000000211670|              4|
|EP000000211672|              5|
|EP000000211675|              4|
|EP000000211676|              4|
+--------------+---------------+
only showing top 20 rows



In [0]:
display(malicious_records.limit(150))

prog_code,condition_count
DVRPGMKEY,4
EP000000211576,4
EP000000211614,4
EP000000211639,4
EP000000211645,4
EP000000211646,5
EP000000211647,5
EP000000211648,5
EP000000211650,4
EP000000211654,4
