<h2>Reworked Super stars to match format of Undervalued</h2>

In [0]:
ATTRIBUTES = {
  'game_ID' : 1,
  'date' : 2,
  'player_ID' : 3,
  'player' : 4,
  'team' : 5,
  'position' : 9,
  'salary' : 11,
  'points' : 13
}

INDICES = [val for key, val in ATTRIBUTES.items()]

CURRENT_DATE = '2019-10-05'

COMPOSITION = {
  'C' : 2,
  'G' : 2, 
  'D' : 2,
  'W' : 3
}

percent_super_stars = 10
number_of_super_stars = 4

BUDGET = 50000

TEAM = []

# Pre-Process Data
#### 1. Read Raw Data

In [0]:
# File location and type
file_location = "/FileStore/tables/DataTable.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

#display(df)

#### 2. Convert to RDD & Cache

In [0]:
RDD = df.rdd.map(list)
RDD.cache()

#### 3. Extract the Required Columns from RDD

In [0]:
# extract only relevant columns
RDD_filtered_by_column = RDD.map(lambda entry: [entry[i] for i in INDICES])

# update attribute indices
ATTRIBUTES = {
  'game_ID' : 0,
  'date' : 1,
  'player_ID' : 2,
  'player' : 3,
  'team' : 4,
  'position' : 5,
  'salary' : 6,
  'points' : 7
}

#### 4. Remove Rows Containing Invalid Entries

In [0]:
# filter out rows that contain invalid 'position' and 'points' entries
RDD_cleaned = RDD_filtered_by_column.filter(lambda entry: entry[ATTRIBUTES['position']] != "#N/A" 
                                          and entry[ATTRIBUTES['points']] != "#N/A" 
                                          and entry[ATTRIBUTES['salary']] != "#N/A")
#RDD_cleaned.collect()

#### 5. Group Rows by Player ID

In [0]:
# group rows by 'player_ID'
RDD_paired_by_player = RDD_cleaned.map(lambda entry: [entry[ATTRIBUTES['player_ID']], entry])
RDD_grouped_by_player = RDD_paired_by_player.groupByKey().mapValues(list).map(lambda entry: list(entry))
#RDD_grouped_by_player.collect()

#### 6. Sort Rows by Date

In [0]:
# sort rows by 'date' for each player
RDD_sorted_by_date = RDD_grouped_by_player.map(lambda entry: [entry[0], sorted(entry[1], key = lambda x: x[ATTRIBUTES['date']])])
#RDD_sorted_by_date.collect()

# Compute Running Averages of Fantasy Points and Salaries (Cost) for Each Player

In [0]:
def compute_running_average_points(entry):
  """
    Returns the following attributes for each player (for each game):
    
      - game_ID
      - most recent game date
      - ID
      - name
      - team
      - position
      - salary in most recent game
      - points in most recent game
      - running average of points up to most recent game
      
    ** Assumes that entries for each player are sorted by date.
    
  """
  ID, rows = tuple(entry)
  count = 0
  total_points = 0
  total_cost = 0
  
  for row in rows:
    count += 1
    total_points += float(row[ATTRIBUTES['points']])
    row.append(float(total_points) / count)
    row[ATTRIBUTES['salary']] = int(row[ATTRIBUTES['salary']])
    
  return [ID, rows]


def compute_running_average_of_points_and_cost(entry):
  
  """
    Returns the following attributes for each player (for each game):
    
      - game_ID
      - most recent game date
      - ID
      - name
      - team
      - position
      - salary in most recent game
      - points in most recent game
      - running average of points up to most recent game
      - running average of salary up to most recent game
      
    ** Assumes that entries for each player are sorted by date.
    
  """
  
  ID, rows = tuple(entry)
  count = 0
  total_points = 0
  total_cost = 0
  
  for row in rows:
    count += 1
    total_points += float(row[ATTRIBUTES['points']])
    total_cost += int(row[ATTRIBUTES['salary']])
    row.append(float(total_points) / count)
    row.append(float(total_cost) / count)
    row[ATTRIBUTES['salary']] = int(row[ATTRIBUTES['salary']])
    
  #return [ID, rows[-1] + [total_points, total_cost]]
  return [ID, rows]

RDD_with_running_averages = RDD_sorted_by_date.map(compute_running_average_of_points_and_cost)
RDD_average_points = RDD_sorted_by_date.map(compute_running_average_points)

# add attribute-index mappings
ATTRIBUTES['average_points'] = 8
ATTRIBUTES['average_salary'] = 9



# Extract Subset of RDD that Predates Current Date

