In [1]:
import json

from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, regexp_replace, expr

### Initiate spark session

In [2]:
spark = SparkSession.builder \
    .appName('Cognitivo AI') \
    .master('local[*]') \
    .getOrCreate()

spark.sparkContext

### Parameters

In [3]:
equipment_sensors_file = 'data/equipment_sensors.csv'
equipment_file = 'data/equipment.json'
equipment_failure_file = 'data/equipment_failure_sensors.log'

In [4]:
with open(equipment_file, 'r') as file_object:
    n_json = json.loads(file_object.read())
    
equipment = spark.sparkContext.parallelize(n_json).map(lambda x: json.dumps(x))
equipment = spark.read.json(equipment)

In [5]:
equipment_sensors = spark.read.format('csv') \
        .option('header',True) \
        .option('delimiter', ';') \
        .load(equipment_sensors_file)

In [6]:
equipment_failure = spark.read.format('csv') \
        .option('delimiter', '\t') \
        .load(equipment_failure_file)

## Answers

1 – Total equipment failures that happened;

2 – Which equipment code had most failures;

3 – Average amount of failures across equipment group, ordering by the amount of failures in ascending order.

In [7]:
equipment.createOrReplaceTempView('equipment')
equipment_sensors.createOrReplaceTempView('equipment_sensors')
equipment_failure.createOrReplaceTempView('equipment_failure')

In [8]:
equipment_failure = spark.sql(sqlQuery = '''
    SELECT 
        replace(replace(_c0, '[', ''), ']', '') as date, 
        _c1 as status, 
        replace(replace(_c2, 'sensor[', ''), ']:', '') as sensor_id, 
        replace(_c4, ', vibration', '') as temperature,
        replace(_c5, ')', '') as vibration
    FROM equipment_failure
''');

equipment_failure.createOrReplaceTempView('equipment_failure')

In [9]:
total_equipment_failures = spark.sql(sqlQuery = '''
    SELECT count(*)
    FROM equipment_failure
    where status = 'ERROR'
''');

In [10]:
total_equipment_failures.show()

+--------+
|count(1)|
+--------+
|   36979|
+--------+



In [11]:
equipment_sensors_join = equipment_failure.join(
    equipment_sensors, 
    equipment_failure.sensor_id == equipment_sensors.sensor_id)

equipment_sensors_join.createOrReplaceTempView('equipment_sensors_join')

In [12]:
most_failure_equipment = spark.sql(sqlQuery = '''
    SELECT count(*), equipment_id
    FROM equipment_sensors_join
    where status = 'ERROR'
    group by 2
    order by 1 desc
    limit 1
''');

In [13]:
most_failure_equipment.show()

+--------+------------+
|count(1)|equipment_id|
+--------+------------+
|    3753|           3|
+--------+------------+



In [14]:
e = equipment.alias('e')
es = equipment_sensors_join.alias('es')

e = e.selectExpr('equipment_id as id', 'code as code', 'group_name as group_name')


equipment_full_join = e.join(
    es, 
    e.id == es.equipment_id)

equipment_full_join.createOrReplaceTempView('equipment_full_join')

In [15]:
avg_amount_failures = spark.sql(sqlQuery = '''
    SELECT avg(total_failures), sum(total_failures), group_name
    FROM (
        select count(*) as total_failures, equipment_id, group_name 
            from equipment_full_join
        where status = 'ERROR'
        group by 2, 3
        )
        group by 3
    order by 2 desc
''');

In [16]:
avg_amount_failures.show()

+-------------------+-------------------+----------+
|avg(total_failures)|sum(total_failures)|group_name|
+-------------------+-------------------+----------+
|             2703.5|              10814|  FGHQWR2Q|
|             3033.5|               6067|  9N127Z5P|
|             2881.0|               5762|  NQWPA8D3|
|             2726.5|               5453|  PA92NCXZ|
|             2592.0|               5184|  VAPQY59S|
|             3699.0|               3699|  Z9K1SAP4|
+-------------------+-------------------+----------+

