In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as f

from pyspark import SparkContext
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("my_project_1").getOrCreate()
sc = spark.sparkContext

In [0]:
# Read a CSV into a dataframe
# There is a smarter version, that will first check if there is a Parquet file and use it
def load_csv_file(filename, schema):
  # Reads the relevant file from distributed file system using the given schema

  allowed_files = {'Daily program data': ('Daily program data', "|"),
                   'demographic': ('demographic', "|")}

  if filename not in allowed_files.keys():
    print(f'You were trying to access unknown file \"{filename}\". Only valid options are {allowed_files.keys()}')
    return None

  filepath = allowed_files[filename][0]
  dataPath = f"dbfs:/mnt/coursedata2024/fwm-stb-data/{filepath}"
  delimiter = allowed_files[filename][1]

  df = spark.read.format("csv")\
    .option("header","false")\
    .option("delimiter",delimiter)\
    .schema(schema)\
    .load(dataPath)
  return df

# This dict holds the correct schemata for easily loading the CSVs
schemas_dict = {'Daily program data':
                  StructType([
                    StructField('prog_code', StringType()),
                    StructField('title', StringType()),
                    StructField('genre', StringType()),
                    StructField('air_date', StringType()),
                    StructField('air_time', StringType()),
                    StructField('Duration', FloatType())
                  ]),
                'viewing':
                  StructType([
                    StructField('device_id', StringType()),
                    StructField('event_date', StringType()),
                    StructField('event_time', IntegerType()),
                    StructField('mso_code', StringType()),
                    StructField('prog_code', StringType()),
                    StructField('station_num', StringType())
                  ]),
                'viewing_full':
                  StructType([
                    StructField('mso_code', StringType()),
                    StructField('device_id', StringType()),
                    StructField('event_date', IntegerType()),
                    StructField('event_time', IntegerType()),
                    StructField('station_num', StringType()),
                    StructField('prog_code', StringType())
                  ]),
                'demographic':
                  StructType([StructField('household_id',StringType()),
                    StructField('household_size',IntegerType()),
                    StructField('num_adults',IntegerType()),
                    StructField('num_generations',IntegerType()),
                    StructField('adult_range',StringType()),
                    StructField('marital_status',StringType()),
                    StructField('race_code',StringType()),
                    StructField('presence_children',StringType()),
                    StructField('num_children',IntegerType()),
                    StructField('age_children',StringType()), #format like range - 'bitwise'
                    StructField('age_range_children',StringType()),
                    StructField('dwelling_type',StringType()),
                    StructField('home_owner_status',StringType()),
                    StructField('length_residence',IntegerType()),
                    StructField('home_market_value',StringType()),
                    StructField('num_vehicles',IntegerType()),
                    StructField('vehicle_make',StringType()),
                    StructField('vehicle_model',StringType()),
                    StructField('vehicle_year',IntegerType()),
                    StructField('net_worth',IntegerType()),
                    StructField('income',StringType()),
                    StructField('gender_individual',StringType()),
                    StructField('age_individual',IntegerType()),
                    StructField('education_highest',StringType()),
                    StructField('occupation_highest',StringType()),
                    StructField('education_1',StringType()),
                    StructField('occupation_1',StringType()),
                    StructField('age_2',IntegerType()),
                    StructField('education_2',StringType()),
                    StructField('occupation_2',StringType()),
                    StructField('age_3',IntegerType()),
                    StructField('education_3',StringType()),
                    StructField('occupation_3',StringType()),
                    StructField('age_4',IntegerType()),
                    StructField('education_4',StringType()),
                    StructField('occupation_4',StringType()),
                    StructField('age_5',IntegerType()),
                    StructField('education_5',StringType()),
                    StructField('occupation_5',StringType()),
                    StructField('polit_party_regist',StringType()),
                    StructField('polit_party_input',StringType()),
                    StructField('household_clusters',StringType()),
                    StructField('insurance_groups',StringType()),
                    StructField('financial_groups',StringType()),
                    StructField('green_living',StringType())
                  ])
}

# Read demogrphic data

