In [1]:
import os
import findspark
findspark.init()
findspark.find()
from pyspark.sql.functions import lit
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import monotonically_increasing_id

# Task I

In [2]:
# connect spark to PostgreSQL
appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [3]:
#spark DataFrame Schema
#We need both spark schema and postgres schema, because the datatypes of spark and postgresql don't match with each other
fifa='''
sofifa_id int NOT NULL,
player_url string NOT NULL,
short_name string NOT NULL,
long_name string NOT NULL,
player_positions string NOT NULL,
overall int NOT NULL,
potential int NOT NULL,
value_eur float,
wage_eur float,
age int NOT NULL,
dob date NOT NULL,
height_cm int NOT NULL,
weight_kg int NOT NULL,
club_team_id int,
club_name string,
league_name string,
league_level int,
club_position string,
club_jersey_number int,
club_loaned_from string,
club_joined date,
club_contract_valid_until int,
nationality_id int NOT NULL,
nationality_name string NOT NULL,
nation_team_id int,
nation_position string,
nation_jersey_number int,
preferred_foot string NOT NULL,
weak_foot int NOT NULL,
skill_moves int NOT NULL,
international_reputation int NOT NULL,
work_rate string,body_type string NOT NULL,
real_face string NOT NULL,
release_clause_eur float,
player_tags string,
player_traits string,
pace int,shooting int,
passing int,dribbling int,
defending int,physic int,
attacking_crossing int NOT NULL,
attacking_finishing int NOT NULL,
attacking_heading_accuracy int NOT NULL,
attacking_short_passing int NOT NULL,
attacking_volleys int NOT NULL,
skill_dribbling int NOT NULL,
skill_curve int NOT NULL,
skill_fk_accuracy int NOT NULL,
skill_long_passing int NOT NULL,
skill_ball_control int NOT NULL,
movement_acceleration int NOT NULL,
movement_sprint_speed int NOT NULL,
movement_agility int NOT NULL,
movement_reactions int NOT NULL,
movement_balance int NOT NULL,
power_shot_power int NOT NULL,
power_jumping int NOT NULL,
power_stamina int NOT NULL,
power_strength int NOT NULL,
power_long_shots int NOT NULL,
mentality_aggression int NOT NULL,
mentality_interceptions int NOT NULL,
mentality_positioning int NOT NULL,
mentality_vision int NOT NULL,
mentality_penalties int NOT NULL,
mentality_composure int,
defending_marking_awareness int NOT NULL,
defending_standing_tackle int NOT NULL,
defending_sliding_tackle int NOT NULL,
goalkeeping_diving int NOT NULL,
goalkeeping_handling int NOT NULL,
goalkeeping_kicking int NOT NULL,
goalkeeping_positioning int NOT NULL,
goalkeeping_reflexes int NOT NULL,
goalkeeping_speed int,
ls string NOT NULL,
st string NOT NULL,
rs string NOT NULL,
lw string NOT NULL,
lf string NOT NULL,
cf string NOT NULL,
rf string NOT NULL,
rw string NOT NULL,
lam string NOT NULL,
cam string NOT NULL,
ram string NOT NULL,
lm string NOT NULL,
lcm string NOT NULL,
cm string NOT NULL,
rcm string NOT NULL,
rm string NOT NULL,
lwb string NOT NULL,
ldm string NOT NULL,
cdm string NOT NULL,
rdm string NOT NULL,
rwb string NOT NULL,
lb string NOT NULL,
lcb string NOT NULL,
cb string NOT NULL,
rcb string NOT NULL,
rb string NOT NULL,
gk string NOT NULL,
player_face_url string NOT NULL,
club_logo_url string,
club_flag_url string,
nation_logo_url string NOT NULL,
nation_flag_url string NOT NULL
'''

In [4]:
df = spark.read.csv("./data/players_15.csv",header=True,schema=fifa)

