In [17]:
import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

In [18]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.fpm import FPGrowth
from mlxtend.frequent_patterns import apriori, association_rules
from io import StringIO
import prettytable    

In [19]:
#read dataset from csv file
FIFA = spark.read.csv('FIFA22_preprocessed.csv', header=True, inferSchema=True).alias('items')
FIFA.show(5)

+---+------+---------------+---+-----------+------------------+-----+--------------+------------------------+---------+-----------+------------+----------------+------+------+--------+---------+---------------+------------+-------+---------+-----+----------+-----------+-----------+------------+-----------+-------+---------+-------+---------+-------+-------+--------+---------+----------+-------------+-----------+------+---------+---------+--------------+-------------+-------------+------------------+
|_c0|    ID|           Name|Age|Nationality|              Club|Value|Preferred Foot|International Reputation|Weak Foot|Skill Moves|   Work Rate|       Body Type|Height|Weight|Crossing|Finishing|HeadingAccuracy|ShortPassing|Volleys|Dribbling|Curve|FKAccuracy|LongPassing|BallControl|Acceleration|SprintSpeed|Agility|Reactions|Balance|ShotPower|Jumping|Stamina|Strength|LongShots|Aggression|Interceptions|Positioning|Vision|Penalties|Composure|StandingTackle|SlidingTackle|Best Position|DefensiveA

In [20]:

# Define binning function for numerical columns

def bin_column(data, labels, column_name):
    discretizer = QuantileDiscretizer(numBuckets=4, inputCol=column_name, outputCol=column_name+"_bucketed", relativeError=0)
    data = discretizer.fit(data).transform(data)
    data = data.withColumn(column_name, 
                           when(col(column_name+"_bucketed") == 0, labels[0])
                           .when(col(column_name+"_bucketed") == 1, labels[1])
                           .when(col(column_name+"_bucketed") == 2, labels[2])
                           .otherwise(labels[3])) \
               .drop(column_name+"_bucketed")

    return data

def bin_column2(data, labels, column_name):
    discretizer = QuantileDiscretizer(numBuckets=5, inputCol=column_name, outputCol=column_name+"_bucketed", relativeError=0)
    data = discretizer.fit(data).transform(data)
    data = data.withColumn(column_name, 
                           when(col(column_name+"_bucketed") == 0, labels[0])
                           .when(col(column_name+"_bucketed") == 1, labels[1])
                           .when(col(column_name+"_bucketed") == 2, labels[2])
                           .when(col(column_name+"_bucketed") == 3, labels[3])
                           .otherwise(labels[4])) \
               .drop(column_name+"_bucketed")

    return data


# bin the 'height' column
height_labels = ['161-170', '171-180', '181-190', '>190']
FIFA = bin_column(FIFA, height_labels, "Height")

# bin the 'weight' column
weight_labels = ['61-70', '71-80', '81-90', '>90' ]
FIFA = bin_column(FIFA,  weight_labels, "Weight")

#bin the 'age' column
age_labels = ['10-20', '21-30', '31-40', '>40']
FIFA = bin_column(FIFA, age_labels, "Age")

# bin the 'value' column
value_labels = ['0-1M', '1M-10M', '10M-100M', '>100M']
FIFA = bin_column(FIFA, value_labels, "Value")



# bin the 'international reputation' column
international_reputation_labels = ['1', '2', '3', '4', '5']
FIFA = bin_column2(FIFA,international_reputation_labels, "International Reputation")

# bin the 'weak foot' column
weak_foot_labels = ['1', '2', '3', '4', '5']
FIFA = bin_column2(FIFA, weak_foot_labels, "Weak Foot")

# bin the 'skill moves' column
skill_moves_labels = ['1', '2', '3', '4', '5']
FIFA = bin_column2(FIFA, skill_moves_labels, "Skill Moves")


numerical_columns = [ "Crossing", "Finishing", "HeadingAccuracy", "ShortPassing", "Volleys", "Dribbling", 
                     "Curve", "FKAccuracy", "LongPassing", "BallControl", "Acceleration", "SprintSpeed", "Agility", "Reactions", "Balance", "ShotPower", 
                     "Jumping", "Stamina", "Strength", "LongShots", "Aggression", "Interceptions", "Positioning", "Vision", "Penalties", "Composure", 
                     "StandingTackle", "SlidingTackle", "DefensiveAwareness"]
