In [1]:
# Install Additional Python Libraries
!pip install -r requirements.txt

In [2]:
# Need postgres
# https://mvnrepository.com/artifact/org.postgresql/postgresql
from spark_libs import spark_submit
packages = ["com.databricks:spark-csv_2.11:1.5.0", 
            "org.postgresql:postgresql:42.2.9"]
spark_submit(packages=packages)

Adding environment variable `PYSPARK_SUBMIT_ARGS`
--packages com.databricks:spark-csv_2.11:1.5.0,org.postgresql:postgresql:42.2.9 pyspark-shell


In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

In [4]:
# get or create Spark session

app_name = "spark-postgres"
spark = SparkSession.builder.appName(app_name).getOrCreate()

In [6]:
team_data_file ="../team/cleaned_team_stats.csv"

spark.sparkContext.addFile(team_data_file)

team_df = spark.read \
    .format("com.databricks.spark.csv") \
    .options(header='true', inferSchema="true") \
    .load(SparkFiles.get("cleaned_team_stats.csv"))
team_df.printSchema()

root
 |-- team_id: integer (nullable = true)
 |-- variable: string (nullable = true)
 |-- first_downs: integer (nullable = true)
 |-- first_downs_by_penalty: integer (nullable = true)
 |-- third_down_percentage: double (nullable = true)
 |-- fourth_down_percentage: double (nullable = true)
 |-- average_interception_yards: double (nullable = true)
 |-- average_kickoff_return_yards: double (nullable = true)
 |-- average_punt_return_yards: double (nullable = true)
 |-- interceptions: integer (nullable = true)
 |-- net_average_punt_yards: double (nullable = true)
 |-- net_passing_yards: integer (nullable = true)
 |-- net_passing_yards_per_game: double (nullable = true)
 |-- passing_first_downs: integer (nullable = true)
 |-- passing_touchdowns: integer (nullable = true)
 |-- rushing_first_downs: integer (nullable = true)
 |-- rushing_attempts: integer (nullable = true)
 |-- rushing_touchdowns: integer (nullable = true)
 |-- rushing_yards: integer (nullable = true)
 |-- rushing_yards_per_ga

In [13]:
# Postgres credentials
jdbcHostname = "host.docker.internal"
jdbcPort = "5432" # should be 5432 for you
jdbcDatabase = "team_stats"
dialect = "postgresql"
jdbcUsername = "postgres"
jdbcPassword = "changeme"

jdbcUrl = "jdbc:{3}://{0}:{1}/{2}".format(jdbcHostname, jdbcPort, jdbcDatabase, dialect)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "org.postgresql.Driver" 
}
# for mysql driver = com.mysql.jdbc.Driver

In [14]:
jdbcUrl

'jdbc:postgresql://host.docker.internal:5432/team_stats'

In [15]:
connectionProperties

{'user': 'postgres', 'password': 'changeme', 'driver': 'org.postgresql.Driver'}

In [16]:
table = "team_stats"
mode = "overwrite" # options are: error, append, overwrite

team_df.write.jdbc(jdbcUrl, table, mode, connectionProperties)