In [1]:
import findspark
findspark.init("/usr/local/spark/spark-2.2.1-bin-hadoop2.7")

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Dodgers').getOrCreate()

In [3]:
traffic = spark.read.csv('Dodgers.data', inferSchema=True, header=False)

In [4]:
traffic.head(10)

[Row(_c0='4/10/2005 0:00', _c1=-1),
 Row(_c0='4/10/2005 0:05', _c1=-1),
 Row(_c0='4/10/2005 0:10', _c1=-1),
 Row(_c0='4/10/2005 0:15', _c1=-1),
 Row(_c0='4/10/2005 0:20', _c1=-1),
 Row(_c0='4/10/2005 0:25', _c1=-1),
 Row(_c0='4/10/2005 0:30', _c1=-1),
 Row(_c0='4/10/2005 0:35', _c1=-1),
 Row(_c0='4/10/2005 0:40', _c1=-1),
 Row(_c0='4/10/2005 0:45', _c1=-1)]

In [5]:
traffic.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)



In [6]:
from datetime import datetime
import csv
from io import StringIO


def parsedates(row):
    DATE_FMT = "%m/%d/%Y %H:%M"
    row = row.split(",")
    row[0] = datetime.strptime(row[0], DATE_FMT)
    return row[0]


In [7]:
traffic.columns

['_c0', '_c1']

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
date_udf = udf(parsedates, DateType())

In [9]:
newTraffic = traffic.withColumn('Date',date_udf(traffic._c0))

In [10]:
newTraffic.show()

+--------------+---+----------+
|           _c0|_c1|      Date|
+--------------+---+----------+
|4/10/2005 0:00| -1|2005-04-10|
|4/10/2005 0:05| -1|2005-04-10|
|4/10/2005 0:10| -1|2005-04-10|
|4/10/2005 0:15| -1|2005-04-10|
|4/10/2005 0:20| -1|2005-04-10|
|4/10/2005 0:25| -1|2005-04-10|
|4/10/2005 0:30| -1|2005-04-10|
|4/10/2005 0:35| -1|2005-04-10|
|4/10/2005 0:40| -1|2005-04-10|
|4/10/2005 0:45| -1|2005-04-10|
|4/10/2005 0:50| -1|2005-04-10|
|4/10/2005 0:55| -1|2005-04-10|
|4/10/2005 1:00| -1|2005-04-10|
|4/10/2005 1:05| -1|2005-04-10|
|4/10/2005 1:10| -1|2005-04-10|
|4/10/2005 1:15| -1|2005-04-10|
|4/10/2005 1:20| -1|2005-04-10|
|4/10/2005 1:25| -1|2005-04-10|
|4/10/2005 1:30| -1|2005-04-10|
|4/10/2005 1:35| -1|2005-04-10|
+--------------+---+----------+
only showing top 20 rows



In [11]:
TrafficParsred = newTraffic.select(newTraffic['Date'],newTraffic['_C1'])
TrafficParsred.show()

+----------+---+
|      Date|_C1|
+----------+---+
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
|2005-04-10| -1|
+----------+---+
only showing top 20 rows



In [12]:
from pyspark.sql.functions import desc
trafficsum = TrafficParsred.groupBy('Date').sum()
trafficsum.sort(desc('sum(_C1)')).show()

+----------+--------+
|      Date|sum(_C1)|
+----------+--------+
|2005-07-28|    7661|
|2005-07-29|    7499|
|2005-08-12|    7287|
|2005-07-27|    7238|
|2005-09-23|    7175|
|2005-07-26|    7163|
|2005-05-20|    7119|
|2005-08-11|    7110|
|2005-09-08|    7107|
|2005-09-07|    7082|
|2005-09-30|    7079|
|2005-08-10|    7060|
|2005-07-22|    7028|
|2005-08-05|    6924|
|2005-09-29|    6917|
|2005-07-25|    6898|
|2005-09-09|    6897|
|2005-09-16|    6885|
|2005-09-28|    6831|
|2005-04-12|    6822|
+----------+--------+
only showing top 20 rows



In [13]:
game = spark.read.csv('Dodgers.events', header = False, inferSchema=True)

In [14]:
game.show()

