In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext(appName="YourTest", master="local[*]")

In [None]:
import requests
import re
import pandas as pd
from lxml import html

In [None]:
def grab_player_key(player_url):
    "Returns player key from the bball reference player url"
    return re.search('(?<=/)[^/]+(?=.html)', player_url).group(0)

In [None]:
import pandas as pd

def create_per_game_dataframe(player_url):
    """Create a dataframe with the Per Game stats of a player"""
    
    # Read html, find appropriate table
    page = requests.get(player_url)
    tree = html.fromstring(page.content)
    per_game_table_rows = tree.xpath('//*[@id="per_game"]/*/tr')
    
    # each element of per_game_cols will contain the name of the column, and the list of values in said column
    per_game_cols = []

    per_game_rows = []
    
    age_column_index = -1
    season_index = -1

    # Get Column Headers
    column_index = 0
    num_cols = 0
    for header in per_game_table_rows[0]:
        name = header.text_content()
        if name.upper() == "AGE":
            age_column_index = column_index
        elif name.upper() == "SEASON":
            season_index = column_index

        per_game_cols.append( (name, []) )
        column_index += 1
        num_cols += 1

    # add another column for player key
    per_game_cols.append( ("PlayerKey", []))

    if age_column_index == -1:
        raise RuntimeError("Age column index not found")
    if season_index == -1:
        raise RuntimeError("Season column index not found")
    print("Age Index: ", age_column_index)
    print("Season Index: ", season_index)

    player_key = grab_player_key(player_url)
    for row_index in range(1, len(per_game_table_rows)):
        row_element = per_game_table_rows[row_index]
        if len(row_element) != num_cols:
            continue

        # check if the column is an aggregate Career row, in which case we skip
        if row_element[age_column_index].text_content() == "":
            continue

        #Iterate through each element of the row
        column_index = 0
        row = []
        for element in row_element:
            data = element.text_content()

            # format the season number to the last year
            if column_index == season_index:
                data = int(data[0:4]) + 1
            else:
                #Convert any numerical value to integers
                try:
                    data=float(data)
                except:
                    pass

            row.append(data)
            per_game_cols[column_index][1].append(data)
            column_index += 1 
        
        # add the player key to each column
        per_game_cols[column_index][1].append(player_key)
        row.append(player_key)
        per_game_rows.append(row)

    Dict={title:column for (title,column) in per_game_cols}
    df=pd.DataFrame(Dict)
    
    #return per_game_cols
    return df


In [None]:
def create_per_game_list(player_url):
    
    # Read html, find appropriate table
    page = requests.get(player_url)
    tree = html.fromstring(page.content)
    per_game_table_rows = tree.xpath('//*[@id="per_game"]/*/tr')
    
    # each element of per_game_cols will contain the name of the column, and the list of values in said column
    per_game_cols = []
    per_game_rows = []
    
    age_column_index = -1
    season_index = -1

    # Get Column Headers
    column_index = 0
    num_cols = 0
    for header in per_game_table_rows[0]:
        name = header.text_content()
        if name.upper() == "AGE":
            age_column_index = column_index
        elif name.upper() == "SEASON":
            season_index = column_index

        column_index += 1
        num_cols += 1

    # add another column for player key
    per_game_cols.append( ("PlayerKey", []))

    if age_column_index == -1:
        raise RuntimeError("Age column index not found")
    if season_index == -1:
        raise RuntimeError("Season column index not found")
    print("Age Index: ", age_column_index)
    print("Season Index: ", season_index)

    player_key = grab_player_key(player_url)
    for row_index in range(1, len(per_game_table_rows)):
        row_element = per_game_table_rows[row_index]
        if len(row_element) != num_cols:
            continue

        # check if the column is an aggregate Career row, in which case we skip
        if row_element[age_column_index].text_content() == "":
            continue

        #Iterate through each element of the row
        column_index = 0
        row = []
        for element in row_element:
            data = element.text_content()

            # format the season number to the last year
            if column_index == season_index:
                data = int(data[0:4]) + 1
            else:
                #Convert any numerical value to integers
                try:
                    data=float(data)
                except:
                    pass

            row.append(data)
            column_index += 1 
        
        # add the player key to each column
        if len(row) != 0:
          row.append(player_key)
          per_game_rows.append(row)

    return per_game_rows, num_cols + 1


