In [1]:
import time
from pyspark.sql.functions import window
from pyspark.sql import functions as f

In [2]:
import numpy as np
from IPython.display import clear_output

In [3]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructType
import pyspark.sql.types as tp

In [4]:
# Create a spark Session
spark = SparkSession.builder.appName("HARspark").getOrCreate()


In [5]:
user_schema = tp.StructType([
  tp.StructField(name= 'mobileno',dataType= tp.StringType(),  nullable= True),
  tp.StructField(name= 'value',dataType= tp.StringType(),   nullable= True)
])

In [13]:
data_path = "/home/jovyan/work//data"

In [6]:
csvDF = spark.readStream.format("csv").option("header", True).schema(user_schema).load(data_path)
csvDF.isStreaming

True

In [7]:
csvDF.createOrReplaceTempView("mobile_position");

sensor_write_stream = csvDF \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format('memory') \
        .queryName("sensorTable") \
        .start()


In [8]:
def calculate_sqrt(mobile_values):
    x = list(map(float, mobile_values.split('_')))
    sqrt_acceler  = 0
    sqrt_gyro = 0
    i = 0
    for item in x:  
        i = i+1
        if item == 3.0:
            acceleros=[x[i],x[i+1],x[i+2]]
            sqrt_acceler = np.sqrt(sum(j*j for j in acceleros))
#             print(sqrt_acceler)

        if item == 4.0:
            gyros=[x[i],x[i+1],x[i+2]]
            sqrt_gyro = np.sqrt(sum(j*j for j in gyros))
#             print(sqrt_gyro)
    return float(sqrt_acceler), float(sqrt_gyro)

In [9]:
def get_data_mobile(data):
    mobile1_values = data[0][1]
    mobile2_values = data[1][1]
    
    acc1, gyro1 = calculate_sqrt(mobile1_values)
    acc2, gyro2 = calculate_sqrt(mobile2_values)
            
    return acc1, gyro1, acc2, gyro2

In [11]:
while True:
    df = spark.sql("SELECT * FROM sensorTable")
    df.show()
    count=spark.sql("SELECT count(*) FROM sensorTable" )
    count.show()
    acc1, gyro1, acc2, gyro2 = get_data_mobile(df.tail(2))
    print("Accelerometer1:",acc1,"\nGyroscope1:", gyro1,"\n\nAccelerometer2:", acc2,"\nGyroscope2:",  gyro2)
    time.sleep(5)
    clear_output(wait=True)

+--------+--------------------+
|mobileno|               value|
+--------+--------------------+
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
+--------+--------------------+
only showing top 20 rows

+--------+
|count(1)|
+--------+
|      68|
+--------+

Accelerometer1: 9.799142870680068 
Gyroscope1: 0.09710303805751909 
Accelerometer2: 9.392925955206929 
Gyroscope2: 0.009695359714832657


KeyboardInterrupt: 

In [12]:
sensor_write_stream.stop()

In [10]:
# # show whole table
# while True:
#     df = spark.sql("SELECT * FROM sensorTable" )
#     df.show()
#     time.sleep(5)
#     clear_output(wait=True)

+--------+--------------------+
|mobileno|               value|
+--------+--------------------+
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
| mobile1|1202414.60968_3_7...|
| mobile2|808475.73243_3_-0...|
+--------+--------------------+
only showing top 20 rows



KeyboardInterrupt: 

In [None]:
# # cluster by mobileno 
# while True:
#     df_cluster = spark.sql("SELECT * FROM sensorTable \
#                 CLUSTER BY mobileno" )
#     df_cluster.show()
#     time.sleep(2)
#     clear_output(wait=True)

In [None]:
# #total count
# while True:
#     count=spark.sql("SELECT count(*) FROM sensorTable" )
#     count.show()
#     time.sleep(5)
#     clear_output(wait=True)