In [137]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StringType

import numpy as np

sc =SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [138]:
def readCSV(path):
    return sqlContext.read.format("csv").options(header="true")\
    .load(path)



In [139]:
team_df = readCSV("/Users/peggy/Desktop/footballManager/team_feat.csv")
player_df = readCSV("/Users/peggy/Desktop/footballManager/data_clean.csv")

In [140]:
def playerSimilarity(p1, p2):
    cosine = np.dot(p1,p2)/(np.linalg.norm(p1)*(np.linalg.norm(p2)))
    r =  np.sqrt(sum([i ** 2 for i in p1]))
    return r * cosine

In [141]:
def findTopK(playerList, K, player, sort_type):
    playerList.append(player)
    playerList.sort(key=lambda p: sort_type * p[1])
    if(len(playerList) > K):
        return playerList[:K]
    return playerList


def mergeTopK(pList1, pList2, K, sort_type):
    result = pList1 + pList2
    result.sort(key=lambda p:sort_type*p[1])
    if(len(result) > K):
        return result[:K]
    return result

In [144]:
def findSimilarPlayer(df, name, topK):
    player_df = df.select(["ID"] + df.columns[44:73]).where(df.Name == name)
    if player_df == None:
        raise NameError("No Player Found!")
    playerInfo = player_df.rdd.map(list)\
        .map(lambda l:(l[0], [int(l[i]) for i in range(1, len(l))])).collect()[0]
    (playerId, playerList) = playerInfo[0], playerInfo[1]
    mat = df.select(["ID"] + df.columns[44:73]).rdd.map(list)\
        .map(lambda l:(l[0], [int(l[i]) for i in range(1, len(l))]))\
        .filter(lambda kv: kv[0] != playerId)\
        .mapValues(lambda l: playerSimilarity(l, playerList))

    res = mat.aggregate([], lambda inp1, inp2: findTopK(inp1, topK, inp2, -1), lambda inp1, inp2: mergeTopK(inp1, inp2, topK, -1))
    res = [id for id, score in res]
    id_df = sqlContext.createDataFrame(res, StringType()).toDF("ID")
    res = df.join(id_df, "ID", "inner").select("Name", "Age", "Nationality", "Club", "Height(cm)", "Weight(lbs)")
    res.show()
    return res
    

print (findSimilarPlayer(player_df, "L. Messi", 20))
    

+-----------------+---+-----------+-------------------+----------+-----------+
|             Name|Age|Nationality|               Club|Height(cm)|Weight(lbs)|
+-----------------+---+-----------+-------------------+----------+-----------+
|Cristiano Ronaldo| 33|   Portugal|           Juventus|    187.96|        183|
|          G. Bale| 28|      Wales|        Real Madrid|    185.42|        181|
|     K. De Bruyne| 27|    Belgium|    Manchester City|    180.34|        154|
|        L. Suárez| 31|    Uruguay|       FC Barcelona|    182.88|        190|
|     A. Griezmann| 27|     France|    Atlético Madrid|    175.26|        161|
| L. Modri<U+0107>| 32|    Croatia|        Real Madrid|    172.72|        146|
|        E. Hazard| 27|    Belgium|            Chelsea|    172.72|        163|
|        Neymar Jr| 26|     Brazil|Paris Saint-Germain|    175.26|        150|
|         P. Pogba| 25|     France|  Manchester United|    193.04|        185|
|        P. Dybala| 24|  Argentina|           Juvent

In [145]:
def findBestReplicate(teamName, playerId, df, topK, weightVector):
    '''
    return list of [(player_id, replace_id, improve score)]
    '''
    player_info = df.select(df.columns[44:73]).where(df.ID == playerId).rdd.map(list)\
            .map(lambda l: [float(i) for i in l]).collect()[0] # list
    candidatePlayers = df.select(["ID"] + df.columns[44:73]).where(df.Club != teamName).rdd.map(list)\
        .map(lambda l:(l[0], [float(l[i]) for i in range(1, len(l))]))\
        .mapValues(lambda vals: improve(vals, player_info, weightVector)) # rdd
    res = candidatePlayers.aggregate([], lambda inp1, inp2: findTopK(inp1, topK, inp2, -1), lambda inp1, inp2: mergeTopK(inp1, inp2, topK, -1))
    res = [(playerId, id, score) for id, score in res]
    return res