#bin numerical columns
for column in numerical_columns:
    # bin with three quartile
    discretizer = QuantileDiscretizer(numBuckets=3, inputCol=column, outputCol=column+"_bucketed", relativeError=0)
    FIFA = discretizer.fit(FIFA).transform(FIFA)
    FIFA = FIFA.withColumn(column, 
                           when(col(column+"_bucketed") == 0, "below average")
                           .when(col(column+"_bucketed") == 1, "above average")
                           .otherwise("very high")) \
               .drop(column+"_bucketed")

# Show the resulting DataFrame
FIFA.show()



+---+------+---------------+-----+-----------+-------------------+-----+--------------+------------------------+---------+-----------+--------------+----------------+-------+------+---------+-------------+---------------+------------+---------+-------------+---------+----------+-----------+-----------+-------------+-------------+-------------+---------+-------------+---------+-------------+---------+-------------+---------+-------------+-------------+-----------+---------+-------------+---------+--------------+-------------+-------------+------------------+
|_c0|    ID|           Name|  Age|Nationality|               Club|Value|Preferred Foot|International Reputation|Weak Foot|Skill Moves|     Work Rate|       Body Type| Height|Weight| Crossing|    Finishing|HeadingAccuracy|ShortPassing|  Volleys|    Dribbling|    Curve|FKAccuracy|LongPassing|BallControl| Acceleration|  SprintSpeed|      Agility|Reactions|      Balance|ShotPower|      Jumping|  Stamina|     Strength|LongShots|   Aggres

In [21]:
categorical_columns  = ["Age", "Nationality", "Club", "Value", "Preferred Foot", "International Reputation", "Weak Foot", 
                "Skill Moves", "Work Rate", "Body Type", "Height", "Weight", "Crossing", "Finishing", "HeadingAccuracy", "ShortPassing", "Volleys", 
                "Dribbling", "Curve", "FKAccuracy", "LongPassing", "BallControl", "Acceleration", "SprintSpeed", "Agility", "Reactions", "Balance", 
                "ShotPower", "Jumping", "Stamina", "Strength", "LongShots", "Aggression", "Interceptions", "Positioning", "Vision", "Penalties", 
                "Composure", "StandingTackle", "SlidingTackle", "Best Position", "DefensiveAwareness"]

# Define stages for the pipeline
stages = []

# Apply StringIndexer to convert categorical columns to numerical indices
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in categorical_columns]
stages += indexers

# Apply OneHotEncoder to the indexed columns
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_onehot") for column in categorical_columns]
stages += encoders


# Create a pipeline
pipeline = Pipeline(stages=stages)

# Fit and transform the pipeline
pipeline_model = pipeline.fit(FIFA)
FIFA_encoded = pipeline_model.transform(FIFA)

# Drop the original categorical columns and index columns
for column in categorical_columns:
    FIFA_encoded = FIFA_encoded.drop(column)
    FIFA_encoded = FIFA_encoded.drop(column+"_index")


# Show the resulting DataFrame

FIFA_encoded.show()


+---+------+---------------+-------------+------------------+-----------------+-------------+---------------------+-------------------------------+----------------+------------------+----------------+----------------+-------------+-------------+---------------+----------------+----------------------+-------------------+--------------+----------------+-------------+-----------------+------------------+------------------+-------------------+------------------+--------------+----------------+--------------+----------------+--------------+--------------+---------------+----------------+-----------------+--------------------+------------------+-------------+----------------+----------------+---------------------+--------------------+--------------------+-------------------------+
|_c0|    ID|           Name|   Age_onehot|Nationality_onehot|      Club_onehot| Value_onehot|Preferred Foot_onehot|International Reputation_onehot|Weak Foot_onehot|Skill Moves_onehot|Work Rate_onehot|Body Type_oneh

In [22]:
MIN_SUPPORT = 0.1
MIN_ITEMSET_LEN = 1
MIN_THRESHOLD = 0.5

In [23]:
from pyspark.sql.functions import array, col
from pyspark.sql.functions import flatten



# List of original column names
original_columns = ["Age", "Nationality", "Club", "Value", "Preferred Foot", "International Reputation", "Weak Foot", 
                    "Skill Moves", "Work Rate", "Body Type", "Height", "Weight", "Crossing", "Finishing", 
                    "HeadingAccuracy", "ShortPassing", "Volleys", "Dribbling", "Curve", "FKAccuracy", 
                    "LongPassing", "BallControl", "Acceleration", "SprintSpeed", "Agility", "Reactions", 
                    "Balance", "ShotPower", "Jumping", "Stamina", "Strength", "LongShots", "Aggression", 
                    "Interceptions", "Positioning", "Vision", "Penalties", "Composure", "StandingTackle", 
                    "SlidingTackle", "Best Position", "DefensiveAwareness"]

