In [0]:
import random
from itertools import combinations
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("EuroGroupStage").getOrCreate()

# Define the list of teams for each group
groups = {
    "A": ["Germany", "Scotland", "Hungary", "Switzerland"],
    "B": ["Spain", "Croatia", "Italy", "Albania"],
    "C": ["Slovenia", "Denmark", "Serbia", "England"],
    "D": ["France", "Netherlands", "Austria", "Finland"],
    "E": ["Belgium", "Slovakia", "Romania", "Iceland"],
    "F": ["Turkey", "Greece", "Portugal", "Czech Republic"] 
}

# Create an empty list to store match data
matches_data = []

# Iterate over each group
for group, teams in groups.items():
    # Generate all possible match combinations
    for team1, team2 in combinations(teams, 2):
        # Generate random scores for the match
        goals_team1 = random.randint(0, 5)
        goals_team2 = random.randint(0, 5)
        # Append match data to the list
        matches_data.append((group, team1, team2, goals_team1, goals_team2))

# Define the schema for the matches data
schema = ["group", "team1", "team2", "goals_team1", "goals_team2"]



# Create a DataFrame from the matches data
matches_df = spark.createDataFrame(matches_data, schema=schema)

display(matches_df)

# Sample data for demonstration
data = [
    ("A", "Germany", 0, 0, 0, 0, 0, 0, 0, 0),
    ("A", "Scotland", 0, 0, 0, 0, 0, 0, 0, 0),
    ("A", "Hungary", 0, 0, 0, 0, 0, 0, 0, 0),
    ("A", "Switzerland", 0, 0, 0, 0, 0, 0, 0, 0),
    ("B", "Spain", 0, 0, 0, 0, 0, 0, 0, 0),
    ("B", "Croatia", 0, 0, 0, 0, 0, 0, 0, 0),
    ("B", "Italy", 0, 0, 0, 0, 0, 0, 0, 0),
    ("B", "Albania", 0, 0, 0, 0, 0, 0, 0, 0),
    ("C", "Slovenia", 0, 0, 0, 0, 0, 0, 0, 0),
    ("C", "Denmark", 0, 0, 0, 0, 0, 0, 0, 0),
    ("C", "Serbia", 0, 0, 0, 0, 0, 0, 0, 0),
    ("C", "England", 0, 0, 0, 0, 0, 0, 0, 0),
    ("D", "France", 0, 0, 0, 0, 0, 0, 0, 0),
    ("D", "Netherlands", 0, 0, 0, 0, 0, 0, 0, 0),
    ("D", "Austria", 0, 0, 0, 0, 0, 0, 0, 0),
    ("D", "Finland", 0, 0, 0, 0, 0, 0, 0, 0),
    ("E", "Belgium", 0, 0, 0, 0, 0, 0, 0, 0),
    ("E", "Slovakia", 0, 0, 0, 0, 0, 0, 0, 0),
    ("E", "Romania", 0, 0, 0, 0, 0, 0, 0, 0),
    ("E", "Iceland", 0, 0, 0, 0, 0, 0, 0, 0),
    ("F", "Turkey", 0, 0, 0, 0, 0, 0, 0, 0),
    ("F", "Greece", 0, 0, 0, 0, 0, 0, 0, 0),
    ("F", "Portugal", 0, 0, 0, 0, 0, 0, 0, 0),
    ("F", "Czech Republic", 0, 0, 0, 0, 0, 0, 0, 0)
    # Add more data here
]

columns = ["group", "team_name", "matches_played", "wins", "draws", "losses", "goals_scored", "goals_conceded", "gd", "points"]

group_stage_results_df = spark.createDataFrame(data, columns)

