# Managing the environment

Docker Containers: mssql, mongodb, jupyter, minio, gamestream

To check if all the containers are running (docker commands in command line): 

PS > docker-compose ps

To STOP all the containers are running (docker commands in command line): 

PS > docker-compose down OR PS > docker-compose stop

# Starting and stopping the environment

To start the environment, run the following commands:

* Start the databases: PS> docker-compose up -d mssql mongodb minio

* Make sure the databases are running: PS> docker-compose ps

* Start the tools: PS> docker-compose up -d drill jupyter

* Finally, start the gamestream: PS> docker-compose up -d gamestream

* Make sure the gamestream is running by looking at the logs: PS> docker-compose logs gamestream

Valid gamesteam output looks like this:
  
  | Added `s3` successfully.
  | Bucket created successfully `s3/gamestreams`.
  | Bucket created successfully `s3/boxscores`. 
  | Commands completed successfully.
  | Commands completed successfully.
  | INFO:root:Waiting for services...
  | INFO:root:Bucket exists...ok
  | INFO:root:Starting Game Data Stream. Delay: 1 second == 0.25 seconds.
  | INFO:root:Wrote gamestream.txt to bucket gamestreams at 59:51

* To stop the environment, run the following command: PS> docker-compose down

* To start over from the very beginning (erase the volumes) run the following command: PS> docker-compose down -v


# Managing the gamestream

The gamestream container simulates the live game. Each time the game stream is started:

* The players and teams database tables are reset back to their original state.
* The live game is replayed from the beginning, writing events to s3/gamestreams/gamestream.txt as they occur.
* The same game is played each time, with the same game events. The expected behavior will help you write the code.
* Restarting the game steam will NOT erase any other data in mongo or other tables in mssql. See "Start over from the very beginning" if you need to do that.
  
To Restart the game stream:, run the following command:

PS> docker-compose restart gamestream

To view the gamestream activity, run the following command:

PS> docker-compose logs gamestream

Adjusting the gamestream speed

By default the game stream "plays" at 4x speed. That 0.25 seconds of real time is 1 second of game time. You can adjust this by setting the DELAY environment variable in the .env file. DELAY=1 plays the game in real time, and DELAY=0.1 plays the game at 10x speed. If you adust the DELAY environment variable, you will need to rebuild the gamestream container. To do this, run the following commands:

PS> docker-compose stop gamestream

PS> docker-compose rm -force gamestream

PS> docker-compose up -d gamestream

NOTE: You can always docker-compose down everything and bring it back up with docker-compose up -d too.


Importing Libraries and Creating a SparkSession

In [2]:
import pyspark
from pyspark.sql import SparkSession

user = "mongo"
passwd = "SU2orange!"
s3_bucket = "gamestreams"
s3_server = "http://minio:9000"
s3_access_key = "minio"
s3_secret_key = "SU2orange!"
mongo_uri = f"mongodb://{user}:{passwd}@mongo:27017/admin?authSource=admin"
server_name = "jdbc:sqlserver://mssql"
database_name = "sidearmdb"
mssql_user = "sa"
mssql_pw = "SU2orange!"
mssql_url = server_name + ";" + "databaseName=" + database_name + ";encrypt=true;trustServerCertificate=true;"

jars = [
    "org.apache.hadoop:hadoop-aws:3.1.2",
    "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1",
    "com.microsoft.azure:spark-mssql-connector_2.12:1.2.0",
    "com.microsoft.sqlserver:mssql-jdbc:12.2.0.jre11"
]

spark = SparkSession.builder \
    .master("local") \
    .appName('jupyter-pyspark') \
        .config("spark.jars.packages",",".join(jars) )\
        .config("spark.hadoop.fs.s3a.endpoint", s3_server ) \
        .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
        .config("spark.hadoop.fs.s3a.fast.upload", True) \
        .config("spark.hadoop.fs.s3a.path.style.access", True) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.mongodb.input.uri", mongo_uri) \
        .config("spark.mongodb.output.uri", mongo_uri) \
    .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR") # Keeps the noise down!!!



:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
com.microsoft.azure#spark-mssql-connector_2.12 added as a dependency
com.microsoft.sqlserver#mssql-jdbc added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4b734a02-5590-4c0c-8ed6-0d5096e9a5df;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.1.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.271 in central
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
	found com.microsoft.azure#spark-mssql-connector_2.12;1.2.0 in central
	found com.microsoft.sqlserver#mssql-jdbc;12.2.0.jre11 in central
:: resolution report :: resolve 588ms :: artif

# Players and Teams Reference Data

The player and team reference data is stored in a Microsoft SQL Server database. The database is called sidearmdb . The database has two tables, players and teams with the following schemas, respectively:

CREATE TABLE teams (

    id int primary key NOT NULL,

    name VARCHAR(50) NOT NULL, 
    
    conference VARCHAR(50) NOT NULL,
    
    wins INT NOT NULL,
    
    losses INT NOT NULL,
    
)

CREATE TABLE players (

    id int  primary key NOT NULL,

    name VARCHAR(50) NOT NULL,
    
    number varchar(3) NOT NULL,

    shots INT NOT NULL,
    
    goals INT NOT NULL,
    
    teamid INT foreign key references teams(id) NOT NULL,
    
)

* The teams table, only has two teams, 101 = syracuse and 205 = johns hopkins. Each team has a conference affiliation, and a current win / loss record.

* The players table has 10 players for each team. Each player has a name, jersey number, shots taken, goals scored, along with their team id.


Reading Players Data from MSSQL

In [3]:
# READ FROM MSSQL
players_df = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "players") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()

players_df.show()

+---+------+------+-----+-----+------+
| id|  name|number|shots|goals|teamid|
+---+------+------+-----+-----+------+
|  1|   sam|     6|   56|   23|   101|
|  2| sarah|     1|   85|   34|   101|
|  3| steve|     2|   60|   20|   101|
|  4| stone|    13|   33|   10|   101|
|  5|  sean|    17|   26|    9|   101|
|  6|   sly|     8|   78|   15|   101|
|  7|   sol|     9|   52|   20|   101|
|  8| shree|     4|   20|    4|   101|
|  9|shelly|    15|   10|    2|   101|
| 10| swede|    10|   90|   50|   101|
| 11| jimmy|     1|  100|   50|   205|
| 12| julie|     9|   10|    0|   205|
| 13| james|     2|   45|   15|   205|
| 14|  jane|    15|   82|   46|   205|
| 15| jimmy|    16|   42|   30|   205|
| 16| julie|     8|   67|   32|   205|
| 17| james|    17|   40|   14|   205|
| 18|  jane|     3|   91|   40|   205|
| 19| jimmy|     5|   78|   22|   205|
| 20| julie|    22|   83|   19|   205|
+---+------+------+-----+-----+------+



                                                                                

Reading Teams Data from MSSQL

In [4]:
teams_df = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "teams") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()

teams_df.show()

+---+-------------+----------+----+------+
| id|         name|conference|wins|losses|
+---+-------------+----------+----+------+
|101|     syracuse|       acc|  11|     2|
|205|johns hopkins|     big10|   9|     4|
+---+-------------+----------+----+------+



In [5]:
# Read the gamestream.txt from minio
df1 = spark.read.text("s3a://gamestreams/gamestream.txt")

df1.show()

+-----------------+
|            value|
+-----------------+
|  0 59:51 101 2 0|
|  1 57:06 101 6 0|
|  2 56:13 205 8 1|
|  3 55:25 101 4 0|
|  4 55:03 101 1 1|
| 5 54:50 101 17 0|
|  6 54:14 205 8 0|
|  7 53:59 101 9 0|
|  8 53:23 101 2 0|
| 9 51:21 101 13 0|
| 10 49:55 101 1 1|
| 11 49:28 101 2 1|
|12 48:52 101 10 1|
| 13 47:52 101 4 1|
| 14 47:44 101 9 0|
| 15 46:38 101 2 0|
| 16 45:49 101 1 1|
| 17 45:31 101 4 0|
| 18 43:29 205 1 1|
| 19 41:54 205 1 1|
+-----------------+
only showing top 20 rows