In [0]:
%%time
# demographic data filename is 'demographic'
demo_df = load_csv_file('demographic', schemas_dict['demographic'])

demo_df.printSchema()
print(f'demo_df contains {demo_df.count()} records!')
display(demo_df.limit(6))

root
 |-- household_id: string (nullable = true)
 |-- household_size: integer (nullable = true)
 |-- num_adults: integer (nullable = true)
 |-- num_generations: integer (nullable = true)
 |-- adult_range: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- race_code: string (nullable = true)
 |-- presence_children: string (nullable = true)
 |-- num_children: integer (nullable = true)
 |-- age_children: string (nullable = true)
 |-- age_range_children: string (nullable = true)
 |-- dwelling_type: string (nullable = true)
 |-- home_owner_status: string (nullable = true)
 |-- length_residence: integer (nullable = true)
 |-- home_market_value: string (nullable = true)
 |-- num_vehicles: integer (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- vehicle_model: string (nullable = true)
 |-- vehicle_year: integer (nullable = true)
 |-- net_worth: integer (nullable = true)
 |-- income: string (nullable = true)
 |-- gender_individual: string (nullable = t

household_id,household_size,num_adults,num_generations,adult_range,marital_status,race_code,presence_children,num_children,age_children,age_range_children,dwelling_type,home_owner_status,length_residence,home_market_value,num_vehicles,vehicle_make,vehicle_model,vehicle_year,net_worth,income,gender_individual,age_individual,education_highest,occupation_highest,education_1,occupation_1,age_2,education_2,occupation_2,age_3,education_3,occupation_3,age_4,education_4,occupation_4,age_5,education_5,occupation_5,polit_party_regist,polit_party_input,household_clusters,insurance_groups,financial_groups,green_living
15,2.0,2.0,1.0,100000000,S,B,,,0,0,S,O,5.0,E,,,,,6.0,4.0,M,60.0,4.0,,,,,,,,,,,,,,,,,D,443,02C3,08C3,
24,2.0,2.0,1.0,100000000000,,W,,,0,0,M,O,,F,,,,,7.0,7.0,F,46.0,3.0,Z,,,,,,,,,,,,,,,,R,223,09O3,03O3,
26,,,,0,,,,,0,0,S,,,F,,,,,,,,,,,,,,,,,,,,,,,,,,,46G,04CG,08CG,
28,3.0,2.0,2.0,110000000000000,S,W,Y,1.0,10000000000000,1000000000,S,O,3.0,H,,,,,5.0,7.0,M,38.0,2.0,4,,,34.0,1.0,7.0,,,,,,,,,,,V,473,11R3,09C3,1.0
35,1.0,1.0,1.0,100000000000,,W,,,0,0,,,,G,,,,,4.0,,M,50.0,2.0,1,,,,,,,,,,,,,,,,D,523,13C3,08C3,
36,,,,0,,,,,0,0,,,,G,,,,,,,,,,,,,,,,,,,,,,,,,,,51G,10RG,10RG,


CPU times: user 6.64 ms, sys: 3.35 ms, total: 9.99 ms
Wall time: 1.75 s


# Read Daily program data

In [0]:
%%time
# daily_program data filename is 'Daily program data'
daily_prog_df = load_csv_file('Daily program data', schemas_dict['Daily program data'])

daily_prog_df.printSchema()
print(f'daily_prog_df contains {daily_prog_df.count()} records!')
display(daily_prog_df.limit(6))

root
 |-- prog_code: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- air_date: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- Duration: float (nullable = true)

daily_prog_df contains 13194849 records!


prog_code,title,genre,air_date,air_time,Duration
EP000000250035,21 Jump Street,Crime drama,20151219,50000,60.0
EP000000250035,21 Jump Street,Crime drama,20151219,110000,60.0
EP000000250063,21 Jump Street,Crime drama,20151219,180000,60.0
EP000000510007,A Different World,Sitcom,20151219,100000,30.0
EP000000510008,A Different World,Sitcom,20151219,103000,30.0
EP000000510159,A Different World,Sitcom,20151219,80300,29.0


CPU times: user 13.5 ms, sys: 4.15 ms, total: 17.6 ms
Wall time: 9.32 s


# Read viewing data

In [0]:
# Sample of 10 Million viewing entries

dataPath = f"dbfs:/viewing_10M"
viewing10m_df = spark.read.format("csv")\
    .option("header","true")\
    .option("delimiter",",")\
    .schema(schemas_dict['viewing_full'])\
    .load(dataPath)

display(viewing10m_df.limit(6))
print(f'viewing10m_df contains {viewing10m_df.count()} rows!')

mso_code,device_id,event_date,event_time,station_num,prog_code
1540,000000033afa,20151101,33000,67375,EP020820940009
1540,00000004e4b6,20151101,93000,42599,SP003189620000
1540,00000004eb8f,20151101,91856,42642,EP000176170270
1540,00000004f1d6,20151101,90206,68827,EP007961190099
1540,00000004f3c0,20151101,160658,10178,MV000259670000
1540,000000051ca0,20151101,174949,32645,EP001786120664


viewing10m_df contains 10042340 rows!


# Read reference data

In [0]:
%%time
# reference data is stored in parquet for your convinence.

ref_df = spark.read.parquet('dbfs:/refxml_new_parquet')

ref_df.printSchema()
print(f'ref_df contains {ref_df.count()} records!')
display(ref_df.limit(6))

root
 |-- device_id: string (nullable = true)
 |-- dma: string (nullable = true)
 |-- dma_code: long (nullable = true)
 |-- household_id: long (nullable = true)
 |-- household_type: string (nullable = true)
 |-- system_type: string (nullable = true)
 |-- zipcode: long (nullable = true)

ref_df contains 1268071 records!


device_id,dma,dma_code,household_id,household_type,system_type,zipcode
00000113498f,Toledo,547,1470605,FWM-ID,H,43460
12bf0065bad0,Toledo,547,1492575,FWM-ID,H,43460
000000797c1d,Toledo,547,1493317,FWM-ID,H,43460
000002de361c,Toledo,547,1446566,FWM-ID,H,43528
0000026360a2,Toledo,547,1467668,FWM-ID,H,43528
00000071622f,Toledo,547,1519598,FWM-ID,H,43528


CPU times: user 118 µs, sys: 8.09 ms, total: 8.21 ms
Wall time: 772 ms


# Part 1

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.window import Window

In [0]:
#filter data by drop unnecessary cols and duplicted rows
demografic_cols_to_keep = ['household_id', 'household_size', 'num_adults', 'net_worth', 'income']
demo_df = demo_df.select(*demografic_cols_to_keep).dropDuplicates(['household_id'])

daily_cols_to_drop = ['title', 'Duration']
daily_df = daily_prog_df.select("prog_code", "genre").dropDuplicates(['prog_code'])
daily_df = daily_df.join(daily_prog_df.drop(*daily_cols_to_drop), on=['prog_code', 'genre'], how='inner').dropDuplicates()

viewing_cols_to_drop = ['mso_code', 'station_num']
viewing_df = viewing10m_df.drop(*viewing_cols_to_drop).dropDuplicates()

ref_cols_to_keep = ['device_id', 'dma', 'household_id']
ref_df = ref_df.select(*ref_cols_to_keep).dropDuplicates(['device_id'])

In [0]:
#will use as table of prog_code and condition to see what condition each prog_code satisfies
prog_and_conditions = daily_df.select("prog_code").distinct()

condition 1 - the prog_code was viewed by a device with a daily event avg > 5

In [0]:
device_daily_avg_5 = viewing_df.groupBy("device_id") \
    .agg((f.count("event_time") / f.countDistinct("event_date")).alias("avg_daily_times")) 

daily_avg_5 = device_daily_avg_5.join(viewing_df.select("device_id", "prog_code").distinct(), on=["device_id"], how='inner') 

daily_avg_5 = daily_avg_5.groupBy("prog_code").agg(
    f.max(f.when(f.col("avg_daily_times") > 5, 1).otherwise(0)).alias("condition1"))

prog_and_conditions = prog_and_conditions.join(daily_avg_5, on=["prog_code"], how="left") \
    .fillna({"condition1": 0})


condition 2 - the prog_code was viewed by a device associated with a dma name that contains 'z' (case insensitive)

In [0]:
ref_and_view = ref_df.join(viewing_df.select("device_id", "prog_code").distinct(), on=["device_id"], how="inner")

dma_with_z = ref_and_view.groupBy("prog_code").agg(
    f.max(f.when(f.lower(f.col("dma")).contains("z"), 1).otherwise(0)).alias("condition2"))

prog_and_conditions = prog_and_conditions.join(dma_with_z, on=["prog_code"], how="left") \
    .fillna({"condition2": 0})


condition 3 - the prog_code was watched by a device from a family with num_adults < 3 and net_worth > 8

In [0]:
demo_and_ref_and_view = ref_and_view.join(demo_df, on=['household_id'], how='inner').dropDuplicates()

adults_3_net_8 = demo_and_ref_and_view.groupBy("prog_code").agg(
    f.max(f.when((f.col('num_adults') < 3) & (f.col('net_worth') > 8), 1).otherwise(0)).alias("condition3"))

prog_and_conditions = prog_and_conditions.join(adults_3_net_8, on=["prog_code"], how="left") \
    .fillna({"condition3": 0})


condition 4 - prog_code air_time <= 180000 at friday and air_time >= 190000 at sturday

In [0]:
#change air_date to type YYYYMMDD
daily_df = daily_df.withColumn('air_date', f.to_date(f.col('air_date'), 'yyyyMMdd'))

friday_saturday = daily_df.groupBy("prog_code").agg(
        f.max(f.when(((f.dayofweek(f.col("air_date")).isin([6])) & (f.col("air_time") >= 180000)) | 
                     ((f.dayofweek(f.col("air_date")).isin([7])) & (f.col("air_time") <= 190000)), 1) \
        .otherwise(0)).alias("condition4"))

prog_and_conditions = prog_and_conditions.join(friday_saturday, on=["prog_code"], how="left") \
    .fillna({"condition4": 0})

condition 5 - the prog_code was watched by a household with household_size >= 8

In [0]:
household_size_8 = demo_and_ref_and_view.groupBy("prog_code").agg(
    f.max(f.when(f.col('household_size') >= 8, 1).otherwise(0)).alias("condition5"))

prog_and_conditions = prog_and_conditions.join(household_size_8, on=["prog_code"], how="left") \
    .fillna({"condition5": 0})

condition 6 - the prog_code was watched by a device from a household with num_device > 3 and income < avg income

In [0]:
window_all = Window.partitionBy()
window_household = Window.partitionBy('household_id')

#change income to int
demo_df = demo_df.withColumn('income', 
    f.when(f.col('income') == 'A', 10)
     .when(f.col('income') == 'B', 11)
     .when(f.col('income') == 'C', 12)
     .when(f.col('income') == 'D', 13)
     .otherwise(f.col('income').cast('int')))

#avg income of all household
device_3 = demo_df.select('household_id', 'income').withColumn("avg_income", f.avg("income").over(window_all)) \
    .select('household_id', 'income', 'avg_income')

#count amount of devices per household
low_income = ref_df.select('household_id', 'device_id').withColumn('device_count', f.count("device_id").over(window_household)) \
    .select('household_id', 'device_count').dropDuplicates()

device_3_low_income = device_3.join(low_income, on=['household_id'], how='inner')

device_3_low_income = demo_and_ref_and_view.select('prog_code', 'household_id') \
    .join(device_3_low_income, on=['household_id'], how='inner').dropDuplicates()

device_3_low_income = device_3_low_income.groupBy('prog_code').agg(
    f.max(f.when((f.col('avg_income') > f.col('income')) & (f.col('device_count') > 3), 1).otherwise(0)) \
    .alias("condition6"))

prog_and_conditions = prog_and_conditions.join(device_3_low_income, on=["prog_code"], how="left") \
    .fillna({"condition6": 0})


condition 7 - prog with more than 1 genre from ['Hydroplane racing', 'Biathlon', 'Snowmobile', 'Community', 'Agriculture', 'Music'] 

In [0]:
genre_list = ['Hydroplane racing', 'Biathlon', 'Snowmobile', 'Community', 'Agriculture', 'Music']

#make genre a list of genres
daily_df = daily_df.withColumn("genre", f.split(f.col("genre"), ","))

#check if some genre from genre list is in daily genre of prog_code
prog_genre = daily_df.select('prog_code', 'genre').dropDuplicates()
prog_genre = prog_genre.withColumn('condition7', 
        f.when(f.expr(" or ".join([f"array_contains(genre, '{genre}')" for genre in genre_list])), 1) \
        .otherwise(0))

prog_and_conditions = prog_and_conditions.join(prog_genre.select('prog_code', 'condition7'), on=["prog_code"], how="left") \
    .fillna({"condition7": 0})

In [0]:
#sum all condition for a prog_code to see if prog_code has more then 4 - so he will be malicious
prog_and_conditions = prog_and_conditions.withColumn("condition_sum", 
    f.expr("condition1 + condition2 + condition3 + condition4 + condition5 + condition6 + condition7"))

malicious_prog_codes = prog_and_conditions.filter(f.col("condition_sum") >= 4)


In [0]:
#sum each condition alone to see how many records apply to each condition
condition_records = daily_prog_df.select('prog_code').join(prog_and_conditions, on=['prog_code'], how='inner')

condition_sums = condition_records.agg(
    f.sum("condition1").alias("sum_condition1"),
    f.sum("condition2").alias("sum_condition2"),
    f.sum("condition3").alias("sum_condition3"),
    f.sum("condition4").alias("sum_condition4"),
    f.sum("condition5").alias("sum_condition5"),
    f.sum("condition6").alias("sum_condition6"),
    f.sum("condition7").alias("sum_condition7")
)

condition_sums.show(truncate=False)

+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|sum_condition1|sum_condition2|sum_condition3|sum_condition4|sum_condition5|sum_condition6|sum_condition7|
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|118411        |5670777       |5985000       |10221199      |4487557       |8442819       |1350909       |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+



In [0]:
#write malicious records to parquet
col_to_drop = ["condition1", "condition2", "condition3", "condition4", "condition5", "condition6", "condition7", "condition_sum"]
malicious_records = malicious_prog_codes.drop(*col_to_drop)
malicious_records = daily_prog_df.join(malicious_prog_codes, on=['prog_code'], how='inner')
malicious_records.write.mode("overwrite").parquet("/Workspace/Users/antallykazky@campus.technion.ac.il/project1_part1_malicious_212895171_211569637.parquet")


In [0]:
#50 malicious prog_code order by ascending order 
malicious_prog_asc = malicious_prog_codes.select('prog_code').orderBy(f.col("prog_code").asc())
malicious_prog_asc.show(50, truncate=False)

+--------------+
|prog_code     |
+--------------+
|EP000000211576|
|EP000000211639|
|EP000000211645|
|EP000000211646|
|EP000000211647|
|EP000000211648|
|EP000000211649|
|EP000000211650|
|EP000000211654|
|EP000000211659|
|EP000000211661|
|EP000000211662|
|EP000000211665|
|EP000000211666|
|EP000000211667|
|EP000000211669|
|EP000000211670|
|EP000000211672|
|EP000000211676|
|EP000000211679|
|EP000000211680|
|EP000000211681|
|EP000000211682|
|EP000000211683|
|EP000000211684|
|EP000000211685|
|EP000000211686|
|EP000000211688|
|EP000000211689|
|EP000000211690|
|EP000000211692|
|EP000000211694|
|EP000000211696|
|EP000000211698|
|EP000000351218|
|EP000000351219|
|EP000000351223|
|EP000000351224|
|EP000000351225|
|EP000000351228|
|EP000000351230|
|EP000000351235|
|EP000000351240|
|EP000000351247|
|EP000000351250|
|EP000000351251|
|EP000000351254|
|EP000000351255|
|EP000000351257|
|EP000000351258|
+--------------+
only showing top 50 rows