# Update the group stage results DataFrame based on match results
# Update the group stage results DataFrame based on match results
updated_results_df = group_stage_results_df.alias("gsr").join(
    matches_df.alias("matches"),
    (F.col("gsr.group") == F.col("matches.group")) &
    ((F.col("gsr.team_name") == F.col("matches.team1")) | (F.col("gsr.team_name") == F.col("matches.team2"))),
    "left"
).withColumn(
    "matches_played",
    F.when((F.col("gsr.team_name") == F.col("matches.team1")) | (F.col("gsr.team_name") == F.col("matches.team2")),
           F.col("gsr.matches_played") + 1).otherwise(F.col("gsr.matches_played"))
).withColumn(
    "wins",
    F.when((F.col("gsr.team_name") == F.col("matches.team1")) & (F.col("matches.goals_team1") > F.col("matches.goals_team2")),
           F.col("gsr.wins") + 1)
    .when((F.col("gsr.team_name") == F.col("matches.team2")) & (F.col("matches.goals_team2") > F.col("matches.goals_team1")),
          F.col("gsr.wins") + 1)
    .otherwise(F.col("gsr.wins"))
).withColumn(
    "draws",
    F.when(((F.col("gsr.team_name") == F.col("matches.team1")) | (F.col("gsr.team_name") == F.col("matches.team2"))) &
           (F.col("matches.goals_team1") == F.col("matches.goals_team2")), F.col("gsr.draws") + 1)
    .otherwise(F.col("gsr.draws"))
).withColumn(
    "losses",
    F.when((F.col("gsr.team_name") == F.col("matches.team1")) & (F.col("matches.goals_team1") < F.col("matches.goals_team2")),
           F.col("gsr.losses") + 1)
    .when((F.col("gsr.team_name") == F.col("matches.team2")) & (F.col("matches.goals_team2") < F.col("matches.goals_team1")),
          F.col("gsr.losses") + 1)
    .otherwise(F.col("gsr.losses"))
).withColumn(
    "goals_scored",
    F.when(F.col("gsr.team_name") == F.col("matches.team1"), F.col("gsr.goals_scored") + F.col("matches.goals_team1")).otherwise(
        F.col("gsr.goals_scored"))
).withColumn(
    "goals_conceded",
    F.when(F.col("gsr.team_name") == F.col("matches.team1"), F.col("gsr.goals_conceded") + F.col("matches.goals_team2")).otherwise(
        F.col("gsr.goals_conceded"))
).withColumn(
    "gd",
    F.col("goals_scored") - F.col("goals_conceded")
).withColumn(
    "points",
    (F.col("wins") * 3) + F.col("draws")
).select("gsr.group", "gsr.team_name", "matches_played", "wins", "draws", "losses", "goals_scored", "goals_conceded", "gd", "points")

display(updated_results_df)


group,team1,team2,goals_team1,goals_team2
A,Germany,Scotland,3,2
A,Germany,Hungary,1,3
A,Germany,Switzerland,4,5
A,Scotland,Hungary,3,4
A,Scotland,Switzerland,3,4
A,Hungary,Switzerland,3,5
B,Spain,Croatia,5,4
B,Spain,Italy,5,2
B,Spain,Albania,5,3
B,Croatia,Italy,2,0


group,team_name,matches_played,wins,draws,losses,goals_scored,goals_conceded,gd,points
A,Germany,1,0,0,1,4,5,-1,0
A,Germany,1,0,0,1,1,3,-2,0
A,Germany,1,1,0,0,3,2,1,3
A,Scotland,1,0,0,1,3,4,-1,0
A,Scotland,1,0,0,1,3,4,-1,0
A,Scotland,1,0,0,1,0,0,0,0
A,Hungary,1,0,0,1,3,5,-2,0
A,Hungary,1,1,0,0,0,0,0,3
A,Hungary,1,1,0,0,0,0,0,3
A,Switzerland,1,1,0,0,0,0,0,3


In [0]:
from pyspark.sql.functions import col, sum

# Assuming updated_results_df is your DataFrame
filtered_df = updated_results_df.filter(col("matches_played").cast("int").isNotNull()) \
                                .filter(col("wins").cast("int").isNotNull()) \
                                .filter(col("draws").cast("int").isNotNull()) \
                                .filter(col("losses").cast("int").isNotNull()) \
                                .filter(col("goals_scored").cast("int").isNotNull()) \
                                .filter(col("goals_conceded").cast("int").isNotNull()) \
                                .filter(col("gd").cast("int").isNotNull()) \
                                .filter(col("points").cast("int").isNotNull())

