### Joe Leonard (ymd3tv) - DS 2002 (1:00PM MWF)
# Final Project - Team Performance Analysis

##### Importing Necessary Libraries

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd 
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

##### Instantiate Global Variables

In [None]:
# Azure MySQL Server Connection
jdbc_hostname = "ymd3tv-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "pitch_analysis_db"

connection_properties = {
"user" : "ymd3tv",
  "password" : "000025Jl##",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection
atlas_cluster_name = "final-cluster"
atlas_database_name = "pitch_analysis_db"
atlas_user_name = "josephleonard725"
atlas_password = "G3hVpL9KsJqRtN5"

# Data Files (JSON)
dst_database = "pitch_analysis_db_dlh"

base_dir = "dbfs:/FileStore/ds_2002_final_project/data"    
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/retail"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

atbats_stream_dir = f"{stream_dir}/pitch_analysis_atbats"
games_stream_dir = f"{stream_dir}/pitch_analysis_games"
pitches_stream_dir = f"{stream_dir}/pitch_analysis_pitches"
players_stream_dir = f"{database_dir}/pitch_analysis_players"

output_bronze = f"{database_dir}/pitch_data/bronze"
output_silver = f"{database_dir}/pitch_data/silver"
output_gold   = f"{database_dir}/pitch_data/gold"

# Delete the Streaming Files
dbutils.fs.rm(f"{database_dir}/pitch_data", True) 


# Delete the Database Files
dbutils.fs.rm(database_dir, True)

##### Defining Global Functions

In [None]:
##################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
##################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

##################################################################################################################
# Use this Function to Create New Collections by Uploading CSV file(s) to the MongoDB Atlas server.
##################################################################################################################
def set_mongo_collections(client, db_name, data_directory, csv_files):              # updated function for CSV Files
    db = client[db_name]    
        
    for collection_name, csv_file in csv_files.items():
        db[collection_name].drop()
        csv_path = os.path.join(data_directory, csv_file)
        try:
            df = pd.read_csv(csv_path, encoding='utf-8')  # try with utf-8
        except UnicodeDecodeError:
            df = pd.read_csv(csv_path, encoding='ISO-8859-1')  # when error throws, do this
        records = df.to_dict(orient='records')
        db[collection_name].insert_many(records)
    
    client.close()




### Populating the Dimensions by Ingesting Reference (Cold-path) Data

##### Fetching reference data from the Azure MySQL Database

##### Creating new MetaBricks Database

In [None]:
%sql 
DROP DATABASE IF EXISTS pitch_analysis_db_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS pitch_analysis_db_dlh
COMMENT "Joe Leonard - Pitch Analysis Database"
LOCATION "dbfs:/FileStore/ds_2002_final_project/pitch_analysis_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "Final Project");

##### Creating New Tables for other tables that is sourced from DATE table in Azure MySQL Database (VIEW_DATE CREATION)

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ymd3tv-mysql.mysql.database.azure.com:3306/pitch_analysis_db", 
  dbtable "dim_date",
  user "ymd3tv",    
  password "000025Jl##"  
)

In [None]:
%sql
USE DATABASE pitch_analysis_dlh;

CREATE OR REPLACE TABLE pitch_analysis_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds_2002_final_project/pitch_analysis_dlh"
AS SELECT * FROM view_date

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>num_affected_rows</th><th>num_inserted_rows</th></tr></thead><tbody></tbody></table></div>

##### Fetching Reference Data from MongoDB Atlas

In [None]:
display(dbutils.fs.ls(batch_dir)) 

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>path</th><th>name</th><th>size</th><th>modificationTime</th></tr></thead><tbody><tr><td>dbfs:/FileStore/ds_2002_final_project/pitch_analysis_dlh/batch/pitch_analysis_atbats.json</td><td>pitch_analysis_atbats.json</td><td>3930</td><td>1833195448000</td></tr><tr><td>dbfs:/FileStore/ds_2002_final_project/pitch_analysis_dlh/batch/pitch_analysis_games.json</td><td>pitch_analysis_games.json</td><td>4681</td><td>1833195448000</td></tr><tr><td>dbfs:/FileStore/ds_2002_final_project/pitch_analysis_dlh/batch/pitch_analysis_pitches.json</td><td>pitch_analysis_pitches.json</td><td>34935</td><td>1833195448000</td></tr><tr><td>dbfs:/FileStore/ds_2002_final_project/pitch_analysis_dlh/batch/pitch_analysis_players.json</td><td>pitch_analysis_players.json</td><td>175</td><td>1833195448000</td></tr></tbody></table></div>

