In [54]:
import datetime as dt
from pyspark.sql import SQLContext, Row
import pyspark.sql.functions as F
from py4j.protocol import Py4JJavaError

In [55]:
usr_log = sqlContext.read.csv("hdfs:///user/spark-file/input/CpuLogData2019-10-24.csv",header = True, inferSchema = True)

In [56]:
sel_column = ['DateTime', 'user_name', 'keyboard', 'mouse']
usr_log = usr_log.select(*sel_column)
usr_log = usr_log[usr_log['DateTime'] >= '2019-10-24 08:30:00']
usr_log = usr_log[usr_log['DateTime'] <= '2019-10-24 19:30:00']
unique_users = usr_log.toPandas()['user_name'].unique()

In [64]:
users_new_data = dict()
for user in unique_users:
    users_new_data[user] = {
        'start_time' : dt.datetime(2019, 10, 24, 0, 0, 0),
        'end_time' : dt.datetime(2019, 10, 24, 0, 0, 0),
        'idle_time': dt.datetime(2019, 10, 24, 0, 0, 0),
        'working_hour' : dt.datetime(2019, 10, 24, 0, 0, 0)
    }

In [59]:
usr_log = usr_log.sort('DateTime')
startTime = usr_log.groupBy("user_name").agg(F.min("DateTime").alias('start_time'))
startTime.show(5)

+--------------------+-------------------+
|           user_name|         start_time|
+--------------------+-------------------+
|ashutoshrit64@gma...|2019-10-24 09:00:01|
|giridhardandikwar...|2019-10-24 10:40:02|
|       nikitapawar17|2019-10-24 11:15:02|
|salinabodale73@gm...|2019-10-24 10:15:01|
|mail2anik.officia...|2019-10-24 08:30:01|
+--------------------+-------------------+
only showing top 5 rows



In [60]:
endTime = usr_log.groupBy("user_name").agg(F.max("DateTime").alias('end_time'))
endTime.show(5)

+--------------------+-------------------+
|           user_name|           end_time|
+--------------------+-------------------+
|ashutoshrit64@gma...|2019-10-24 19:25:02|
|giridhardandikwar...|2019-10-24 19:25:02|
|       nikitapawar17|2019-10-24 19:25:02|
|salinabodale73@gm...|2019-10-24 19:25:01|
|mail2anik.officia...|2019-10-24 10:00:01|
+--------------------+-------------------+
only showing top 5 rows



In [61]:
def cal_idle_hour(detail):
    global count_idle
    if detail.keyboard == 0 and detail.mouse == 0:
        count_idle += 1
    else:
        count_idle = 0
    if count_idle >= 5:
        if count_idle == 5:
            users_new_data[detail.user_name]['idle_time'] \
                = users_new_data[detail.user_name].get('idle_time') \
                     + dt.timedelta(0, 1500)
        else:
            users_new_data[detail.user_name]['idle_time'] \
                = users_new_data[detail.user_name].get('idle_time') \
                    + dt.timedelta(0, 300)

In [62]:
for user in unique_users:
    count_idle = 0
    for row in usr_log.rdd.collect():
        if user == row.user_name:
            cal_idle_hour(row)
print('Done!')

Done!


In [66]:
data_as_rows = [Row(**{'user_name': user, **logs}) for user,logs in users_new_data.items()]
idleTime = spark.createDataFrame(data_as_rows).select('user_name', 'idle_time')
idleTime.show(3)

+--------------------+-------------------+
|           user_name|          idle_time|
+--------------------+-------------------+
|  sahil24c@gmail.com|2019-10-24 00:00:00|
|magadum.iranna@gm...|2019-10-24 00:00:00|
|  yathink3@gmail.com|2019-10-24 00:00:00|
+--------------------+-------------------+
only showing top 3 rows



In [10]:
idleTime.printSchema()

root
 |-- user_name: string (nullable = true)
 |-- idle_time: timestamp (nullable = true)



In [11]:
data_df = idleTime.join(startTime, on=['user_name'], how='inner').join(endTime, on=['user_name'], how='inner')
data_df.show(5)

+--------------------+-------------------+-------------------+-------------------+
|           user_name|          idle_time|         start_time|           end_time|
+--------------------+-------------------+-------------------+-------------------+
|prathameshsalap@g...|2019-10-21 01:50:00|2019-10-21 08:35:01|2019-10-21 19:25:02|
|vaishusawant143@g...|2019-10-21 01:35:00|2019-10-21 08:35:01|2019-10-21 19:25:02|
|     you@example.com|2019-10-21 01:55:00|2019-10-21 08:35:01|2019-10-21 19:25:01|
|  shelkeva@gmail.com|2019-10-21 01:00:00|2019-10-21 08:50:02|2019-10-21 19:25:02|
|ashutoshrit64@gma...|2019-10-21 00:30:00|2019-10-21 08:50:01|2019-10-21 19:25:02|
+--------------------+-------------------+-------------------+-------------------+
only showing top 5 rows



In [12]:
def cal_working_hours(details):
    delta = (dt.datetime(2019, 10, 22, 0, 0, 0) + (details.end_time -details.start_time)) - details.idle_time
    users_new_data[details.user_name]['working_hour'] = dt.datetime(2019, 10, 22, 0, 0, 0) + delta
    users_new_data[details.user_name]['start_time'] = details.start_time
    users_new_data[details.user_name]['end_time'] = details.end_time
#     print(users_new_data[details.user_name]['working_hour'])
#     print(type(users_new_data[details.user_name]['working_hour']))

In [13]:
for row in data_df.rdd.collect():
    cal_working_hours(row)
print('Done!')

Done!


In [14]:
data_as_rows = [Row(**{'user_name': user, **logs}) for user,logs in users_new_data.items()]
data_final = spark.createDataFrame(data_as_rows).select('user_name', 'start_time', 'end_time', 'idle_time', 'working_hour')
data_final.show(5)

+--------------------+-------------------+-------------------+-------------------+-------------------+
|           user_name|         start_time|           end_time|          idle_time|       working_hour|
+--------------------+-------------------+-------------------+-------------------+-------------------+
|prathameshsalap@g...|2019-10-21 08:35:01|2019-10-21 19:25:02|2019-10-21 01:50:00|2019-10-21 09:00:01|
|vaishusawant143@g...|2019-10-21 08:35:01|2019-10-21 19:25:02|2019-10-21 01:35:00|2019-10-21 09:15:01|
|     you@example.com|2019-10-21 08:35:01|2019-10-21 19:25:01|2019-10-21 01:55:00|2019-10-21 08:55:00|
|  shelkeva@gmail.com|2019-10-21 08:50:02|2019-10-21 19:25:02|2019-10-21 01:00:00|2019-10-21 09:35:00|
|ashutoshrit64@gma...|2019-10-21 08:50:01|2019-10-21 19:25:02|2019-10-21 00:30:00|2019-10-21 10:05:01|
+--------------------+-------------------+-------------------+-------------------+-------------------+
only showing top 5 rows



In [15]:
data_final.repartition(1).write.csv('hdfs:///user/spark-file/output/sparklog2019-10-22',header='true')

In [15]:
#Save the dataframe to the table. 
data_final.write.format('jdbc').options(
      url='jdbc:mysql://localhost:3306/test',
      driver='com.mysql.jdbc.Driver',
      dbtable='UserLog',
      user='hadoopuser',
      password='Gupta@007').mode('append').save()