# San Diego Parking Meter Day Revenue Prediction & Visualization

Data Files: treas_parking_meters_loc_datasd (meter location), treas_parking_payments_2017_datasd (meter payment transactions)

We would like to see a csv or excel file created reflects the following:
1) Generate a file that includes: poleid, sub-area, year, month, day, time (being the 5 minute interval like08:05:00, 08:10:00, 08:15:00), the % of time during that interval that was paid for, count of transactionsthat started in that interval, count of transactions that expired during that interval)

2) create a column named "Paid Occupied Percentage" - this will be the % of the time interval that we know someone has paid for based on the transaction start time and expire time

5) Create a separate tab that shows your daily revenue predictions in comparison to actuals

6) compile your code and findings in such a way that they can be reviewed (i.e. a python notebook).You are also free to provide additional analytics or insights into the data that you discover that you thinkare useful. This is a critical part of being a data scientist at Xaqt.


In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
#enable IPython to display matplotlib graphs
%matplotlib inline
import csv

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.cross_validation import train_test_split
from sklearn import metrics
from sklearn.cross_validation import cross_val_score

In [2]:
sc=SparkContext.getOrCreate()
#initiate SQLContext instance on Spark
sqlContext = SQLContext(sc)

In [3]:
#read data as Spark dataframe
file = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', sep=',').load('treas_parking_payments_2017_datasd_parsed.csv')
file.show(3)

+---+--------------------+----------+-------+---------+----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+
|_c0|                uuid|meter_type|pole_id|trans_amt|pay_method|         trans_start|        meter_expire|trans_start_year|trans_start_month|trans_start_day|trans_start_time|meter_expire_year|meter_expire_month|meter_expire_day|meter_expire_time|
+---+--------------------+----------+-------+---------+----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+
|  0|SSSL2161701010015...|        SS| SL-216|       55|      CASH|2017-01-01 00:15:...|2017-01-01 00:15:...|            2017|                1|              1|        00:15:18|             2017|                 1|               1|         00:15:18|
|  1

In [4]:
#subset records in month 07, 08 into file2
from pyspark.sql.functions import col 

file2=file.where((col('trans_start_month')==7) | (col('trans_start_month')==8))
file2.show(3)

+-------+--------------------+----------+-------+---------+----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+
|    _c0|                uuid|meter_type|pole_id|trans_amt|pay_method|         trans_start|        meter_expire|trans_start_year|trans_start_month|trans_start_day|trans_start_time|meter_expire_year|meter_expire_month|meter_expire_day|meter_expire_time|
+-------+--------------------+----------+-------+---------+----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+
|4854486|SSSL7081707010104...|        SS| SL-708|       10|      CASH|2017-07-01 01:04:...|2017-07-01 01:04:...|            2017|                7|              1|        01:04:40|             2017|                 7|               1|       

#Paid parking is only valid from 8am-6pm. Outside of this time range, if people accidentally paid the meter
#it shows `meter_expire` to be the same as `trans_start`. We exclude this part of the record from our analysis
file2[file2['trans_start']==file2['meter_expire']].shape #(10481, 15) rows omitted because of this
file2=file2[file2['trans_start']!=file2['meter_expire']]
file2.shape #(1278130, 15) left after removing transactions outside of meter operating hours

In [5]:
#Paid parking is only valid from 8am-6pm. Outside of this time range, if people accidentally paid the meter
#it shows `meter_expire` to be the same as `trans_start`. We exclude this part of the record from our analysis
file3=file2.filter(file2.trans_start!=file2.meter_expire)
file3.show(3)

+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+
|    _c0|                uuid|meter_type|pole_id|trans_amt| pay_method|         trans_start|        meter_expire|trans_start_year|trans_start_month|trans_start_day|trans_start_time|meter_expire_year|meter_expire_month|meter_expire_day|meter_expire_time|
+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+
|4854496|SSWG800N170701060...|        SS|WG-800N|      375|       CASH|2017-07-01 06:01:...|2017-07-01 13:00:...|            2017|                7|              1|        06:01:31|             2017|                 7|               1|   

In [6]:
loc = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', sep=',').load('treas_parking_meters_loc_datasd.csv')
loc.show(3)

