In [0]:
import re
from typing import Dict, List, Tuple

from pyspark.sql import DataFrame

from pyspark.sql import functions as F

from pyspark.sql import Row

from pyspark.sql.window import Window

from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, ArrayType, IntegerType, BooleanType

from collections import Counter

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier,  LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.functions import vector_to_array


## Basic Task 1 - Video game sales data


Load the data from the CSV file into a data frame. The column headers and the first few data lines should give sufficient information about the source dataset. The numbers in the sales columns are given in millions.

Using the data, find answers to the following:

- Which publisher has the highest total sales in video games in North America considering games released in years 2006-2015?
- How many titles in total for this publisher do not have sales data available for North America considering games released in years 2006-2015?
- Separating games released in different years and considering only this publisher and only games released in years 2006-2015, what are the total sales, in North America and globally, for each year?
    - I.e., what are the total sales (in North America and globally) for games released by this publisher in year 2006? And the same for year 2007? ...


In [0]:
videogameDF: DataFrame = spark.read \
    .format("csv") \
    .option("delimiter", "|") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://shared@tunics320f2024gen2.dfs.core.windows.net/assignment/sales/video_game_sales.csv") \
    .select("Publisher", "release_date", "total_sales", "na_sales")


videogameDF = videogameDF.withColumn("Year", F.year(F.col("release_date"))) \
    .filter((F.col("Year") >= 2006) & (F.col("Year") <= 2015))

videogameDF = videogameDF.cache()


In [0]:
NAsaleswithpublisherDF = videogameDF.groupBy("Publisher") \
    .agg(F.sum("na_sales").alias("na_sales")) \
    .orderBy(F.col("na_sales").desc())

bestNAPublisher: str = NAsaleswithpublisherDF.first()["Publisher"]

bestNApublishergames = videogameDF.filter(F.col("Publisher") == bestNAPublisher)

titlesWithMissingSalesData: int = bestNApublishergames.filter(F.isnull(F.col("NA_Sales"))).count()

bestNAPublisherSales: DataFrame = bestNApublishergames.groupBy("Year") \
    .agg(
        F.round(F.sum("na_Sales"),2).alias("na_total"),
        F.round(F.sum("total_sales"),2).alias("global_total")
    ) \
    .orderBy("Year")

print(f"The publisher with the highest total video game sales in North America is: '{bestNAPublisher}'")
print(f"The number of titles with missing sales data for North America: {titlesWithMissingSalesData}")
print("Sales data for the publisher:")
bestNAPublisherSales.show()

The publisher with the highest total video game sales in North America is: 'Activision'
The number of titles with missing sales data for North America: 230
Sales data for the publisher:
+----+--------+------------+
|Year|na_total|global_total|
+----+--------+------------+
|2006|   14.55|       19.99|
|2007|    26.9|       42.11|
|2008|   39.21|       63.38|
|2009|   45.08|       74.95|
|2010|   37.92|       60.08|
|2011|   28.63|       51.29|
|2012|   23.08|        46.0|
|2013|   20.92|       39.64|
|2014|   21.51|       42.45|
|2015|   19.67|       38.98|
+----+--------+------------+



## Basic Task 2 - Event data from football matches

#### Background information

In the considered leagues, a season is played in a double round-robin format where each team plays against all other teams twice. Once as a home team in their own stadium and once as an away team in the other team's stadium. A season usually starts in August and ends in May.