# Aggregating the filtered DataFrame
aggregated_df = filtered_df.groupBy("team_name","group").agg(

    sum("matches_played").alias("matches_played"),
    sum("wins").alias("wins"),
    sum("draws").alias("draws"),
    sum("losses").alias("losses"),
    sum("goals_scored").alias("goals_scored"),
    sum("goals_conceded").alias("goals_conceded"),
    sum("gd").alias("gd"),
    sum("points").alias("points")
)

# Display the aggregated DataFrame
display(aggregated_df)


team_name,group,matches_played,wins,draws,losses,goals_scored,goals_conceded,gd,points
Germany,A,3,1,0,2,8,10,-2,3
Scotland,A,3,0,0,3,6,8,-2,0
Hungary,A,3,2,0,1,3,5,-2,6
Spain,B,3,3,0,0,15,9,6,9
Croatia,B,3,1,0,2,2,4,-2,3
Switzerland,A,3,3,0,0,0,0,0,9
Italy,B,3,0,0,3,3,5,-2,0
Albania,B,3,2,0,1,0,0,0,6
Slovenia,C,3,1,0,2,7,10,-3,3
Denmark,C,3,1,0,2,1,7,-6,3


#group_stage_Table

In [0]:
# Create a window specification to partition by group and order by points descending
window_spec = Window.partitionBy("group").orderBy(F.col("points").desc(), F.col("gd").desc())

# Add a row number column based on the window specification
ranked_df = aggregated_df.withColumn("rank", F.row_number().over(window_spec))
display(ranked_df)

team_name,group,matches_played,wins,draws,losses,goals_scored,goals_conceded,gd,points,rank
Switzerland,A,3,3,0,0,0,0,0,9,1
Hungary,A,3,2,0,1,3,5,-2,6,2
Germany,A,3,1,0,2,8,10,-2,3,3
Scotland,A,3,0,0,3,6,8,-2,0,4
Spain,B,3,3,0,0,15,9,6,9,1
Albania,B,3,2,0,1,0,0,0,6,2
Croatia,B,3,1,0,2,2,4,-2,3,3
Italy,B,3,0,0,3,3,5,-2,0,4
Serbia,C,3,2,1,0,5,5,0,7,1
England,C,3,1,1,1,0,0,0,4,2


In [0]:


# Filter the ranked DataFrame to select the top 2 teams from each group
top2_teams_df = ranked_df.filter(F.col("rank") <= 2)

# Select the third-placed team from each group
third_place_df = ranked_df.filter(F.col("rank") == 3)

# Select the top 4 teams overall based on points
top_4_teams_df = third_place_df.orderBy(F.col("points").desc(), F.col("gd").desc()).limit(4)

# Combine the third-placed teams and the top 4 teams overall
group_stage_selected_teams = top2_teams_df.union(top_4_teams_df)

display(group_stage_selected_teams)


team_name,group,matches_played,wins,draws,losses,goals_scored,goals_conceded,gd,points,rank
Switzerland,A,3,3,0,0,0,0,0,9,1
Hungary,A,3,2,0,1,3,5,-2,6,2
Spain,B,3,3,0,0,15,9,6,9,1
Albania,B,3,2,0,1,0,0,0,6,2
Serbia,C,3,2,1,0,5,5,0,7,1
England,C,3,1,1,1,0,0,0,4,2
Netherlands,D,3,3,0,0,10,4,6,9,1
France,D,3,2,0,1,3,6,-3,6,2
Romania,E,3,3,0,0,5,1,4,9,1
Slovakia,E,3,2,0,1,4,5,-1,6,2


In [0]:
group_stage_selected_teams = group_stage_selected_teams.withColumn("Group_no", F.concat(col("rank"),col("group")))
group_stage_selected_teams = group_stage_selected_teams.select("team_name","group","Group_no","matches_played","wins","draws","losses","goals_scored","goals_conceded","gd","points","rank")
display(group_stage_selected_teams)
group_stage_selected_teams.show()
group_stage_selected_teams.createOrReplaceTempView("group_stage_selected_teams")

