In [1]:
# Import statements

import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, count, when, year, month, explode, array_contains, lower, array_join, to_date, concat_ws, expr, coalesce, lit
import pandas as pd
import matplotlib.pyplot as plt
import re
import string
import numpy as np

In [2]:
# Create or get the spark session

spark_session = SparkSession \
    .builder \
    .appName("s4768708_project") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

spark_session

24/05/17 04:56:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Load the dataset - All files

df_load = spark_session.read.json("/data/ProjectDatasetFacebookAU/*")

24/05/17 04:57:13 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

In [4]:
# Drop the duplicate records with respect to 'id' column

df_id_unique = df_load.dropDuplicates(subset=['id'])

In [5]:
# Search the specific columns for the election words and filter the dataset

columns_to_search = [
    'ad_creative_body',
    'ad_creative_bodies',
    'ad_creative_link_caption',
    'ad_creative_link_captions',
    'ad_creative_link_description',
    'ad_creative_link_descriptions',
    'ad_creative_link_title',
    'ad_creative_link_titles'
]

words_to_search = ['funds', 'people', 'party', 'state', 'government', 'adam', 'vote', 'leader', 'scott', 'australian', 'pm', 'campaign', 'election',
                   'anthony', 'policy', 'federal', 'morrison', 'parliament', 'greens', 'liberal', 'candidate', 'political', 'voting', 'albanese',
                   'labor', 'bandt']

# Create an empty DataFrame to collect the filtered results
filtered_df = spark_session.createDataFrame([], df_id_unique.schema)

# Loop through each word to search for
for word in words_to_search:
    # Initialize the filter condition
    word_filter = None
    # Loop through each column to search in
    for column in columns_to_search:
        # Apply lowercase transformation and concatenate array elements into a single string
        if '_bodies' in column or '_captions' in column or '_descriptions' in column or '_titles' in column:
            concatenated_string = lower(array_join(col(column), " "))
        else:
            concatenated_string = lower(col(column))
        # Check if the concatenated string contains the search word
        condition = concatenated_string.contains(word.lower())
        # Union the filtered results with the existing DataFrame
        if word_filter is None:
            word_filter = condition
        else:
            word_filter = word_filter | condition
    # Filter the DataFrame based on the current word and union the results
    filtered_df = filtered_df.union(df_id_unique.filter(word_filter))

print("\nTotal Count:")
print(filtered_df.count())


Total Count:


24/05/17 04:58:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 8:>                                                          (0 + 1) / 1]

2336237


