In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, avg, max, expr


# Create a SparkSession
spark = SparkSession.builder.appName("MatchingCode 6").getOrCreate()

gcs_path = "gs://genai-security-dataproc-dataset/meta-kaggle/Competitions.csv"
spark.sparkContext.setJobDescription("Reading competitions df")

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(gcs_path)

spark.sparkContext.setJobDescription("Reading episodes df")

episodes_df = spark.read.csv("gs://genai-security-dataproc-dataset/meta-kaggle/Episodes.csv", header=True, inferSchema=True)
spark.sparkContext.setJobDescription("Reading episode agents df")

episode_agents_df = spark.read.csv("gs://genai-security-dataproc-dataset/meta-kaggle/EpisodeAgents.csv", header=True, inferSchema=True)


24/11/03 17:17:21 WARN SparkConf: The configuration key 'spark.yarn.executor.failuresValidityInterval' has been deprecated as of Spark 3.5 and may be removed in the future. Please use the new key 'spark.executor.failuresValidityInterval' instead.
24/11/03 17:17:21 INFO SparkEnv: Registering MapOutputTracker
24/11/03 17:17:21 INFO SparkEnv: Registering BlockManagerMaster
24/11/03 17:17:21 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/11/03 17:17:21 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

In [61]:

df.printSchema()
spark.sparkContext.setJobDescription("Counting total rows in df")

count = df.count()
print(f"Total count: {count}")

from pyspark.sql.functions import col, desc, when
spark.sparkContext.setJobDescription("Selecting high competitor competitions")

# Select specific columns, apply a condition, and order the results
(df.select("Title", "TotalCompetitors", "TotalTeams")
    .where(col("TotalCompetitors") > 100)
    .orderBy(desc("TotalCompetitors"))).show(10)


spark.sparkContext.setJobDescription("Categorizing MaxDailySubmissions")

# Apply similar delay logic to a field from your schema, for example 'MaxDailySubmissions'
(df.select(
        "Title", 
        "MaxDailySubmissions",
        when(col("MaxDailySubmissions") > 100, 'Very High Submissions')
            .when((col("MaxDailySubmissions") > 50) & (col("MaxDailySubmissions") <= 100), 'High Submissions')
            .when((col("MaxDailySubmissions") > 10) & (col("MaxDailySubmissions") <= 50), 'Moderate Submissions')
            .when((col("MaxDailySubmissions") > 0) & (col("MaxDailySubmissions") <= 10), 'Low Submissions')
            .when(col("MaxDailySubmissions") == 0, 'No Submissions')
            .otherwise('Unknown')
            .alias("Submission_Category")
    )
    .orderBy(col("HostName"), desc("MaxDailySubmissions"))).show(10)

df.explain()



spark.sparkContext.setJobDescription("Calculate Submission Interaction")
# Calculate total interactions
df = df.withColumn("TotalInteractions", col("TotalCompetitors") * col("TotalTeams"))
df.select("Title", "TotalInteractions").show(10)

# spark.sparkContext.setJobDescription("Show Query Plan")
# df.explain()

spark.sparkContext.setJobDescription("Filter Competitions by Host")
filtered_df = df.filter(df.HostName.contains("kaggle"))
filtered_df.show(10)

root
 |-- Id: string (nullable = true)
 |-- Slug: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- HostSegmentTitle: string (nullable = true)
 |-- ForumId: string (nullable = true)
 |-- OrganizationId: string (nullable = true)
 |-- EnabledDate: string (nullable = true)
 |-- DeadlineDate: string (nullable = true)
 |-- ProhibitNewEntrantsDeadlineDate: string (nullable = true)
 |-- TeamMergerDeadlineDate: string (nullable = true)
 |-- TeamModelDeadlineDate: string (nullable = true)
 |-- ModelSubmissionDeadlineDate: string (nullable = true)
 |-- FinalLeaderboardHasBeenVerified: string (nullable = true)
 |-- HasKernels: string (nullable = true)
 |-- OnlyAllowKernelSubmissions: string (nullable = true)
 |-- HasLeaderboard: string (nullable = true)
 |-- LeaderboardPercentage: string (nullable = true)
 |-- ScoreTruncationNumDecimals: string (nullable = true)
 |-- EvaluationAlgorithmAbbreviation: string (nullable = true)
 |-- EvaluationA

                                                                                

In [62]:
spark.sparkContext.setJobDescription("Joining Episodes and EpisodeAgents datasets")

joined_df = episodes_df.join(episode_agents_df, episodes_df["Id"]==episode_agents_df["EpisodeId"], how="inner")

joined_df.show(10)

joined_df.explain()
from pyspark.sql.functions import broadcast
spark.sparkContext.setJobDescription("Joining Episodes and Competitions datasets without BHJ")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

joined_df_without_BHJ = episodes_df.join(df, df["Id"]==episodes_df["CompetitionId"], how="inner")
joined_df_without_BHJ.show(10)


spark.sparkContext.setJobDescription("Joining Episodes and Competitions datasets with BHJ")

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)  # 10MB by default

joined_df_BHJ = episodes_df.join(broadcast(df), df["Id"]==episodes_df["CompetitionId"], how="inner")

joined_df_BHJ.show(10)
joined_df_BHJ.explain()







                                                                                

