In [None]:
import os

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

import pandas as pd



from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.types import *
from pyspark import SQLContext
from pyspark.sql.functions import asc, desc, col, when, from_unixtime, lit ,year, month, dayofmonth, hour

In [None]:
role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = SparkSession.builder\
   .config("spark.driver.extraClassPath", classpath)\
   .master("local[*]")\
   .getOrCreate()
sqlContext = SQLContext(spark)

In [None]:
clipin= '::21e:5e09:23c:235e'

params = sqlContext.read\
   .parquet("s3a://cb-prod-analytics/data-parquet/params-cl03/year=2018/month=11")\
       .withColumn('hubTimestamp', col('parameter.ts'))\
       .withColumn('name', col('parameter.name'))\
       .withColumn('value', col('parameter.value'))\
       .filter((col('clipinId')==clipin) & (col('name').isin('Appliance_Flow_Temperature')))\
       .select(col('clipinId'),col('hubTimestamp'), col('name'), col('value'))

In [None]:
def prep_data(df, sparkdf=True):
    '''
    @Description: prepare data for plotting
    @Params param1: df, Dataframe
    @Params param2: sparkdf, bool indicating if the param1 is a pandas or Spark Dataframe
    @Return: pandas dataframe with additional date and processed columns
    @Dependencies: pandas
    '''
    ret_param_list = ['Appliance_Flow_Temperature',
                     'Appliance_Return_Temperature',
                     'Appliance_Domestic_Hot_Water_Temperature',
                     'Appliance_Current',
                     'Appliance_Phase_Power_Factor',
                     'Domestic_Hot_Water_Demand',
                     'Central_Heating_Demand']

    bosch_param_lst = ['ChPump',
                      'GasValMain',
                      'Fan',
                      'RthSwitch',
                      'PrimT',
                      'RetT',
                      'ActPow',
                      'HwTOutlet',
                      'HwFlow']


    if sparkdf:
        df = df.toPandas()
    
    
    mask = df['name'].isin(ret_param_list+bosch_param_lst)
    df = df [mask]
    
    df['hubTimestamp'] = pd.to_numeric(df['hubTimestamp'])
    
    df.sort_values(by=['clipinId','hubTimestamp'],ascending=[True, True], inplace=True)
    
    df['ts'] = pd.to_datetime(df['hubTimestamp'],unit='ms')
    
    
    df['value']= df['value'].str.lower().str.replace(r'^(?!off$|on$|\d|true$|false$|yes$|no$).*','-999')
    

    
    df['value']= df['value'].str.lower().str.replace(r'(off$|no$|false$)','0')
    df['value']= df['value'].str.lower().str.replace(r'(on$|yes$|true$)','1')
    
    df['value'] = df['value'].replace({'0': False,
                                      '1': True,
                                     })
    
    
    df = df.drop(df[df.value =='-999'].index).reset_index(drop=True)
    
    df['value'] = pd.to_numeric(df['value'])
    dup_cols = ['clipinId','ts','name']
    df.drop_duplicates(subset=dup_cols, keep='first' ,inplace=True)
    
    return df

In [None]:
df = prep_data(params)

In [None]:
df.head()

In [None]:
df.shape[0]

In [None]:
df.dtypes

In [None]:
from io import StringIO
import boto3

csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
s3_resource = boto3.resource('s3')

s3_resource.Object('hackathon-nerual-network-datasets', clipin[2:] + '_Appliance_Flow_Temperature.csv').put(Body=csv_buffer.getvalue())