24/05/17 05:01:23 WARN BlockManagerMasterEndpoint: No more replicas available for broadcast_6_piece0 !
24/05/17 05:01:23 WARN BlockManagerMaster: Failed to remove broadcast 6 with removeFromMaster = true - org.apache.spark.SparkException: Could not find BlockManagerEndpoint1.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:178)
	at org.apache.spark.rpc.netty.Dispatcher.postRemoteMessage(Dispatcher.scala:136)
	at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:683)
	at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163)
	at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(

In [6]:
# Drop the duplicate records with respect to 'id' column on the filtered_df

filtered_df_id_unique = filtered_df.dropDuplicates(subset=['id'])
print("\nTotal Count:")
print(filtered_df_id_unique.count())


Total Count:




1499362


                                                                                

In [7]:
# Filter the DataFrame based on the ad_creation_time column

election_df = filtered_df_id_unique.filter(to_date(col("ad_creation_time"), "yyyy-MM-dd") < "2022-07-01")
election_df = election_df.dropDuplicates(subset=['id'])
print("\nTotal Count:")
print(election_df.count())


Total Count:


[Stage 28:>                                                         (0 + 1) / 1]

162042


                                                                                

In [8]:
# Check the null values in each column

missing_counts_election = election_df.select([count(when(col(c).isNull(), c)).alias(c) for c in election_df.columns]).toPandas().T
print(missing_counts_election)

24/05/17 05:12:22 WARN DAGScheduler: Broadcasting large task binary with size 1805.5 KiB
24/05/17 05:14:33 WARN DAGScheduler: Broadcasting large task binary with size 1153.6 KiB
                                                                                

                                    0
ad_creation_time                    0
ad_creative_bodies             156726
ad_creative_body                 6134
ad_creative_link_caption        57088
ad_creative_link_captions      157700
ad_creative_link_description    77012
ad_creative_link_descriptions  158588
ad_creative_link_title          54675
ad_creative_link_titles        157715
ad_delivery_start_time              0
ad_delivery_stop_time          157257
ad_snapshot_url                     0
bylines                        156712
currency                            0
delivery_by_region             157734
demographic_distribution        13995
estimated_audience_size        156704
funding_entity                    247
id                                  0
impressions                         0
languages                      156709
page_id                             0
page_name                           0
publisher_platforms            156683
region_distribution             13995
spend       

In [13]:
# Check the top 50 pages with count

page_counts = election_df.groupBy("page_id", "page_name").agg(count("*").alias("count"))
page_counts = page_counts.orderBy("count", ascending=False)
page_counts.select("page_id", "page_name", "count").show(150,truncate=False)



+----------------+--------------------------------------------------------+-----+
|page_id         |page_name                                               |count|
+----------------+--------------------------------------------------------+-----+
|9480502970      |Greenpeace Australia Pacific                            |6027 |
|307341981788    |Australian Labor Party                                  |4855 |
|157481181070130 |Indigenous Employment Australia                         |3559 |
|13561467463     |Liberal Party of Australia                              |3409 |
|49453745631     |Amnesty International Australia                         |2919 |
|149648047339    |Save the Children Australia                             |1983 |
|316480331783930 |Australian Unions                                       |1797 |
|104389532222332 |Solutions                                               |1649 |
|397230473694314 |Queensland Labor                                        |1405 |
|341910905847062

                                                                                

In [14]:
# Filter the ads for Labor, Liberal, and Greens parties from the top 50 pages

labor_page_ids = ['307341981788', '397230473694314', '537762462924537' '464998863536680', '427125817373739', '113830088678537',
                  '132800873768451', '1501550403427903', '174666115898903', '136238709722696', '146809615402166', '186985648081724',
                  '324660077589275', '581391495214301', '101445619187940', '2063329587276680', '1609771445950238', '426913580659855',
                  '1767975046782781', '665017740294128', '185738798123705', '701178440090122', '1692243401039740', '144449965668596',
                  '545594388945678', '119246428679', '726565307466774', '123434864414732', '590302731315349', '160089444042253',
                  '134712696593275']
liberal_page_ids = ['13561467463', '352953834815624' '44701328033', '100797488277640', '137619162220', '292602604891', '214559598616472',
                    '185917314792939', '162107547181406', '275866669090583', '1510561608994505', '278403342281472', '116868855036962',
                    '424864151385105', '451516984861373', '158532174266173', '32334589553', '101927318283875', '302386636554', '62266509441',
                    '2232624883416629', '145262132264710', '164045887863946', '435267276610362']
greens_page_ids = ['108695110952294' '119931654696956', '21551986502', '22746335927', '196804953777923', '45587531822', '424862620880565']

# Filter the DataFrame based on the page IDs for each party
labor_df = election_df.filter(col("page_id").isin(labor_page_ids))
liberal_df = election_df.filter(col("page_id").isin(liberal_page_ids))
greens_df = election_df.filter(col("page_id").isin(greens_page_ids))

print("\nCounts for labor party:")
print(labor_df.count())
print("\nCounts for liberal party:")
print(liberal_df.count())
print("\nCounts for greens party:")
print(greens_df.count())


Counts for labor party:


                                                                                

14600

Counts for liberal party:


                                                                                

9603

Counts for greens party:




1160


                                                                                

In [15]:
from pyspark.sql.functions import col, lit, date_format
from pyspark.sql import Window
import pyspark.sql.functions as F

# Function to calculate YYYY-MM counts and add party name
def calculate_counts_with_party(df, party_name):
    counts = (
        df.withColumn("month-year", date_format(col("ad_creation_time"), "yyyy-MM"))
        .groupBy("month-year")
        .agg(F.count("*").alias("count"))
        .withColumn("party", lit(party_name))
        .select("month-year", "count", "party")
        .orderBy("month-year")
    )
    return counts

# Calculate counts for each DataFrame
labor_counts = calculate_counts_with_party(labor_df, "labor")
liberal_counts = calculate_counts_with_party(liberal_df, "liberal")
greens_counts = calculate_counts_with_party(greens_df, "greens")
election_counts = calculate_counts_with_party(election_df, "election")

# Union the DataFrames
combined_counts = labor_counts.union(liberal_counts).union(greens_counts).union(election_counts)

# Show the results
combined_counts.show(200)

24/05/17 06:38:11 WARN DAGScheduler: Broadcasting large task binary with size 2040.9 KiB
24/05/17 06:38:11 WARN DAGScheduler: Broadcasting large task binary with size 2040.9 KiB


+----------+-----+--------+
|month-year|count|   party|
+----------+-----+--------+
|   2020-03|   55|   labor|
|   2020-04|   13|   labor|
|   2020-05|   31|   labor|
|   2020-06|   41|   labor|
|   2020-07|   16|   labor|
|   2020-08|  169|   labor|
|   2020-09|  584|   labor|
|   2020-10|  846|   labor|
|   2020-11|  139|   labor|
|   2020-12|   63|   labor|
|   2021-01|   88|   labor|
|   2021-02|  501|   labor|
|   2021-03|  506|   labor|
|   2021-04|  265|   labor|
|   2021-05|  433|   labor|
|   2021-06| 1109|   labor|
|   2021-07|  774|   labor|
|   2021-08|  897|   labor|
|   2021-09|  358|   labor|
|   2021-10|  231|   labor|
|   2021-11|  193|   labor|
|   2021-12|  129|   labor|
|   2022-01|   99|   labor|
|   2022-02|  385|   labor|
|   2022-03|  909|   labor|
|   2022-04| 2130|   labor|
|   2022-05| 3570|   labor|
|   2022-06|   66|   labor|
|   2020-03|   16| liberal|
|   2020-04|    3| liberal|
|   2020-05|    8| liberal|
|   2020-06|    5| liberal|
|   2020-07|   11| l

In [16]:
# Check the currencies used by each party

distinct_currencies_labor = labor_df.select("currency").distinct()
print("\nCurrencies for Labor Party:")
distinct_currencies_labor.show()

distinct_currencies_liberal = liberal_df.select("currency").distinct()
print("\nCurrencies for Liberal Party:")
distinct_currencies_liberal.show()

distinct_currencies_greens = greens_df.select("currency").distinct()
print("\nCurrencies for Greens Party:")
distinct_currencies_greens.show()


Currencies for Labor Party:


                                                                                

+--------+
|currency|
+--------+
|     AUD|
+--------+


Currencies for Liberal Party:


                                                                                

+--------+
|currency|
+--------+
|     AUD|
+--------+


Currencies for Greens Party:


[Stage 223:>                                                        (0 + 1) / 1]

+--------+
|currency|
+--------+
|     AUD|
+--------+



                                                                                

In [17]:
#Calculate the spend for each party with lower_bound and upper_bound

# Replace None values with 0 before summing
labor_df = labor_df.withColumn("spend_lower_bound", when(col("spend.lower_bound").isNull(), 0).otherwise(col("spend.lower_bound")))
labor_df = labor_df.withColumn("spend_upper_bound", when(col("spend.upper_bound").isNull(), 0).otherwise(col("spend.upper_bound")))
liberal_df = liberal_df.withColumn("spend_lower_bound", when(col("spend.lower_bound").isNull(), 0).otherwise(col("spend.lower_bound")))
liberal_df = liberal_df.withColumn("spend_upper_bound", when(col("spend.upper_bound").isNull(), 0).otherwise(col("spend.upper_bound")))
greens_df = greens_df.withColumn("spend_lower_bound", when(col("spend.lower_bound").isNull(), 0).otherwise(col("spend.lower_bound")))
greens_df = greens_df.withColumn("spend_upper_bound", when(col("spend.upper_bound").isNull(), 0).otherwise(col("spend.upper_bound")))

# Calculate the sum of lower_bound and upper_bound values
labor_spend_lower = labor_df.selectExpr("sum(CAST(spend_lower_bound AS FLOAT)) AS spend_lower_bound").collect()[0]["spend_lower_bound"]
labor_spend_upper = labor_df.selectExpr("sum(CAST(spend_upper_bound AS FLOAT)) AS spend_upper_bound").collect()[0]["spend_upper_bound"]
liberal_spend_lower = liberal_df.selectExpr("sum(CAST(spend_lower_bound AS FLOAT)) AS spend_lower_bound").collect()[0]["spend_lower_bound"]
liberal_spend_upper = liberal_df.selectExpr("sum(CAST(spend_upper_bound AS FLOAT)) AS spend_upper_bound").collect()[0]["spend_upper_bound"]
greens_spend_lower = greens_df.selectExpr("sum(CAST(spend_lower_bound AS FLOAT)) AS spend_lower_bound").collect()[0]["spend_lower_bound"]
greens_spend_upper = greens_df.selectExpr("sum(CAST(spend_upper_bound AS FLOAT)) AS spend_upper_bound").collect()[0]["spend_upper_bound"]

print("Overall Spend Labor:")
print(f"Spend Lower Bound: {labor_spend_lower}")
print(f"Spend Upper Bound: {labor_spend_upper}")
print("\nOverall Spend Liberal:")
print(f"Spend Lower Bound: {liberal_spend_lower}")
print(f"Spend Upper Bound: {liberal_spend_upper}")
print("\nOverall Spend Greens:")
print(f"Spend Lower Bound: {greens_spend_lower}")
print(f"Spend Upper Bound: {greens_spend_upper}")

[Stage 283:>                                                        (0 + 1) / 1]

Overall Spend Labor:
Spend Lower Bound: 1841600.0
Spend Upper Bound: 3451700.0

Overall Spend Liberal:
Spend Lower Bound: 1023500.0
Spend Upper Bound: 2086697.0

Overall Spend Greens:
Spend Lower Bound: 112800.0
Spend Upper Bound: 230140.0


                                                                                

In [18]:
#Calculate the impressions for each party with lower_bound and upper_bound

# Replace None values with 0 before summing
labor_df = labor_df.withColumn("impressions_lower_bound", when(col("impressions.lower_bound").isNull(), 0).otherwise(col("impressions.lower_bound")))
labor_df = labor_df.withColumn("impressions_upper_bound", when(col("impressions.upper_bound").isNull(), 0).otherwise(col("impressions.upper_bound")))
liberal_df = liberal_df.withColumn("impressions_lower_bound", when(col("impressions.lower_bound").isNull(), 0).otherwise(col("impressions.lower_bound")))
liberal_df = liberal_df.withColumn("impressions_upper_bound", when(col("impressions.upper_bound").isNull(), 0).otherwise(col("impressions.upper_bound")))
greens_df = greens_df.withColumn("impressions_lower_bound", when(col("impressions.lower_bound").isNull(), 0).otherwise(col("impressions.lower_bound")))
greens_df = greens_df.withColumn("impressions_upper_bound", when(col("impressions.upper_bound").isNull(), 0).otherwise(col("impressions.upper_bound")))

# Calculate the sum of lower_bound and upper_bound values
labor_impressions_lower = labor_df.selectExpr("sum(CAST(impressions_lower_bound AS FLOAT)) AS impressions_lower_bound").collect()[0]["impressions_lower_bound"]
labor_impressions_upper = labor_df.selectExpr("sum(CAST(impressions_upper_bound AS FLOAT)) AS impressions_upper_bound").collect()[0]["impressions_upper_bound"]
liberal_impressions_lower = liberal_df.selectExpr("sum(CAST(impressions_lower_bound AS FLOAT)) AS impressions_lower_bound").collect()[0]["impressions_lower_bound"]
liberal_impressions_upper = liberal_df.selectExpr("sum(CAST(impressions_upper_bound AS FLOAT)) AS impressions_upper_bound").collect()[0]["impressions_upper_bound"]
greens_impressions_lower = greens_df.selectExpr("sum(CAST(impressions_lower_bound AS FLOAT)) AS impressions_lower_bound").collect()[0]["impressions_lower_bound"]
greens_impressions_upper = greens_df.selectExpr("sum(CAST(impressions_upper_bound AS FLOAT)) AS impressions_upper_bound").collect()[0]["impressions_upper_bound"]

print("Overall impressions Labor:")
print(f"impressions Lower Bound: {labor_impressions_lower}")
print(f"impressions Upper Bound: {labor_impressions_upper}")
print("\nOverall impressions Liberal:")
print(f"impressions Lower Bound: {liberal_impressions_lower}")
print(f"impressions Upper Bound: {liberal_impressions_upper}")
print("\nOverall impressions Greens:")
print(f"impressions Lower Bound: {greens_impressions_lower}")
print(f"impressions Upper Bound: {greens_impressions_upper}")

[Stage 343:>                                                        (0 + 1) / 1]

Overall impressions Labor:
impressions Lower Bound: 146548000.0
impressions Upper Bound: 185546400.0

Overall impressions Liberal:
impressions Lower Bound: 95521000.0
impressions Upper Bound: 119390398.0

Overall impressions Greens:
impressions Lower Bound: 9210000.0
impressions Upper Bound: 10638840.0


                                                                                

In [19]:
# Calculate the region distribution in Australia for each party

# List of desired regions
desired_regions = ['Northern Territory', 'South Australia', 'Victoria', 'Western Australia', 'New South Wales', 'Tasmania', 'Queensland',
                   'Australian Capital Territory']

# Explode the array column 'region_distribution' and select the 'percentage' and 'region'
labor_exploded_df = labor_df.select(explode('region_distribution').alias('region_info')).select(col('region_info.percentage').cast('float').alias('percentage'), col('region_info.region'))
liberal_exploded_df = liberal_df.select(explode('region_distribution').alias('region_info')).select(col('region_info.percentage').cast('float').alias('percentage'), col('region_info.region'))
greens_exploded_df = greens_df.select(explode('region_distribution').alias('region_info')).select(col('region_info.percentage').cast('float').alias('percentage'), col('region_info.region'))

# Group by 'region' and calculate the sum of 'percentage' for each region
labor_region_sum_df = labor_exploded_df.groupBy('region').sum('percentage').withColumnRenamed('sum(percentage)', 'total_percentage')
labor_region_sum_df = labor_region_sum_df.filter(col('region').isin(desired_regions))
liberal_region_sum_df = liberal_exploded_df.groupBy('region').sum('percentage').withColumnRenamed('sum(percentage)', 'total_percentage')
liberal_region_sum_df = liberal_region_sum_df.filter(col('region').isin(desired_regions))
greens_region_sum_df = greens_exploded_df.groupBy('region').sum('percentage').withColumnRenamed('sum(percentage)', 'total_percentage')
greens_region_sum_df = greens_region_sum_df.filter(col('region').isin(desired_regions))

# Calculate proportions
labor_total_records = labor_df.count()
labor_region_sum_df = labor_region_sum_df.withColumn('proportion', (col('total_percentage') / labor_total_records) * 100)
liberal_total_records = liberal_df.count()
liberal_region_sum_df = liberal_region_sum_df.withColumn('proportion', (col('total_percentage') / liberal_total_records) * 100)
greens_total_records = greens_df.count()
greens_region_sum_df = greens_region_sum_df.withColumn('proportion', (col('total_percentage') / greens_total_records) * 100)

# Show the result
print("Labor Party")
labor_region_sum_df.show()
print("\nLiberal Party")
liberal_region_sum_df.show()
print("\nGreens Party")
greens_region_sum_df.show()

                                                                                

Labor Party


                                                                                

+--------------------+------------------+------------------+
|              region|  total_percentage|        proportion|
+--------------------+------------------+------------------+
|  Northern Territory|113.11242304970074|0.7747426236280872|
|     South Australia|1344.4202780084306| 9.208358068550893|
|   Western Australia|1465.4811839285549|10.037542355675033|
|            Victoria| 2867.446322432081|19.640043304329325|
|     New South Wales|2657.3622772133076|18.201111487762383|
|Australian Capita...| 334.2918860398281|2.2896704523275897|
|            Tasmania| 587.7287128186676| 4.025539128894984|
|          Queensland| 4175.002119854453|28.595904930509953|
+--------------------+------------------+------------------+


Liberal Party


                                                                                

+--------------------+------------------+-------------------+
|              region|  total_percentage|         proportion|
+--------------------+------------------+-------------------+
|  Northern Territory| 44.06346596474759|0.45885104618085587|
|     South Australia| 759.1094859251316|  7.904920190827154|
|            Victoria|3133.5546965048316|   32.6309975685185|
|   Western Australia| 979.5935225816338| 10.200911408743453|
|     New South Wales|1346.9372750538137| 14.026213423449063|
|Australian Capita...|334.48254107577304| 3.4831046659978453|
|            Tasmania| 540.1817690333592|  5.625135572564399|
|          Queensland|1560.9603261929942|  16.25492373417676|
+--------------------+------------------+-------------------+


Greens Party


[Stage 403:>                                                        (0 + 1) / 1]

+--------------------+------------------+--------------------+
|              region|  total_percentage|          proportion|
+--------------------+------------------+--------------------+
|     South Australia| 175.6051660552621|  15.138376384074318|
|   Western Australia|326.58107901737094|   28.15354129460094|
|            Victoria| 401.8130439966917|   34.63905551695618|
|            Tasmania|189.62273600697517|  16.346787586808205|
|  Northern Territory|0.6769910037983209|0.058361293430889735|
|     New South Wales|16.067166937122238|   1.385100598027779|
|Australian Capita...|3.9232009779661894| 0.33820698085915424|
|          Queensland| 8.678931005299091|    0.74818370735337|
+--------------------+------------------+--------------------+



                                                                                

In [20]:
# Calculate demographic distribution percentages for each party:

# Explode the demographic_distribution array column
exploded_df_labor = labor_df.select(explode('demographic_distribution').alias('demographic_info')).select(col('demographic_info.percentage').cast('float').alias('percentage'), col('demographic_info.age'), col('demographic_info.gender'))
exploded_df_liberal = liberal_df.select(explode('demographic_distribution').alias('demographic_info')).select(col('demographic_info.percentage').cast('float').alias('percentage'), col('demographic_info.age'), col('demographic_info.gender'))
exploded_df_greens = greens_df.select(explode('demographic_distribution').alias('demographic_info')).select(col('demographic_info.percentage').cast('float').alias('percentage'), col('demographic_info.age'), col('demographic_info.gender'))

# Group by 'region' and calculate the sum of 'percentage' for each region
labor_age_sum_df = exploded_df_labor.groupBy('age').sum('percentage').withColumnRenamed('sum(percentage)', 'age_percentage')
labor_gender_sum_df = exploded_df_labor.groupBy('gender').sum('percentage').withColumnRenamed('sum(percentage)', 'gender_percentage')
liberal_age_sum_df = exploded_df_liberal.groupBy('age').sum('percentage').withColumnRenamed('sum(percentage)', 'age_percentage')
liberal_gender_sum_df = exploded_df_liberal.groupBy('gender').sum('percentage').withColumnRenamed('sum(percentage)', 'gender_percentage')
greens_age_sum_df = exploded_df_greens.groupBy('age').sum('percentage').withColumnRenamed('sum(percentage)', 'age_percentage')
greens_gender_sum_df = exploded_df_greens.groupBy('gender').sum('percentage').withColumnRenamed('sum(percentage)', 'gender_percentage')

# Calculate proportions
labor_total_records = labor_df.count()
labor_age_sum_df = labor_age_sum_df.withColumn('proportion', (col('age_percentage') / labor_total_records) * 100)
labor_gender_sum_df = labor_gender_sum_df.withColumn('proportion', (col('gender_percentage') / labor_total_records) * 100)
liberal_total_records = liberal_df.count()
liberal_age_sum_df = liberal_age_sum_df.withColumn('proportion', (col('age_percentage') / liberal_total_records) * 100)
liberal_gender_sum_df = liberal_gender_sum_df.withColumn('proportion', (col('gender_percentage') / liberal_total_records) * 100)
greens_total_records = greens_df.count()
greens_age_sum_df = greens_age_sum_df.withColumn('proportion', (col('age_percentage') / greens_total_records) * 100)
greens_gender_sum_df = greens_gender_sum_df.withColumn('proportion', (col('gender_percentage') / greens_total_records) * 100)

# Show the result
print("Labor Party")
labor_age_sum_df.show()
labor_gender_sum_df.show()
print("\nLiberal Party")
liberal_age_sum_df.show()
liberal_gender_sum_df.show()
print("\nGreens Party")
greens_age_sum_df.show()
greens_gender_sum_df.show()

                                                                                

Labor Party


                                                                                

+-------+-------------------+--------------------+
|    age|     age_percentage|          proportion|
+-------+-------------------+--------------------+
|  45-54| 1866.0515291968186|  12.781174857512456|
|  13-17|  30.83838291329039|  0.2112218007759616|
|  55-64| 1606.2959688738447|   11.00202718406743|
|  35-44|  2535.185001647318|   17.36428083320081|
|  25-34| 3313.2889317251393|   22.69375980633657|
|    65+| 1449.7863172444486|   9.930043268797593|
|  18-24| 2708.5463623627857|  18.551687413443737|
|Unknown|0.00802700001258927|5.497945214102239E-5|
+-------+-------------------+--------------------+



                                                                                

+-------+-----------------+------------------+
| gender|gender_percentage|        proportion|
+-------+-----------------+------------------+
|unknown|166.7748008788708|1.1422931567045944|
| female|6858.918190209606|46.978891713764426|
|   male|6548.307559650553|44.851421641442144|
+-------+-----------------+------------------+


Liberal Party


                                                                                

+-------+--------------------+--------------------+
|    age|      age_percentage|          proportion|
+-------+--------------------+--------------------+
|  45-54|  1311.6163566909236|  13.658402131531016|
|  13-17|  39.440139949254444|  0.4107064453738878|
|  55-64|  1154.6107796319975|  12.023438296698922|
|  35-44|  1571.2729201269976|  16.362313028501486|
|  25-34|  1874.8247922662304|  19.523323880727173|
|    65+|   1300.238342038674|    13.5399181718075|
|  18-24|   1408.975083311594|  14.672238709898927|
|Unknown|0.021722999939811416|2.262105585734813...|
+-------+--------------------+--------------------+



                                                                                

+-------+------------------+------------------+
| gender| gender_percentage|        proportion|
+-------+------------------+------------------+
|unknown| 95.66903797092482|0.9962411535033303|
| female|3911.1189245221135|40.728094600875906|
|   male| 4606.212220088084| 47.96638779639784|
+-------+------------------+------------------+


Greens Party


                                                                                

+-------+--------------------+--------------------+
|    age|      age_percentage|          proportion|
+-------+--------------------+--------------------+
|  45-54|  131.91149296266667|  11.371680427816093|
|  55-64|  113.35750110263325|    9.77219837091666|
|  35-44|   171.2055008074094|  14.759094897190467|
|  25-34|    329.717569674307|   28.42392842019888|
|    65+|  107.28985791302694|   9.249125682157494|
|  18-24|  269.38650195644004|   23.22297430658966|
|  13-17|   5.119106026932059| 0.44130224370103954|
|Unknown|0.012471999856643379|0.001075172401434774|
+-------+--------------------+--------------------+





+-------+-----------------+-----------------+
| gender|gender_percentage|       proportion|
+-------+-----------------+-----------------+
|unknown|16.34415897983854|1.408979222399874|
| female|591.9781848493585|51.03260214218608|
|   male|519.6776457217366|44.79979704497729|
+-------+-----------------+-----------------+



                                                                                

In [21]:
# Calculte the most focussed topics by each party in their ads

def preprocess_text(text):
    # Convert to lowercase
    text = text.lower()
    
    # Remove punctuations
    text = re.sub(r'[{}]'.format(string.punctuation), '', text)
    
    # Remove prepositions and common words
    common_words = ["i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "you're", "you've", "you'll", "you'd", "your",
                    "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "she's", "her", "hers", "herself", "it",
                    "it's", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this",
                    "that", "that'll", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had",
                    "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while",
                    "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above",
                    "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here",
                    "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no",
                    "nor", "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "don't", "should",
                    "should've", "now", "d", "ll", "m", "o", "re", "ve", "y", "ain", "aren", "aren't", "couldn", "couldn't", "didn", "didn't",
                    "doesn", "doesn't", "hadn", "hadn't", "hasn", "hasn't", "haven", "haven't", "isn", "isn't", "ma", "mightn", "mightn't",
                    "mustn", "mustn't", "needn", "needn't", "shan", "shan't", "shouldn", "shouldn't", "wasn", "wasn't", "weren", "weren't",
                    "won't", "wouldn", "wouldn't", "make", "need", "new", "1", "get", "better", "help", "us",
                    "time", "like", "care", "one", "–", "que", "want", "back", "today", "find", "year", "years",
                    "see", "action", "mp", "de", "sign", "keep", "know", "take", "would", "early", "ward", "strong", "first",
                    "also", "every", "many", "join", "✅", "day", "last", "could"]
    
    text = ' '.join(word for word in text.split() if word not in common_words)
    
    return text

def get_combined_text(row):
    text = ""
    for column in ['ad_creative_body', 'ad_creative_link_caption', 'ad_creative_link_description', 'ad_creative_link_title']:
        if row[column]:
            text += preprocess_text(row[column]) + " "
    
    for column in ['ad_creative_bodies', 'ad_creative_link_captions', 'ad_creative_link_descriptions', 'ad_creative_link_titles']:
        if row[column]:
            text += " ".join(preprocess_text(item) for item in row[column]) + " "
    
    return text.strip()

# Combine text from all relevant columns
combined_text_labor = labor_df.rdd.map(get_combined_text).filter(lambda x: x)
combined_text_liberal = liberal_df.rdd.map(get_combined_text).filter(lambda x: x)
combined_text_greens = greens_df.rdd.map(get_combined_text).filter(lambda x: x)

# Count word frequencies
word_counts_labor = combined_text_labor.flatMap(lambda text: text.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts_liberal = combined_text_liberal.flatMap(lambda text: text.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts_greens = combined_text_greens.flatMap(lambda text: text.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Get the top 50 words
top_words_labor = word_counts_labor.takeOrdered(50, key=lambda x: (-x[1], x[0]))
top_words_liberal = word_counts_liberal.takeOrdered(50, key=lambda x: (-x[1], x[0]))
top_words_greens = word_counts_greens.takeOrdered(50, key=lambda x: (-x[1], x[0]))

# Print the top 50 words
print("\nLabor Party:")
for word, count in top_words_labor:
    print(f"{word}: {count}")
print("\nLiberal Party:")
for word, count in top_words_liberal:
    print(f"{word}: {count}")
print("\nGreens Party:")
for word, count in top_words_greens:
    print(f"{word}: {count}")

24/05/17 09:49:07 WARN DAGScheduler: Broadcasting large task binary with size 1804.3 KiB
24/05/17 09:51:25 WARN DAGScheduler: Broadcasting large task binary with size 1804.3 KiB
24/05/17 09:53:27 WARN DAGScheduler: Broadcasting large task binary with size 1804.3 KiB
24/05/17 09:54:58 WARN DAGScheduler: Broadcasting large task binary with size 1144.6 KiB
24/05/17 09:55:04 WARN DAGScheduler: Broadcasting large task binary with size 1144.4 KiB
24/05/17 09:55:08 WARN DAGScheduler: Broadcasting large task binary with size 1145.3 KiB


Labor Party:
labor: 9056
morrison: 6280
plan: 5497
government: 4169
vote: 3862
scott: 3784
jobs: 3187
future: 3044
medicare: 2333
local: 2213
australians: 2100
australia: 2047
alporgau: 2022
community: 1953
election: 1946
labors: 1747
health: 1540
australian: 1536
support: 1478
federal: 1387
queensland: 1340
cheaper: 1283
pay: 1257
labor’s: 1218
state: 1141
bennelong: 1129
cut: 1064
child: 1059
build: 1054
workers: 1052
nomoremorrisoncom: 1044
covid19: 1041
good: 1019
people: 1014
deliver: 996
it’s: 996
families: 985
risk: 981
lnp: 936
secure: 898
economic: 867
work: 861
follow: 842
job: 825
facebookcom: 823
right: 823
liberals: 794
million: 788
create: 781
power: 776

Liberal Party:
liberal: 4327
labor: 4049
australia: 3444
plan: 2848
government: 2609
party: 2260
economy: 2091
stronger: 1735
local: 1699
vote: 1600
community: 1564
future: 1563
risk: 1500
australians: 1311
election: 1237
higher: 1235
can’t: 1177
democrats: 1138
jobs: 1108
times: 1096
read: 1088
support: 1075
page: 1010

                                                                                

In [23]:
# Stop the spark session

#spark_session.stop()