# Data Preprocessing

In [1]:
# Importing python modules

from pyspark.sql import SparkSession
from pyspark.sql import Row
from os.path import abspath 
#from pyspark.sql import HiveContext
#from pyspark.context import SparkContext

In [2]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,FloatType,DoubleType

**Initializing Spark Context and Hive Context**

In [3]:
#sc = SparkContext("local", "Sensor")
#sqlcontext = HiveContext(sc)

In [5]:
warehouse_location =abspath('spark-warehouse')

In [13]:
warehouse_location

'/home/ebby/Desktop/predictive_maintenance/notebook/spark-warehouse'

In [7]:
sqlcontext = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

###### Run the below 2 steps if you didn't create tables yet

In [8]:
sqlcontext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS sensordata (machine_id string, sensor_id string, session_id int, sess_time timestamp, cum_dist double, temperature double, sound double) ROW FORMAT DELIMITED FIELDS TERMINATED BY',' LINES TERMINATED BY '\n' LOCATION '../data/train' ")

DataFrame[]

In [9]:
sqlcontext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS lookup (machine_id string, sess_duration int,speed double, distance double) ROW FORMAT DELIMITED FIELDS TERMINATED BY',' LINES TERMINATED BY '\n' LOCATION '../data/lookup' ")

DataFrame[]

In [11]:
sqlcontext.sql("show tables").show()

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|    lookup|      false|
| default|sensordata|      false|
| default| test_data|      false|
| default|  test_out|      false|
| default|train_data|      false|
+--------+----------+-----------+



In [15]:
sqlcontext.sql("SELECT * FROM lookup").show()

+----------+-------------+-----+------------+
|machine_id|sess_duration|speed|    distance|
+----------+-------------+-----+------------+
|      MID1|           53|  7.0|6.1833333333|
|     MID10|           52|  5.0|4.3333333333|
|    MID100|           33|  8.0|         4.4|
|     MID11|           57| 10.0|         9.5|
|     MID12|           40|  8.0|5.3333333333|
|     MID13|           46|  9.0|         6.9|
|     MID14|           40|  8.0|5.3333333333|
|     MID15|           42|  5.0|         3.5|
|     MID16|           37|  7.0|4.3166666667|
|     MID17|           43|  8.0|5.7333333333|
|     MID18|           40|  7.0|4.6666666667|
|     MID19|           48|  8.0|         6.4|
|      MID2|           42| 10.0|         7.0|
|     MID20|           55|  7.0|6.4166666667|
|     MID21|           43|  6.0|         4.3|
|     MID22|           58|  7.0|6.7666666667|
|     MID23|           35|  8.0|4.6666666667|
|     MID24|           58|  5.0|4.8333333333|
|     MID25|           41|  5.0|3.

**Initializing spark context with table name**

In [16]:
table = sqlcontext.table("default.sensordata")

In [17]:
table_lookup=sqlcontext.table("default.lookup")

**Creating a temp view of the table**

In [18]:
table.registerTempTable('raw_table')

In [19]:
table_lookup.registerTempTable('lkp_table')

**Running some spark sql query on the table **

In [20]:
sqlcontext.sql('Select count(*) from raw_table').show()

+--------+
|count(1)|
+--------+
| 8949764|
+--------+



In [21]:
sqlcontext.sql('Select count(*) from lkp_table').show()

+--------+
|count(1)|
+--------+
|     100|
+--------+



**Reading data from table into a dataframe using spark SQL**

In [22]:
sensor_data = sqlcontext.sql('Select * from raw_table')
lookup = sqlcontext.sql('Select * from lkp_table')

In [23]:
# Quick look at the schema of the sensor_data
sensor_data.printSchema()

root
 |-- machine_id: string (nullable = true)
 |-- sensor_id: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- sess_time: timestamp (nullable = true)
 |-- cum_dist: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- sound: double (nullable = true)



In [24]:
lookup.printSchema()

root
 |-- machine_id: string (nullable = true)
 |-- sess_duration: integer (nullable = true)
 |-- speed: double (nullable = true)
 |-- distance: double (nullable = true)



In [25]:
lookup=lookup.select('machine_id','distance')

In [26]:
df=sensor_data

In [27]:
df.show(5)

+----------+---------+----------+-------------------+--------+-----------+------------------+
|machine_id|sensor_id|session_id|          sess_time|cum_dist|temperature|             sound|
+----------+---------+----------+-------------------+--------+-----------+------------------+
|     MID19|    SID19|        28|2017-01-28 05:15:04|     0.0|       27.0|2.0079232216193077|
|     MID18|    SID18|        28|2017-01-28 05:15:04|     0.0|       31.0| 2.366025327379722|
|     MID17|    SID17|        28|2017-01-28 05:15:04|     0.0|       31.0|2.2203230407628203|
|     MID16|    SID16|        28|2017-01-28 05:15:04|     0.0|       33.0| 2.228764679049286|
|     MID15|    SID15|        28|2017-01-28 05:15:04|     0.0|       29.0|2.1329891764859923|
+----------+---------+----------+-------------------+--------+-----------+------------------+
only showing top 5 rows




