#### Author: Sailesh Chauhan
#### Date: 25-07-2021
#### Title: CPU_logs dataset has been analysed to find out solution for following problem statement.

1. Finding users with lowest number of average hours using Data Bricks sql
2. Finding users with highest number of average hours using Data Bricks sql
3. Finding users with highest numbers of times late comings using Data Bricks sql
4. Finding users with highest numbers of idle hours using Data Bricks sql
5. Store the cleaning data on BLOB Storage
6. Store results on BLOB Storage

In [0]:
%fs
ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/CpuLogData2019_09_16.csv,CpuLogData2019_09_16.csv,177697
dbfs:/FileStore/tables/CpuLogData2019_09_17.csv,CpuLogData2019_09_17.csv,299062
dbfs:/FileStore/tables/CpuLogData2019_09_18.csv,CpuLogData2019_09_18.csv,286264
dbfs:/FileStore/tables/CpuLogData2019_09_19.csv,CpuLogData2019_09_19.csv,346015
dbfs:/FileStore/tables/CpuLogData2019_09_20.csv,CpuLogData2019_09_20.csv,136532
dbfs:/FileStore/tables/CpuLogData2019_09_21.csv,CpuLogData2019_09_21.csv,291933
dbfs:/FileStore/tables/LMS_DB.zip,LMS_DB.zip,636912


In [0]:
%run ./custom_logging

##### Mounting AzureStorage account container in databricks filestorage system

In [0]:
try:
  AZURE_KEY=spark.conf.get('spark.azure_key')
  dbutils.fs.mount(
    source = "wasbs://cpulogsdata@cpulogs.blob.core.windows.net",
    mount_point = "/mnt/CPU_Logs_New",
    extra_configs ={"fs.azure.account.key.cpulogs.blob.core.windows.net":AZURE_KEY})
  logger.debug('Storage Mounting succesfull')
except Exception as ex:
  logger.error('Storage mounting failed'+str(ex))

In [0]:
%fs
ls /mnt

path,name,size
dbfs:/mnt/CPU_Logs/,CPU_Logs/,0
dbfs:/mnt/CPU_Logs_New/,CPU_Logs_New/,0


#### Loading all csv files for each cpu log dates from 2019-09-16 to 2019-09-21 into dataframe df. Then selecting required use columns DateTime,user_name,key_board,mouse.

In [0]:
try:
  file_location = ["/FileStore/tables/CpuLogData2019_09_16.csv",\
                   "/FileStore/tables/CpuLogData2019_09_17.csv",\
                   "/FileStore/tables/CpuLogData2019_09_18.csv",\
                   "/FileStore/tables/CpuLogData2019_09_19.csv",\
                   "/FileStore/tables/CpuLogData2019_09_20.csv",\
                   "/FileStore/tables/CpuLogData2019_09_21.csv"]

  file_type = "csv"
  
  infer_schema = "true"
  first_row_is_header = "true"
  delimiter = ","

  df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(file_location)

  df_CPU_logs=df.select("DateTime","user_name","keyboard","mouse")
  rows=df_CPU_logs.count()
  df_CPU_logs.describe()
  rows
  logger.debug('File loading successful')
except Exception as ex:
  logger.error('file loading failed '+str(ex))


##### Adding column time containing values in YYYY-MM-DD HH:mm from datetime as timestamp

In [0]:
from pyspark.sql.functions import date_format
try:
  df_CPU_logs_date_time = df_CPU_logs.withColumn('time', date_format('datetime', 'yyyy-MM-dd HH:mm'))

  df_CPU_logs_date_time=df_CPU_logs_date_time.drop("DateTime")
  display(df_CPU_logs_date_time.take(5))
  logger.debug('type casting success')
except Exception as ex:
  logger.error('type casting failed '+str(ex))

user_name,keyboard,mouse,time
iamnzm@outlook.com,1.0,32.0,2019-09-19 08:40
iamnzm@outlook.com,0.0,0.0,2019-09-19 08:45
iamnzm@outlook.com,0.0,0.0,2019-09-19 08:50
iamnzm@outlook.com,11.0,900.0,2019-09-19 08:55
iamnzm@outlook.com,2.0,25.0,2019-09-19 09:00


##### creating temparory view CpuLogData_All

In [0]:
try:
  temp_table_name = "CpuLogData_All"
  df_CPU_logs_date_time.createOrReplaceTempView(temp_table_name)
  logger.debug('view created success')
except Exception as ex:
  logger.error('view creation failed')
  
logging.shutdown()

##### Selecting required columns Datetime,user_name,keyboard,mouse in df from CpuLogData_All

In [0]:
try:
  df=spark.sql("""select to_timestamp(Time) as Datetime,user_name,keyboard,mouse from CpuLogData_All""")
  df.printSchema()
  logger.debug('type casting successful')
except Exception as ex:
  logger.error('type casting failed'+str(ex))

##### creating temparory view CpuLogData_All

