In [63]:
from pyspark import SparkContext
from pyspark.sql import HiveContext, DataFrameWriter
from datetime import datetime
from dateutil import tz
import time
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [64]:
#Put all variables here
iHiveTable = "vrawload"
oHiveTable = "vprdload"

iHiveQuery = "SELECT ts, mid, market, percentage, workdone from " +  iHiveTable
iHiveQuery

'SELECT ts, mid, market, percentage, workdone from vrawload'

In [67]:
#Must do this if running py files independently
sc = SparkContext( 'local', 'pyspark')
hiveContext = HiveContext(sc)

In [68]:
tdf = hiveContext.sql(iHiveQuery)

In [69]:
tdf.show()

+-------------------+---+-------+----------+--------+
|                 ts|mid| market|percentage|workdone|
+-------------------+---+-------+----------+--------+
|2016-06-01@12:00:00| M1|    NYC|      20.2|  2500.0|
|2016-06-01@12:15:00| M1|    NYC|      10.5|  2000.0|
|2016-06-01@12:30:00| M1|    NYC|       8.5|  1000.0|
|2016-06-01@12:45:00| M1|    NYC|      12.5|  1400.0|
|2016-06-01@13:00:00| M1|    NYC|      12.5|  1200.0|
|2016-06-01@13:15:00| M1|    NYC|      21.5|  4200.0|
|2016-06-01@13:30:00| M1|    NYC|      12.5|  1200.0|
|2016-06-01@13:45:00| M1|    NYC|      12.5|  1200.0|
|2016-06-01@12:00:00| M2|CHICAGO|      12.5|  1200.0|
+-------------------+---+-------+----------+--------+



In [52]:
def mid_market_ts(mid, market, ts):
    return mid+'_'+market+'_'+ts[:13]
udfMidTs = udf(mid_market_ts, StringType())

In [70]:
tdf2 = tdf.withColumn('ts', udfMidTs("mid", "market", "ts"))

In [71]:
tdf2.show()

+--------------------+---+-------+----------+--------+
|                  ts|mid| market|percentage|workdone|
+--------------------+---+-------+----------+--------+
|M1_NYC_2016-06-01@12| M1|    NYC|      20.2|  2500.0|
|M1_NYC_2016-06-01@12| M1|    NYC|      10.5|  2000.0|
|M1_NYC_2016-06-01@12| M1|    NYC|       8.5|  1000.0|
|M1_NYC_2016-06-01@12| M1|    NYC|      12.5|  1400.0|
|M1_NYC_2016-06-01@13| M1|    NYC|      12.5|  1200.0|
|M1_NYC_2016-06-01@13| M1|    NYC|      21.5|  4200.0|
|M1_NYC_2016-06-01@13| M1|    NYC|      12.5|  1200.0|
|M1_NYC_2016-06-01@13| M1|    NYC|      12.5|  1200.0|
|M2_CHICAGO_2016-0...| M2|CHICAGO|      12.5|  1200.0|
+--------------------+---+-------+----------+--------+



In [72]:
tdf3 = tdf2.groupBy("ts").mean()
tdf3.show()

+--------------------+---------------+-------------+
|                  ts|avg(percentage)|avg(workdone)|
+--------------------+---------------+-------------+
|M2_CHICAGO_2016-0...|           12.5|       1200.0|
|M1_NYC_2016-06-01@12|         12.925|       1725.0|
|M1_NYC_2016-06-01@13|          14.75|       1950.0|
+--------------------+---------------+-------------+



In [73]:
schema = StructType([
  StructField("machine", StringType(), True),
  StructField("market", StringType(), True),
  StructField("hr", IntegerType(), True),
  StructField("weekday", IntegerType(), True)
])
def split_ts(s):
    mid, market, data_dt = s.split('_')
    day_hr = data_dt.split('@')
    hr = int(day_hr[1])
    weekday = time.strptime(day_hr[0], "%Y-%m-%d").tm_wday
    return mid, market, hr, weekday

splitTs = udf(split_ts, schema)

df4 = tdf3.withColumn("ts", splitTs(col("ts")))

#df4 = df4.withColumnRenamed("avg(percentage)", "percentage") \
#         .withColumnRenamed("avg(workdone)", "workdone")
df4 = df4.select("ts.machine", "ts.market",
                 "ts.hr","ts.weekday","avg(percentage)",
                 "avg(workdone)")

In [74]:
df4.show()

+-------+-------+---+-------+---------------+-------------+
|machine| market| hr|weekday|avg(percentage)|avg(workdone)|
+-------+-------+---+-------+---------------+-------------+
|     M2|CHICAGO| 12|      2|           12.5|       1200.0|
|     M1|    NYC| 12|      2|         12.925|       1725.0|
|     M1|    NYC| 13|      2|          14.75|       1950.0|
+-------+-------+---+-------+---------------+-------------+



In [75]:
df4.printSchema()

root
 |-- machine: string (nullable = true)
 |-- market: string (nullable = true)
 |-- hr: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- avg(percentage): double (nullable = true)
 |-- avg(workdone): double (nullable = true)



In [76]:
df_writer = DataFrameWriter(df4)
df_writer.insertInto(oHiveTable,overwrite=True)

In [66]:
sc.stop()

In [46]:
#Alternate Way: Probably less efficient
def to_mid(mid_market_ts):
    return mid_market_ts.split('_')[0]

def to_market(mid_market_ts):
    return mid_market_ts.split('_')[1]

def to_hr(mid_market_ts):
    return int(mid_market_ts.split('_')[2].split('@')[1])

def to_weekday(mid_market_ts):
    return time.strptime(mid_market_ts.split('_')[2].split('@')[0], "%Y-%m-%d").tm_wday

toMid = udf(to_mid, StringType())
toMarket = udf(to_market, StringType())
toHr = udf(to_hr, IntegerType())
toWeekday = udf(to_weekday, IntegerType())

tdf4 = tdf3.withColumn('mid', toMid("ts")) \
           .withColumn('market', toMarket("ts")) \
           .withColumn('hr', toHr("ts")) \
           .withColumn('weekday', toWeekday("ts"))