In [6]:
#Write the gamestream to mongodb
df1.write.format("mongo") \
    .mode("overwrite") \
    .option("database","sidearmdb") \
    .option("collection","boxscores") \
    .save()

df1.show()

+-----------------+
|            value|
+-----------------+
|  0 59:51 101 2 0|
|  1 57:06 101 6 0|
|  2 56:13 205 8 1|
|  3 55:25 101 4 0|
|  4 55:03 101 1 1|
| 5 54:50 101 17 0|
|  6 54:14 205 8 0|
|  7 53:59 101 9 0|
|  8 53:23 101 2 0|
| 9 51:21 101 13 0|
| 10 49:55 101 1 1|
| 11 49:28 101 2 1|
|12 48:52 101 10 1|
| 13 47:52 101 4 1|
| 14 47:44 101 9 0|
| 15 46:38 101 2 0|
| 16 45:49 101 1 1|
| 17 45:31 101 4 0|
| 18 43:29 205 1 1|
| 19 41:54 205 1 1|
+-----------------+
only showing top 20 rows



# Game Stream

While the game is going on, there is a file called gamestream.txt located in the minio/gamestreams S3 bucket. Each time an in-game event happens, the event is appended to this file. To simplify things, the game stream only reports shots on goal. Here is the format of the file each line is an event and the fields are separated by a space:

0 59:51 101 2 0

1 57:06 101 6 0

2 56:13 205 8 1

3 55:25 101 4 0

Data Dictionary for gamestream.txt

* The first column is the event ID. These are sequential. An event ID of -1 means the game is over.
* The second column is the timestamp of the event in the format mm:ss. This counts down to 00:00. For example the first event occurred 9 seconds into the game.
* The third column is the team ID, indicating team took the shot on goal. In the simulation there are only two teams, 101 and 205.
* The fourth colum is the jersey number of the player who took the shot.
* The final column is a 1 if the shot was a goal, 0 if it was a miss.


Label each of the columns in the gamestream with their appropriate columns names from the data dictionary.

In [10]:
from pyspark.sql.functions import col

gamestream_df = spark.read.csv("s3a://gamestreams/gamestream.txt",
      sep =' ',
      inferSchema = True,
      header=False
)

headers = ["event_id","timestamp","team_id","jersey_number","shots"]

gamestream_df  = gamestream_df .toDF(*headers)

gamestream_df.createOrReplaceTempView("gamestream")

gamestream_df .show()

+--------+---------+-------+-------------+-----+
|event_id|timestamp|team_id|jersey_number|shots|
+--------+---------+-------+-------------+-----+
|       0|    59:51|    101|            2|    0|
|       1|    57:06|    101|            6|    0|
|       2|    56:13|    205|            8|    1|
|       3|    55:25|    101|            4|    0|
|       4|    55:03|    101|            1|    1|
|       5|    54:50|    101|           17|    0|
|       6|    54:14|    205|            8|    0|
|       7|    53:59|    101|            9|    0|
|       8|    53:23|    101|            2|    0|
|       9|    51:21|    101|           13|    0|
|      10|    49:55|    101|            1|    1|
|      11|    49:28|    101|            2|    1|
|      12|    48:52|    101|           10|    1|
|      13|    47:52|    101|            4|    1|
|      14|    47:44|    101|            9|    0|
|      15|    46:38|    101|            2|    0|
|      16|    45:49|    101|            1|    1|
|      17|    45:31|

Group the gamestream by team/player jersey number adding up the shots and goals. 

1. One row per team / jersey number in the gamestream.
2. Values dependent on team and player: total shots and goals for each player.
3. Value dependent on only team: total goals (this should repeat for every row with the same team id)

In [11]:

query = spark.sql("""
    SELECT 
        team_id,
        jersey_number,
        COUNT(*) AS shots,
        SUM(shots) AS goals,
        teamgoals
    FROM 
        (SELECT 
            team_id, 
            jersey_number, 
            shots, 
            SUM(shots) OVER (PARTITION BY team_id) as teamgoals
         FROM 
            gamestream
         WHERE team_id > 0
        ) 
    GROUP BY 
        team_id, 
        jersey_number, 
        teamgoals
    ORDER BY 
        team_id, 
        jersey_number
""")

