In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import *
import pyspark.pandas as pd
import json
# create our spark context to create dataframe based on json parsing 
jar = '/workspace/nba_sas_assessment/config/jar/postgresql-42.5.1.jar'

sparkClassPath = os.getenv('SPARK_CLASSPATH', jar)


spark = (SparkSession.builder.config('spark.jars', f'file:{sparkClassPath}').config('spark.executor.extraClassPath', sparkClassPath).config('spark.driver.extraClassPath', sparkClassPath).appName("PySpark processing NBA data")\
    .getOrCreate())

In [24]:
from dotenv import load_dotenv

load_dotenv('./workspace/nba_sas_assessment/index(6).env')
URL = os.getenv('JDBC')
USER = os.getenv('PGUSER')
PASS = os.getenv('PGPASSWORD')



In [25]:
from script_etl import extract_files
urls_list = ['https://github.com/sealneaward/nba-movement-data/raw/master/data/01.01.2016.CHA.at.TOR.7z']


# Extract the game files from raw
game_files = extract_files(folder_raw=folder_raw,folder_tmp_path=folder_tmp_path,urls=urls_list)     


## Replace the next variables with your directory of preference ##
folder_raw = '/workspace/nba_sas_assessment/raw_data' # directory to save and process raw data
folder_tmp_path = "/workspace/nba_sas_assessment/raw_data/tmp" # directory to store json raw data

## Replace the next variables with the path to postgres jar file and .env with credencials to access neon.tech ##
jar_postgres_path = '/workspace/nba_sas_assessment/config/jar/postgresql-42.5.1.jar'
env_postgres_credentials = './workspace/nba_sas_assessment/config/postgres_login.env'


# use python open method to open json file on read mode
game_file = open(f'{folder_tmp_path}/{game_files[0]}', 'r')
# using json loads to load the json data
data = json.load(game_file)
# catching up the key 'events' so we can go inside of the key values 
events = data['events']
# create a list to store the moments keys (location, ball, player, team id)
location_data = []
# for each play in events, we are able to store the eventId and also the moments (with ball/team location info)
for play in events:
    # store the eventid
    event_id = play['eventId']
    # store the court info 
    court_info = play['moments']
    # for each location info on court
    for location in court_info:
        # select the value where ball and player location are stored
        for ball_or_player in location[5]:
            # 'extend' allow create a list with multiple info, so we can load this into spark dataframe (horizontal data)
            ball_or_player.extend((location[2], location[3], location[0], data['gameid'], event_id))
            location_data.append(ball_or_player)
    # break


In [27]:
location_data[1]

[1610612761,
 2449,
 19.08811,
 13.91147,
 0.0,
 711.26,
 11.99,
 1,
 '0021500492',
 '1']

In [17]:
neon_team_data = spark.read.format("jdbc") \
.option("url", "jdbc:postgresql://ep-rapid-cloud-796936.us-east-2.aws.neon.tech/neondb?user=joaopedro.brb&password=ymFheQfG70XC") \
.option("dbtable", "teams_dimensions.team_data") \
.option("user", "joaopedro.brb") \
.option("password", "ymFheQfG70XC") \
.option("driver", "org.postgresql.Driver") \
.load()

In [None]:
team_data.write.format("jdbc").mode('append').option("url", URL)\
.option("user", USER)\
.option("password", PASS)\
.option("dbtable", 'teams_dimensions.team_data')\
.option("driver", "org.postgresql.Driver")\
.save()

In [None]:
# Extract the game files from raw
game_files = extract_files(folder_raw=folder_raw,folder_tmp_path=folder_tmp_path,urls=urls_list)     
get_teams_data_dim = get_teams_data(game_files=game_files[i],folder_tmp_path=folder_tmp_path)  

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import *
import pyspark.pandas as pd
import os
from urllib import request
from py7zr import unpack_7zarchive
import shutil
import glob
import json
from script_etl import * 
from dotenv import load_dotenv

## WARNING: BEFORE RUNNING, PLEASE INSTALL THE FILE config/requirements.txt with command: pip install -r config/requirements.txt 

## Replace the next variables with your directory of preference ##
folder_raw = '/workspace/nba_sas_assessment/raw_data' # directory to save and process raw data
folder_tmp_path = "/workspace/nba_sas_assessment/raw_data/tmp" # directory to store json raw data

## Replace the next variables with the path to postgres jar file and .env with credencials to access neon.tech ##
jar_postgres_path = '/workspace/nba_sas_assessment/config/jar/postgresql-42.5.1.jar'
env_postgres_credentials = '/workspace/nba_sas_assessment/index(6).env'

# create spark session and return the session
spark = create_spark_session(jar_file_path=jar_postgres_path)

# create spark structure to received processed json data
struct_spark = struct_field_create()

## Here you have two options:
        # 1) If you already had uploaded the file (.7z) inside the 'folder_raw', you can ignore the next variable containing a list
        # 2) If you want to download one or more files and store into the 'folder_raw', fill the list 'urls_list' with the desired links to download

urls_list = ['https://github.com/sealneaward/nba-movement-data/raw/master/data/01.01.2016.CHA.at.TOR.7z']