+---+----+-------------+-------------------+-------------------+---+---------+-----+------+-----+------------+-----------------+------------+-----------------+------------+
| Id|Type|CompetitionId|         CreateTime|            EndTime| Id|EpisodeId|Index|Reward|State|SubmissionId|InitialConfidence|InitialScore|UpdatedConfidence|UpdatedScore|
+---+----+-------------+-------------------+-------------------+---+---------+-----+------+-----+------------+-----------------+------------+-----------------+------------+
| 65|   1|        17203|11/20/2019 22:35:02|11/20/2019 22:35:15|125|       65|    1|   0.0|    2|    13387710|             NULL|        NULL|             NULL|        NULL|
| 65|   1|        17203|11/20/2019 22:35:02|11/20/2019 22:35:15|120|       65|    0|   1.0|    2|    13387725|             NULL|        NULL|             NULL|        NULL|
|133|   1|        17203|11/20/2019 22:37:06|11/20/2019 22:37:38|267|      133|    1|   0.0|    2|    13385792|             NULL|       

                                                                                

+--------+----+-------------+-------------------+-------------------+-----+------------+------------+--------------------+----------------+-------+--------------+-------------------+-------------------+-------------------------------+----------------------+---------------------+---------------------------+-------------------------------+----------+--------------------------+--------------+---------------------+--------------------------+-------------------------------+-----------------------+------------------------------+------------------------+-------------------+--------------------+-----------+--------------+----------------+----------+--------------+---------+------------------+---------------+----------+----------------+----------------+-----------------+------------------+---------------------------+--------------------------------+--------+-----------------+-----------------+
|      Id|Type|CompetitionId|         CreateTime|            EndTime|   Id|        Slug|       Title| 

24/11/03 17:42:11 WARN SparkConf: The configuration key 'spark.yarn.executor.failuresValidityInterval' has been deprecated as of Spark 3.5 and may be removed in the future. Please use the new key 'spark.executor.failuresValidityInterval' instead.
[Stage 74:>                                                         (0 + 1) / 1]

+-------+----+-------------+-------------------+-------------------+-----+------+-------------------+--------------------+----------------+-------+--------------+-------------------+-------------------+-------------------------------+----------------------+---------------------+---------------------------+-------------------------------+----------+--------------------------+--------------+---------------------+--------------------------+-------------------------------+-----------------------+------------------------------+------------------------+-------------------+--------------------+-----------+--------------+----------------+----------+--------------+---------+------------------+---------------+----------+----------------+----------------+-----------------+------------------+---------------------------+--------------------------------+--------+-----------------+-----------------+
|     Id|Type|CompetitionId|         CreateTime|            EndTime|   Id|  Slug|              Title| 

                                                                                

In [63]:
joined_df_BHJ.printSchema()
from pyspark.sql.functions import avg, max, col
spark.sparkContext.setJobDescription("Calculate average and max for TotalSubmissions")

# Calculate average and max for TotalSubmissions and TotalInteractions by CompetitionTypeId
aggregated_df = joined_df_BHJ.groupBy("CompetitionTypeId").agg(
    avg(col("TotalSubmissions").cast("int")).alias("AvgTotalSubmissions"),
    max(col("TotalSubmissions").cast("int")).alias("MaxTotalSubmissions"),
    avg("TotalInteractions").alias("AvgTotalInteractions"),
    max("TotalInteractions").alias("MaxTotalInteractions")
)

aggregated_df.show()


root
 |-- Id: integer (nullable = true)
 |-- Type: integer (nullable = true)
 |-- CompetitionId: integer (nullable = true)
 |-- CreateTime: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Slug: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- HostSegmentTitle: string (nullable = true)
 |-- ForumId: string (nullable = true)
 |-- OrganizationId: string (nullable = true)
 |-- EnabledDate: string (nullable = true)
 |-- DeadlineDate: string (nullable = true)
 |-- ProhibitNewEntrantsDeadlineDate: string (nullable = true)
 |-- TeamMergerDeadlineDate: string (nullable = true)
 |-- TeamModelDeadlineDate: string (nullable = true)
 |-- ModelSubmissionDeadlineDate: string (nullable = true)
 |-- FinalLeaderboardHasBeenVerified: string (nullable = true)
 |-- HasKernels: string (nullable = true)
 |-- OnlyAllowKernelSubmissions: string (nullable = true)
 |-- HasLeaderboard: string (nullable

24/11/03 17:42:13 WARN SparkConf: The configuration key 'spark.yarn.executor.failuresValidityInterval' has been deprecated as of Spark 3.5 and may be removed in the future. Please use the new key 'spark.executor.failuresValidityInterval' instead.

+-----------------+-------------------+-------------------+--------------------+--------------------+
|CompetitionTypeId|AvgTotalSubmissions|MaxTotalSubmissions|AvgTotalInteractions|MaxTotalInteractions|
+-----------------+-------------------+-------------------+--------------------+--------------------+
|                1| 23577.825961191327|              48236|   1289479.634120139|           3014868.0|
+-----------------+-------------------+-------------------+--------------------+--------------------+



                                                                                

In [64]:
interaction_threshold = 50  # Define the threshold
spark.sparkContext.setJobDescription("Calculateinteractions per team ")

# Calculate interactions per team and filter based on threshold
high_interaction_df = joined_df_BHJ.withColumn(
    "InteractionsPerTeam",
    when(col("TotalTeams") > 0, col("TotalInteractions") / col("TotalTeams").cast("int")).otherwise(0)
).filter(col("InteractionsPerTeam") > interaction_threshold)

high_interaction_df.select("CompetitionId", "InteractionsPerTeam").show()

24/11/03 17:42:26 WARN SparkConf: The configuration key 'spark.yarn.executor.failuresValidityInterval' has been deprecated as of Spark 3.5 and may be removed in the future. Please use the new key 'spark.executor.failuresValidityInterval' instead.
[Stage 80:>                                                         (0 + 1) / 1]

+-------------+-------------------+
|CompetitionId|InteractionsPerTeam|
+-------------+-------------------+
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
|        18011|             1291.0|
+-------------+-------------------+
only showing top 20 rows



                                                                                

In [50]:
spark.stop()