##### Aggregating data based on Machine_id and session id


In [28]:
x=df.groupBy(['machine_id', 'session_id']).agg({'cum_dist':'max', 'temperature':'avg', 'sound':'avg'}).orderBy('machine_id','session_id')
#

In [29]:
lookup=lookup.select('machine_id','distance')

##### JOIN lookup table with Original table on machine id to get avg distance of a machine

In [30]:
cond = [x.machine_id == lookup.machine_id]
df2=x.join(lookup, cond, how='inner').drop(lookup.machine_id)

##### UDF for data labeling 

In [31]:
def labelling(cum_dist, avg_dist, temperature,sound):
    if (cum_dist < .95*(avg_dist) or temperature > 35 or sound > 2.5): return 1
    else: return 0
    
labelling_UDF = udf(labelling, IntegerType())

df3 = df2.withColumn('label',labelling_UDF(df2['max(cum_dist)'],df2['distance'],df2['avg(temperature)'],df2['avg(sound)']))

In [32]:
df3.show(10000)

+----------+----------+------------------+------------------+-------------+------------+-----+
|machine_id|session_id|        avg(sound)|  avg(temperature)|max(cum_dist)|    distance|label|
+----------+----------+------------------+------------------+-------------+------------+-----+
|      MID7|         1|2.2437212989300783|29.574007220216608|        7.667|         7.5|    0|
|      MID7|         2| 2.253173230523249| 29.36462093862816|        7.667|         7.5|    0|
|      MID7|         3|   2.2502084065054| 29.67870036101083|        7.667|         7.5|    0|
|      MID7|         4|2.2516293220431245|29.314079422382672|        7.667|         7.5|    0|
|      MID7|         5|2.2666558868109385|29.732851985559567|        7.667|         7.5|    0|
|      MID7|         6|2.2517655101620044|29.509025270758123|        7.667|         7.5|    0|
|      MID7|         7|2.2565301282317702| 29.99638989169675|        7.667|         7.5|    0|
|      MID7|         8|2.2678744821056807| 29.6064

In [33]:
df3.groupBy('label').agg({'label':'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|    1|         229|
|    0|       33106|
+-----+------------+



In [34]:
df3.printSchema()

root
 |-- machine_id: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- avg(sound): double (nullable = true)
 |-- avg(temperature): double (nullable = true)
 |-- max(cum_dist): double (nullable = true)
 |-- distance: double (nullable = true)
 |-- label: integer (nullable = true)



In [35]:
df3=df3.withColumnRenamed('avg(sound)','avg_sound').withColumnRenamed('avg(temperature)','avg_temperature').withColumnRenamed('max(cum_dist)','max_cum_dist')

In [36]:
df3.printSchema()

root
 |-- machine_id: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- avg_sound: double (nullable = true)
 |-- avg_temperature: double (nullable = true)
 |-- max_cum_dist: double (nullable = true)
 |-- distance: double (nullable = true)
 |-- label: integer (nullable = true)



##### Save as hive table

In [37]:
sqlcontext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS agg_data (machine_id string,session_id int,avg_sound double,avg_temperature double,max_cum_dist double, distance double,label integer) ROW FORMAT DELIMITED FIELDS TERMINATED BY',' LINES TERMINATED BY '\n' LOCATION 'hdfs://localhost:9000/predictive_maintenance/data/train_labeled' ")

DataFrame[]

In [38]:
#df3.write.saveAsTable("agg_machine_data")

In [40]:
df3.repartition(1).write.insertInto("agg_data")

In [41]:
#df3.createOrReplaceTempView("tempTable") 

In [45]:
sqlcontext.sql("select * from agg_data limit 10").show()

+----------+----------+------------------+------------------+------------+--------+-----+
|machine_id|session_id|         avg_sound|   avg_temperature|max_cum_dist|distance|label|
+----------+----------+------------------+------------------+------------+--------+-----+
|      MID7|         1|2.2437212989300783|29.574007220216608|       7.667|     7.5|    0|
|      MID7|         2| 2.253173230523249| 29.36462093862816|       7.667|     7.5|    0|
|      MID7|         3|   2.2502084065054| 29.67870036101083|       7.667|     7.5|    0|
|      MID7|         4|2.2516293220431245|29.314079422382672|       7.667|     7.5|    0|
|      MID7|         5|2.2666558868109385|29.732851985559567|       7.667|     7.5|    0|
|      MID7|         6|2.2517655101620044|29.509025270758123|       7.667|     7.5|    0|
|      MID7|         7|2.2565301282317702| 29.99638989169675|       7.667|     7.5|    0|
|      MID7|         8|2.2678744821056807| 29.60649819494585|       7.667|     7.5|    0|
|      MID

In [44]:
sqlcontext.sql("show tables").show()

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|  agg_data|      false|
| default|    lookup|      false|
| default|sensordata|      false|
| default| test_data|      false|
| default|  test_out|      false|
| default|train_data|      false|
|        | lkp_table|       true|
|        | raw_table|       true|
+--------+----------+-----------+