+----+------------+--------------------+-------+---------+--------------------+-----------+---------+
|zone|        area|            sub_area|   pole|config_id|         config_name|  longitude| latitude|
+----+------------+--------------------+-------+---------+--------------------+-----------+---------+
|City|Barrio Logan|1000 CESAR CHAVEZ...|CC-1003|     9000|2 Hour Max $1.25 ...|-117.145178|32.700353|
|City|Barrio Logan|1000 CESAR CHAVEZ...|CC-1005|     9000|2 Hour Max $1.25 ...|-117.145178|32.700352|
|City|Barrio Logan|1000 CESAR CHAVEZ...|CC-1011|     9000|2 Hour Max $1.25 ...|-117.145349|32.700155|
+----+------------+--------------------+-------+---------+--------------------+-----------+---------+
only showing top 3 rows



In [7]:
#we join meter transaction table file2 with meter location file loc by meter pole id:
file4=file3.join(loc, file3.pole_id == loc.pole, "left_outer").drop(loc.pole)
file4.show(3)

+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+--------+------+-------------+---------+--------------------+-----------+---------+
|    _c0|                uuid|meter_type|pole_id|trans_amt| pay_method|         trans_start|        meter_expire|trans_start_year|trans_start_month|trans_start_day|trans_start_time|meter_expire_year|meter_expire_month|meter_expire_day|meter_expire_time|    zone|  area|     sub_area|config_id|         config_name|  longitude| latitude|
+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+--------+------+-------------+---------+--------------------+-----------+

In [8]:
#drop all the rows where records are not matched from meter location file:
file4=file4.dropna()
file4.count()

1268728

In [9]:
#create new column date that concatenate transaction year, month, day together:
from pyspark.sql.functions import concat, col, lit
file4 = file4.withColumn('date', concat(col('trans_start_year'), 
                                        col('trans_start_month'),
                                       col('trans_start_day')))
file4.show(3)

+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+--------+------+-------------+---------+--------------------+-----------+---------+------+
|    _c0|                uuid|meter_type|pole_id|trans_amt| pay_method|         trans_start|        meter_expire|trans_start_year|trans_start_month|trans_start_day|trans_start_time|meter_expire_year|meter_expire_month|meter_expire_day|meter_expire_time|    zone|  area|     sub_area|config_id|         config_name|  longitude| latitude|  date|
+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+--------+------+-------------+---------+-------------------

In [40]:
#calculate daily parking meter revenue, average value per transaction:
#spark 2.0 does not recognize multiple aggregations for the same column in one command, use function F
#from pyspark.sql import functions as F
#daily_revenue = file4.groupby('date').agg(file4.date, F.sum('trans_amt'), F.avg('trans_amt'))
daily_revenue=file4.groupby('date').agg({'trans_amt':'sum'}).sort(desc("date"))
daily_revenue.show()
daily_revenue.count() #43 days of record 

+-------+--------------+
|   date|sum(trans_amt)|
+-------+--------------+
| 201789|       3687416|
| 201788|       3721204|
| 201787|       3495326|
| 201785|       3496341|
| 201784|       3780824|
| 201783|       3751091|
| 201782|       3752886|
|2017814|       3403125|
|2017813|            25|
|2017812|       3423217|
|2017811|       3723263|
|2017810|       3695786|
| 201781|       3738601|
| 201778|       3529107|
| 201777|       3807751|
| 201776|       3853585|
| 201775|       3642764|
| 201774|          7500|
|2017731|       3523958|
|2017730|            75|
+-------+--------------+
only showing top 20 rows



43

In [41]:
#rename accounts with function name embedded to avoid confusion

#and sort by date in descending order
#from pyspark.sql.functions import *
#daily_revenue=daily_revenue.sort(desc("date")).collect()

daily_revenue = daily_revenue.select(col("date").alias("date"), col("sum(trans_amt)").alias("daily_rev_sum"))
daily_revenue.show()

+-------+-------------+
|   date|daily_rev_sum|
+-------+-------------+
| 201789|      3687416|
| 201788|      3721204|
| 201787|      3495326|
| 201785|      3496341|
| 201784|      3780824|
| 201783|      3751091|
| 201782|      3752886|
|2017814|      3403125|
|2017813|           25|
|2017812|      3423217|
|2017811|      3723263|
|2017810|      3695786|
| 201781|      3738601|
| 201778|      3529107|
| 201777|      3807751|
| 201776|      3853585|
| 201775|      3642764|
| 201774|         7500|
|2017731|      3523958|
|2017730|           75|
+-------+-------------+
only showing top 20 rows



