In [None]:
import pyspark
import glob
import pandas as pd



if 'sc' not in locals():
    from pyspark.context import SparkContext
    from pyspark.sql.context import SQLContext
    from pyspark.sql.session import SparkSession
    
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    spark = SparkSession(sc)

spark

In [None]:
print(glob.glob("data/NBA*"))

In [None]:
mypath = './data'
files =  glob.glob("data/NBA*")


for file in files:
    df = pd.read_csv(file)
    if "Unnamed: 40" in df.columns:
        df = df.drop(["Unnamed: 40"], axis=1)

In [None]:
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

In [None]:
mypath = './data'
files =  glob.glob("data/NBA*")

spark_table_games = "games"
for file in files:
    df = pd.read_csv(file)
    if "Unnamed: 40" in df.columns:
        df = df.drop(["Unnamed: 40"], axis=1)
    sparkDF=spark.createDataFrame(df) 
    sparkDF.write.format("parquet").saveAsTable(spark_table_games, mode='append')

In [None]:
type(df)

In [None]:
spark.sql(f"drop table games")

In [None]:
type(spark_df)

In [None]:
csv_files = glob.glob("data/NBA*")

file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_games = (spark.read.format(file_type) 
                    .option("inferSchema", infer_schema) 
                    .option("header", first_row_is_header) 
                    .option("sep", delimiter) 
                    .load(csv_files))


In [None]:
df_games.printSchema()

In [None]:
# create table
spark_table_games = "games"
df_games.write.format("parquet").saveAsTable(spark_table_games)

In [None]:
spark.sql("SELECT count(*) from games").collect()

In [None]:
###################################
# Using database below

In [1]:
from databaseClass import DB
import utils
import sql_files

In [2]:
userName = utils.userName
userPass = utils.userPass
dbName = utils.dbName


In [3]:
db = DB(userName = userName, userPass = userPass, dataBaseName = dbName)

In [4]:
def generate_sql(input_sql_file):
    with open(input_sql_file, 'r') as file:
        sql = file.read()
        return sql
    
def build_table(sql_file, table_name):
    '''postgres does not have a create or replace table option.  so we do that here instead'''
    try:
        db.BuildTableFromQuery(generate_sql(sql_file), table_name)
    except:
        db.dropTable(table_name)
        db.BuildTableFromQuery(generate_sql(sql_file), table_name)

In [7]:
build_table('sql_files/game_scores.sql', 'game_scores')

In [21]:
build_table('sql_files/shot_performance.sql', 'player_game_shot_performance')

In [22]:
build_table('sql_files/game_rosters_1.sql', 'game_rosters_1')

In [24]:
build_table('sql_files/game_rosters_2.sql', 'game_rosters_2')

In [20]:
build_table('sql_files/game_rebounds.sql', 'game_rebounds')

In [25]:
build_table('sql_files/output.sql', 'output')

In [None]:
db.DBtoDF(generate_sql('sql_files/shot_performance.sql'))

In [9]:
df = db.DBtoDF(generate_sql('sql_files/test.sql'))

In [17]:
df['regexp_match'].iloc[1][0]

'D. Jordan'

In [26]:
import psycopg2

In [27]:
!which python

/home/tai/Desktop/Projects/springboard_final/app/env/bin/python