In [None]:
url = "https://www.basketball-reference.com/players/a/antetgi01.html"
create_per_game_list(url)

Age Index:  1
Season Index:  0


([[2014,
   19.0,
   'MIL',
   'NBA',
   'SF',
   77.0,
   23.0,
   24.6,
   2.2,
   5.4,
   0.414,
   0.5,
   1.5,
   0.347,
   1.7,
   3.9,
   0.44,
   0.463,
   1.8,
   2.6,
   0.683,
   1.0,
   3.4,
   4.4,
   1.9,
   0.8,
   0.8,
   1.6,
   2.2,
   6.8,
   'antetgi01'],
  [2015,
   20.0,
   'MIL',
   'NBA',
   'SG',
   81.0,
   71.0,
   31.4,
   4.7,
   9.6,
   0.491,
   0.1,
   0.5,
   0.159,
   4.6,
   9.1,
   0.511,
   0.496,
   3.2,
   4.3,
   0.741,
   1.2,
   5.5,
   6.7,
   2.6,
   0.9,
   1.0,
   2.1,
   3.1,
   12.7,
   'antetgi01'],
  [2016,
   21.0,
   'MIL',
   'NBA',
   'PG',
   80.0,
   79.0,
   35.3,
   6.4,
   12.7,
   0.506,
   0.4,
   1.4,
   0.257,
   6.1,
   11.3,
   0.537,
   0.52,
   3.7,
   5.1,
   0.724,
   1.4,
   6.2,
   7.7,
   4.3,
   1.2,
   1.4,
   2.6,
   3.2,
   16.9,
   'antetgi01'],
  [2017,
   22.0,
   'MIL',
   'NBA',
   'SF',
   80.0,
   80.0,
   35.6,
   8.2,
   15.7,
   0.521,
   0.6,
   2.3,
   0.272,
   7.6,
   13.5,
   0.563,
   0.541,
   

In [None]:
player_urls = sc.textFile("player_urls.txt")
player_urls = player_urls.map(lambda x: x.strip())
player_urls.take(1)

['https://www.basketball-reference.com/players/a/abdelal01.html']

In [None]:
def save_per_game_table(player_url):
  df = create_per_game_dataframe(player_url=player_url)
  player_key = grab_player_key(player_url=player_url)
  df.to_csv("per_game_data/" + player_key + "_per_game.csv", index=False)
  
  return df


In [None]:
import time
tic = time.time()
b = player_urls.map(lambda x: (x, create_per_game_list(x)))
b = b.filter(lambda x: x[1][1] == 31)
lists_rdd = b.flatMap(lambda x: x[1][0])
unformatted_lists = lists_rdd.map(lambda x: ','.join([str(itm) for itm in x]))
unformatted_lists.saveAsTextFile("/content/drive/MyDrive/CS631-Project/per_game_stats")
toc = time.time()
print(toc-tic)

0.026630878448486328


In [None]:
per_game_file = sc.textFile("/content/drive/MyDrive/CS631-Project/per_game_stats/part-00001")
per_game_file.count()

11419

In [None]:
counts = b.map(lambda x: (x[1][1], 1)).reduceByKey(lambda x, y: x+y)

NameError: ignored

In [None]:
counts.collect()

[(24, 84), (30, 46), (31, 3407), (19, 890), (23, 153), (29, 223)]

In [None]:
from pyspark.sql import SparkSession
import random
spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

In [None]:
player_raw = spark.read.csv("per_game_data/antetgi01_per_game.csv",sep=',',inferSchema=True, header=True)
player_raw.show()

+------+----+---+---+---+----+----+----+----+----+-----+---+---+-----+---+----+-----+-----+---+----+-----+---+----+----+---+---+---+---+---+----+---------+
|Season| Age| Tm| Lg|Pos|   G|  GS|  MP|  FG| FGA|  FG%| 3P|3PA|  3P%| 2P| 2PA|  2P%| eFG%| FT| FTA|  FT%|ORB| DRB| TRB|AST|STL|BLK|TOV| PF| PTS|PlayerKey|
+------+----+---+---+---+----+----+----+----+----+-----+---+---+-----+---+----+-----+-----+---+----+-----+---+----+----+---+---+---+---+---+----+---------+
|  2014|19.0|MIL|NBA| SF|77.0|23.0|24.6| 2.2| 5.4|0.414|0.5|1.5|0.347|1.7| 3.9| 0.44|0.463|1.8| 2.6|0.683|1.0| 3.4| 4.4|1.9|0.8|0.8|1.6|2.2| 6.8|antetgi01|
|  2015|20.0|MIL|NBA| SG|81.0|71.0|31.4| 4.7| 9.6|0.491|0.1|0.5|0.159|4.6| 9.1|0.511|0.496|3.2| 4.3|0.741|1.2| 5.5| 6.7|2.6|0.9|1.0|2.1|3.1|12.7|antetgi01|
|  2016|21.0|MIL|NBA| PG|80.0|79.0|35.3| 6.4|12.7|0.506|0.4|1.4|0.257|6.1|11.3|0.537| 0.52|3.7| 5.1|0.724|1.4| 6.2| 7.7|4.3|1.2|1.4|2.6|3.2|16.9|antetgi01|
|  2017|22.0|MIL|NBA| SF|80.0|80.0|35.6| 8.2|15.7|0.521|0.6|2.3|

In [None]:
player_raw = spark.read.csv("per_game_data/antetgi01_per_game.csv",sep=',',inferSchema=True, header=True)
player_raw_rdd = player_raw.rdd
player_raw.take(5)

[Row(Season=2014, Age=19.0, Tm='MIL', Lg='NBA', Pos='SF', G=77.0, GS=23.0, MP=24.6, FG=2.2, FGA=5.4, FG%=0.414, 3P=0.5, 3PA=1.5, 3P%=0.347, 2P=1.7, 2PA=3.9, 2P%=0.44, eFG%=0.463, FT=1.8, FTA=2.6, FT%=0.683, ORB=1.0, DRB=3.4, TRB=4.4, AST=1.9, STL=0.8, BLK=0.8, TOV=1.6, PF=2.2, PTS=6.8, PlayerKey='antetgi01'),
 Row(Season=2015, Age=20.0, Tm='MIL', Lg='NBA', Pos='SG', G=81.0, GS=71.0, MP=31.4, FG=4.7, FGA=9.6, FG%=0.491, 3P=0.1, 3PA=0.5, 3P%=0.159, 2P=4.6, 2PA=9.1, 2P%=0.511, eFG%=0.496, FT=3.2, FTA=4.3, FT%=0.741, ORB=1.2, DRB=5.5, TRB=6.7, AST=2.6, STL=0.9, BLK=1.0, TOV=2.1, PF=3.1, PTS=12.7, PlayerKey='antetgi01'),
 Row(Season=2016, Age=21.0, Tm='MIL', Lg='NBA', Pos='PG', G=80.0, GS=79.0, MP=35.3, FG=6.4, FGA=12.7, FG%=0.506, 3P=0.4, 3PA=1.4, 3P%=0.257, 2P=6.1, 2PA=11.3, 2P%=0.537, eFG%=0.52, FT=3.7, FTA=5.1, FT%=0.724, ORB=1.4, DRB=6.2, TRB=7.7, AST=4.3, STL=1.2, BLK=1.4, TOV=2.6, PF=3.2, PTS=16.9, PlayerKey='antetgi01'),
 Row(Season=2017, Age=22.0, Tm='MIL', Lg='NBA', Pos='SF', G=80

In [None]:
player_file = sc.textFile("per_game_data/antetgi01_per_game.csv")
player_file.take(1)

['Season,Age,Tm,Lg,Pos,G,GS,MP,FG,FGA,FG%,3P,3PA,3P%,2P,2PA,2P%,eFG%,FT,FTA,FT%,ORB,DRB,TRB,AST,STL,BLK,TOV,PF,PTS,PlayerKey']

In [None]:
player_file_len = player_file.map(lambda x: (x.split(',')))
a = player_file.take(1)
len(a[0].split(','))

31

In [None]:
player_file_len = player_file_len.take(1)
player_file_len[0][0]

'Season'

In [None]:
import os
per_game_data_files = os.listdir("per_game_data")

big_rdd = sc.textFile("per_game_data/antetgi01_per_game.csv")
big_rdd = big_rdd.map(lambda x: x.split(','))
big_rdd = big_rdd.filter(lambda x: x[0] != 'Season')

counter_31 = 0
for file in per_game_data_files:
  if file == "antetgi01_per_game.csv":
    continue
  per_game_rdd = sc.textFile("per_game_data/" + file)
  per_game_rdd = per_game_rdd.map(lambda x: x.split(','))

  headers = per_game_rdd.take(1)
  if len(headers[0]) != 31:
    continue

  counter_31 += 1
  per_game_rdd = per_game_rdd.filter(lambda x: x[0] != 'Season')
  big_rdd = big_rdd.union(per_game_rdd)
  #big_rdd.take(1)

big_rdd.saveAsTextFile("/content/drive/MyDrive/CS631-Project/per_game_stats1")



Py4JJavaError: ignored

In [None]:
big_rdd.saveAsTextFile("/content/drive/MyDrive/CS631-Project/per_game_stats1")

In [None]:
from pyspark.sql.functions import lit

def player_data_join(df1, df2):
  columns = list(set(df1.columns).union(set(df2.columns)))
  for column in set(columns) - set(df1.columns):
    df1 = df1.withColumn(column,lit(None))
  for column in set(columns) - set(df2.columns):
    df2 = df2.withColumn(column,lit(None))

  return df1.unionByName(df2)

In [None]:
import os
per_game_data_files = os.listdir("per_game_data")
tic = time.time()
i = 3
for i in range(3,21):
  per_game_df = spark.read.csv("per_game_data/" + per_game_data_files[0 + 250*i],sep=',',inferSchema=True, header=True)
  for file in per_game_data_files[1+ 250*i:250+ 250*i]:
    new_df = spark.read.csv("per_game_data/" + file,sep=',',inferSchema=True, header=True)
    per_game_df = player_data_join(per_game_df, new_df)
  toc = time.time()
  print(toc-tic)
  per_game_df = per_game_df.coalesce(1)
  per_game_df.write.csv("per_game_full_" + str(i + 3))
  tic = time.time()
  print(tic-toc)


150.2653558254242
57.57891368865967
151.71946215629578
56.92203998565674
147.97836756706238
70.390695810318
147.4594898223877
53.2462363243103
146.76342940330505
53.391186475753784
146.1782467365265
53.39643573760986
149.54935765266418
56.35137319564819
150.17580699920654
55.3159294128418
147.70355820655823
59.22399973869324
148.7922592163086
93.61418437957764
392.4908084869385
184.80229926109314
545.8273661136627
204.41166639328003
544.6308124065399
243.97536182403564
668.1665012836456


Py4JJavaError: ignored

In [None]:
per_game_data_files = os.listdir("per_game_data")
len(per_game_data_files)

4803

In [None]:
per_game_df.write.csv("per_game_full_file")

In [13]:
import ast
per_game_rdd = sc.textFile("/content/drive/MyDrive/CS631-Project/per_game_stats/part-00*")
per_game_rdd = per_game_rdd.map(lambda x: x.split(','))
per_game_rdd = per_game_rdd.map(lambda x: (x[0], 1))
per_game_rdd = per_game_rdd.reduceByKey(lambda x, y: x+y)

In [14]:
per_game_rdd.collect()

[('1991', 441),
 ('1993', 449),
 ('1994', 481),
 ('1995', 452),
 ('1970', 111),
 ('1977', 233),
 ('1978', 275),
 ('1979', 292),
 ('1980', 357),
 ('1981', 363),
 ('1982', 373),
 ('1983', 391),
 ('2002', 500),
 ('2003', 483),
 ('2004', 585),
 ('2007', 516),
 ('2017', 595),
 ('2019', 708),
 ('1990', 459),
 ('2013', 573),
 ('2014', 611),
 ('2015', 651),
 ('2016', 578),
 ('2020', 651),
 ('2011', 625),
 ('2012', 551),
 ('2010', 578),
 ('1969', 83),
 ('1964', 2),
 ('1947', 1),
 ('1992', 458),
 ('1971', 146),
 ('1972', 209),
 ('1973', 250),
 ('1974', 265),
 ('1975', 243),
 ('1976', 286),
 ('1984', 343),
 ('1985', 362),
 ('1986', 379),
 ('1987', 378),
 ('1988', 426),
 ('1989', 438),
 ('1996', 545),
 ('1997', 574),
 ('1998', 547),
 ('2001', 537),
 ('1999', 507),
 ('2000', 496),
 ('2005', 585),
 ('2006', 563),
 ('2008', 595),
 ('2018', 664),
 ('2009', 582),
 ('1966', 9),
 ('1967', 13),
 ('1968', 55),
 ('1963', 1),
 ('1965', 7)]

In [None]:
10 % 5

0