def improve(l1, l2, weight):
    improve = 0
    for i in range(len(l1)):
        improve += (l1[i] - l2[i]) * weight[i]
    return improve


In [163]:
def featureThreshold(l):
    temp = sorted(l)
    return temp[int(len(l) / 4)]


def findWorstFeatures(teamName, team_df):
    '''
    take the team name and team dataframe and return list of index of weak features start from 0 = Crossing
    '''
    targ_df = team_df.select('*').where(team_df.Club == teamName).rdd.map(list)\
            .map(lambda l: (l[0], [float(l[i]) for i in range(1, len(l))]))\
            .mapValues(lambda l: (featureThreshold(l), l))\
            .mapValues(lambda tup: [index for index, val in enumerate(tup[1]) if val < tup[0]])
    feature_indexes = targ_df.collect()[0][1]
    return feature_indexes
    
    
def createWeightVector(feature_indexes):
    '''
    take list of weak features and return weight list of size 29
    '''
    norm = float(10 / (29 + len(feature_indexes)))
    weightVector = [2.0 * norm if index in feature_indexes else norm for index in range(29)]
    return weightVector
     
    
def findWorstPlayers(teamName, player_df, feature_indexes):
    '''
    take team name, player dataframe, weak features index list
    return list of worst players id
    '''
    worst_players = player_df.select(["ID"] + player_df.columns[44:73]).where(player_df.Club == teamName).rdd.map(list)\
            .map(lambda l: (l[0], [float(i) for i in l[1:]]))\
            .mapValues(lambda l: [l[i] for i in range(len(l)) if i in feature_indexes])\
            .mapValues(lambda l: sum(l)).collect()
    worst_players.sort(key = lambda t: t[1], reverse=True)
    return [id for id, index in worst_players][:10]


    
def replaceModeRecommendation(player_df, team_df, teamName, topK):
    feature_indexes = findWorstFeatures(teamName, team_df)
#     print([team_df.columns[i + 1] for i in feature_indexes])
    weight_vector = createWeightVector(feature_indexes)
#     print(weight_vector)
    worst_players = findWorstPlayers(teamName, player_df, feature_indexes)
    res = []
    for player_id in worst_players:
        res += findBestReplicate(teamName, player_id, player_df, topK, weight_vector)
    res.sort(key = lambda l: l[2], reverse=True)
    return res[:topK]
    


def printPlayerInfo(player_df, playerId):
    player_info = player_df.select("ID", 'Name', "Age", "Nationality", "Overall", "Club", "Position")\
            .where(player_df.ID == playerId).show()



# team_name = 'FC Barcelona'

# p1, p2 = replaceModeRecommendation(player_df, team_df, team_name)
# printPlayerInfo(player_df, p1)
# printPlayerInfo(player_df, p2)

team_name = 'LA Galaxy'
res = replaceModeRecommendation(player_df, team_df, team_name, 3)
for i in res:
    print("player:" + i[0] +" replacement:" + i[1] + " improvement:" + str(i[2]))


player:137114 replacement:178518 improvement:289.4444444444445
player:137114 replacement:181872 improvement:288.8888888888889
player:137114 replacement:197445 improvement:288.33333333333337


Row(Club='FC Barcelona', Crossing='80.62476177064626', Finishing='81.540688809306', HeadingAccuracy='76.74369928962068', ShortPassing='86.23370323678645', Volleys='77.52326333708207', Dribbling='87.37904545330319', Curve='83.40222425085713', FKAccuracy='78.63074125271213', LongPassing='82.10312165178314', BallControl='88.09562595091771', Acceleration='87.16262074103706', SprintSpeed='84.8290963007526', Agility='86.94290823807727', Reactions='86.33703058169628', Balance='86.42046501651164', ShotPower='82.47033883901801', Jumping='82.18584618622971', Stamina='84.9731157307648', Strength='78.10304695526871', LongShots='84.58731838575437', Aggression='80.66883086070462', Interceptions='82.01912593583735', Positioning='82.98261152893497', Vision='85.63862090045062', Penalties='75.68051410036797', Composure='85.84881264827561', Marking='82.18240960220339', StandingTackle='82.64364955148712', SlidingTackle='80.60619365785712')