Each league match consists of two halves of 45 minutes each. Each half runs continuously, meaning that the clock is not stopped when the ball is out of play. The referee of the match may add some additional time to each half based on game stoppages. \[[https://en.wikipedia.org/wiki/Association_football#90-minute_ordinary_time](https://en.wikipedia.org/wiki/Association_football#90-minute_ordinary_time)\]

The team that scores more goals than their opponent wins the match.

**Columns in the data**

Each row in the given data represents an event in a specific match. An event can be, for example, a pass, a foul, a shot, or a save attempt.

Simple explanations for the available columns. Not all of these will be needed in this assignment.

| column name | column type | description |
| ----------- | ----------- | ----------- |
| competition | string | The name of the competition |
| season | string | The season the match was played |
| matchId | integer | A unique id for the match |
| eventId | integer | A unique id for the event |
| homeTeam | string | The name of the home team |
| awayTeam | string | The name of the away team |
| event | string | The main category for the event |
| subEvent | string | The subcategory for the event |
| eventTeam | string | The name of the team that initiated the event |
| eventPlayerId | integer | The id for the player who initiated the event |
| eventPeriod | string | `1H` for events in the first half, `2H` for events in the second half |
| eventTime | double | The event time in seconds counted from the start of the half |
| tags | array of strings | The descriptions of the tags associated with the event |
| startPosition | struct | The event start position given in `x` and `y` coordinates in range \[0,100\] |
| enPosition | struct | The event end position given in `x` and `y` coordinates in range \[0,100\] |

The used event categories can be seen from `assignment/football/metadata/eventid2name.csv`.<br>
And all available tag descriptions from `assignment/football/metadata/tags2name.csv`.<br>
You don't need to access these files in the assignment, but they can provide context for the following basic tasks that will use the event data.

#### The task

In this task you should load the data with all the rows into a data frame. This data frame object will then be used in the following basic tasks 3-8.

In [0]:
eventDF: DataFrame = spark \
    .read \
    .parquet("abfss://shared@tunics320f2024gen2.dfs.core.windows.net/assignment/football/events.parquet") \
    .select("competition", "season", "matchId",  "homeTeam", "awayTeam","event", "eventTeam", "tags").cache() #since this is used again 

## Basic Task 3 - Calculate match results

Create a match data frame for all the matches included in the event data frame created in basic task 2.

The resulting data frame should contain one row for each match and include the following columns:

| column name   | column type | description |
| ------------- | ----------- | ----------- |
| matchId       | integer     | A unique id for the match |
| competition   | string      | The name of the competition |
| season        | string      | The season the match was played |
| homeTeam      | string      | The name of the home team |
| awayTeam      | string      | The name of the away team |
| homeTeamGoals | integer     | The number of goals scored by the home team |
| awayTeamGoals | integer     | The number of goals scored by the away team |

The number of goals scored for each team should be determined by the available event data.<br>
There are two events related to each goal:

- One event for the player that scored the goal. This includes possible own goals.
- One event for the goalkeeper that tried to stop the goal.

You need to choose which types of events you are counting.<br>
If you count both of the event types mentioned above, you will get double the amount of actual goals.

In [0]:
goalEventsDF = eventDF.filter(
    F.array_contains(F.col("tags"), "Goal") & (F.col("event") == "Save attempt") 
)

goalsPerMatchDF = goalEventsDF.groupBy("matchId", "eventTeam", "homeTeam", "awayTeam") \
    .agg(F.count("*").alias("totalGoals"))

In [0]:

matchGoalsAggregatedDF = goalsPerMatchDF.groupBy("matchId") \
    .agg(
        F.sum(F.when(F.col("eventTeam") == F.col("awayTeam"), F.col("totalGoals")).otherwise(0)).cast("int").alias("homeTeamGoals"),
        F.sum(F.when(F.col("eventTeam") == F.col("homeTeam"), F.col("totalGoals")).otherwise(0)).cast("int").alias("awayTeamGoals")
    )


matchDF = eventDF.select("matchId", "competition", "season", "homeTeam", "awayTeam").distinct().join(matchGoalsAggregatedDF, "matchId", "left")

matchDF = matchDF.fillna({"homeTeamGoals": 0, "awayTeamGoals": 0}).cache() #Since this is reused



## Basic Task 4 - Calculate team points in a season

Create a season data frame that uses the match data frame from the basic task 3 and contains aggregated seasonal results and statistics for all the teams in all leagues. While the used dataset only includes data from a single season for each league, the code should be written such that it would work even if the data would include matches from multiple seasons for each league.

###### Game result determination

- Team wins the match if they score more goals than their opponent.
- The match is considered a draw if both teams score equal amount of goals.
- Team loses the match if they score fewer goals than their opponent.

###### Match point determination

- The winning team gains 3 points from the match.
- Both teams gain 1 point from a drawn match.
- The losing team does not gain any points from the match.

The resulting data frame should contain one row for each team per league and season. It should include the following columns:

| column name    | column type | description |
| -------------- | ----------- | ----------- |
| competition    | string      | The name of the competition |
| season         | string      | The season |
| team           | string      | The name of the team |
| games          | integer     | The number of games the team played in the given season |
| wins           | integer     | The number of wins the team had in the given season |
| draws          | integer     | The number of draws the team had in the given season |
| losses         | integer     | The number of losses the team had in the given season |
| goalsScored    | integer     | The total number of goals the team scored in the given season |
| goalsConceded  | integer     | The total number of goals scored against the team in the given season |
| points         | integer     | The total number of points gained by the team in the given season |

In [0]:
teamResultsDF = matchDF.select(
    F.col("competition"),
    F.col("season"),
    F.col("homeTeam").alias("team"),
    F.col("awayTeam").alias("opponent"),
    F.col("homeTeamGoals").alias("goalsScored"),
    F.col("awayTeamGoals").alias("goalsConceded"),
    F.when(F.col("homeTeamGoals") > F.col("awayTeamGoals"), 3)  # Win
     .when(F.col("homeTeamGoals") == F.col("awayTeamGoals"), 1)  # Draw
     .otherwise(0).alias("points"),
    F.when(F.col("homeTeamGoals") > F.col("awayTeamGoals"), 1)  # Win
     .otherwise(0).alias("wins"),
    F.when(F.col("homeTeamGoals") == F.col("awayTeamGoals"), 1)  # Draw
     .otherwise(0).alias("draws"),
    F.when(F.col("homeTeamGoals") < F.col("awayTeamGoals"), 1)  # Loss
     .otherwise(0).alias("losses")
).union(
    matchDF.select(
        F.col("competition"),
        F.col("season"),
        F.col("awayTeam").alias("team"),
        F.col("homeTeam").alias("opponent"),
        F.col("awayTeamGoals").alias("goalsScored"),
        F.col("homeTeamGoals").alias("goalsConceded"),
        F.when(F.col("awayTeamGoals") > F.col("homeTeamGoals"), 3)  # Win
         .when(F.col("awayTeamGoals") == F.col("homeTeamGoals"), 1)  # Draw
         .otherwise(0).alias("points"),
        F.when(F.col("awayTeamGoals") > F.col("homeTeamGoals"), 1)  # Win
         .otherwise(0).alias("wins"),
        F.when(F.col("awayTeamGoals") == F.col("homeTeamGoals"), 1)  # Draw
         .otherwise(0).alias("draws"),
        F.when(F.col("awayTeamGoals") < F.col("homeTeamGoals"), 1)  # Loss
         .otherwise(0).alias("losses")
    )
)

seasonDF = teamResultsDF.groupBy("competition", "season", "team").agg(
    F.count("*").cast("int").alias("games"),
    F.sum("wins").cast("int").alias("wins"),
    F.sum("draws").cast("int").alias("draws"),
    F.sum("losses").cast("int").alias("losses"),
    F.sum("goalsScored").cast("int").alias("goalsScored"),
    F.sum("goalsConceded").cast("int").alias("goalsConceded"),
    F.sum("points").cast("int").alias("points")
).cache() #Since reuse this 

## Basic Task 5 - English Premier League table

Using the season data frame from basic task 4 calculate the final league table for `English Premier League` in season `2017-2018`.

The result should be given as data frame which is ordered by the team's classification for the season.

A team is classified higher than the other team if one of the following is true:

- The team has a higher number of total points than the other team
- The team has an equal number of points, but have a better goal difference than the other team
- The team has an equal number of points and goal difference, but have more goals scored in total than the other team

Goal difference is the difference between the number of goals scored for and against the team.

The resulting data frame should contain one row for each team.<br>
It should include the following columns (several columns renamed trying to match the [league table in Wikipedia](https://en.wikipedia.org/wiki/2017%E2%80%9318_Premier_League#League_table)):

| column name | column type | description |
| ----------- | ----------- | ----------- |
| Pos         | integer     | The classification of the team |
| Team        | string      | The name of the team |
| Pld         | integer     | The number of games played |
| W           | integer     | The number of wins |
| D           | integer     | The number of draws |
| L           | integer     | The number of losses |
| GF          | integer     | The total number of goals scored by the team |
| GA          | integer     | The total number of goals scored against the team |
| GD          | string      | The goal difference |
| Pts         | integer     | The total number of points gained by the team |

The goal difference should be given as a string with an added `+` at the beginning if the difference is positive, similarly to the table in the linked Wikipedia article.

In [0]:
englandDF: DataFrame = seasonDF.filter(
    (F.col("competition") == "English Premier League") & (F.col("season") == "2017-2018")
).withColumn(
    "GD",
    F.when((F.col("goalsScored") - F.col("goalsConceded")) > 0,
           F.concat(F.lit("+"), (F.col("goalsScored") - F.col("goalsConceded")).cast("string"))
    ).otherwise((F.col("goalsScored") - F.col("goalsConceded")).cast("string"))
)

englandDF = englandDF.select(
    F.col("team").alias("Team"),
    F.col("games").alias("Pld"),
    F.col("wins").alias("W"),
    F.col("draws").alias("D"),
    F.col("losses").alias("L"),
    F.col("goalsScored").alias("GF"),
    F.col("goalsConceded").alias("GA"),
    F.col("GD"),
    F.col("points").alias("Pts")
)

windowSpec = Window.orderBy(
    F.col("Pts").desc(),
    (F.col("GF") - F.col("GA")).desc(),
    F.col("GF").desc()
)

englandDF = englandDF.withColumn("Pos", F.row_number().over(windowSpec))

# Reorder columns
englandDF = englandDF.select(
    "Pos", "Team", "Pld", "W", "D", "L", "GF", "GA", "GD", "Pts"
)


print("English Premier League table for season 2017-2018")
englandDF.show(20, False)

English Premier League table for season 2017-2018
+---+----------------------+---+---+---+---+---+---+---+---+
|Pos|Team                  |Pld|W  |D  |L  |GF |GA |GD |Pts|
+---+----------------------+---+---+---+---+---+---+---+---+
|1  |Manchester City       |38 |32 |4  |2  |106|27 |+79|100|
|2  |Manchester United     |38 |25 |6  |7  |68 |28 |+40|81 |
|3  |Tottenham Hotspur     |38 |23 |8  |7  |74 |36 |+38|77 |
|4  |Liverpool             |38 |21 |12 |5  |84 |38 |+46|75 |
|5  |Chelsea               |38 |21 |7  |10 |62 |38 |+24|70 |
|6  |Arsenal               |38 |19 |6  |13 |74 |51 |+23|63 |
|7  |Burnley               |38 |14 |12 |12 |36 |39 |-3 |54 |
|8  |Everton               |38 |13 |10 |15 |44 |58 |-14|49 |
|9  |Leicester City        |38 |12 |11 |15 |56 |60 |-4 |47 |
|10 |Newcastle United      |38 |12 |8  |18 |39 |47 |-8 |44 |
|11 |Crystal Palace        |38 |11 |11 |16 |45 |55 |-10|44 |
|12 |AFC Bournemouth       |38 |11 |11 |16 |45 |61 |-16|44 |
|13 |West Ham United       |38 |10 

## Basic task 6: Calculate the number of passes

This task involves going back to the event data frame and counting the number of passes each team made in each match. A pass is considered successful if it is marked as `Accurate`.

Using the event data frame from basic task 2, calculate the total number of passes as well as the total number of successful passes for each team in each match.<br>
The resulting data frame should contain one row for each team in each match, i.e., two rows for each match. It should include the following columns:

| column name | column type | description |
| ----------- | ----------- | ----------- |
| matchId     | integer     | A unique id for the match |
| competition | string      | The name of the competition |
| season      | string      | The season |
| team        | string      | The name of the team |
| totalPasses | integer     | The total number of passes the team attempted in the match |
| successfulPasses | integer | The total number of successful passes made by the team in the match |

You can assume that each team had at least one pass attempt in each match they played.

In [0]:
matchPassDF: DataFrame = eventDF.filter(F.col("event") == "Pass").select(
    "matchId", "eventTeam", "competition", "season", 
    F.when(F.array_contains(F.col("tags"), "Accurate"), 1).otherwise(0).alias("isAccurate")
)

matchPassDF = matchPassDF.groupBy(
    "matchId", "eventTeam","competition", "season"
).agg(                  
    F.sum("isAccurate").cast("int").alias("successfulPasses") ,
    F.count("*").cast("int").alias("totalPasses")        
)

matchPassDF = matchPassDF.withColumnRenamed("eventTeam", "team")

## Basic Task 7: Teams with the worst passes

Using the match pass data frame from basic task 6 find the teams with the lowest average ratio for successful passes over the season `2017-2018` for each league.

The ratio for successful passes over a single match is the number of successful passes divided by the number of total passes.<br>
The average ratio over the season is the average of the single match ratios.

Give the result as a data frame that has one row for each league-team pair with the following columns:

| column name | column type | description |
| ----------- | ----------- | ----------- |
| competition | string      | The name of the competition |
| team        | string      | The name of the team |
| passSuccessRatio | double | The average ratio for successful passes over the season given as percentages rounded to two decimals |

Order the data frame so that the team with the lowest ratio for passes is given first.

In [0]:
matchPassDF = matchPassDF.filter(F.col("season") == "2017-2018").withColumn(
    "passSuccessRatio", (F.col("successfulPasses") / F.col("totalPasses") * 100).cast("double")
)


windowSpec = Window.partitionBy("competition").orderBy("passSuccessRatio")
lowestPassSuccessRatioDF = (
    matchPassDF.groupBy("competition", "team")
    .agg(F.round(F.avg("passSuccessRatio"), 2).alias("passSuccessRatio"))
    .withColumn("rank", F.row_number().over(windowSpec))
    .orderBy("rank", "passSuccessRatio")
    .drop("rank")
).cache() #Since this is used in task 8


print("The teams with the lowest ratios for successful passes for each league in season 2017-2018:")
lowestPassSuccessRatioDF.show(5, False)

The teams with the lowest ratios for successful passes for each league in season 2017-2018:
+----------------------+----------+----------------+
|competition           |team      |passSuccessRatio|
+----------------------+----------+----------------+
|Spanish La Liga       |Getafe    |72.37           |
|Italian Serie A       |Crotone   |74.74           |
|English Premier League|Stoke City|76.28           |
|German Bundesliga     |Augsburg  |76.44           |
|French Ligue 1        |Toulouse  |77.51           |
+----------------------+----------+----------------+
only showing top 5 rows



## Basic task 8: The best teams

For this task the best teams are determined by having the highest point average per match.

Using the data frames created in the previous tasks find the two best teams from each league in season `2017-2018` with their full statistics.

Give the result as a data frame with the following columns:

| column name | column type | description |
| ----------- | ----------- | ----------- |
| Team        | string      | The name of the team |
| League      | string      | The name of the league |
| Pos         | integer     | The classification of the team within their league |
| Pld         | integer     | The number of games played |
| W           | integer     | The number of wins |
| D           | integer     | The number of draws |
| L           | integer     | The number of losses |
| GF          | integer     | The total number of goals scored by the team |
| GA          | integer     | The total number of goals scored against the team |
| GD          | string      | The goal difference |
| Pts         | integer     | The total number of points gained by the team |
| Avg         | double      | The average points per match gained by the team |
| PassRatio   | double      | The average ratio for successful passes over the season given as percentages rounded to two decimals |

Order the data frame so that the team with the highest point average per match is given first.

In [0]:
seasonDF = seasonDF.withColumn(
    "Avg", F.col("points") / F.col("games")
)

rankedTeamDF = seasonDF.join(lowestPassSuccessRatioDF, ["competition", "team"], "inner")

windowSpec = Window.partitionBy("competition").orderBy(F.col("Avg").desc())
rankedTeamDF = rankedTeamDF.withColumn("rank", F.rank().over(windowSpec))

topTeamsDF = rankedTeamDF.filter(F.col("rank") <= 2)

bestDF: DataFrame = topTeamsDF.select(
    F.col("team").alias("Team"),
    F.col("competition").alias("League"),
    F.col("rank").alias("Pos"),
    F.col("games").alias("Pld"),
    F.col("wins").alias("W"),
    F.col("draws").alias("D"),
    F.col("losses").alias("L"),
    F.col("goalsScored").alias("GF"),
    F.col("goalsConceded").alias("GA"),
    F.when(
        (F.col("goalsScored") - F.col("goalsConceded")) > 0,
        F.concat(F.lit("+"), (F.col("goalsScored") - F.col("goalsConceded")).cast("string"))
    ).otherwise((F.col("goalsScored") - F.col("goalsConceded")).cast("string")).alias("GD"),
    F.col("points").alias("Pts"),
    F.round(F.col("Avg"), 2).alias("Avg"),
    F.round(F.col("passSuccessRatio"), 2).alias("PassRatio")
).orderBy(F.col("Avg").desc())


print("The top 2 teams for each league in season 2017-2018")
bestDF.show(10, False)

The top 2 teams for each league in season 2017-2018
+-----------------+----------------------+---+---+---+---+---+---+---+---+---+----+---------+
|Team             |League                |Pos|Pld|W  |D  |L  |GF |GA |GD |Pts|Avg |PassRatio|
+-----------------+----------------------+---+---+---+---+---+---+---+---+---+----+---------+
|Manchester City  |English Premier League|1  |38 |32 |4  |2  |106|27 |+79|100|2.63|89.62    |
|Juventus         |Italian Serie A       |1  |38 |30 |5  |3  |86 |24 |+62|95 |2.5 |87.96    |
|Bayern München   |German Bundesliga     |1  |34 |27 |3  |4  |92 |28 |+64|84 |2.47|87.83    |
|PSG              |French Ligue 1        |1  |38 |29 |6  |3  |108|29 |+79|93 |2.45|89.16    |
|Barcelona        |Spanish La Liga       |1  |38 |28 |9  |1  |99 |29 |+70|93 |2.45|88.35    |
|Napoli           |Italian Serie A       |2  |38 |28 |7  |3  |77 |29 |+48|91 |2.39|87.87    |
|Manchester United|English Premier League|2  |38 |25 |6  |7  |68 |28 |+40|81 |2.13|84.79    |
|Monaco 

## Advanced Task 3 - Image data and pixel colors (2 points)

This advanced task involves loading in PNG image data and complementing JSON metadata into Spark data structure. And then determining the colors of the pixels in the images, and finding the answers to several color related questions.

### Tasks

The target of the task is to combine the image data with the JSON data, determine the image pixel colors, and the find the answers to the following questions:

- Which four images have the most colored non-transparent pixels?
- Which five images have the lowest ratio of colored vs. transparent pixels?
- What are the three most common colors in the Finnish flag image (annotation: `flag: Finland`)?
    - And how many percentages of the colored pixels does each color have?
- How many images have their most common three colors as, `Blue`-`Yellow`-`Black`, in that order?
- Which five images have the most red pixels among the image group `activities`?
    - And how many red pixels do each of these images have?

It might be advisable to test your work-in-progress code with a limited number of images before using the full image set.<br>
You are free to choose your own approach to the task: user defined functions with data frames, RDDs/Datasets, or combination of both.

Note that the currently the Python helper functions do not exactly match the Scala versions, and thus the answers to the questions might not quite match the given example results in the example output notebook.

In [0]:
imageDF = spark.read.format("image").load("abfss://shared@tunics320f2024gen2.dfs.core.windows.net/assignment/openmoji/color")
jsonDF = spark.read.json("abfss://shared@tunics320f2024gen2.dfs.core.windows.net/assignment/openmoji/metadata/openmoji.jsonl")


In [0]:
# separates binary image data to an array of hex strings that represent the pixels
# assumes 8-bit representation for each pixel (0x00 - 0xff)
# with `channels` attribute representing how many bytes is used for each pixel
def toPixels(data: bytes, channels: int) -> List[str]:
    return [
        "".join([
            f"{data[index+byte]:02X}"
            for byte in range(0, channels)
        ])
        for index in range(0, len(data), channels)
    ]

In [0]:
   
toPixels_udf = F.udf(toPixels, ArrayType(StringType()))

imageDF = imageDF.withColumn("hex_pixels", toPixels_udf(F.col("image.data"), F.col("image.nChannels")))


In [0]:
# naive implementation of picking the name of the pixel color based on the input hex representation of the pixel
# only works for OpenCV type CV_8U (mode=24) compatible input
def toColorName(hexString: str) -> str:
    # mapping of RGB values to basic color names
    colors: Dict[Tuple[int, int, int], str] = {
        (0, 0, 0):     "Black",  (0, 0, 128):     "Blue",   (0, 0, 255):     "Blue",
        (0, 128, 0):   "Green",  (0, 128, 128):   "Green",  (0, 128, 255):   "Blue",
        (0, 255, 0):   "Green",  (0, 255, 128):   "Green",  (0, 255, 255):   "Blue",
        (128, 0, 0):   "Red",    (128, 0, 128):   "Purple", (128, 0, 255):   "Purple",
        (128, 128, 0): "Green",  (128, 128, 128): "Gray",   (128, 128, 255): "Purple",
        (128, 255, 0): "Green",  (128, 255, 128): "Green",  (128, 255, 255): "Blue",
        (255, 0, 0):   "Red",    (255, 0, 128):   "Pink",   (255, 0, 255):   "Purple",
        (255, 128, 0): "Orange", (255, 128, 128): "Orange", (255, 128, 255): "Pink",
        (255, 255, 0): "Yellow", (255, 255, 128): "Yellow", (255, 255, 255): "White"
    }

    # helper function to round values of 0-255 to the nearest of 0, 128, or 255
    def roundColorValue(value: int) -> int:
        if value < 85:
            return 0
        if value < 170:
            return 128
        return 255

    validString: bool = re.match(r"[0-9a-fA-F]{8}", hexString) is not None
    if validString:
        # for OpenCV type CV_8U (mode=24) the expected order of bytes is BGRA
        blue: int = roundColorValue(int(hexString[0:2], 16))
        green: int = roundColorValue(int(hexString[2:4], 16))
        red: int = roundColorValue(int(hexString[4:6], 16))
        alpha: int = int(hexString[6:8], 16)

        if alpha < 128:
            return "None"  # any pixel with less than 50% opacity is considered as color "None"
        return colors[(red, green, blue)]

    return "None"  # any input that is not in valid format is considered as color "None"

In [0]:
@udf(ArrayType(StringType()))
def toColorNamesudf(hex_pixels):
    return [toColorName(pixel) for pixel in hex_pixels]

# Add a new column with the color names for each pixel
imageDF = imageDF.withColumn("color_names", toColorNamesudf(F.col("hex_pixels")))


In [0]:
# Extract file name from the origin column
imageDF = imageDF.withColumn("file_name", F.regexp_extract("image.origin", r"([^/]+)\.png$", 1))

combinedDF = imageDF.join(jsonDF, imageDF.file_name == jsonDF.hexcode, "inner")

In [0]:
combinedDF = combinedDF.withColumn(
    "colored_pixel_count", 
    F.size(F.filter(F.col('color_names'), lambda color_name: color_name != 'None'))
)

combinedDF = combinedDF.withColumn("total_pixel_count", F.size(F.col("hex_pixels")))

combinedDF = combinedDF.withColumn(
    "colored_to_transparent_ratio", 
    F.col("colored_pixel_count") / F.col("total_pixel_count")
)

combinedDF.cache() #since this is reused in the next cells

DataFrame[image: struct<origin:string,height:int,width:int,nChannels:int,mode:int,data:binary>, hex_pixels: array<string>, color_names: array<string>, file_name: string, annotation: string, emoji: string, group: string, hexcode: string, openmoji_author: string, openmoji_date: string, openmoji_tags: string, order: bigint, skintone: string, skintone_base_emoji: string, skintone_base_hexcode: string, skintone_combination: string, subgroups: string, tags: string, unicode: string, colored_pixel_count: int, total_pixel_count: int, colored_to_transparent_ratio: double]

In [0]:
# The annotations for the four images with the most colored non-transparent pixels
mostColoredPixelsDF = combinedDF.orderBy(F.col("colored_pixel_count").desc()).limit(4)
mostColoredPixels: List[str] =  mostColoredPixelsDF.select("annotation").rdd.flatMap(lambda x: x).collect()

print("The annotations for the four images with the most colored non-transparent pixels:")
for image in mostColoredPixels:
    print(f"- {image}")
print("============================================================")

# The annotations for the five images having the lowest ratio of colored vs. transparent pixels
leastColoredPixels_df = combinedDF.orderBy(F.col("colored_to_transparent_ratio").asc()).limit(5)
leastColoredPixels: List[str] = leastColoredPixels_df.select("annotation").rdd.flatMap(lambda x: x).collect()

print("The annotations for the five images having the lowest ratio of colored vs. transparent pixels:")
for image in leastColoredPixels:
    print(f"- {image}")

The annotations for the four images with the most colored non-transparent pixels:
- flag: Finland
- bowling
- bullseye
- volleyball
The annotations for the five images having the lowest ratio of colored vs. transparent pixels:
- seedling
- magic wand
- herb
- lizard
- fireworks


In [0]:
finnishFlagDF = combinedDF.filter(combinedDF.annotation == "flag: Finland")  
finnishFlagColorsDF = finnishFlagDF.select("color_names").collect()

# The three most common colors in the Finnish flag image:
finnishFlagColors: List[str] = [color for row in finnishFlagColorsDF for color in row["color_names"]]
finnishFlagColors = [color for color in finnishFlagColors if color != "None"]

colorCounts = Counter(finnishFlagColors)

mostcommonColors = colorCounts.most_common(3)
finnishFColors = [color for color, _ in mostcommonColors]

totalPixels = len(finnishFlagColors)

# The percentages of the colored pixels for each common color in the Finnish flag image:
finnishColorShares: List[float] =  [round(count / totalPixels * 100, 2) for _, count in mostcommonColors]


print("The colors and their percentage shares in the image for the Finnish flag:")
for color, share in zip(finnishFColors, finnishColorShares):
    print(f"- color: {color}, share: {share}")
print("============================================================")


def getmostCommonColors(color_names: List[str]) -> List[str]:
    filteredColors = [color for color in color_names if color != "None"]
    color_counts = Counter(filteredColors)
    mostcommonColors = [color for color, _ in color_counts.most_common(3)]
    return mostcommonColors

get_most_common_colors_udf = F.udf(getmostCommonColors, ArrayType(StringType()))

combinedDF = combinedDF.withColumn("most_common_colors", get_most_common_colors_udf(F.col("color_names")))

# The number of images that have their most common three colors as, Blue-Yellow-Black, in that exact order:
blueYellowBlackCount: int = combinedDF.filter(
    (F.col("most_common_colors")[0] == "Blue") & 
    (F.col("most_common_colors")[1] == "Yellow") & 
    (F.col("most_common_colors")[2] == "Black")
).count()

print(f"The number of images that have, Blue-Yellow-Black, as the most common colors: {blueYellowBlackCount}")

The colors and their percentage shares in the image for the Finnish flag:
- color: White, share: 56.47
- color: Blue, share: 27.15
- color: Black, share: 15.53
The number of images that have, Blue-Yellow-Black, as the most common colors: 6


In [0]:
activitiesDF = combinedDF.filter(combinedDF.group == "activities")


def countredPixels(color_names: List[str]) -> int:
    return color_names.count("Red")

countredpixelsUdf = F.udf(countredPixels, IntegerType())

activitiesDF = activitiesDF.withColumn("red_pixel_count", countredpixelsUdf(F.col("color_names")))

topredimagesDF = activitiesDF.orderBy(F.col("red_pixel_count").desc()).limit(5)


In [0]:
# The annotations for the five images with the most red pixels among the image group activities:
redImageNames: List[str] = topredimagesDF.select("annotation").rdd.flatMap(lambda x: x).collect()

# The number of red pixels in the five images with the most red pixels among the image group activities:
redPixelAmounts: List[int] = topredimagesDF.select("red_pixel_count").rdd.flatMap(lambda x: x).collect()

print("The annotations and red pixel counts for the five images with the most red pixels among the image group 'activities':")
for color, pixel_count in zip(redImageNames, redPixelAmounts):
    print(f"- {color} (red pixels: {pixel_count})")

The annotations and red pixel counts for the five images with the most red pixels among the image group 'activities':
- red envelope (red pixels: 1765)
- admission tickets (red pixels: 1191)
- flower playing cards (red pixels: 892)
- reminder ribbon (red pixels: 764)
- balloon (red pixels: 537)


## Advanced Task 4 - Machine learning tasks (2 points)

This advanced task involves experimenting with the classifiers provided by the Spark machine learning library. 

#### Data description

The dataset contains time series data from a period of 13 months (from the beginning of May 2023 to the end of May 2024). Each row contains the average of the measured values for a single minute. The following columns are included in the data:

| column name        | column type   | description |
| ------------------ | ------------- | ----------- |
| time               | long          | The UNIX timestamp in second precision |
| temperature        | double        | The temperature measured by the weather station on top of Sähkötalo (`°C`) |
| humidity           | double        | The humidity measured by the weather station on top of Sähkötalo (`%`) |
| wind_speed         | double        | The wind speed measured by the weather station on top of Sähkötalo (`m/s`) |
| power_tenants      | double        | The total combined electricity power used by the tenants on Kampusareena (`W`) |
| power_maintenance  | double        | The total combined electricity power used by the building maintenance systems on Kampusareena (`W`) |
| power_solar_panels | double        | The total electricity power produced by the solar panels on Kampusareena (`W`) |
| electricity_price  | double        | The market price for electricity in Finland (`€/MWh`) |

There are some missing values that need to be removed before using the data for training or testing. However, only the minimal amount of rows should be removed for each test case.

### Tasks

- The main task is to train and test a machine learning model with [Random forest classifier](https://spark.apache.org/docs/3.5.0/ml-classification-regression.html#random-forests) in six different cases:
    - Predict the month (1-12) using the three weather measurements (temperature, humidity, and wind speed) as input
    - Predict the month (1-12) using the three power measurements (tenants, maintenance, and solar panels) as input
    - Predict the month (1-12) using all seven measurements (weather values, power values, and price) as input
    - Predict the hour of the day (0-23) using the three weather measurements (temperature, humidity, and wind speed) as input
    - Predict the hour of the day (0-23) using the three power measurements (tenants, maintenance, and solar panels) as input
    - Predict the hour of the day (0-23) using all seven measurements (weather values, power values, and price) as input
- For each of the six case you are asked to:
    1. Clean the source dataset from rows with missing values.
    2. Split the dataset into training and test parts.
    3. Train the ML model using a Random forest classifier with case-specific input and prediction.
    4. Evaluate the accuracy of the model with Spark built-in multiclass classification evaluator.
    5. Further evaluate the accuracy of the model with a custom build evaluator which should do the following:
        - calculate the percentage of correct predictions
            - this should correspond to the accuracy value from the built-in accuracy evaluator
        - calculate the percentage of predictions that were at most one away from the correct predictions taking into account the cyclic nature of the month and hour values:
            - if the correct month value was `5`, then acceptable predictions would be `4`, `5`, or `6`
            - if the correct month value was `1`, then acceptable predictions would be `12`, `1`, or `2`
            - if the correct month value was `12`, then acceptable predictions would be `11`, `12`, or `1`
        - calculate the percentage of predictions that were at most two away from the correct predictions taking into account the cyclic nature of the month and hour values:
            - if the correct month value was `5`, then acceptable predictions would be from `3` to `7`
            - if the correct month value was `1`, then acceptable predictions would be from `11` to `12` and from `1` to `3`
            - if the correct month value was `12`, then acceptable predictions would be from `10` to `12` and from `1` to `2`
        - calculate the average probability the model predicts for the correct value
            - the probabilities for a single prediction can be found from the `probability` column after the predictions have been made with the model
- As the final part of this advanced task, you are asked to do the same experiments (training+evaluation) with two further cases of your own choosing:
    - you can decide on the input columns yourself
    - you can decide the predicted attribute yourself
    - you can try some other classifier other than the random forest one if you want

In all cases you are free to choose the training parameters as you wish.<br>
Note that it is advisable that while you are building your task code to only use a portion of the full 13-month dataset in the initial experiments.

In [0]:
# Load data
procemDF = spark.read.parquet("abfss://shared@tunics320f2024gen2.dfs.core.windows.net/assignment/energy/procem_13m.parquet")
procemDF = procemDF.withColumn("month", F.from_unixtime(F.col("time"), "M").cast("int")) \
                           .withColumn("hour", F.from_unixtime(F.col("time"), "H").cast("int"))

In [0]:
def trainandTest(data, input_features, label_column, classifier_type="RandomForest"):

    # removing missing values
    cleanedData = data.dropna(subset=input_features + [label_column])

    assembler = VectorAssembler(inputCols=input_features, outputCol="features")
    cleanedData = assembler.transform(cleanedData)
    
    # Split the dataset
    trainDF, testDF = cleanedData.randomSplit([0.8, 0.2], seed=1)
        
    classifiers = {
        "RandomForest": RandomForestClassifier(labelCol=label_column, featuresCol="features", probabilityCol="probabilities"),
        "LogisticRegression": LogisticRegression(labelCol=label_column, featuresCol="features", probabilityCol="probabilities")
    }
    
    classifier = classifiers.get(classifier_type)
    if not classifier:
        raise ValueError("Classifier not supported")
    
    # Train the model
    model = classifier.fit(trainDF)
    
    # Testing
    predictions = model.transform(testDF)
    
    # accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol=label_column, metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    
    totalPredictions = predictions.count()

    # correct predictions
    correct_predictions = predictions.filter(F.col(label_column) == F.col("prediction")).count()
    correct_percentage = round((correct_predictions / totalPredictions) * 100, 2)
    
    # within one step
    withinonePredictions = predictions.filter(
        (F.abs(F.col(label_column) - F.col("prediction")) <= 1) |
        (F.abs(F.col(label_column) - F.col("prediction")) == 11)  # for(1-12)
    ).count()
    withinonePredictions = round((withinonePredictions / totalPredictions) * 100, 2) 
    
    #within two steps
    withintwoPredictions = predictions.filter(
        (F.abs(F.col(label_column) - F.col("prediction")) <= 2) |
        (F.abs(F.col(label_column) - F.col("prediction")) >= 10)  # for(1-12)
    ).count()
    withintwoPredictions = round((withintwoPredictions / totalPredictions) * 100, 2)
    

    #average correct probability
    predictions = predictions.withColumn("prob_array", vector_to_array(F.col("probabilities")))
    predictions = predictions.withColumn(
        "correct_prob",
        F.element_at(F.col("prob_array"), (F.col("prediction") + 1).cast("int"))
    )
    
    correctPredictions_df = predictions.filter(F.col(label_column) == F.col("prediction"))
    
    avgProb = round(correctPredictions_df.agg(F.avg("correct_prob").alias("avg_prob")).collect()[0][0], 3)

    
    return {
        "accuracy": accuracy, 
        "correct_percentage": correct_percentage,
        "within_one_percentage": withinonePredictions,
        "within_two_percentage": withintwoPredictions,
        "avg_prob": avgProb
    }



In [0]:
cases = [
    (["temperature", "humidity", "wind_speed"], "month", "RandomForest"),
    (["temperature", "humidity", "wind_speed"], "hour", "RandomForest"),
    (["power_tenants", "power_maintenance", "power_solar_panels"], "month", "RandomForest"),
    (["power_tenants", "power_maintenance", "power_solar_panels"], "hour", "RandomForest"),
    (["temperature", "humidity", "wind_speed", "power_tenants", "power_maintenance", "power_solar_panels", "electricity_price"], "month", "RandomForest"),
    (["temperature", "humidity", "wind_speed", "power_tenants", "power_maintenance", "power_solar_panels", "electricity_price"], "hour", "RandomForest")
]

# Process cases and store results in a DataFrame
schema = StructType([
    StructField("classifier", StringType(), True),
    StructField("input", StringType(), True),
    StructField("label", StringType(), True),
    StructField("correct", DoubleType(), True),
    StructField("within_one", DoubleType(), True),
    StructField("within_two", DoubleType(), True),
     StructField("avg_prob", DoubleType(), True)
])

resultsData = []
for input_features, label_column, classifier_type in cases:
    # Train and evaluate the model
    result = trainandTest(procemDF, input_features, label_column)
    
    input_features_str = ", ".join(input_features)
    model_name = classifier_type
    accuracy = result["accuracy"]

    print(f"Training a '{model_name}' model to predict '{label_column}' based on input '{input_features_str}'.")
    print(f"The accuracy of the model is {accuracy:.5f}")

    # Results
    resultsData.append((
        model_name,
        input_features_str,
        label_column,
        result["correct_percentage"],
        result["within_one_percentage"],
        result["within_two_percentage"],
        result["avg_prob"] 
        
    ))

resultsDF = spark.createDataFrame(resultsData, schema=schema)
resultsDF = resultsDF.orderBy(F.col("correct"), ascending=False)
resultsDF.show(truncate=False)

Training a 'RandomForest' model to predict 'month' based on input 'temperature, humidity, wind_speed'.
The accuracy of the model is 0.45197
Training a 'RandomForest' model to predict 'hour' based on input 'temperature, humidity, wind_speed'.
The accuracy of the model is 0.07572
Training a 'RandomForest' model to predict 'month' based on input 'power_tenants, power_maintenance, power_solar_panels'.
The accuracy of the model is 0.28023
Training a 'RandomForest' model to predict 'hour' based on input 'power_tenants, power_maintenance, power_solar_panels'.
The accuracy of the model is 0.22178
Training a 'RandomForest' model to predict 'month' based on input 'temperature, humidity, wind_speed, power_tenants, power_maintenance, power_solar_panels, electricity_price'.
The accuracy of the model is 0.51026
Training a 'RandomForest' model to predict 'hour' based on input 'temperature, humidity, wind_speed, power_tenants, power_maintenance, power_solar_panels, electricity_price'.
The accuracy of 

In [0]:
# two further cases first with "power_tenants", "power_maintenance", "electricity_price" variables with Randomforest
# second one with "temperature", "humidity", "wind_speed", "power_tenants", "power_maintenance", "power_solar_panels", "electricity_price" variables with Logistic regression
cases = [
    (["power_tenants", "power_maintenance", "electricity_price"], "month", "RandomForest"),  # Random Forest Classifier
    (["temperature", "humidity", "wind_speed", "power_tenants", "power_maintenance", "power_solar_panels", "electricity_price"], "month", "LogisticRegression")  # Logistic Regression Classifier
]


resultsData = []
for input_features, label_column, classifier_type in cases:

    result = trainandTest(procemDF, input_features, label_column, classifier_type)
    
    input_features_str = ", ".join(input_features)
    model_name = classifier_type
    accuracy = result["accuracy"]
    
    print(f"Training a '{model_name}' model to predict '{label_column}' based on input '{input_features_str}'.")
    print(f"The accuracy of the model is {accuracy:.5f}")

    # Results
    resultsData.append((
        model_name,
        input_features_str,
        label_column,
        result["correct_percentage"],
        result["within_one_percentage"],
        result["within_two_percentage"],
        result["avg_prob"]
    ))

resultsDF = spark.createDataFrame(resultsData, schema=schema)
resultsDF = resultsDF.orderBy(F.col("correct"), ascending=False)
resultsDF.show(truncate=False)

Training a 'RandomForest' model to predict 'month' based on input 'power_tenants, power_maintenance, electricity_price'.
The accuracy of the model is 0.26905
Training a 'LogisticRegression' model to predict 'month' based on input 'temperature, humidity, wind_speed, power_tenants, power_maintenance, power_solar_panels, electricity_price'.
The accuracy of the model is 0.45045
+------------------+----------------------------------------------------------------------------------------------------------+-----+-------+----------+----------+--------+
|classifier        |input                                                                                                     |label|correct|within_one|within_two|avg_prob|
+------------------+----------------------------------------------------------------------------------------------------------+-----+-------+----------+----------+--------+
|LogisticRegression|temperature, humidity, wind_speed, power_tenants, power_maintenance, power_solar_pan