# Part 1.1 - Spam Detection

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import re
from pyspark.sql.window import Window
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("my project 1").getOrCreate()
sc = spark.sparkContext

# Read a CSV into a dataframe
# There is a smarter version, that will first check if there is a Parquet file and use it
def load_PD_file(filename_or_dir, schema) :
    dataPath = "/data/" + filename_or_dir
    df = spark.read.format("csv")\
      .option("header","false")\
      .option("delimiter", "|")\
      .schema(schema)\
      .load(dataPath)
    return df

df1 = spark.read.csv("/test.csv")


# Reading the Reference Parquet files

ref_data = spark.read.parquet('/ref_data_raw').withColumnRenamed("_device-id","device_id")\
                                                .withColumnRenamed("_dma","dma")\
                                                .withColumnRenamed("_dma-code","dma_code")\
                                                .withColumnRenamed("_household-id","household_id")\
                                                .withColumnRenamed("_household-type","household_type")\
                                                .withColumnRenamed("_system-type","system_type")\
                                                .withColumnRenamed("_zipcode","zipcode")
ref_data_count = ref_data.count()
print(ref_data_count)
#ref_data.limit(5).show()


# Reading the Daily Programs CSV file

daily_prog_schema =  StructType([StructField('prog_code',StringType()),
                     StructField('title',StringType()),
                     StructField('genre',StringType()),
                     StructField('air_date',StringType()),
                     StructField('air_time',StringType()),
                     StructField('Duration',FloatType())
                                       ])
daily_prog_data = load_PD_file("Daily program data/" , daily_prog_schema  )

#daily_prog_data.limit(3).show()

# Reading the 2.5% sample of the viewing data from a Parquet file

viewing_data = spark.read.parquet('/sample_viewing_2_5percent')

print(f'There are {viewing_data.count():,} entries in viewing_data dataframe!')
#viewing_data.show(5)

# Reading the Demographic CSV file

demographic_schema =  StructType([StructField('household_id',StringType()),
                      StructField('household_size',IntegerType()),
                      StructField('num_adults',IntegerType()),
                      StructField('num_generations',IntegerType()),
                      StructField('adult_range',StringType()),
                      StructField('marital_status',StringType()),
                      StructField('race_code',StringType()),
                      StructField('presence_children',StringType()),
                      StructField('num_children',IntegerType()),
                      StructField('age_children',StringType()), #format like range - 'bitwise'
                      StructField('age_range_children',StringType()),
                      StructField('dwelling_type',StringType()),
                      StructField('home_owner_status',StringType()),
                      StructField('length_residence',IntegerType()),
                      StructField('home_market_value',StringType()),
                      StructField('num_vehicles',IntegerType()),
                      StructField('vehicle_make',StringType()),
                      StructField('vehicle_model',StringType()),
                      StructField('vehicle_year',IntegerType()),
                      StructField('net_worth',IntegerType()),
                      StructField('income',StringType()),
                      StructField('gender_individual',StringType()),
                      StructField('age_individual',IntegerType()),
                      StructField('education_highest',StringType()),
                      StructField('occupation_highest',StringType()),
                      StructField('education_1',StringType()),
                      StructField('occupation_1',StringType()),
                      StructField('age_2',IntegerType()),
                      StructField('education_2',StringType()),
                      StructField('occupation_2',StringType()),
                      StructField('age_3',IntegerType()),
                      StructField('education_3',StringType()),
                      StructField('occupation_3',StringType()),
                      StructField('age_4',IntegerType()),
                      StructField('education_4',StringType()),
                      StructField('occupation_4',StringType()),
                      StructField('age_5',IntegerType()),
                      StructField('education_5',StringType()),
                      StructField('occupation_5',StringType()),
                      StructField('polit_party_regist',StringType()),
                      StructField('polit_party_input',StringType()),
                      StructField('household_clusters',StringType()),
                      StructField('insurance_groups',StringType()),
                      StructField('financial_groups',StringType()),
                      StructField('green_living',StringType())
                                       ])

demographic_data = load_PD_file("demographic/" , demographic_schema  )

#display(demographic_data.limit(1))

203581233
There are 130,289,194 entries in viewing_data dataframe!


In [None]:
# Filter unused columns in Reference Data
ref_data_C = ref_data.select("device_id", "dma","dma_code" ,"household_iD", "household_type").na.drop().dropDuplicates()

# Filter unused columns in Daily Program Data
daily_prog_data_C = daily_prog_data.select("prog_code", "genre", "air_date", "air_time", "duration")

# Filter unused columns in Program Viewing Data
viewing_data_C = viewing_data.select("device_id", "prog_code", "event_date", "event_time")

# Filter unused columns in Demographic Data
demographic_data_C = demographic_data.select("household_id", "household_size", "num_adults", "net_worth", "income")