##### Creating the new mongoDB database and loading JSON data

In [None]:
source_dir = '/dbfs//FileStore/ds_2002_final_project/pitch_analysis_dlh/batch'
csv_files = {"atbats" : 'pitch_analysis_atbats.json'
              , "games" : 'pitch_analysis_games.json'
              , "pitches" : 'pitch_analysis_pitches.json'
              , "players" : 'pitch_analysis_players.json'}

set_mongo_collections(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, csv_files) 

##### DF for At-Bats

In [None]:
%scala

val df_atbats = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "pitch_analysis_db")
.option("collection", "atbats").load()
.select("ab_id",
        "batter_id",
        "event",
        "g_id",
        "inning",
        "o",
        "p_score",
        "p_throws",
        "pitcher_id",
        "stand",
        "top")

display(df_atbats.head(2))

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>ab_id</th>
      <th>batter_id</th>
      <th>event</th>
      <th>g_id</th>
      <th>inning</th>
      <th>o</th>
      <th>p_score</th>
      <th>p_throws</th>
      <th>pitcher_id</th>
      <th>stand</th>
      <th>top</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>2015000001</td>
      <td>572761</td>
      <td>Groundout</td>
      <td>201500001</td>
      <td>1</td>
      <td>1</td>
      <td>0</td>
      <td>L</td>
      <td>452657</td>
      <td>L</td>
      <td>True</td>
    </tr>
    <tr>
      <th>1</th>
      <td>2015000002</td>
      <td>518792</td>
      <td>Double</td>
      <td>201500001</td>
      <td>1</td>
      <td>1</td>
      <td>0</td>
      <td>L</td>
      <td>452657</td>
      <td>L</td>
      <td>True</td>
    </tr>
  </tbody>
</table>
</div>

##### Using Spark Dataframe to fully Create

In [None]:
%scala
df_atbats.write.format("delta").mode("overwrite").saveAsTable("pitch_analysis_dlh.dim_atbats")

##### DF for Games

In [None]:
%scala

val df_games = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "pitch_analysis_db")
.option("collection", "games").load()
.select("attendance",
        "away_final_score",
        "away_team",
        "date",
        "elapsed_time",
        "g_id",
        "home_final_score",
        "home_team",
        "start_time",
        "umpire_1B",
        "umpire_2B",
        "umpire_3B",
        "umpire_HP",
        "venue_name",
        "weather",
        "wind",
        "delay")

display(df_games.head(2))

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>attendance</th>
      <th>away_final_score</th>
      <th>away_team</th>
      <th>date</th>
      <th>elapsed_time</th>
      <th>g_id</th>
      <th>home_final_score</th>
      <th>home_team</th>
      <th>start_time</th>
      <th>umpire_1B</th>
      <th>umpire_2B</th>
      <th>umpire_3B</th>
      <th>umpire_HP</th>
      <th>venue_name</th>
      <th>weather</th>
      <th>wind</th>
      <th>delay</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>35055</td>
      <td>3</td>
      <td>sln</td>
      <td>2015-04-05</td>
      <td>184</td>
      <td>201500001</td>
      <td>0</td>
      <td>chn</td>
      <td>7:17 PM</td>
      <td>Mark Wegner</td>
      <td>Marty Foster</td>
      <td>Mike Muchlinski</td>
      <td>Mike Winters</td>
      <td>Wrigley Field</td>
      <td>44 degrees, clear</td>
      <td>7 mph, In from CF</td>
      <td>0</td>
    </tr>
    <tr>
      <th>1</th>
      <td>45909</td>
      <td>1</td>
      <td>ana</td>
      <td>2015-04-06</td>
      <td>153</td>
      <td>201500002</td>
      <td>4</td>
      <td>sea</td>
      <td>1:12 PM</td>
      <td>Ron Kulpa</td>
      <td>Brian Knight</td>
      <td>Vic Carapazza</td>
      <td>Larry Vanover</td>
      <td>Safeco Field</td>
      <td>54 degrees, cloudy</td>
      <td>1 mph, Varies</td>
      <td>0</td>
    </tr>
  </tbody>
</table>
</div>