In [0]:
def filter_by_current_date(entry):
  
  ID, rows = tuple(entry)
  new_rows = []
  
  for row in rows:
    if row[ATTRIBUTES['date']] < CURRENT_DATE:
      new_rows.append(row)
  
  if len(new_rows):
    return [ID, new_rows[-1]]
  else:
    return [ID, []]
  
RDD_filtered_by_date_super_stars = RDD_average_points.map(filter_by_current_date).filter(lambda x:x[1] != [])
RDD_filtered_by_date_under_valued = RDD_with_running_averages.map(filter_by_current_date).filter(lambda x:x[1] != [])
#print(RDD_filtered_by_date.collect())

# Order players by average points in decending order

In [0]:
RDD_paired_by_position_super_stars = RDD_filtered_by_date_super_stars.map(lambda entry: [entry[1][ATTRIBUTES['position']], entry])
RDD_grouped_by_position_super_stars = RDD_paired_by_position_super_stars.groupByKey().mapValues(list).map(lambda entry: list(entry))
RDD_sorted_by_average_points_scored = RDD_grouped_by_position_super_stars.map(
                                    lambda entry: [entry[0], sorted(entry[1], 
                                                                    key = lambda x: -x[1][ATTRIBUTES['average_points']])])

#Compute Degree of Undervaluedness for each Player at each point in Season History

In [0]:
def compute_undervaluedness(entry):
  
  ID, row = tuple(entry)

  undervalued, UV_index_absolute, UV_index_relative  = False, 0, 0 
  curr_cost = int(row[ATTRIBUTES['salary']])
  average_cost = row[ATTRIBUTES['average_salary']]

  if curr_cost < average_cost:
    undervalued = True

  UV_index_absolute = curr_cost - average_cost
  UV_index_relative = (curr_cost - average_cost) / average_cost
  row += [undervalued, UV_index_absolute, UV_index_relative]
    
  return [ID, row]

RDD_with_undervaluedness = RDD_filtered_by_date_under_valued.map(compute_undervaluedness)

# add attribute-index mappings
ATTRIBUTES['undervalued'] = 10
ATTRIBUTES['UV_index_absolute'] = 11
ATTRIBUTES['UV_index_relative'] = 12 

# Group Players by Position & Sort by Undervaluedness

In [0]:
RDD_paired_by_position_undervalue = RDD_with_undervaluedness.map(lambda entry: [entry[1][ATTRIBUTES['position']], entry])
RDD_grouped_by_position_undervalue = RDD_paired_by_position_undervalue.groupByKey().mapValues(list).map(lambda entry: list(entry))
RDD_sorted_by_undervaluedness = RDD_grouped_by_position_undervalue.map(
                                    lambda entry: [entry[0], sorted(entry[1], 
                                                                    key = lambda x: x[1][ATTRIBUTES['UV_index_absolute']])])


# Filter Out Players Not Available on Current Date

#### 1. Extract Players Available on Current Date

In [0]:
def find_available_players(entry):
  
  ID, rows = tuple(entry)
  
  for row in rows:
    if row[ATTRIBUTES['date']] == CURRENT_DATE:
      return True
    
  return False
  
RDD_available_superstars = RDD_grouped_by_player.filter(find_available_players).map(lambda x: x[0])
RDD_available_undervalued = RDD_grouped_by_player.filter(find_available_players).map(lambda x: x[0])

#### 2. Filter Players by Availability

In [0]:
def filter_by_availability(entry):
  
  position, players = tuple(entry)
  
  for player in players:
    ID, _ = tuple(player)
    if RDD_available_players.filter(lambda player: player == ID):
      return True
    
  return False

RDD_availability_for_possible_super_stars = RDD_sorted_by_average_points_scored.filter(lambda entry: entry)
RDD_undervalued_filtered_by_availability = RDD_sorted_by_undervaluedness.filter(lambda entry: entry)
#RDD_super_stars_filtered_by_availability.collect()

<h2>Find the available superstars </h2>

In [0]:
def find_super_stars(entry):
  ID, rows = tuple(entry)
  superStars = []
  
  nStars = int(len(rows)/percent_super_stars)
  
  i = 0
  while i < nStars:
    superStars.append(rows[i])
    i += 1
  
  return [ID, superStars]

RDD_super_stars_filtered_by_availability = RDD_availability_for_possible_super_stars.map(find_super_stars)
#RDD_super_stars.collect()

In [0]:
#RDD_super_stars_filtered_by_availability.collect()
RDD_undervalued_filtered_by_availability.collect()

# Pick players to meet team compositon and budget requiremnts

#### 1. Find Super Stars and remaining budget