+--------+--------+--------+-----+-------------+-------+
|     _c0|     _c1|     _c2|  _c3|          _c4|    _c5|
+--------+--------+--------+-----+-------------+-------+
|04/12/05|13:10:00|16:23:00|55892|San Francisco| W 9-8�|
|04/13/05|19:10:00|21:48:00|46514|San Francisco| W 4-1�|
|04/15/05|19:40:00|21:48:00|51816|    San Diego| W 4-0�|
|04/16/05|19:10:00|21:52:00|54704|    San Diego| W 8-3�|
|04/17/05|13:10:00|15:31:00|53402|    San Diego| W 6-0�|
|04/25/05|19:10:00|21:33:00|36876|      Arizona| L 4-2�|
|04/26/05|19:10:00|22:00:00|44486|      Arizona| L 3-2�|
|04/27/05|19:10:00|22:17:00|54387|      Arizona| L 6-3�|
|04/29/05|19:40:00|22:01:00|40150|     Colorado| W 6-3�|
|04/30/05|19:10:00|21:45:00|54123|     Colorado| W 6-2�|
|05/01/05|13:10:00|15:53:00|46243|     Colorado| W 2-1�|
|05/02/05|19:10:00|21:53:00|34079|   Washington| L 6-2�|
|05/03/05|19:10:00|22:13:00|41190|   Washington| W 4-2�|
|05/04/05|19:10:00|22:08:00|33081|   Washington| L 5-2�|
|05/13/05|19:40:00|22:17:00|536

In [15]:
def parseGames(row):
    DATE_FMT = "%m/%d/%y"
    row = row.split(",")
    row[0] = datetime.strptime(row[0],DATE_FMT).date()
    return row[0]


In [16]:
gamedate_udf = udf(parseGames, DateType())

In [17]:
game.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5']

In [18]:
newgame = game.withColumn('Date',gamedate_udf(game['_c0']))

In [19]:
game.head(1)

[Row(_c0='04/12/05', _c1='13:10:00', _c2='16:23:00', _c3=55892, _c4='San Francisco', _c5='W 9-8�')]

In [20]:
game.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)



In [21]:
newgame.show()

+--------+--------+--------+-----+-------------+-------+----------+
|     _c0|     _c1|     _c2|  _c3|          _c4|    _c5|      Date|
+--------+--------+--------+-----+-------------+-------+----------+
|04/12/05|13:10:00|16:23:00|55892|San Francisco| W 9-8�|2005-04-12|
|04/13/05|19:10:00|21:48:00|46514|San Francisco| W 4-1�|2005-04-13|
|04/15/05|19:40:00|21:48:00|51816|    San Diego| W 4-0�|2005-04-15|
|04/16/05|19:10:00|21:52:00|54704|    San Diego| W 8-3�|2005-04-16|
|04/17/05|13:10:00|15:31:00|53402|    San Diego| W 6-0�|2005-04-17|
|04/25/05|19:10:00|21:33:00|36876|      Arizona| L 4-2�|2005-04-25|
|04/26/05|19:10:00|22:00:00|44486|      Arizona| L 3-2�|2005-04-26|
|04/27/05|19:10:00|22:17:00|54387|      Arizona| L 6-3�|2005-04-27|
|04/29/05|19:40:00|22:01:00|40150|     Colorado| W 6-3�|2005-04-29|
|04/30/05|19:10:00|21:45:00|54123|     Colorado| W 6-2�|2005-04-30|
|05/01/05|13:10:00|15:53:00|46243|     Colorado| W 2-1�|2005-05-01|
|05/02/05|19:10:00|21:53:00|34079|   Washington|

In [22]:
gameParsed = newgame.select(newgame['Date'], newgame['_c4'])
gameParsed.show()

+----------+-------------+
|      Date|          _c4|
+----------+-------------+
|2005-04-12|San Francisco|
|2005-04-13|San Francisco|
|2005-04-15|    San Diego|
|2005-04-16|    San Diego|
|2005-04-17|    San Diego|
|2005-04-25|      Arizona|
|2005-04-26|      Arizona|
|2005-04-27|      Arizona|
|2005-04-29|     Colorado|
|2005-04-30|     Colorado|
|2005-05-01|     Colorado|
|2005-05-02|   Washington|
|2005-05-03|   Washington|
|2005-05-04|   Washington|
|2005-05-13|      Atlanta|
|2005-05-14|      Atlanta|
|2005-05-15|      Atlanta|
|2005-05-16|      Florida|
|2005-05-17|      Florida|
|2005-05-18|      Florida|
+----------+-------------+
only showing top 20 rows



In [23]:
dailytrend = trafficsum.join(gameParsed, 'Date','left')

In [24]:
dailytrend.show()