team_name,group,Group_no,matches_played,wins,draws,losses,goals_scored,goals_conceded,gd,points,rank
Switzerland,A,1A,3,3,0,0,0,0,0,9,1
Hungary,A,2A,3,2,0,1,3,5,-2,6,2
Spain,B,1B,3,3,0,0,15,9,6,9,1
Albania,B,2B,3,2,0,1,0,0,0,6,2
Serbia,C,1C,3,2,1,0,5,5,0,7,1
England,C,2C,3,1,1,1,0,0,0,4,2
Netherlands,D,1D,3,3,0,0,10,4,6,9,1
France,D,2D,3,2,0,1,3,6,-3,6,2
Romania,E,1E,3,3,0,0,5,1,4,9,1
Slovakia,E,2E,3,2,0,1,4,5,-1,6,2


+--------------+-----+--------+--------------+----+-----+------+------------+--------------+---+------+----+
|     team_name|group|Group_no|matches_played|wins|draws|losses|goals_scored|goals_conceded| gd|points|rank|
+--------------+-----+--------+--------------+----+-----+------+------------+--------------+---+------+----+
|   Switzerland|    A|      1A|             3|   3|    0|     0|           0|             0|  0|     9|   1|
|       Hungary|    A|      2A|             3|   2|    0|     1|           3|             5| -2|     6|   2|
|         Spain|    B|      1B|             3|   3|    0|     0|          15|             9|  6|     9|   1|
|       Albania|    B|      2B|             3|   2|    0|     1|           0|             0|  0|     6|   2|
|        Serbia|    C|      1C|             3|   2|    1|     0|           5|             5|  0|     7|   1|
|       England|    C|      2C|             3|   1|    1|     1|           0|             0|  0|     4|   2|
|   Netherlands|   

# group Stage Match logic

In [0]:
matches_list = [
    ["3A", "3D", "3B", "3C"],
    ["3A", "3E", "3B", "3C"],
    ["3A", "3F", "3B", "3C"],
    ["3D", "3E", "3A", "3B"],
    ["3D", "3F", "3A", "3B"],
    ["3E", "3F", "3B", "3A"],
    ["3E", "3D", "3C", "3A"],
    ["3F", "3D", "3C", "3A"],
    ["3E", "3F", "3C", "3A"],
    ["3E", "3F", "3D", "3A"],
    ["3E", "3D", "3B", "3C"],
    ["3F", "3D", "3C", "3B"],
    ["3F", "3E", "3C", "3B"],
    ["3F", "3E", "3D", "3B"],
    ["3F", "3E", "3D", "3C"]
]

no_3=group_stage_selected_teams.filter(F.col("rank") == 3)
display(no_3)
no_3_group = no_3.select(F.col("Group_no"))
no_3_group.show()
no_3_group_list=[row.Group_no for row in no_3_group.collect()]
print(no_3_group_list)

for i in range (0,len(matches_list)):
    if sorted(no_3_group_list) == sorted(matches_list[i]):
        team2_list=matches_list[i]
        break
print(team2_list)

team1_list = ['1B','1C','1E','1F']

# Combine the lists into a list of tuples
data = list(zip(team1_list, team2_list))

# Create a DataFrame from the list of tuples
df1 = spark.createDataFrame(data, ["Team1_id", "Team2_id"])
display(df1)
df1.show()

team3_list =['1A','2D','1D','2A']
team4_list =['2C','2E','2F','2B']
data2 = list(zip(team3_list, team4_list))

# Create a DataFrame from the list of tuples
df2 = spark.createDataFrame(data2, ["Team3_id", "Team4_id"])
display(df2)
df2.show()



team_name,group,Group_no,matches_played,wins,draws,losses,goals_scored,goals_conceded,gd,points,rank
Austria,D,3D,3,1,0,2,4,3,1,3,3
Iceland,E,3E,3,1,0,2,0,0,0,3,3
Germany,A,3A,3,1,0,2,8,10,-2,3,3
Croatia,B,3B,3,1,0,2,2,4,-2,3,3


+--------+
|Group_no|
+--------+
|      3D|
|      3E|
|      3A|
|      3B|
+--------+