In [0]:
candidate_CsuperstarsRDD = RDD_super_stars_filtered_by_availability.filter(lambda entry: entry[0] == 'C').map(lambda entry: entry[1][:number_of_super_stars]).collect()[0]
candidate_WsuperstarsRDD = RDD_super_stars_filtered_by_availability.filter(lambda entry: entry[0] == 'W').map(lambda entry: entry[1][:number_of_super_stars]).collect()[0]
candidate_DsuperstarsRDD = RDD_super_stars_filtered_by_availability.filter(lambda entry: entry[0] == 'D').map(lambda entry: entry[1][:number_of_super_stars]).collect()[0]
candidate_GsuperstarsRDD = RDD_super_stars_filtered_by_availability.filter(lambda entry: entry[0] == 'G').map(lambda entry: entry[1][:number_of_super_stars]).collect()[0]

candidate_superstars = candidate_CsuperstarsRDD + candidate_WsuperstarsRDD + candidate_DsuperstarsRDD + candidate_GsuperstarsRDD
sorted_super_star_candidates = sc.parallelize(candidate_superstars).sortBy(lambda x:-x[1][ATTRIBUTES['average_points']]).collect()

#UDGET = 50000

#OMPOSITION = {
# 'C' : 2,
# 'G' : 2, 
# 'D' : 2,
# 'W' : 3
#

players_added = 0
#EAM = []

for player in sorted_super_star_candidates:
  if (players_added) == number_of_super_stars:
    break
  if player[1][ATTRIBUTES['position']] == 'C' and COMPOSITION['C'] != 0:
    name = player[1][ATTRIBUTES['player']]
    salary = player[1][ATTRIBUTES['salary']]
    print("\t{}: {} {}" .format(name, salary, 'C'))
    BUDGET -= salary
    TEAM.append(player)
    COMPOSITION['C'] -= 1
    players_added += 1
  elif player[1][ATTRIBUTES['position']] == 'W' and COMPOSITION['W'] != 0:
    name = player[1][ATTRIBUTES['player']]
    salary = player[1][ATTRIBUTES['salary']]
    print("\t{}: {} {}" .format(name, salary, 'W'))
    BUDGET -= salary
    TEAM.append(player)
    COMPOSITION['W'] -= 1
    players_added += 1
  elif player[1][ATTRIBUTES['position']] == 'D' and COMPOSITION['D'] != 0:
    name = player[1][ATTRIBUTES['player']]
    salary = player[1][ATTRIBUTES['salary']]
    print("\t{}: {} {}" .format(name, salary, 'D'))
    BUDGET -= salary
    TEAM.append(player)
    COMPOSITION['D'] -= 1
    players_added += 1
  elif player[1][ATTRIBUTES['position']] == 'G' and COMPOSITION['G'] != 0:
    name = player[1][ATTRIBUTES['player']]
    salary = player[1][ATTRIBUTES['salary']]
    print("\t{}: {} {}" .format(name, salary, 'G'))
    BUDGET -= salary
    TEAM.append(player)
    COMPOSITION['G'] -= 1
    players_added += 1
    
print("\n >> REMAINING BUDGET = " + str(BUDGET) + "\n")

#### 2. Use remaining budget to find undervalued players to add to the team

In [0]:
#on average goalies salary is 1.9* that of a skater

budget_per_player = BUDGET/(9-number_of_super_stars + COMPOSITION['G'])
goalie_budget = budget_per_player*2
skater_budget = budget_per_player
print("Goalie Budget: {}".format(goalie_budget))
print("Skater Budget: {}".format(skater_budget))

for position, quantity in COMPOSITION.items():
  
  #print("\nSELECTING {} PLAYERS\n".format(position))
  candidate_players = RDD_undervalued_filtered_by_availability.filter(lambda entry: entry[0] == position).map(lambda entry: entry[1]).collect()[0]
  
  for player in candidate_players:
    if COMPOSITION[position] == 0:
      break
    if player[1][ATTRIBUTES['salary']] <= skater_budget or player[1][ATTRIBUTES['position']] == 'G' and player[1][ATTRIBUTES['salary']] <= goalie_budget:
      name = player[1][ATTRIBUTES['player']]
      salary = player[1][ATTRIBUTES['salary']]
      position = player[1][ATTRIBUTES['position']]
      print("\t{}: {} {}".format(name, salary, position))
      BUDGET -= salary
      COMPOSITION[position] -= 1
      TEAM.append(player)
      

print("\n >> REMAINING BUDGET = " + str(BUDGET) + "\n")

In [0]:
for player in TEAM:
  name = player[1][ATTRIBUTES['player']]
  salary = player[1][ATTRIBUTES['salary']]
  position = player[1][ATTRIBUTES['position']]
  print("\t{}: {} {}".format(name, salary, position))

# Compute Fantasy Points Earned on Current Date given Selected Team