In [None]:
#dealt with null values ​​in both parts of the project - In part 1 there is no specific reference to the conversion of null values ​​- and it is necessary to ignore the records that include them in the calculation of the average values ​​that are required of you in this section - so we not add when(col("income").isNull(), 0)
demographic_data_C = demographic_data_C.withColumn("income",
                                                   when(col("income") == 'A', 11).
                                                   when(col("income")== 'B', 12).
                                                   when(col("income")== 'C', 13).
                                                   when(col("income")== 'D', 14)
                                                   .otherwise(col("income"))).withColumn("net_worth",
                                                        when(col("net_worth") == 'A', 11).
                                                        when(col("net_worth") == 'B', 12).
                                                        when(col("net_worth") == 'C', 13).
                                                        when(col("net_worth") == 'D', 14).
                                                        otherwise(col("net_worth")))

demographic_data_C = demographic_data_C.withColumn("income", col("income").cast("double"))
demographic_data_C = demographic_data_C.withColumn("net_worth", col("net_worth").cast("double"))

## CONDITIONS

In [None]:
viewing_data_C.cache()

# The prog code was viewed by a device with less than 5 average daily events.

#find devices with less than 5 average daily events
viewing_data_C = viewing_data_C.repartition('device_id', 'event_date')
device_date_count = viewing_data_C.groupBy("device_id", "event_date").agg(count('event_time').alias("count_events"))

device_date_count = device_date_count.repartition('device_id')
avg_events_per_device = device_date_count.groupBy("device_id").agg(avg('count_events').alias('avg_events'))

devices_less_than_5 = avg_events_per_device.filter(avg_events_per_device.avg_events < 5)
devices_programs = viewing_data_C.select("device_id","prog_code")

#get only the devices with devices less than 5 daily events
joined_df = devices_programs.join(devices_less_than_5, "device_id")

#prog codes was viewed by a device with less than 5 average daily events.
programs_1 = joined_df.select("prog_code").distinct()


In [None]:
#The prog code was watched by a device from a DMA name that contains the letter [‘z’] (case insensitive).
contains_z = ref_data_C.select("dma","device_id").filter(lower(col("dma")).rlike("z"))
joined_df_contains_z = viewing_data_C.join(contains_z, "device_id")
programs_2 =  joined_df_contains_z.select("prog_code").distinct()

In [None]:
# The program was watched by a family with less than 3 adults and their net worth is higher than 8 (both exclusive).
# family with less than 3 adults and their net worth is higher than 8
relevant_families = demographic_data_C.filter((col("num_adults") < 3) & (col("net_worth") > 8))

# getting the devices per household_id of the relevant families
household_id_devices = ref_data_C.join(relevant_families,"household_id").select("household_id","device_id")

#The programs was watched by a family from relevant_families
programs_3 = viewing_data_C.join(household_id_devices,"device_id").select("prog_code").distinct()

viewing_data_C.unpersist()

Out[16]: DataFrame[device_id: string, prog_code: string, event_date: string, event_time: string]

In [None]:
daily_prog_data.cache()
daily_data = daily_prog_data.select("prog_code","air_date","air_time")\
    .withColumn('air_date', to_date(daily_prog_data['air_date'], 'yyyyMMdd'))\
        .withColumn('day_of_week', dayofweek('air_date'))

daily_filtered = daily_data.select("prog_code").where(
    ((col("day_of_week") == 6) & (col("air_time") >= 180000)) |
    ((col("day_of_week") == 7) & (col("air_time") <= 190000))
).distinct()
fams_data = demographic_data_C.select("household_id").where("household_size >= 8").distinct()
fams_device_Data = ref_data_C.select("device_id","household_id").join(broadcast(fams_data),on="household_id",how="inner").select("device_id").distinct()
programs_4 = viewing_data.select("prog_code","device_id").join(broadcast(fams_device_Data), on="device_id", how="inner").select("prog_code").distinct().intersect(daily_filtered).distinct()

In [None]:
demographic_data_C.cache()
#The prog code was watched (at least once) by a device from an household with more than 3 devices
#(exclusive) and the income of that household is lower than the average household income in the data.
more_than_3_devices = ref_data_C.groupBy(col("household_id")).agg(count("device_id").alias("count_devices")).filter(col("count_devices") > 3)

#getting  average household income in the data
average_income = demographic_data_C.dropna(subset = ['income']).select(avg(col("income")).alias("average_income")).collect()[0][0]
lower_than_average = demographic_data_C.dropna(subset = ['income']).select("household_id", "income").filter(col("income") < average_income)

#household with more than 3 devices and the income of that household is lower than the average household income
household_ids_df = lower_than_average.join(more_than_3_devices,"household_id")

household_id_devices = ref_data_C.join(household_ids_df,"household_id").select("household_id","device_id")
programs_5 = viewing_data_C.select("device_id","prog_code").join(household_id_devices,"device_id").select("prog_code").distinct()

demographic_data_C.unpersist()

Out[18]: DataFrame[household_id: string, household_size: int, num_adults: int, net_worth: double, income: double]

In [None]:
#seperate the genres in the col - genre
seperator=','
condition_6 = (array_contains(split(col('genre'),seperator),'News') | array_contains(split(col('genre'),seperator),'Politics')\
    | array_contains(split(col('genre'),seperator),'Talk')|\
         array_contains(split(col('genre'),seperator),'Crime')|\
             array_contains(split(col('genre'),seperator),'Community'))