In [39]:
#calculate daily total number of transactions, average transaction value
daily_transaction=file4.groupby('date').agg({'uuid':'count','trans_amt':'avg'}).sort(desc("date"))
daily_transaction.show()

+-------+------------------+-----------+
|   date|    avg(trans_amt)|count(uuid)|
+-------+------------------+-----------+
| 201789| 106.6898906313292|      34562|
| 201788|107.17753456221199|      34720|
| 201787|104.23851843015626|      33532|
| 201785|110.25293264379415|      31712|
| 201784|105.47408357975786|      35846|
| 201783|  106.586281362771|      35193|
| 201782|108.89609146040681|      34463|
|2017814| 104.5539033457249|      32549|
|2017813|              25.0|          1|
|2017812|111.03885951539135|      30829|
|2017811|105.69044510048825|      35228|
|2017810|106.54671779052671|      34687|
| 201781|106.79276165447898|      35008|
| 201778|111.54293751382787|      31639|
| 201777|105.69159241679851|      36027|
| 201776|105.84736451781251|      36407|
| 201775|104.27560542737734|      34934|
| 201774|144.23076923076923|         52|
|2017731|104.74876642292372|      33642|
|2017730|              75.0|          1|
+-------+------------------+-----------+
only showing top

In [42]:
#rename accounts with function name embedded to avoid confusion:
daily_transaction = daily_transaction.select(col("date").alias("date"), col("avg(trans_amt)").alias("daily_rev_avg"), 
                                     col("count(uuid)").alias("count_trans"))
daily_transaction.show()

+-------+------------------+-----------+
|   date|     daily_rev_avg|count_trans|
+-------+------------------+-----------+
| 201789| 106.6898906313292|      34562|
| 201788|107.17753456221199|      34720|
| 201787|104.23851843015626|      33532|
| 201785|110.25293264379415|      31712|
| 201784|105.47408357975786|      35846|
| 201783|  106.586281362771|      35193|
| 201782|108.89609146040681|      34463|
|2017814| 104.5539033457249|      32549|
|2017813|              25.0|          1|
|2017812|111.03885951539135|      30829|
|2017811|105.69044510048825|      35228|
|2017810|106.54671779052671|      34687|
| 201781|106.79276165447898|      35008|
| 201778|111.54293751382787|      31639|
| 201777|105.69159241679851|      36027|
| 201776|105.84736451781251|      36407|
| 201775|104.27560542737734|      34934|
| 201774|144.23076923076923|         52|
|2017731|104.74876642292372|      33642|
|2017730|              75.0|          1|
+-------+------------------+-----------+
only showing top

In [16]:
#calculate week of the day
# user defined function to parse the time string cell intonullable = true datetime format and return day of the week:
from pyspark.sql import functions as F
from pyspark.sql import types as t
from datetime import datetime

def func(x):
    return datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S').weekday()
UDF = F.udf(func, t.IntegerType())
file4=file4.withColumn('day_of_week', UDF(col('trans_start')))
file4.show(3)

+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+--------+------+-------------+---------+--------------------+-----------+---------+------+-----------+
|    _c0|                uuid|meter_type|pole_id|trans_amt| pay_method|         trans_start|        meter_expire|trans_start_year|trans_start_month|trans_start_day|trans_start_time|meter_expire_year|meter_expire_month|meter_expire_day|meter_expire_time|    zone|  area|     sub_area|config_id|         config_name|  longitude| latitude|  date|day_of_week|
+-------+--------------------+----------+-------+---------+-----------+--------------------+--------------------+----------------+-----------------+---------------+----------------+-----------------+------------------+----------------+-----------------+--------+------+-------------+-----

In [47]:
#joined all newly created variables into one table with primary key being date:
data=daily_transaction.toPandas()
data.head(10)

Unnamed: 0,date,daily_rev_avg,count_trans
0,201789,106.689891,34562
1,201788,107.177535,34720
2,201787,104.238518,33532
3,201785,110.252933,31712
4,201784,105.474084,35846
5,201783,106.586281,35193
6,201782,108.896091,34463
7,2017814,104.553903,32549
8,2017813,25.0,1
9,2017812,111.03886,30829