##### Using Spark Dataframe to fully Create

In [None]:
%scala
df_games.write.format("delta").mode("overwrite").saveAsTable("pitch_analysis_dlh.dim_games")

##### DF for Pitches

In [None]:
%scala

val df_pitches = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "pitch_analysis_db")
.option("collection", "pitches").load()
.select("px",
      "pz",
      "start_speed",
      "end_speed",
      "spin_rate",
      "spin_dir",
      "break_angle",
      "break_length",
      "break_y",
      "ax",
      "...",
      "event_num",
      "b_score",
      "ab_id",
      "b_count",
      "s_count",
      "outs",
      "pitch_num",
      "on_1b",
      "on_2b",
      "on_3b")

display(df_pitches.head(2))

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>px</th>
      <th>pz</th>
      <th>start_speed</th>
      <th>end_speed</th>
      <th>spin_rate</th>
      <th>spin_dir</th>
      <th>break_angle</th>
      <th>break_length</th>
      <th>break_y</th>
      <th>ax</th>
      <th>...</th>
      <th>event_num</th>
      <th>b_score</th>
      <th>ab_id</th>
      <th>b_count</th>
      <th>s_count</th>
      <th>outs</th>
      <th>pitch_num</th>
      <th>on_1b</th>
      <th>on_2b</th>
      <th>on_3b</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>0.416</td>
      <td>2.963</td>
      <td>92.9</td>
      <td>84.1</td>
      <td>2305.052</td>
      <td>159.235</td>
      <td>-25.0</td>
      <td>3.2</td>
      <td>23.7</td>
      <td>7.665</td>
      <td>...</td>
      <td>3</td>
      <td>0.0</td>
      <td>2015000001.0</td>
      <td>0.0</td>
      <td>0.0</td>
      <td>0.0</td>
      <td>1.0</td>
      <td>0.0</td>
      <td>0.0</td>
      <td>0.0</td>
    </tr>
    <tr>
      <th>1</th>
      <td>-0.191</td>
      <td>2.347</td>
      <td>92.8</td>
      <td>84.1</td>
      <td>2689.935</td>
      <td>151.402</td>
      <td>-40.7</td>
      <td>3.4</td>
      <td>23.7</td>
      <td>12.043</td>
      <td>...</td>
      <td>4</td>
      <td>0.0</td>
      <td>2015000001.0</td>
      <td>0.0</td>
      <td>1.0</td>
      <td>0.0</td>
      <td>2.0</td>
      <td>0.0</td>
      <td>0.0</td>
      <td>0.0</td>
    </tr>
  </tbody>
</table>
<p>2 rows × 40 columns</p>
</div>

In [None]:
%scala
df_pitches.write.format("delta").mode("overwrite").saveAsTable("pitch_analysis_dlh.dim_pitches")

##### DF for Players

In [None]:
%scala

val df_players = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "pitch_analysis_db")
.option("collection", "players").load()
.select("id",
        "first_name",
        "last_name")

display(df_players.head(2))

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style><div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<div>
<style scoped>
    .dataframe tbody tr th:only-of-type {
        vertical-align: middle;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }

    .dataframe thead th {
        text-align: right;
    }
</style>
<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>id</th>
      <th>first_name</th>
      <th>last_name</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>452657</td>
      <td>Jon</td>
      <td>Lester</td>
    </tr>
    <tr>
      <th>1</th>
      <td>425794</td>
      <td>Adam</td>
      <td>Wainwright</td>
    </tr>
  </tbody>
</table>
</div>

In [None]:
%scala
df_players.write.format("delta").mode("overwrite").saveAsTable("pitch_analysis_dlh.dim_players")

##### Verifying Dimension Tables

In [None]:
%sql
USE pitch_analysis_dlh;
SHOW TABLES

<style scoped>
  .table-result-container {
    max-height: 300px;
    overflow: auto;
  }
  table, th, td {
    border: 1px solid black;
    border-collapse: collapse;
  }
  th, td {
    padding: 5px;
  }
  th {
    text-align: left;
  }