query.createOrReplaceTempView("gamestream_result")

query.show()




+-------+-------------+-----+-----+---------+
|team_id|jersey_number|shots|goals|teamgoals|
+-------+-------------+-----+-----+---------+
|    101|            1|    8|    6|       14|
|    101|            2|    7|    2|       14|
|    101|            4|    5|    1|       14|
|    101|            6|    4|    2|       14|
|    101|            8|    4|    0|       14|
|    101|            9|    5|    0|       14|
|    101|           10|    3|    1|       14|
|    101|           13|    7|    1|       14|
|    101|           15|    3|    1|       14|
|    101|           17|    2|    0|       14|
|    205|            1|    3|    3|        9|
|    205|            2|    3|    1|        9|
|    205|            3|    1|    0|        9|
|    205|            5|    2|    1|        9|
|    205|            8|    2|    1|        9|
|    205|            9|    4|    0|        9|
|    205|           15|    2|    2|        9|
|    205|           16|    1|    0|        9|
|    205|           17|    3|    1

                                                                                

Using gamestream output to include the most most current event id and timestamp for that point in time in the game, include the latest event_id and timestamp.

In [12]:
#Creating Temp View for players and teams table
players_df.createOrReplaceTempView("players")
teams_df.createOrReplaceTempView("teams")

query1 = '''
with cte1 as(
 select team_id, 
        jersey_number, 
        count(shots) as shots, 
        sum(shots) as goals
 from 
        gamestream
 group by
        team_id, jersey_number),
cte2 as(
 select team_id, sum(shots) as team_goals
 from gamestream
 group by team_id),
latest_event_details as(
 select event_id, `timestamp`
 from gamestream
 order by event_id DESC Limit 1)
 
select e.event_id,e.`timestamp`, c1.team_id, c1.jersey_number, c1.shots, c1.goals,
c2.team_goals
 from latest_event_details e,
 cte1 c1
 join cte2 c2 on c1.team_id=c2.team_id
 order by c1.team_id, c1.jersey_number
'''

spark.sql(query1).createOrReplaceTempView("score_at_any_point_ingame")
spark.sql("select * from score_at_any_point_ingame").show()



+--------+---------+-------+-------------+-----+-----+----------+
|event_id|timestamp|team_id|jersey_number|shots|goals|team_goals|
+--------+---------+-------+-------------+-----+-----+----------+
|      70|    00:00|      0|            0|    1|    0|         0|
|      70|    00:00|    101|            1|    8|    6|        14|
|      70|    00:00|    101|            2|    7|    2|        14|
|      70|    00:00|    101|            4|    5|    1|        14|
|      70|    00:00|    101|            6|    4|    2|        14|
|      70|    00:00|    101|            8|    4|    0|        14|
|      70|    00:00|    101|            9|    5|    0|        14|
|      70|    00:00|    101|           10|    3|    1|        14|
|      70|    00:00|    101|           13|    7|    1|        14|
|      70|    00:00|    101|           15|    3|    1|        14|
|      70|    00:00|    101|           17|    2|    0|        14|
|      70|    00:00|    205|            1|    3|    3|         9|
|      70|

                                                                                

Join the Score_at_any_point_ingame output with the player and team reference data mssql so that you have the data necessary for the box score

1. players teams
2. score_at_any_point_ingame

In [13]:
final_result = spark.sql("""
SELECT s.*, p.name as player_name, t.name as team_name, t.conference, t.wins, t.losses
FROM  score_at_any_point_ingame s
LEFT JOIN players p ON s.team_id = p.teamid AND s.jersey_number = p.number
LEFT JOIN teams t ON s.team_id = t.id
""")

final_result.show()


                                                                                