# Extract the game files from raw
game_files = extract_files(folder_raw=folder_raw,folder_tmp_path=folder_tmp_path,urls=urls_list)     

# for each item on folder_tmp_path (each json file)
for i in range(0,len(game_files)):
    # Get data from teams
    get_teams_data_dim = get_teams_data(game_files=game_files[i],folder_tmp_path=folder_tmp_path)                                                                                                                                                                                                                                       
    # Get data from visitant team (players)
    get_visitant_players_info = get_players_dimension_visitant(game_files=game_files[i],folder_tmp_path=folder_tmp_path)
    # Get data from home team (players)
    get_home_players_info = get_players_dimension_home(game_files=game_files[i],folder_tmp_path=folder_tmp_path)
    # Get ball movement data
    get_location_data = get_location_of_ball(game_files=game_files[i],folder_tmp_path=folder_tmp_path)
    # Transform team data into dataframe
    team_data = team_data_df(spark=spark,data_to_process=get_teams_data_dim,struct_spark=struct_spark)
    # Transform visitant team (players) into dataframe
    visitant_team = visitant_team_df(spark=spark,struct_spark=struct_spark,data_to_process=get_visitant_players_info)
    # Transform home team (players) into dataframe
    home_team = home_team_df(spark=spark,data_to_process=get_home_players_info,struct_spark=struct_spark)
    # Transform ball movement into dataframe
    location_of_the_ball = location_of_the_ball_df(spark=spark,data_to_process=get_location_data,struct_spark=struct_spark)
    print(f'{env_postgres_credentials,jar_postgres_path}')
    # Append all these dataframes into our database
    append_to_postgres(location_data=location_of_the_ball\
                       ,home_data=home_team\
                       ,visitant_data=visitant_team\
                       ,team_data=team_data\
                       ,jar=jar_postgres_path\
                       ,cred=env_postgres_credentials)

    # Remove JSON files to clean the folder
    remove_json_file(path=folder_tmp_path,game_file=game_files[i])



In [None]:
import os

# create a folder to store our data if not exists
folder = r'/workspace/nba_ball_movement/game_data' 
if not os.path.exists(folder):
    os.makedirs(folder)

In [None]:
from urllib import request

response_download_name = f'{folder}/game_data.7z'

# download url file from git repo and save into folder created
url_to_download = 'https://github.com/sealneaward/nba-movement-data/raw/master/data/01.02.2016.DET.at.IND.7z'
response = request.urlretrieve(url_to_download, response_download_name)

In [None]:
from py7zr import unpack_7zarchive
import shutil
import glob

# register unzip 7z function
try:
    shutil.register_unpack_format('7zip', ['.7z'], unpack_7zarchive)
except Exception:
    pass

# list all files from download folder
file_name = glob.glob(f'{folder}/*.7z')

# create a count
c = 0

# path to output zipped files
output_path = f'{folder}/tmp/'

# for each file from download folder, decompress zipped and save into /tmp folder
for i in file_name:
    shutil.unpack_archive(file_name[0], output_path)
    os.remove(file_name[0])
    c += 1

In [None]:
import json

# path where json is stored
path = "/workspace/nba_sas_assessment/raw_data/tmp"

# listing all files inside the path
folder = os.listdir(path)
# create a empty list to append if we have json files inside the folder
game_files = []
c = 0
# for each information in folder
for i in folder:
    # if we have '.json' in string, append to our previous empty list
    if '.json' in folder[c]:
        game_files.append(folder[c])
    c =+ 1

In [None]:
c_home = 0
c_visitant = 0
c = 0
# use python open method to open json file on read mode
game_file = open(f'{path}/{game_files[0]}', 'r')
# using json loads to load the json data
data = json.load(game_file)
data_home = data['events']
# HOME TEAM INFO #
player_data_home = []
# fetching players of home team 
players_query = data_home[0]['home']['players']
for i in players_query:
    player_data_home.append([i for i in players_query[c_home].values()])
    player_data_home[-1].extend((data['gameid'],data_home[0]['home']['teamid']))
    c_home += 1
# VISITANT TEAM INFO #
player_data_visitant = []
data_visitant = data['events']
players_query = data_visitant[0]['visitor']['players']
for i in players_query:
    player_data_visitant.append([i for i in players_query[c_visitant].values()])
    player_data_visitant[-1].extend((data['gameid'],data_visitant[0]['visitor']['teamid']))
    c_visitant += 1    
c =+ 1


In [None]:
game_file = open(f'{path}/{game_files[0]}', 'r')

In [None]:
data = json.load(game_file)


In [None]:
df = spark.read.format('org.apache.spark.sql.json') \
        .load(f"{path}/{game_files[0]}")

In [None]:
data.keys()

schema_teams_info = StructType([ \
    StructField("game_id",StringType(),True),
    StructField("game_date",StringType(),True),
    StructField("events",StringType(),True),

    ]
)

In [None]:
spark.createDataFrame(data, schema=schema_teams_info)