In [0]:
try:
  temp_table_name = "CpuLogData_All"
  df.createOrReplaceTempView(temp_table_name)
  logger.debug('CpuLogData_All view succesful')
except Exception as ex:
  logger.error('temparory view failed')

# SQL

##### Counting all records in CpuLogData_All

In [0]:
%sql
SELECT COUNT(*) FROM CpuLogData_All

count(1)
4122


In [0]:
%sql
SELECT * FROM CpuLogData_All limit 5

Datetime,user_name,keyboard,mouse
2019-09-19T08:40:00.000+0000,iamnzm@outlook.com,1.0,32.0
2019-09-19T08:45:00.000+0000,iamnzm@outlook.com,0.0,0.0
2019-09-19T08:50:00.000+0000,iamnzm@outlook.com,0.0,0.0
2019-09-19T08:55:00.000+0000,iamnzm@outlook.com,11.0,900.0
2019-09-19T09:00:00.000+0000,iamnzm@outlook.com,2.0,25.0


##### Writing data to mounted Azure Blob Storage

In [0]:
df=spark.sql("""SELECT  * FROM CpuLogData_All""")
df.write.csv('dbfs:/mnt/CPU_Logs/CpuLogData_All')

##### Marking records working or ideal using keyboard and mouse count using case

In [0]:
%sql
CREATE TEMP VIEW cpu_idle_working AS (SELECT `user_name`,to_timestamp(`DateTime`) AS datetime,
case
when (`keyboard`>0 OR `mouse`>0) then 'working'
else 'idle'
end AS Work_Status
FROM `CpuLogData_All`)

In [0]:
%sql
SELECT * FROM cpu_idle_working limit 5;

user_name,datetime,Work_Status
iamnzm@outlook.com,2019-09-19T08:40:00.000+0000,working
iamnzm@outlook.com,2019-09-19T08:45:00.000+0000,idle
iamnzm@outlook.com,2019-09-19T08:50:00.000+0000,idle
iamnzm@outlook.com,2019-09-19T08:55:00.000+0000,working
iamnzm@outlook.com,2019-09-19T09:00:00.000+0000,working


##### Finding user_name and average idle hours in descending order

In [0]:
%sql
SELECT from_unixtime(round(((count(user_name)-1)*5*60)/6,2),'HH:mm') AS 
user_Avg_idle_hours,user_name 
FROM cpu_idle_working 
where work_status='idle' 
group by user_name 
order by user_Avg_idle_hours desc

user_Avg_idle_hours,user_name
02:08,iamnzm@outlook.com
02:05,rahilstar11@gmail.com
01:46,salinabodale73@gmail.com
01:41,sharlawar77@gmail.com
01:40,bhagyashrichalke21@gmail.com
01:38,markfernandes66@gmail.com
01:14,deepshukla292@gmail.com
00:50,damodharn21@gmail.com


##### Writing average work hours with user name as csv file to Azure Blob storage

In [0]:
try:
  df=spark.sql("""SELECT from_unixtime(round(((count(user_name)-1)*5*60)/6,2),'HH:mm') AS 
  user_Avg_idle_hours,user_name 
  FROM cpu_idle_working 
  where work_status='idle' 
  group by user_name 
  order by user_Avg_idle_hours desc""")
  df.write.option("header",'true').csv('dbfs:/mnt/CPU_Logs/user_Avg_idle_hours')
  logger.debug('File written succesfully')
except Exception as ex:
  logger.error('file writing failed'+str(ex))
## Due to cluster recreation storage connection string not available.So, result is already written to container

##### Finding all user name with average work hours in Descending order

In [0]:
%sql
SELECT from_unixtime(round(((count(user_name)-1)*5*60)/6,2),'HH:mm') AS 
user_Avg_working_hours,user_name 
FROM cpu_idle_working 
where work_status='working' 
group by user_name 
order by user_Avg_working_hours desc

user_Avg_working_hours,user_name
06:35,deepshukla292@gmail.com
06:21,iamnzm@outlook.com
06:20,sharlawar77@gmail.com
06:05,salinabodale73@gmail.com
05:31,rahilstar11@gmail.com
05:23,markfernandes66@gmail.com
05:00,bhagyashrichalke21@gmail.com
02:38,damodharn21@gmail.com


##### Writing average work hours with user name as csv file to Azure Blob storage

In [0]:
try:
  df = spark.sql("""
              SELECT from_unixtime(round(((count(user_name)-1)*5*60)/6,2),'HH:mm') AS 
              user_Avg_idle_hours,user_name 
              FROM cpu_idle_working 
              where work_status='working' 
              group by user_name 
              order by user_Avg_idle_hours desc
              """)
  df.write.option("header",'true').csv('dbfs:/mnt/CPU_Logs/user_Avg_working_hours')
  logger.debug('File written successful')
except Exception as ex:
  logger.error('writing file failed '+str(ex))