+--------+---------+-------+-------------+-----+-----+----------+-----------+-------------+----------+----+------+
|event_id|timestamp|team_id|jersey_number|shots|goals|team_goals|player_name|    team_name|conference|wins|losses|
+--------+---------+-------+-------------+-----+-----+----------+-----------+-------------+----------+----+------+
|      70|    00:00|    101|            9|    5|    0|        14|        sol|     syracuse|       acc|  11|     2|
|      70|    00:00|    101|           10|    3|    1|        14|      swede|     syracuse|       acc|  11|     2|
|      70|    00:00|    101|            2|    7|    2|        14|      steve|     syracuse|       acc|  11|     2|
|      70|    00:00|    101|            8|    4|    0|        14|        sly|     syracuse|       acc|  11|     2|
|      70|    00:00|    101|           13|    7|    1|        14|      stone|     syracuse|       acc|  11|     2|
|      70|    00:00|    101|            6|    4|    2|        14|        sam|   

# The game stream's real-time box score

Each time you run your code while the game is ongoing, you should write a new boxscore document to the mongodb/sidearm/boxscores collection. That way web developers can read the latest document's contents to render a webpage for live box score stats while the game is going on.

For simplicity, assume team 101 is the home team and team 205 is the away team.

The document should have the following structure (consider this an example)

{
    "_id" : "UseTheEventIDFrom gamestream.txt",
    "timestamp" : "55:25",
    "home": {
        "teamid" : 105,
        "conference" : "ACC",
        "wins" : 5,
        "losses" : 2,
        "score" : 3,
        "status" : "winning",
        "players": [
            {"id": 1, "name" : "sam",  "shots" : 3, "goals" : 1, "pct" : 0.33 },
            {"id": 2, "name" : "sarah",  "shots" : 0, "goals" : 0, "pct" : 0.00 },
            {"id": 3, "name" : "steve",  "shots" : 1, "goals" : 1, "pct" : 1.00 },
            ...
        ]
    },
    "away": { ... }
}

NOTES:

* "status" should be "winning", "losing" or "tied" based on the current home.score and away.score
* the "_id" should be the latest event ID from the game stream, at the time the box score was written. 
* the "timestamp" should be the timestamp from the game stream.
* Every player on the roster (in the players table) should appear in the box score.
* The stats in the box score should be the current stats for the player in game only, and not include the stats in the players table and add up the shot and goal for every player at that point in the game stream.
* Calculate the pct field
* game is over when the clock hits 00:00.


Transform the data into the box score document structure 

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lit, when

team_stats = final_result.groupBy("team_id").agg(
    F.first("conference").alias("conference"),
    F.first("wins").alias("wins"),
    F.first("losses").alias("losses"),
    F.sum("goals").alias("score"),
    F.collect_list(
        F.struct(
            "jersey_number", 
            "player_name", 
            "shots", 
            "goals",
            F.when(
                F.col("shots") > 0, F.col("goals") / F.col("shots")
            ).otherwise(F.lit(0)).alias("pct")
        )
    ).alias("players")
)

In [15]:
team_stats = team_stats.withColumn(
    "status",
    F.when(
        F.col("score") == F.max("score").over(Window.partitionBy()), "tied"
    ).otherwise(
        F.when(
            F.col("score") < F.max("score").over(Window.partitionBy()), "losing"
        ).otherwise("winning")
    )
)

In [16]:
#Filtering home and away teams
home_team = team_stats.filter(team_stats.team_id == 101)
away_team = team_stats.filter(team_stats.team_id == 205)

In [17]:
home_team.show()



