In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("Historian Data Processing") \
    .getOrCreate()

In [36]:
from pyspark.sql.types import IntegerType, TimestampType, DoubleType, StructField, StructType
fields = [StructField('created_at', TimestampType(), True),
          StructField('tag_id', IntegerType(), True),
          StructField('value', DoubleType(), True),
          StructField('confidence', DoubleType(), True)]

df = spark.read.csv("/Users/samir/Box Sync/Cloudera/Demos/cdh_historian/sample_data", 
                    schema=StructType(fields), 
                    timestampFormat='yyyy-MM-dd HH:mm:ss')
df.show(truncate=False)

+---------------------+--------+-----+----------+
|created_at           |tag_id  |value|confidence|
+---------------------+--------+-----+----------+
|2016-12-05 04:24:52.0|32000000|70.0 |0.0       |
|2016-12-05 04:24:52.0|33000000|80.0 |0.0       |
|2016-12-05 04:24:52.0|34000000|68.0 |0.0       |
|2016-12-05 04:24:52.0|41000000|57.0 |0.0       |
|2016-12-05 04:24:52.0|42000000|90.0 |0.0       |
|2016-12-05 04:24:52.0|43000000|69.0 |0.0       |
|2016-12-05 04:24:52.0|44000000|83.0 |0.0       |
|2016-12-05 04:24:52.0|51000000|76.0 |0.0       |
|2016-12-05 04:24:52.0|52000000|78.0 |0.0       |
|2016-12-05 04:24:52.0|53000000|66.0 |0.0       |
|2016-12-05 04:24:52.0|54000000|65.0 |0.0       |
|2016-12-05 04:24:52.0|61000000|80.0 |0.0       |
|2016-12-05 04:24:52.0|62000000|62.0 |0.0       |
|2016-12-05 04:24:52.0|63000000|70.0 |0.0       |
|2016-12-05 04:24:52.0|64000000|55.0 |0.0       |
|2016-12-05 04:24:52.0|71000000|70.0 |0.0       |
|2016-12-05 04:24:52.0|72000000|85.0 |0.0       |


In [43]:
pivot_df = df.filter('tag_id<30000000').groupby('created_at').pivot('tag_id').agg(sum('value'))
pivot_df.show()

+--------------------+--------+--------+--------+--------+--------+--------+--------+--------+
|          created_at|11000000|12000000|13000000|14000000|21000000|22000000|23000000|24000000|
+--------------------+--------+--------+--------+--------+--------+--------+--------+--------+
|2016-12-05 14:23:...|    97.0|    81.0|    76.0|    65.0|    94.0|    74.0|    59.0|    65.0|
|2016-12-05 07:43:...|    85.0|    93.0|    98.0|    84.0|    88.0|    98.0|    58.0|    80.0|
|2016-12-05 05:06:...|    92.0|    80.0|    98.0|    87.0|    85.0|    77.0|    95.0|    67.0|
|2016-12-05 07:05:...|    65.0|    55.0|    80.0|    57.0|    58.0|    62.0|    69.0|    88.0|
|2016-12-05 10:24:...|    61.0|    71.0|    74.0|    51.0|    56.0|    67.0|    71.0|    71.0|
|2016-12-05 13:31:...|    96.0|    68.0|    68.0|    77.0|    85.0|    81.0|    57.0|    96.0|
|2016-12-05 14:55:...|    77.0|    92.0|    97.0|    92.0|    53.0|    80.0|    88.0|    80.0|
|2016-12-05 11:27:...|    82.0|    94.0|    93.0| 

In [44]:
pivot_df.printSchema()

root
 |-- created_at: timestamp (nullable = true)
 |-- 11000000: double (nullable = true)
 |-- 12000000: double (nullable = true)
 |-- 13000000: double (nullable = true)
 |-- 14000000: double (nullable = true)
 |-- 21000000: double (nullable = true)
 |-- 22000000: double (nullable = true)
 |-- 23000000: double (nullable = true)
 |-- 24000000: double (nullable = true)