['3D', '3E', '3A', '3B']
['3D', '3E', '3A', '3B']


Team1_id,Team2_id
1B,3D
1C,3E
1E,3A
1F,3B


+--------+--------+
|Team1_id|Team2_id|
+--------+--------+
|      1B|      3D|
|      1C|      3E|
|      1E|      3A|
|      1F|      3B|
+--------+--------+



Team3_id,Team4_id
1A,2C
2D,2E
1D,2F
2A,2B


+--------+--------+
|Team3_id|Team4_id|
+--------+--------+
|      1A|      2C|
|      2D|      2E|
|      1D|      2F|
|      2A|      2B|
+--------+--------+



#GROUP STAGE MATCHES

In [0]:
from pyspark.sql.functions import col

# Join with group_stage_selected_teams to get team names for df1
df1_team_names = df1.join(
    group_stage_selected_teams.withColumnRenamed("Group_no", "Team1_id").withColumnRenamed("Team_name", "Team1_name"),
    "Team1_id", "left"
).join(
    group_stage_selected_teams.withColumnRenamed("Group_no", "Team2_id").withColumnRenamed("Team_name", "Team2_name"),
    "Team2_id", "left"
).select("Team1_name", "Team2_name")

# Join with group_stage_selected_teams to get team names for df2
df2_team_names = df2.join(
    group_stage_selected_teams.withColumnRenamed("Group_no", "Team3_id").withColumnRenamed("Team_name", "Team3_name"),
    "Team3_id", "left"
).join(
    group_stage_selected_teams.withColumnRenamed("Group_no", "Team4_id").withColumnRenamed("Team_name", "Team4_name"),
    "Team4_id", "left"
).select("Team3_name", "Team4_name")

df1_team_names.show()
df2_team_names.show()


+----------+----------+
|Team1_name|Team2_name|
+----------+----------+
|     Spain|   Austria|
|    Serbia|   Iceland|
|   Romania|   Germany|
|    Greece|   Croatia|
+----------+----------+

+-----------+--------------+
| Team3_name|    Team4_name|
+-----------+--------------+
|Switzerland|       England|
|     France|      Slovakia|
|Netherlands|Czech Republic|
|    Hungary|       Albania|
+-----------+--------------+



In [0]:
window_spec2=Window.orderBy(F.monotonically_increasing_id())


# GROUP SATGE WINNERS

In [0]:
# Simulate matches and decide winners
winners_knokout_df1 = df1_team_names.withColumn("winner1", F.when(F.rand() > 0.5, F.col("Team1_name")).otherwise(F.col("Team2_name")))

display(winners_knokout_df1)

# Select the winners of each match
winners_knokout_df1 = winners_knokout_df1.select("winner1")

winners_knokout_df1 = winners_knokout_df1.withColumn("id",F.row_number().over(window_spec2)-1)

# Create temporary view for further query
winners_knokout_df1.createOrReplaceTempView("winners_knokout_df1")

winners_knokout_df1.show()

# Simulate matches and decide winners
winners_knokout_df2 = df2_team_names.withColumn("winner2", F.when(F.rand() > 0.5, F.col("Team3_name")).otherwise(F.col("Team4_name")))

display(winners_knokout_df2)

# Select the winners of each match
winners_knokout_df2 = winners_knokout_df2.select("winner2")

winners_knokout_df2 = winners_knokout_df2.withColumn("id", F.row_number().over(window_spec2)-1)

# Create temporary view for further query
winners_knokout_df2.createOrReplaceTempView("winners_knokout_df2")

winners_knokout_df2.show()



Team1_name,Team2_name,winner1
Spain,Austria,Spain
Serbia,Iceland,Iceland
Romania,Germany,Romania
Greece,Croatia,Greece


+-------+---+
|winner1| id|
+-------+---+
|  Spain|  0|
|Iceland|  1|
|Romania|  2|
| Greece|  3|
+-------+---+



Team3_name,Team4_name,winner2
Switzerland,England,England
France,Slovakia,Slovakia
Netherlands,Czech Republic,Czech Republic
Hungary,Albania,Hungary