+----------+--------+-------------+
|      Date|sum(_C1)|          _c4|
+----------+--------+-------------+
|2005-06-06|    6578|      Detroit|
|2005-08-03|    6673|         null|
|2005-08-14|    5579|      NY Mets|
|2005-08-27|    5663|      Houston|
|2005-08-08|    5979|         null|
|2005-09-20|    6150|         null|
|2005-06-09|    5867|         null|
|2005-09-01|    6444|         null|
|2005-09-28|    6831|      Arizona|
|2005-06-27|    2907|    San Diego|
|2005-09-19|    6184|         null|
|2005-04-12|    6822|San Francisco|
|2005-05-10|    6063|         null|
|2005-06-14|    5966|         null|
|2005-05-11|    6004|         null|
|2005-09-26|    6394|   Pittsburgh|
|2005-05-23|    2173|         null|
|2005-09-14|    5631|     Colorado|
|2005-09-25|    4991|   Pittsburgh|
|2005-04-20|    6166|         null|
+----------+--------+-------------+
only showing top 20 rows



In [25]:
def checkGameDay(row):
    if row == None:
        return "Regular Day"
    else:
        return "Game Day"
    
    
checkgame_udf = udf(checkGameDay, StringType())

In [26]:
dailyTrendbyGames = dailytrend.withColumn('Status',checkgame_udf(dailytrend['_c4']))

In [27]:
dailyTrendbyGames.show()

+----------+--------+-------------+-----------+
|      Date|sum(_C1)|          _c4|     Status|
+----------+--------+-------------+-----------+
|2005-06-06|    6578|      Detroit|   Game Day|
|2005-08-03|    6673|         null|Regular Day|
|2005-08-14|    5579|      NY Mets|   Game Day|
|2005-08-27|    5663|      Houston|   Game Day|
|2005-08-08|    5979|         null|Regular Day|
|2005-09-20|    6150|         null|Regular Day|
|2005-06-09|    5867|         null|Regular Day|
|2005-09-01|    6444|         null|Regular Day|
|2005-09-28|    6831|      Arizona|   Game Day|
|2005-06-27|    2907|    San Diego|   Game Day|
|2005-09-19|    6184|         null|Regular Day|
|2005-04-12|    6822|San Francisco|   Game Day|
|2005-05-10|    6063|         null|Regular Day|
|2005-06-14|    5966|         null|Regular Day|
|2005-05-11|    6004|         null|Regular Day|
|2005-09-26|    6394|   Pittsburgh|   Game Day|
|2005-05-23|    2173|         null|Regular Day|
|2005-09-14|    5631|     Colorado|   Ga

In [28]:
dailyTrendbyGames.orderBy(desc('sum(_C1)')).show()

+----------+--------+-------------+-----------+
|      Date|sum(_C1)|          _c4|     Status|
+----------+--------+-------------+-----------+
|2005-07-28|    7661|   Cincinnati|   Game Day|
|2005-07-29|    7499|    St. Louis|   Game Day|
|2005-08-12|    7287|      NY Mets|   Game Day|
|2005-07-27|    7238|   Cincinnati|   Game Day|
|2005-09-23|    7175|   Pittsburgh|   Game Day|
|2005-07-26|    7163|   Cincinnati|   Game Day|
|2005-05-20|    7119|    LA Angels|   Game Day|
|2005-08-11|    7110| Philadelphia|   Game Day|
|2005-09-08|    7107|         null|Regular Day|
|2005-09-07|    7082|San Francisco|   Game Day|
|2005-09-30|    7079|         null|Regular Day|
|2005-08-10|    7060| Philadelphia|   Game Day|
|2005-07-22|    7028|         null|Regular Day|
|2005-08-05|    6924|         null|Regular Day|
|2005-09-29|    6917|      Arizona|   Game Day|
|2005-07-25|    6898|   Cincinnati|   Game Day|
|2005-09-09|    6897|    San Diego|   Game Day|
|2005-09-16|    6885|         null|Regul

In [29]:
avgTrends = dailyTrendbyGames.groupBy('Status').sum()

In [30]:
countTrends = dailyTrendbyGames.groupBy('Status').count()

In [31]:
avgTrends.show()

countTrends.show()

new = avgTrends.join(countTrends, 'Status','left')
new.show()

+-----------+-------------+
|     Status|sum(sum(_C1))|
+-----------+-------------+
|Regular Day|       508665|
|   Game Day|       481837|
+-----------+-------------+

+-----------+-----+
|     Status|count|
+-----------+-----+
|Regular Day|   94|
|   Game Day|   81|
+-----------+-----+

+-----------+-------------+-----+
|     Status|sum(sum(_C1))|count|
+-----------+-------------+-----+
|Regular Day|       508665|   94|
|   Game Day|       481837|   81|
+-----------+-------------+-----+



In [32]:
final = new.select(new['Status'],new['sum(sum(_C1))'] / new['count'])
final.show()

+-----------+-----------------------+
|     Status|(sum(sum(_C1)) / count)|
+-----------+-----------------------+
|Regular Day|      5411.329787234043|
|   Game Day|      5948.604938271605|
+-----------+-----------------------+