# Add "_onehot" suffix to each column name
new_columns = [col + "_onehot" for col in original_columns]

# Rename columns
for old_col, new_col in zip(original_columns, new_columns):
    FIFA_encoded = FIFA_encoded.withColumnRenamed(old_col, new_col)

# Now create the array column with the renamed columns
FIFA_encoded = FIFA_encoded.withColumn(
    "items",
    array([col(new_col) for new_col in new_columns])
)




In [24]:
# Remove duplicate items in each transaction
FIFA_encoded = FIFA_encoded.withColumn("items", F.array_distinct("items"))

fpgrowth = FPGrowth(itemsCol="items", minSupport=MIN_SUPPORT, minConfidence=0.01)

# Fit the FPGrowth model
fpgrowth = fpgrowth.fit(FIFA_encoded)

# Display frequent itemsets
frequent_itemsets = fpgrowth.freqItemsets
frequent_itemsets.show()

+--------------------+----+
|               items|freq|
+--------------------+----+
|     [(2,[1],[1.0])]|9095|
|[(2,[1],[1.0]), (...|9091|
|[(2,[1],[1.0]), (...|9091|
|[(2,[1],[1.0]), (...|9091|
|[(2,[1],[1.0]), (...|9091|
|[(2,[1],[1.0]), (...|6691|
|[(2,[1],[1.0]), (...|6690|
|[(2,[1],[1.0]), (...|6690|
|[(2,[1],[1.0]), (...|6690|
|[(2,[1],[1.0]), (...|6690|
|[(2,[1],[1.0]), (...|5434|
|[(2,[1],[1.0]), (...|5433|
|[(2,[1],[1.0]), (...|5433|
|[(2,[1],[1.0]), (...|5433|
|[(2,[1],[1.0]), (...|5433|
|[(2,[1],[1.0]), (...|5434|
|[(2,[1],[1.0]), (...|5434|
|[(2,[1],[1.0]), (...|5418|
|[(2,[1],[1.0]), (...|5417|
|[(2,[1],[1.0]), (...|5417|
+--------------------+----+
only showing top 20 rows



In [27]:

# Extract association rules
association_rules = fpgrowth.associationRules
association_rules.show()

# Convert antecedents and consequents to tuples (if needed, for consistency with your Pandas code)
association_rules = association_rules.withColumn("antecedent", F.expr("CAST(antecedent AS STRING)"))
association_rules = association_rules.withColumn("consequent", F.expr("CAST(consequent AS STRING)"))

# Sort by lift and confidence
sorted_rules = association_rules.orderBy(F.desc("lift"), F.desc("confidence"))
sorted_rules.show()

# Save to CSV
sorted_rules.coalesce(1).write.csv("Rules.csv", header=True)

# Show final sorted rules
sorted_rules.show()

+--------------------+----------------+------------------+------------------+-------------------+
|          antecedent|      consequent|        confidence|              lift|            support|
+--------------------+----------------+------------------+------------------+-------------------+
|[(4,[3],[1.0]), (...|[(10,[0],[1.0])]|0.3671124505556602|1.0048925553345613|0.13117512451204738|
|[(4,[3],[1.0]), (...| [(9,[0],[1.0])]|0.5850442644565832|1.1014429398499637|0.20904563198277024|
|[(4,[3],[1.0]), (...| [(4,[0],[1.0])]|0.6426822377095498|0.8795222149662422|0.22964059765782743|
|[(4,[3],[1.0]), (...| [(2,[0],[1.0])]|0.8082501412695423|0.9137863794690959| 0.2888006461165702|
|[(4,[3],[1.0]), (...| [(4,[2],[1.0])]|0.6174420794876625|0.9755374752262537|0.22062188719881545|
|[(4,[3],[1.0]), (...| [(4,[1],[1.0])]|0.5825955923902807|0.8286621971792831|0.20817068246062728|
|[(4,[3],[1.0]), (...| [(9,[1],[1.0])]|0.3074025240158222|0.8668412795268715|0.10983981693363844|
|[(2,[1],[1.0]), (..