+--------------+---+
|       winner2| id|
+--------------+---+
|       England|  0|
|      Slovakia|  1|
|Czech Republic|  2|
|       Hungary|  3|
+--------------+---+



#QUARTER FINAL MATCHES

In [0]:
quarter_final_df=winners_knokout_df1.join(winners_knokout_df2 ,winners_knokout_df1.id == winners_knokout_df2.id)
quarter_final_df.show()

+-------+---+--------------+---+
|winner1| id|       winner2| id|
+-------+---+--------------+---+
|  Spain|  0|       England|  0|
|Iceland|  1|      Slovakia|  1|
|Romania|  2|Czech Republic|  2|
| Greece|  3|       Hungary|  3|
+-------+---+--------------+---+



#QUARTER FINAL WINNERS

In [0]:
winners_quarter_final__df = quarter_final_df.withColumn("winner", F.when(F.rand() > 0.5, F.col("winner1")).otherwise(F.col("winner2")))

display(winners_quarter_final__df)

# Select the winners of each match
winners_quarter_final__df = winners_quarter_final__df.select("winner")

# Create temporary view for further query
winners_quarter_final__df.createOrReplaceTempView("winners_quarter_final__df")

winners_quarter_final__df.show()

winner1,id,winner2,id.1,winner
Spain,0,England,0,England
Iceland,1,Slovakia,1,Slovakia
Romania,2,Czech Republic,2,Czech Republic
Greece,3,Hungary,3,Greece


+--------------+
|        winner|
+--------------+
|       England|
|      Slovakia|
|Czech Republic|
|        Greece|
+--------------+



#SEMI FINAL MATCHES

In [0]:
# Assign a unique identifier to each row
shuffled_quarter_winners = winners_quarter_final__df.withColumn("id", F.row_number().over(window_spec2)-1)
shuffled_quarter_winners.show()

# Create a temporary view for further query
shuffled_quarter_winners.createOrReplaceTempView("shuffled_quarter_winners")

semi_final_matches_df = spark.sql(
    """
    SELECT
        t3.winner AS team1,
        t4.winner AS team2
    FROM
        shuffled_quarter_winners t3
    JOIN
        shuffled_quarter_winners t4
    ON
        t3.id = t4.id - 1
    WHERE
        t3.id % 2 = 0
    """
)

# Show the knockout matches DataFrame
semi_final_matches_df.show()

+--------------+---+
|        winner| id|
+--------------+---+
|       England|  0|
|      Slovakia|  1|
|Czech Republic|  2|
|        Greece|  3|
+--------------+---+

+--------------+--------+
|         team1|   team2|
+--------------+--------+
|       England|Slovakia|
|Czech Republic|  Greece|
+--------------+--------+



#FINALIST

In [0]:
Finalists_df = semi_final_matches_df.withColumn("winner", F.when(F.rand() > 0.5, F.col("team1")).otherwise(F.col("team2")))

display(Finalists_df)

# Select the winners of each match
Finalists_df = Finalists_df.select("winner")

# Create temporary view for further query


Finalists_df.show()

team1,team2,winner
England,Slovakia,England
Czech Republic,Greece,Czech Republic


+--------------+
|        winner|
+--------------+
|       England|
|Czech Republic|
+--------------+



#FINAL WINNER

In [0]:
Finalists_df = Finalists_df.withColumn("id", F.monotonically_increasing_id())

# Create a temporary view for further query
Finalists_df.createOrReplaceTempView("Finalists_df")

final_matches_df = spark.sql(
    """
    SELECT
        t3.winner AS team1,
        t4.winner AS team2
    FROM
        Finalists_df t3
    JOIN
        Finalists_df t4
    ON
        t3.id = t4.id - 1
    WHERE
        t3.id % 2 = 0
    """
)

final_matches_df = final_matches_df.withColumn("winner", F.when(F.rand() > 0.5, F.col("team1")).otherwise(F.col("team2")))
display(final_matches_df)

# Select the winners of each match
Winner = final_matches_df.select("winner")

Winner.show()

team1,team2,winner
England,Czech Republic,England


+-------+
| winner|
+-------+
|England|
+-------+