In [None]:
game_file = open(f'{path}/{game_files[0]}', 'r')
# using json loads to load the json data
data = json.load(game_file)

In [None]:
teams = []
teams.extend((data['events'][0]['home']['name'],data['events'][0]['home']['abbreviation'],data['events'][0]['home']['teamid'],data['events'][0]['visitor']['name'],data['events'][0]['visitor']['abbreviation'],data['events'][0]['visitor']['teamid'],data['gameid']))

In [None]:
teams

In [None]:
schema_teams_info = StructType([ \
    StructField("home_team_name",StringType(),True),
    StructField("home_team_abbreviation",StringType(),True),
    StructField("home_team_id",LongType(),True),
    StructField("visitant_team_name",StringType(),True),
    StructField("visitant_team_abbreviation", StringType(),True),
    StructField("visitant_team_id", LongType(),True),
    StructField("game_id", StringType(),True)
    ]
)

teams = [teams]

In [None]:
team_info_data = spark.createDataFrame(data=teams,schema=schema_teams_info)

In [None]:
team_info_data.show()

In [None]:
data['gameid']

In [None]:
c_home = 0
c_visitant = 0
c = 0
# use python open method to open json file on read mode
game_file = open(f'{path}/{game_files[0]}', 'r')
# using json loads to load the json data
data = json.load(game_file)
data_home = data['events']
# HOME TEAM INFO #
player_data_home = []
# fetching players of home team 
players_query = data_home[0]['home']['players']
for i in players_query:
    player_data_home.append([i for i in players_query[c_home].values()])
    player_data_home[-1].extend((data['gameid'],data_home[0]['home']['teamid']))
    c_home += 1
# VISITANT TEAM INFO #
player_data_visitant = []
data_visitant = data['events']
players_query = data_visitant[0]['visitor']['players']
for i in players_query:
    player_data_visitant.append([i for i in players_query[c_visitant].values()])
    player_data_visitant[-1].extend((data['gameid'],data_visitant[0]['visitor']['teamid']))
    c_visitant += 1    
c =+ 1


In [None]:
player_data_home

In [None]:
# create our schema to upload data
schema_location_of_ball_and_teams = StructType([ \
    StructField("team_id",LongType(),True),
    StructField("player_id",LongType(),True),
    StructField("x_loc",FloatType(),True),
    StructField("y_loc",FloatType(),True),
    StructField("radius", FloatType(),True),
    StructField("game_clock", FloatType(),True),
    StructField("shot_clock", FloatType(),True),
    StructField("quarter", IntegerType(),True),
    StructField("game_id", StringType(),True),
    StructField("event_id", StringType(),True),
    ]
  )

schema_team_info = StructType([ \

    StructField("last_name",StringType(),True),
    StructField("first_name",StringType(),True),
    StructField("player_id",StringType(),True),
    StructField("team_id",StringType(),True),
    StructField("jersey_number",StringType(),True),
    StructField("position", StringType(),True),
    StructField("game_id", StringType(),True)
    ]
  )


In [None]:
# create the ball movement dataframe
#game_ball_movement = spark.createDataFrame(data=location_data, schema=schema_location_of_ball_and_teams)

# create the home team players dimension
home_team_data = spark.createDataFrame(data=player_data_home, schema=schema_team_info) 

# create the visitant team players dimension
#visitant_team_data = spark.createDataFrame(data=player_data_visitant, schema=schema_team_info) 

In [None]:


df = spark.read.format("jdbc").option("url", URL)\
.option("user", USER)\
.option("password", PASS)\
.option("query", 'select distinct * from teams_dimensions.players_data')\
.option("driver", "org.postgresql.Driver")\
.load()




In [None]:
df.count()

In [None]:
df.createOrReplaceTempView('players_neon')
home_team_data('')

In [None]:
df.union(home_team_data).count()

In [None]:
home_team_data.show(5)

In [None]:
home_team_data.write.format("jdbc").mode('ignore').option("url", URL)\
.option("user", USER)\
.option("password", PASS)\
.option("dbtable", 'teams_dimensions.team_data')\
.option("driver", "org.postgresql.Driver")\
.save()

In [None]:
from dotenv import load_dotenv

env_postgres_credentials = './workspace/nba_sas_assessment/config/postgres_login.env'
path = env_postgres_credentials

load_dotenv(path)

USER = os.getenv('PGUSER')
PASS = os.getenv('PGPASSWORD')

credentials = {
    'url': os.getenv('JDBC'),
    'user': USER,
    'password': PASS
}

In [None]:
env_postgres_credentials = './workspace/nba_sas_assessment/config/postgres_login.env'
credentials = env_postgres_credentials

In [None]:
load_dotenv(credentials)
URL = os.getenv('JDBC')
USER = os.getenv('PGUSER')
PASS = os.getenv('PGPASSWORD')

In [None]:
URL

In [None]:


df = spark.read.format("jdbc").option("url", URL)\
.option("user", USER)\
.option("password", PASS)\
.option("dbtable", 'teams_dimensions.team_data')\
.option("driver", "org.postgresql.Driver")\
.load()

In [None]:
credentials.keys()

In [None]:
credentials['url']