# ENTREGABLE N1 - RODRIGO AGUIRRE

CONSIGA:
* Bajar datos de una API en formato JSON
* Cargar datos en la tabla de Redshift

![Imagen](./entregable_arquitectura.png)

In [90]:
#Para el ejercicio elegi una libreria que facilita el trabajo con la api oficial de la NBA. 
#Los datos que tomaremos son las estadisiticas de Stephen Curry en sus temporadas regulares en la NBA.
pip install nba_api

Note: you may need to restart the kernel to use updated packages.


In [2]:
from nba_api.stats.endpoints import playercareerstats
import json 

# Stephen Curry Seasons Stats - Obtenemos un json y lo visualizamos (cambiando el numero de id podriamos analizar otros jugadores).
career = playercareerstats.PlayerCareerStats(player_id='201939')
json=json.dumps(career.get_dict(), indent=4)
#print(json)

ModuleNotFoundError: No module named 'nba_api'

In [93]:
!pip install psycopg2-binary



In [94]:
# Crear sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [95]:
env = os.environ

In [96]:
# Conectamos a Redshift
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [97]:
#Creamos la tabla
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.Stephen_Curry_NBA_RegularSeason_Stats (
    PLAYER_ID int distkey,
    SEASON_ID int,
    TEAM_ID bigint,
    TEAM_ABBREVIATION varchar(3),
    PLAYER_AGE int,
    GP int,
    GS int,
    MIN decimal(10,2),
    FGM int,
    FGA int,
    FG_PCT decimal(5,4),
    FG3M int,
    FG3A int,
    FG3_PCT decimal(5,4),
    FTM int,
    FTA int,
    FT_PCT decimal(5,4),
    OREB int,
    DREB int,
    REB int,
    AST int,
    STL int,
    BLK int,
    TOV int,
    PF int,
    PTS int);""")

conn.commit()
cursor.close()
print("Table created!")

Table created!


In [98]:
# Creamos el dataframe
df = spark.createDataFrame(career.get_dict()["resultSets"][0]["rowSet"], ["PLAYER_ID","SEASON_ID","LEAGUE_ID","TEAM_ID","TEAM_ABBREVIATION","PLAYER_AGE","GP","GS",
"MIN","FGM","FGA","FG_PCT","FG3M","FG3A","FG3_PCT","FTM","FTA","FT_PCT","OREB","DREB","REB","AST","STL","BLK","TOV","PF","PTS"])
df=df.drop("LEAGUE_ID")

In [99]:
#Escribimos los datos en la DB
df.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.Stephen_Curry_NBA_RegularSeason_Stats") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [100]:
# Query Redshift using Spark SQL
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.Stephen_Curry_NBA_RegularSeason_Stats"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [101]:
data.printSchema()
data.show()

root
 |-- player_id: long (nullable = true)
 |-- season_id: string (nullable = true)
 |-- team_id: long (nullable = true)
 |-- team_abbreviation: string (nullable = true)
 |-- player_age: double (nullable = true)
 |-- gp: long (nullable = true)
 |-- gs: long (nullable = true)
 |-- min: double (nullable = true)
 |-- fgm: long (nullable = true)
 |-- fga: long (nullable = true)
 |-- fg_pct: double (nullable = true)
 |-- fg3m: long (nullable = true)
 |-- fg3a: long (nullable = true)
 |-- fg3_pct: double (nullable = true)
 |-- ftm: long (nullable = true)
 |-- fta: long (nullable = true)
 |-- ft_pct: double (nullable = true)
 |-- oreb: long (nullable = true)
 |-- dreb: long (nullable = true)
 |-- reb: long (nullable = true)
 |-- ast: long (nullable = true)
 |-- stl: long (nullable = true)
 |-- blk: long (nullable = true)
 |-- tov: long (nullable = true)
 |-- pf: long (nullable = true)
 |-- pts: long (nullable = true)

+---------+---------+----------+-----------------+----------+---+---+-----

In [102]:
conn.close()