</style>
<div class='table-result-container'>
  <table class='table-result'>
    <thead style='background-color: white'>
      <tr>
        <th>database</th>
        <th>tableName</th>
        <th>isTemporary</th>
      </tr>
    </thead>
    <tbody>
      <tr>
        <td>pitch_analysis_dlh</td>
        <td>dim_atbats</td>
        <td>false</td>
      </tr>
      <tr>
        <td>pitch_analysis_dlh</td>
        <td>dim_games</td>
        <td>false</td>
      </tr>
      <tr>
        <td>pitch_analysis_dlh</td>
        <td>dim_pitches</td>
        <td>false</td>
      </tr>
      <tr>
        <td>pitch_analysis_dlh</td>
        <td>dim_players</td>
        <td>false</td>
      </tr>
      <tr>
        <td>pitch_analysis_dlh</td>
        <td>view_date</td>
        <td>true</td>
      </tr>
    </tbody>
  </table>
</div>


### Adding Real Time Data + Reference

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "csv")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(pitch_analysis_stream_dir)
 .createOrReplaceTempView("pitch_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW output_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM pitch_raw_tempview
)

In [None]:
%sql
SELECT * FROM output_bronze_tempview

In [None]:
(spark.table("output_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("pitch_analysis_bronze"))

In [None]:
(spark.readStream
  .table("pitch_analysis_bronze")
  .createOrReplaceTempView("output_silver_tempview"))

In [None]:
%sql
SELECT * FROM output_silver_tempview

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW output_silver_tempview AS (
    SELECT 
        ab.pitcher_id,
        ab.event,
        play.first_name,
        play.last_name,
        pit.end_speed,
        pit.break_angle,
        pit.spin_rate
    FROM pitch_analysis_dlh.dim_pitches AS pit
    INNER JOIN pitch_analysis_dlh.dim_atbats AS ab
        ON pit.ab_id = ab.ab_id
    INNER JOIN pitch_analysis_dlh.dim_players AS play
        ON ab.pitcher_id = play_id
);

In [None]:
(spark.table("output_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("pitch_analysis_silver"))

##### Aggregating Gold Table

In [None]:
CREATE OR REPLACE TABLE pitch_analysis_dlh.fact_pitch_summary AS (
  SELECT 
    ROW_NUMBER() OVER (ORDER BY pitcher_id) AS id,
    pitcher_id AS Pitcher_ID,
    first_name + ' ' + last_name AS Pitcher_Name
    AVG(end_speed) AS Average_Pitch_Speed,
    AVG(break_angle) AS Average_Break_Angle,
    AVG(spin_rate) AS Average_Spin_Rate,
    TOP 1 event FROM  
      ( SELECT event,
        COUNT(event) as "event_count"
        FROM pitch_analysis_dlh.dim_atbats
        GROUP BY event
        ORDER BY "event_count" DESC ) AS Most_Common_Event
  FROM pitch_analysis_dlh.dim_atbats
  GROUP BY Pitcher_Name LIMIT 5
);

<table border="1">
  <tr>
    <th>ID</th>
    <th>Pitcher_ID</th>
    <th>Pitcher_Name</th>
    <th>Average_Pitch_Speed</th>
    <th>Average_Break_Angle</th>
    <th>Average_Spin_Rate</th>
    <th>Most_Common_Event</th>
  </tr>
<tr>
    <td>1</td>
    <td>592091
</td>
    <td>A.J. Achter
</td>
    <td>78.53815261
</td>
    <td>3.523293173
</td>
    <td>1690.481177
</td>
    <td>Flyout</td>
  </tr>
<tr>
    <td>2</td>
    <td>150359
</td>
    <td>A.J. Burnett
</td>
    <td>81.00200401
</td>
    <td>6.536472946
</td>
    <td>1826.564764
</td>
    <td>Groundout</td>
  </tr>
<tr>
    <td>3</td>
    <td>595918
</td>
    <td>A.J. Cole
</td>
    <td>80.93752495
</td>
    <td>2.726546906
</td>
    <td>1942.203176
</td>
    <td>Flyout</td>
  </tr>
<tr>
    <td>4</td>
    <td>454560
</td>
    <td>A.J. Ellis
</td>
    <td>81.2154
</td>
    <td>5.7368
</td>
    <td>1789.916636
</td>
    <td>Groundout</td>
  </tr>
<tr>
    <td>5</td>
    <td>456167
</td>
    <td>A.J. Griffin
</td>
    <td>80.94239239
</td>
    <td>-1.356006006
</td>
    <td>1781.821676
</td>
    <td>Walk</td>
  </tr>


In [None]:
%fs rm -r /FileStore/ds_2002_final_project/