#filter the data only duration>35
duration_35 = daily_prog_data.select("prog_code","title","genre").where("duration>35").distinct()
result_6_1 = duration_35\
    .withColumn('condition_6',when(condition_6,1).otherwise(0))\
        .select("prog_code","title","genre","condition_6").distinct()
#combine the two conditions of this query
programs_6 = result_6_1.select("prog_code","condition_6","genre").where("condition_6 == 1").select("prog_code").distinct()

daily_prog_data.unpersist()

Out[19]: DataFrame[prog_code: string, title: string, genre: string, air_date: string, air_time: string, Duration: float]

## RESULTS

In [None]:
print(f'Condition1 programs count: {programs_1.count()}')
print(f'Condition2 programs count: {programs_2.count()}')
print(f'Condition3 programs count: {programs_3.count()}')
print(f'Condition4 programs count: {programs_4.count()}')
print(f'Condition5 programs count: {programs_5.count()}')
print(f'Condition6 programs count: {programs_6.count()}')

Condition1 programs count: 9138
Condition2 programs count: 235428
Condition3 programs count: 180061
Condition4 programs count: 67646
Condition5 programs count: 255489
Condition6 programs count: 33648


In [None]:
#get all the prog_codes in the data
daily_prog_data_C= daily_prog_data_C.select("prog_code").distinct()

#join the schemas, and get one relation with prog_code -- condition1-- condition2-- ,..., --condition6
broadcast_programs_1 = broadcast(programs_1.select("prog_code").distinct().alias("broadcast_programs_1"))
daily_prog_data_C = daily_prog_data_C.join(broadcast_programs_1, on="prog_code", how="left") \
    .withColumn("condition_1", when(col("broadcast_programs_1.prog_code").isNull(), 0).otherwise(1)) \
    .drop(broadcast_programs_1.prog_code)

broadcast_programs_2 = broadcast(programs_2.select("prog_code").distinct().alias("broadcast_programs_2"))
daily_prog_data_C = daily_prog_data_C.join(broadcast_programs_2, on="prog_code", how="left") \
    .withColumn("condition_2", when(col("broadcast_programs_2.prog_code").isNull(), 0).otherwise(1)) \
    .drop(broadcast_programs_2.prog_code)

broadcast_programs_3 = broadcast(programs_3.select("prog_code").distinct().alias("broadcast_programs_3"))
daily_prog_data_C = daily_prog_data_C.join(broadcast_programs_3, on="prog_code", how="left") \
    .withColumn("condition_3", when(col("broadcast_programs_3.prog_code").isNull(), 0).otherwise(1)) \
    .drop(broadcast_programs_3.prog_code)

broadcast_programs_4 = broadcast(programs_4.select("prog_code").distinct().alias("broadcast_programs_4"))
daily_prog_data_C = daily_prog_data_C.join(broadcast_programs_4, on="prog_code", how="left") \
    .withColumn("condition_4", when(col("broadcast_programs_4.prog_code").isNull(), 0).otherwise(1)) \
    .drop(broadcast_programs_4.prog_code)

broadcast_programs_5 = broadcast(programs_5.select("prog_code").distinct().alias("broadcast_programs_5"))
daily_prog_data_C = daily_prog_data_C.join(broadcast_programs_5, on="prog_code", how="left") \
    .withColumn("condition_5", when(col("broadcast_programs_5.prog_code").isNull(), 0).otherwise(1)) \
    .drop(broadcast_programs_5.prog_code)

broadcast_programs_6 = broadcast(programs_6.select("prog_code").distinct().alias("broadcast_programs_6"))

daily_prog_data_C = daily_prog_data_C.join(broadcast_programs_6, on="prog_code", how="left") \
    .withColumn("condition_6", when(col("broadcast_programs_6.prog_code").isNull(), 0).otherwise(1))

#sum all the conditions per program
df = daily_prog_data_C.withColumn("num_of_conditions", col("condition_1") + col("condition_2")+col("condition_3") + col("condition_4")+col("condition_5") + col("condition_6"))


In [None]:
#show all the prog_codes and the condition for each of them
#df.show()

In [None]:
#get the programs that num_conditions >= 4
malicious_records = df.filter(col("num_of_conditions") >= 4)
display(malicious_records.select("prog_code").distinct())

prog_code
EP003077660187
EP003915550576
EP003915550604
EP005927330204
EP009670040102
EP013216350305
EP016706760051
EP017164160049
EP019203190027
EP020968890002


## we add a csv of top 150 "top_150_malicus_prog.csv"

In [None]:
sorted_malicious_records = malicious_records.orderBy("prog_code").select("prog_code")
top_150_malicious_records = sorted_malicious_records.limit(150)
display(top_150_malicious_records)

prog_code
EP000000211576
EP000000211614
EP000000211639
EP000000211645
EP000000211646
EP000000211647
EP000000211648
EP000000211650
EP000000211654
EP000000211659


In [None]:
#END PART 1