In [5]:
df.show(1,vertical=True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | null                 
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [6]:
df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: float (nullable = true)
 |-- wage_eur: float (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: integer (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: date (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |

In [7]:
# import datetime
# schema = "dob date"
# data = [ [datetime.datetime.strptime('2021-01-01', "%Y-%m-%d").date()],
#         [datetime.datetime.strptime('2000-01-01', "%Y-%m-%d").date()]]

# test_df = spark.createDataFrame(data, schema)

In [8]:
df=df.withColumn("year",lit(2015))

In [9]:
df.show(1,vertical=True)

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | null                 
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [10]:
df.printSchema()

root
 |-- sofifa_id: integer (nullable = true)
 |-- player_url: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- long_name: string (nullable = true)
 |-- player_positions: string (nullable = true)
 |-- overall: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- value_eur: float (nullable = true)
 |-- wage_eur: float (nullable = true)
 |-- age: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- height_cm: integer (nullable = true)
 |-- weight_kg: integer (nullable = true)
 |-- club_team_id: integer (nullable = true)
 |-- club_name: string (nullable = true)
 |-- league_name: string (nullable = true)
 |-- league_level: integer (nullable = true)
 |-- club_position: string (nullable = true)
 |-- club_jersey_number: integer (nullable = true)
 |-- club_loaned_from: string (nullable = true)
 |-- club_joined: date (nullable = true)
 |-- club_contract_valid_until: integer (nullable = true)
 |-- nationality_id: integer (nullable = true)
 |

In [11]:
#don't consider players_15,because we will use union
players_list=os.listdir("./data")[10:]
players_list

['players_16.csv',
 'players_17.csv',
 'players_18.csv',
 'players_19.csv',
 'players_20.csv',
 'players_21.csv',
 'players_22.csv']

In [12]:
# print(os.path.join("./data/",players_list[0]).split('_')[1][:2])
# print(2000+int(os.path.join("./data/",players_list[0]).split('_')[1][:2]))

In [13]:
for i in players_list:
    path=os.path.join("./data/",i)
    temp_df=spark.read.csv(path,header=True,schema=fifa)
    year=2000+int(path.split('_')[1][:2])
    temp_df=temp_df.withColumn("year",lit(year))
    df = df.union(temp_df)

In [14]:
df.count()

142079

In [15]:
df.select("year").distinct().show()

+----+
|year|
+----+
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
+----+



In [16]:
#142079 !=45629,so sofifa_id can't be used as unique id
df.select("sofifa_id").distinct().count()

45629

In [17]:
# df.write.mode("overwrite").saveAsTable("fifa")
# spark.catalog.listTables()

In [18]:
# df=df.withColumn("id",monotonically_increasing_id())
# df.printSchema()

In [19]:
# #cast club_team_id,nation_team_id into "integer"
# df=df.withColumn("new_club_team_id",df["club_team_id"].cast("integer")).drop("club_team_id").withColumnRenamed("new_club_team_id","club_team_id")
# df=df.withColumn("new_nation_team_id",df["nation_team_id"].cast("integer")).drop("nation_team_id").withColumnRenamed("new_nation_team_id","nation_team_id")

In [20]:
#in order to check "NOT NULL"
from pyspark.sql.functions import *

null_counts_plays_df = df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns])

null_counts_plays_df.show(truncate=False, vertical=True)

-RECORD 0-----------------------------
 sofifa_id                   | 0      
 player_url                  | 0      
 short_name                  | 0      
 long_name                   | 0      
 player_positions            | 0      
 overall                     | 0      
 potential                   | 0      
 value_eur                   | 1897   
 wage_eur                    | 1622   
 age                         | 0      
 dob                         | 0      
 height_cm                   | 0      
 weight_kg                   | 0      
 club_team_id                | 142079 
 club_name                   | 1630   
 league_name                 | 1630   
 league_level                | 2015   
 club_position               | 1630   
 club_jersey_number          | 1630   
 club_loaned_from            | 133774 
 club_joined                 | 9935   
 club_contract_valid_until   | 1630   
 nationality_id              | 0      
 nationality_name            | 0      
 nation_team_id          

In [21]:
#Before overwriting data, we had better create the table first
#so that we can define table datatypes instead of automatic inference.
#We have to know that there isn't a match between spark datatypes and postgresql datatypes

#!pip install psycopg2
import psycopg2
from psycopg2 import sql

conn = psycopg2.connect(
    dbname="postgres",
    user="postgres",
    password="200042",
    host="localhost",
    port="5432"
)

cur = conn.cursor()

#check if we have already created the table first
check_table_query = sql.SQL("""
    SELECT EXISTS (
        SELECT 1
        FROM information_schema.tables
        WHERE table_name = 'fifa'
    )
""")
cur.execute(check_table_query)
table_exists = cur.fetchone()[0]

try:
    cur.execute(check_table_query)
    table_exists = cur.fetchone()[0]
    print("Table fifa exist:{}".format(table_exists))
    if table_exists:
        drop_table_query="""
            drop table fifa;
        """
        cur.execute(drop_table_query)
        conn.commit()
        print("Successfully drop table fifa")
        #postgresql schema
        #use (year,sofifa_id) as our primary key
    create_table_query = """
    CREATE TABLE fifa (
        sofifa_id INT NOT NULL,
        player_url VARCHAR(255) UNIQUE NOT NULL,
        short_name VARCHAR(64) NOT NULL,
        long_name VARCHAR(255) NOT NULL,
        player_positions VARCHAR(64) NOT NULL,
        overall INT NOT NULL,
        potential INT NOT NULL,
        value_eur FLOAT,
        wage_eur FLOAT,
        age INT NOT NULL,
        dob DATE NOT NULL,
        height_cm INT NOT NULL,
        weight_kg INT NOT NULL,
        club_team_id INT, 
        club_name VARCHAR(255),
        league_name VARCHAR(255),
        league_level INT,
        club_position VARCHAR(255),
        club_jersey_number INT,
        club_loaned_from VARCHAR(255),
        club_joined DATE,
        club_contract_valid_until INT,
        nationality_id INT NOT NULL,
        nationality_name VARCHAR(255) NOT NULL,
        nation_team_id INT,
        nation_position VARCHAR(255),
        nation_jersey_number INT,
        preferred_foot VARCHAR(255) NOT NULL,
        weak_foot INT NOT NULL,
        skill_moves INT NOT NULL,
        international_reputation INT NOT NULL,
        work_rate VARCHAR(255) NOT NULL,
        body_type VARCHAR(255) NOT NULL,
        real_face VARCHAR(255) NOT NULL,
        release_clause_eur VARCHAR(255),
        player_tags VARCHAR(255),
        player_traits VARCHAR(255),
        pace INT,
        shooting INT,
        passing INT,
        dribbling INT,
        defending INT,
        physic INT,
        attacking_crossing INT NOT NULL,
        attacking_finishing INT NOT NULL,
        attacking_heading_accuracy INT NOT NULL,
        attacking_short_passing INT NOT NULL,
        attacking_volleys INT NOT NULL,
        skill_dribbling INT NOT NULL,
        skill_curve INT NOT NULL,
        skill_fk_accuracy INT NOT NULL,
        skill_long_passing INT NOT NULL,
        skill_ball_control INT NOT NULL,
        movement_acceleration INT NOT NULL,
        movement_sprint_speed INT NOT NULL,
        movement_agility INT NOT NULL,
        movement_reactions INT NOT NULL,
        movement_balance INT NOT NULL,
        power_shot_power INT NOT NULL,
        power_jumping INT NOT NULL,
        power_stamina INT NOT NULL,
        power_strength INT NOT NULL,
        power_long_shots INT NOT NULL,
        mentality_aggression INT NOT NULL,
        mentality_interceptions INT NOT NULL,
        mentality_positioning INT NOT NULL,
        mentality_vision INT NOT NULL,
        mentality_penalties INT NOT NULL,
        mentality_composure VARCHAR(255),
        defending_marking_awareness INT NOT NULL,
        defending_standing_tackle INT NOT NULL,
        defending_sliding_tackle INT NOT NULL,
        goalkeeping_diving INT NOT NULL,
        goalkeeping_handling INT NOT NULL,
        goalkeeping_kicking INT NOT NULL,
        goalkeeping_positioning INT NOT NULL,
        goalkeeping_reflexes INT NOT NULL,
        goalkeeping_speed INT,
        ls VARCHAR(32) NOT NULL,
        st VARCHAR(32) NOT NULL,
        rs VARCHAR(32) NOT NULL,
        lw VARCHAR(32) NOT NULL,
        lf VARCHAR(32) NOT NULL,
        cf VARCHAR(32) NOT NULL,
        rf VARCHAR(32) NOT NULL,
        rw VARCHAR(32) NOT NULL,
        lam VARCHAR(32) NOT NULL,
        cam VARCHAR(32) NOT NULL,
        ram VARCHAR(32) NOT NULL,
        lm VARCHAR(32) NOT NULL,
        lcm VARCHAR(32) NOT NULL,
        cm VARCHAR(32) NOT NULL,
        rcm VARCHAR(32) NOT NULL,
        rm VARCHAR(32) NOT NULL,
        lwb VARCHAR(32) NOT NULL,
        ldm VARCHAR(32) NOT NULL,
        cdm VARCHAR(32) NOT NULL,
        rdm VARCHAR(32) NOT NULL,
        rwb VARCHAR(32) NOT NULL,
        lb VARCHAR(32) NOT NULL,
        lcb VARCHAR(32) NOT NULL,
        cb VARCHAR(32) NOT NULL,
        rcb VARCHAR(32) NOT NULL,
        rb VARCHAR(32) NOT NULL,
        gk VARCHAR(32) NOT NULL,
        player_face_url VARCHAR(32) NOT NULL,
        club_logo_url VARCHAR(255),
        club_flag_url VARCHAR(255),
        nation_logo_url VARCHAR(255),
        nation_flag_url VARCHAR(32) NOT NULL,
        year INT NOT NULL,
        PRIMARY KEY (year,sofifa_id)
    );
    """
    # create table
    cur.execute(create_table_query)
    conn.commit()
    print(f"Table fifa created successfully.")
except psycopg2.Error as e:
    print(f"Error: {e}")
finally:
    cur.close()
    conn.close()

Table fifa exist:True
Successfully drop table fifa
Table fifa created successfully.


In [22]:
db_properties={}
db_properties['username']="postgres"
db_properties['password']="200042"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"
db_properties['table']="fifa"
db_properties['driver']="org.postgresql.Driver"

df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

# Task II

## Q1

In [23]:
#when we read data from postgresql, it will automatically transform postgresql datatype into pyspark datatype
#besides, since before overwriting, we have already defined schema as fifa, we will get the same schema as fifa when we read 
#If we didn't define pyspark schema, when we read back, for example, we shall get double variable instead of float
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df_read.show(1, vertical=True)
df_read.printSchema()

-RECORD 0-------------------------------------------
 sofifa_id                   | 158023               
 player_url                  | https://sofifa.co... 
 short_name                  | L. Messi             
 long_name                   | Lionel Andrés Mes... 
 player_positions            | CF                   
 overall                     | 93                   
 potential                   | 95                   
 value_eur                   | 1.005E8              
 wage_eur                    | 550000.0             
 age                         | 27                   
 dob                         | 1987-06-24           
 height_cm                   | 169                  
 weight_kg                   | 67                   
 club_team_id                | null                 
 club_name                   | FC Barcelona         
 league_name                 | Spain Primera Div... 
 league_level                | 1                    
 club_position               | CF             

In [24]:
df_read.createOrReplaceTempView("df_view")

In [25]:
def contracts_ending_2023(x):
    sqlWay = spark.sql("""
    select club_name,count(*) as player_with_contracts_ending_in_2023
    from df_view
    where year = 2022 and club_contract_valid_until=2023
    group by club_name
    having count(*) in
    (
    SELECT count(*) as cnt
    FROM df_view
    Where year = 2022 and club_contract_valid_until=2023
    group by club_name  
    order by cnt desc
    limit {}
    )
    order by player_with_contracts_ending_in_2023 desc
    """.format(x))
    sqlWay.show()

In [26]:
contracts_ending_2023(5)

+--------------------+------------------------------------+
|           club_name|player_with_contracts_ending_in_2023|
+--------------------+------------------------------------+
|En Avant de Guingamp|                                  19|
| Club Atlético Lanús|                                  17|
|       Lechia Gdańsk|                                  17|
|            Barnsley|                                  16|
|        Kasimpaşa SK|                                  16|
|        Bengaluru FC|                                  16|
+--------------------+------------------------------------+



## Q2

In [27]:
#method 1
def number_of_players_over_27(x):
    sqlWay = spark.sql("""
    select club_name,avg(number_players) as average_number_players_allyear from
    (
    SELECT club_name,count(*) as number_players
    FROM df_view
    Where age>27 and club_name is not null
    group by club_name,year
    ) as temp
    group by club_name
    having average_number_players_allyear in
    (
    select avg(number_players) as cnt from
    (
    SELECT club_name,count(*) as number_players
    FROM df_view
    Where age>27 and club_name is not null
    group by club_name,year
    ) as temp2
    group by club_name
    order by cnt desc
    limit {}
    )
    order by average_number_players_allyear desc
    """.format(x))
    sqlWay.show()

In [28]:
number_of_players_over_27(20)

+--------------------+------------------------------+
|           club_name|average_number_players_allyear|
+--------------------+------------------------------+
|  Dorados de Sinaloa|                          19.0|
| Matsumoto Yamaga FC|                          19.0|
| Shanghai Shenhua FC|                          18.5|
|          Qingdao FC|                          18.0|
|Club Deportivo Jo...|                          17.5|
|            Altay SK|                          17.0|
|         Guaireña FC|                          17.0|
|İstanbul Başakşeh...|                        16.625|
|      BB Erzurumspor|                          16.5|
|        Club Olimpia|                          16.5|
|      Sport Huancayo|                          16.5|
|    Beijing Renhe FC|                          16.0|
|              Paraná|                          16.0|
|  Sandecja Nowy Sącz|                          16.0|
|            CFR Cluj|            15.666666666666666|
|      Extremadura UD|      

In [29]:
#method 2
#Although this method seems a litte bit more complicated, it can apply to circumstances where x>20
#That's because pyspark show() has a limit of 20
def number_of_players_over_27(x):
    sqlWay = spark.sql("""
    select club_name,avg(number_players) as average_number_players_allyear from
    (
    SELECT club_name,count(*) as number_players
    FROM df_view
    Where age>27 and club_name is not null
    group by club_name,year
    ) as temp
    group by club_name
    order by average_number_players_allyear desc
    """)
    #find exactly how many rows(num) should be shown
    sqlWay=sqlWay.withColumn("id",monotonically_increasing_id()+1)
    least=sqlWay.select('average_number_players_allyear').take(x)[x-1]['average_number_players_allyear']
    rows_to_show=x
    temp=sqlWay.filter(sqlWay.id==rows_to_show+1).take(1)[0]['average_number_players_allyear']
    while temp==least:
        rows_to_show=rows_to_show+1
        temp=sqlWay.filter(sqlWay.id==rows_to_show+1).take(1)[0]['average_number_players_allyear']
    sqlWay.show(rows_to_show)

In [30]:
number_of_players_over_27(20)

+--------------------+------------------------------+---+
|           club_name|average_number_players_allyear| id|
+--------------------+------------------------------+---+
|  Dorados de Sinaloa|                          19.0|  1|
| Matsumoto Yamaga FC|                          19.0|  2|
| Shanghai Shenhua FC|                          18.5|  3|
|          Qingdao FC|                          18.0|  4|
|Club Deportivo Jo...|                          17.5|  5|
|            Altay SK|                          17.0|  6|
|         Guaireña FC|                          17.0|  7|
|İstanbul Başakşeh...|                        16.625|  8|
|      BB Erzurumspor|                          16.5|  9|
|        Club Olimpia|                          16.5| 10|
|      Sport Huancayo|                          16.5| 11|
|    Beijing Renhe FC|                          16.0| 12|
|              Paraná|                          16.0| 13|
|  Sandecja Nowy Sącz|                          16.0| 14|
|            C

## Q3

In [31]:
# sqlWay1=spark.sql('''
# select nation_position,year,count(*) as cnt 
# from df_view
# where nation_position is not null and nation_position<>'SUB'
# group by nation_position,year
# order by cnt desc
# ''')
# sqlWay1.show()

In [32]:
# sqlWay1.createOrReplaceTempView("sqlWay1")
# spark.catalog.listTables()

In [33]:
# sqlWay=spark.sql('''
# select temp.year,nation_position,sqlWay1.cnt from
# (
# select year,max(cnt) as maxcnt
# from sqlWay1
# group by year
# ) as temp
# left join sqlWay1
# on temp.maxcnt=sqlWay1.cnt and temp.year=sqlWay1.year
# order by temp.year
# ''')

In [34]:
# sqlWay.show(30)

In [35]:
def most_frequent_nation_position():
    sqlWay1=spark.sql('''
    select nation_position,year,count(*) as cnt 
    from df_view
    where nation_position is not null and nation_position<>'SUB'
    group by nation_position,year
    order by cnt desc
    ''')
    sqlWay1.createOrReplaceTempView("sqlWay1")
    sqlWay2=spark.sql('''
    select temp.year,nation_position,sqlWay1.cnt from
    (
    select year,max(cnt) as maxcnt
    from sqlWay1
    group by year
    ) as temp
    left join sqlWay1
    on temp.maxcnt=sqlWay1.cnt and temp.year=sqlWay1.year
    order by temp.year
    ''')
    sqlWay2.show(30)

In [36]:
most_frequent_nation_position()

+----+---------------+---+
|year|nation_position|cnt|
+----+---------------+---+
|2015|             GK| 47|
|2015|            RCB| 47|
|2015|            LCB| 47|
|2016|            RCB| 46|
|2016|             GK| 46|
|2017|             GK| 47|
|2017|            LCB| 47|
|2017|            RCB| 47|
|2018|             GK| 50|
|2018|            LCB| 50|
|2018|            RCB| 50|
|2019|            LCB| 48|
|2019|             GK| 48|
|2019|            RCB| 48|
|2020|            RCB| 49|
|2020|            LCB| 49|
|2020|             GK| 49|
|2021|            RCB| 49|
|2021|             GK| 49|
|2021|            LCB| 49|
|2022|            LCB| 33|
|2022|             GK| 33|
|2022|            RCB| 33|
+----+---------------+---+

