In [2]:
#DOC

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import count
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import avg


In [3]:
def create_spark_session():
    '''Create a spark session'''
    spark = SparkSession \
    .builder\
    .master("local") \
    .appName("MODEC") \
    .config("spark.executor.memory", "2gb") \
    .getOrCreate()
    return spark



def read_gold(spark, file_path):
    '''
    Read parquet files
    on the silver path

    Parameters:
    spark : Spark Session
    file_path (str): Path to input data
    '''
    try:
        df_data = spark.read.parquet(file_path)
        return df_data
    except IOError:
        print('read error')

In [4]:
spark = create_spark_session()

In [5]:
#Read data
fact =read_gold(spark, 'datalake/gold/fact')

In [135]:
#display data
fact.show(500)

+---------+------------+--------+----------+-------------------+-----+-----------+---------+---+----+-----+----+-------+
|sensor_id|equipment_id|    code|group_name|               date|error|temperature|vibration|day|week|month|year|weekday|
+---------+------------+--------+----------+-------------------+-----+-----------+---------+---+----+-----+----+-------+
|       67|           1|5310B9D7|  FGHQWR2Q|2020-02-25 08:38:54|    1|     167.59|  9643.11| 25|   9|    2|2020|      3|
|       67|           1|5310B9D7|  FGHQWR2Q|2020-03-04 00:04:46|    1|     204.48|   2184.2|  4|  10|    3|2020|      4|
|       67|           1|5310B9D7|  FGHQWR2Q|2020-01-24 23:54:11|    1|     446.09| -2260.74| 24|   4|    1|2020|      6|
|       67|           1|5310B9D7|  FGHQWR2Q|2019-12-24 19:50:00|    1|      186.6|  5810.96| 24|  52|   12|2019|      3|
|       67|           1|5310B9D7|  FGHQWR2Q|2019-12-21 02:28:31|    1|     487.57| -1597.33| 21|  51|   12|2019|      7|
|       67|           1|5310B9D7

In [130]:
#Question 1 - Total equipment failures that happened in Jan/2020?
q1 = fact.where( (col('year') == 2020) & (col('month') == 1) & (col('error') == 1))
print('There are a {0} failures in Jan/2020'.format(q1.count()))

There are a 11645 failures in Jan/2020


In [131]:
#IF you prefer SQL
fact.createOrReplaceTempView('fact')
q1_sql = spark.sql('SELECT COUNT(*) as result FROM fact\
               WHERE year = 2020 AND month = 1 AND error = 1')
q1_sql.show()

+------+
|result|
+------+
| 11645|
+------+



In [132]:
#Question 2 - Which equipment code had most failures in Jan/2020?

#q1 has only jan 2020 results
q2 = q1.groupBy('code').agg(count('code').alias('count')).orderBy(desc('count'))
q2.show(1)

#IN SQL
#fact 'temp_table' has all results

q2_sql = spark.sql('SELECT CODE,COUNT(CODE) as result FROM fact \
                    WHERE year = 2020 AND month = 1 AND error = 1 \
                    GROUP BY (CODE) ORDER BY (result) DESC LIMIT 1')
q2_sql.show()

+--------+-----+
|    code|count|
+--------+-----+
|E1AD07D4| 1377|
+--------+-----+
only showing top 1 row

+--------+------+
|    CODE|result|
+--------+------+
|E1AD07D4|  1377|
+--------+------+



In [133]:
#Question 3 - 
#Average amount of failures across equipment group, ordering by the amount of failures in ascending order?


#q1 has only jan 2020 results
#each record is equal an one error, so we count count to get the avg
q3 = q1.groupBy('group_name').agg(count('group_name').alias('avg')).orderBy(asc('avg'))
q3.show(100)


# IN SQL
q3_sql = spark.sql('SELECT group_name,COUNT(group_name) as avg FROM fact \
                    WHERE year = 2020 AND month = 1 AND error = 1 \
                    GROUP BY (group_name) ORDER BY (avg) ASC')

q3_sql.show()

+----------+----+
|group_name| avg|
+----------+----+
|  Z9K1SAP4|1161|
|  VAPQY59S|1666|
|  PA92NCXZ|1715|
|  9N127Z5P|1732|
|  NQWPA8D3|1747|
|  FGHQWR2Q|3624|
+----------+----+

+----------+----+
|group_name| avg|
+----------+----+
|  Z9K1SAP4|1161|
|  VAPQY59S|1666|
|  PA92NCXZ|1715|
|  9N127Z5P|1732|
|  NQWPA8D3|1747|
|  FGHQWR2Q|3624|
+----------+----+



In [None]:
fact.wri