+-------+----------+----+------+-----+--------------------+------+
|team_id|conference|wins|losses|score|             players|status|
+-------+----------+----+------+-----+--------------------+------+
|    101|       acc|  11|     2|   14|[{9, sol, 5, 0, 0...|  tied|
+-------+----------+----+------+-----+--------------------+------+



                                                                                

In [18]:
away_team.show()



+-------+----------+----+------+-----+--------------------+------+
|team_id|conference|wins|losses|score|             players|status|
+-------+----------+----+------+-----+--------------------+------+
|    205|     big10|   9|     4|    9|[{9, julie, 4, 0,...|losing|
+-------+----------+----+------+-----+--------------------+------+



                                                                                

In [19]:
latest_eventid = spark.sql("""
    SELECT 
        MAX(event_id) AS latest_eventid
    FROM gamestream
    WHERE team_id != 0
""").collect()[0]['latest_eventid']

latest_eventid

69

In [20]:
latest_timestamp = spark.sql("""
    SELECT 
        MAX(timestamp) AS latest_timestamp
    FROM gamestream
    WHERE team_id != 0
""").collect()[0]['latest_timestamp']

latest_timestamp

'59:51'

In [21]:
latest_df = final_result.withColumn("latest_eventid", lit(latest_eventid))\
                    .withColumn("latest_timestamp", lit(latest_timestamp))

latest_df.show()

                                                                                

+--------+---------+-------+-------------+-----+-----+----------+-----------+-------------+----------+----+------+--------------+----------------+
|event_id|timestamp|team_id|jersey_number|shots|goals|team_goals|player_name|    team_name|conference|wins|losses|latest_eventid|latest_timestamp|
+--------+---------+-------+-------------+-----+-----+----------+-----------+-------------+----------+----+------+--------------+----------------+
|      70|    00:00|    101|            9|    5|    0|        14|        sol|     syracuse|       acc|  11|     2|            69|           59:51|
|      70|    00:00|    101|           10|    3|    1|        14|      swede|     syracuse|       acc|  11|     2|            69|           59:51|
|      70|    00:00|    101|            2|    7|    2|        14|      steve|     syracuse|       acc|  11|     2|            69|           59:51|
|      70|    00:00|    101|            8|    4|    0|        14|        sly|     syracuse|       acc|  11|     2|    

In [22]:
def dataframe_to_json(df):
    return df.rdd.map(lambda row: row.asDict(recursive=True)).collect()[0]

# Convert home and away team stats DataFrames to their corresponding JSON structure
home_json = dataframe_to_json(home_team)
away_json = dataframe_to_json(away_team)

# Construct the box score document
box_score_document = {
    "_id": 'event_id',
    "timestamp": 'timestamp',
    "home": home_json,
    "away": away_json
}

box_score_document

                                                                                

{'_id': 'event_id',
 'timestamp': 'timestamp',
 'home': {'team_id': 101,
  'conference': 'acc',
  'wins': 11,
  'losses': 2,
  'score': 14,
  'players': [{'jersey_number': 9,
    'player_name': 'sol',
    'shots': 5,
    'goals': 0,
    'pct': 0.0},
   {'jersey_number': 10,
    'player_name': 'swede',
    'shots': 3,
    'goals': 1,
    'pct': 0.3333333333333333},
   {'jersey_number': 2,
    'player_name': 'steve',
    'shots': 7,
    'goals': 2,
    'pct': 0.2857142857142857},
   {'jersey_number': 8,
    'player_name': 'sly',
    'shots': 4,
    'goals': 0,
    'pct': 0.0},
   {'jersey_number': 13,
    'player_name': 'stone',
    'shots': 7,
    'goals': 1,
    'pct': 0.14285714285714285},
   {'jersey_number': 6,
    'player_name': 'sam',
    'shots': 4,
    'goals': 2,
    'pct': 0.5},
   {'jersey_number': 15,
    'player_name': 'shelly',
    'shots': 3,
    'goals': 1,
    'pct': 0.3333333333333333},
   {'jersey_number': 1,
    'player_name': 'sarah',
    'shots': 8,
    'goals': 6,

Write the box score completed to the mongo.sidearm.boxscores collection. 
The document is keyed by event_id

In [23]:
import json

box_score_json = json.dumps(box_score_document)

box_score_df = spark.read.json(spark.sparkContext.parallelize([box_score_json]))

In [24]:
box_score_df.write.format("mongo") \
    .option("database", "sidearm") \
    .option("collection", "boxscores") \
    .mode("append") \
    .save()

PySpark script that will run the entire process of creating the box score document. 

In [25]:

query = spark.sql("""
    SELECT 
        team_id,
        jersey_number,
        COUNT(*) AS shots,
        SUM(shots) AS goals,
        teamgoals
    FROM 
        (SELECT 
            team_id, 
            jersey_number, 
            shots, 
            SUM(shots) OVER (PARTITION BY team_id) as teamgoals
         FROM 
            gamestream
         WHERE team_id > 0
        ) 
    GROUP BY 
        team_id, 
        jersey_number, 
        teamgoals
    ORDER BY 
        team_id, 
        jersey_number
""")

# result.show()

latest_eventid = spark.sql("""
    SELECT 
        MAX(event_id) AS latest_eventid
    FROM gamestream
    WHERE team_id != 0
""").collect()[0]['latest_eventid']

latest_timestamp = spark.sql("""
    SELECT 
        MAX(timestamp) AS latest_timestamp
    FROM gamestream
    WHERE team_id != 0
""").collect()[0]['latest_timestamp']

latest_df = final_result.withColumn("latest_eventid", lit(latest_eventid))\
                    .withColumn("latest_timestamp", lit(latest_timestamp))

players_df = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "players") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()

teams_df = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "teams") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()

latest_df.createOrReplaceTempView("latest_result")

players_df.createOrReplaceTempView("players")

teams_df.createOrReplaceTempView("teams")

final_result = spark.sql("""
SELECT s.*, p.name as player_name, t.name as team_name, t.conference, t.wins, t.losses
FROM  score_at_any_point_ingame s
LEFT JOIN players p ON s.team_id = p.teamid AND s.jersey_number = p.number
LEFT JOIN teams t ON s.team_id = t.id
""")

team_stats = final_result.groupBy("team_id").agg(
    F.first("conference").alias("conference"),
    F.first("wins").alias("wins"),
    F.first("losses").alias("losses"),
    F.sum("goals").alias("score"),
    F.collect_list(
        F.struct(
            "jersey_number", 
            "player_name", 
            "shots", 
            "goals",
            F.when(
                F.col("shots") > 0, F.col("goals") / F.col("shots")
            ).otherwise(F.lit(0)).alias("pct")
        )
    ).alias("players")
)

team_stats = team_stats.withColumn(
    "status",
    F.when(
        F.col("score") == F.max("score").over(Window.partitionBy()), "tied"
    ).otherwise(
        F.when(
            F.col("score") < F.max("score").over(Window.partitionBy()), "losing"
        ).otherwise("winning")
    )
)

home_team = team_stats.filter(team_stats.team_id == 101)

away_team = team_stats.filter(team_stats.team_id == 205)

def dataframe_to_json(df):
    return df.rdd.map(lambda row: row.asDict(recursive=True)).collect()[0]

home_json = dataframe_to_json(home_team)

away_json = dataframe_to_json(away_team)

# Construct the box score document
box_score_document = {
    "_id": latest_eventid,
    "timestamp": latest_timestamp,
    "home": home_json,
    "away": away_json
}

box_score_document

                                                                                

{'_id': 69,
 'timestamp': '59:51',
 'home': {'team_id': 101,
  'conference': 'acc',
  'wins': 11,
  'losses': 2,
  'score': 14,
  'players': [{'jersey_number': 9,
    'player_name': 'sol',
    'shots': 5,
    'goals': 0,
    'pct': 0.0},
   {'jersey_number': 10,
    'player_name': 'swede',
    'shots': 3,
    'goals': 1,
    'pct': 0.3333333333333333},
   {'jersey_number': 2,
    'player_name': 'steve',
    'shots': 7,
    'goals': 2,
    'pct': 0.2857142857142857},
   {'jersey_number': 8,
    'player_name': 'sly',
    'shots': 4,
    'goals': 0,
    'pct': 0.0},
   {'jersey_number': 13,
    'player_name': 'stone',
    'shots': 7,
    'goals': 1,
    'pct': 0.14285714285714285},
   {'jersey_number': 6,
    'player_name': 'sam',
    'shots': 4,
    'goals': 2,
    'pct': 0.5},
   {'jersey_number': 15,
    'player_name': 'shelly',
    'shots': 3,
    'goals': 1,
    'pct': 0.3333333333333333},
   {'jersey_number': 1,
    'player_name': 'sarah',
    'shots': 8,
    'goals': 6,
    'pct': 

  
# Updating stats in the database when the game is over

After the game is complete, the tables in the mssql sidearmdb database should be updated, based on the final box score. Specifically:

* update the win/loss record for each team in the teams table
* update the shots and goals for each player in the players table

NOTES:

We will not update the actual tables. Instead we will create new tables called teams2 and players2 with the updated data. It's anti-big data to perform row-level updates. The proper way to move the updates into the original tables would be to write an MSSQL script to update the tables.


When the game is complete, write pyspark code to update the wins and losses for the teams in the teams table. 
Specifically, load the teams table and update it, then display the updated data frame.

In [27]:
home_team_status = box_score_document["home"]["status"]
home_team_status

'tied'

In [28]:
away_team_status = box_score_document["away"]["status"]
away_team_status

'losing'

In [29]:
# Function to update the team stats
def update_team_stats(teams_df, team_id, status):
    return teams_df.withColumn(
        'wins', 
        when((col('id') == team_id) & (lit(status) == 'winning'), col('wins') + 1)
        .otherwise(col('wins'))
    ).withColumn(
        'losses', 
        when((col('id') == team_id) & (lit(status) == 'losing'), col('losses') + 1)
        .otherwise(col('losses'))
    )

In [30]:
df_teams = update_team_stats(teams_df, 101, home_team_status)
df_teams = update_team_stats(teams_df, 205, away_team_status)

df_teams.show()

+---+-------------+----------+----+------+
| id|         name|conference|wins|losses|
+---+-------------+----------+----+------+
|101|     syracuse|       acc|  11|     2|
|205|johns hopkins|     big10|   9|     5|
+---+-------------+----------+----+------+



Updating teams final data to a new mssql.sidearmdb.teams2 table.

In [31]:

df_teams.write.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "teams2") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .mode("overwrite") \
    .save()

Update the shots and goals for the players in the players table. Specifically, load the players table and update it, then display the updated data frame.

In [32]:

df_players_with_new_stats = players_df.alias('players').join(
    latest_df.alias('new_stats'),
    (players_df['name'] == latest_df['player_name']) & 
    (players_df['number'] == latest_df['jersey_number']),
    'left'
)

df_players_with_new_stats.show()

                                                                                

+---+------+------+-----+-----+------+--------+---------+-------+-------------+-----+-----+----------+-----------+-------------+----------+----+------+--------------+----------------+
| id|  name|number|shots|goals|teamid|event_id|timestamp|team_id|jersey_number|shots|goals|team_goals|player_name|    team_name|conference|wins|losses|latest_eventid|latest_timestamp|
+---+------+------+-----+-----+------+--------+---------+-------+-------------+-----+-----+----------+-----------+-------------+----------+----+------+--------------+----------------+
| 10| swede|    10|   90|   50|   101|      70|    00:00|    101|           10|    3|    1|        14|      swede|     syracuse|       acc|  11|     2|            69|           59:51|
|  7|   sol|     9|   52|   20|   101|      70|    00:00|    101|            9|    5|    0|        14|        sol|     syracuse|       acc|  11|     2|            69|           59:51|
| 20| julie|    22|   83|   19|   205|      70|    00:00|    205|           22| 

                                                                                

In [None]:
# Update the 'shots' and 'goals' columns by adding new stats to existing stats
# If there are no new stats, keep the existing stats
df_players_with_updated_stats = df_players_with_new_stats.select(
    'players.id',
    'players.name',
    'players.number',
    (F.col('players.shots') + F.coalesce(F.col('new_stats.shots'), F.lit(0))).alias('shots'),
    (F.col('players.goals') + F.coalesce(F.col('new_stats.goals'), F.lit(0))).alias('goals'),
    'players.teamid'
)

# Show the updated DataFrame
df_players_with_updated_stats.show()

Update the Players data to a new mssql.sidearmdb.players2 table

In [None]:

df_players_with_updated_stats.write.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "players2") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .mode("overwrite") \
    .save()