## Renew power case study

## Question 1:
Approach : 
Turbine is not connected to server if it satisfies any of the 2 conditions  -

•	Case A : if state = 4 , Or

•	Case B : if state=0 and wind speed <=0


In [1]:
#initialize SparkContext
sc

In [10]:
#import relevant libraries
from pyspark.sql import functions as psf

In [3]:
#read file into a Spark Dataframe
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("hdfs:/user/kushalu/renew_power_data.csv")


In [4]:
#display 5 rows
df.show(5,False)

+-----------------------------------------+--------------------------+-----------------+
|TimeStamp (India Standard Time UTC+05:30)|LAHT461-CommunicationState|LAHT461-WindSpeed|
+-----------------------------------------+--------------------------+-----------------+
|01/09/2019 00:00:07                      |null                      |3.541333         |
|01/09/2019 00:00:17                      |null                      |3.381333         |
|01/09/2019 00:00:37                      |null                      |3.621333         |
|01/09/2019 00:00:47                      |null                      |3.498667         |
|01/09/2019 00:00:57                      |null                      |3.397333         |
+-----------------------------------------+--------------------------+-----------------+
only showing top 5 rows



In [40]:
#data manipulation to rename certain columns, and set non missing to -1 in case of speed and state in order to prevent 
# issue when doing comparison filtering on these columns
df1=df.withColumnRenamed('TimeStamp (India Standard Time UTC+05:30)','date_id')\
.withColumn('state',
            psf.when(
                psf.col('LAHT461-CommunicationState').isNotNull(),
                psf.col('LAHT461-CommunicationState')
            ).otherwise(-1)
           )\
.withColumn('speed',
            psf.when(
                psf.col('LAHT461-WindSpeed').isNotNull(),
                psf.col('LAHT461-WindSpeed')
            ).otherwise(-1)
           )\
.drop('LAHT461-CommunicationState').drop('LAHT461-WindSpeed')

In [16]:
df1.show(5,False)

+-------------------+--------+-----+
|date_id            |speed   |state|
+-------------------+--------+-----+
|01/09/2019 00:00:07|3.541333|-1   |
|01/09/2019 00:00:17|3.381333|-1   |
|01/09/2019 00:00:37|3.621333|-1   |
|01/09/2019 00:00:47|3.498667|-1   |
|01/09/2019 00:00:57|3.397333|-1   |
+-------------------+--------+-----+
only showing top 5 rows



In [171]:
#method 1 : using Spark DataFrame
'''
Step 1: derive column where - 
* turbine is not connected. Its value is 1 if not connected, else 0 : column name = 'not_connected'
* set a dummy value to 1 for row count : column name = 'row_dummy'

Step 2: find sum of columns mentioned above

Step 3: find percentage not connected in column 'pct_not_connected' using -
sum(not_connected)/(total number of rows)

'''
df2 = df1.withColumn('not_connected', 
                     psf.when( (psf.col('state')==4) | 
                               ( (psf.col('speed')<=0) & (psf.col('state')==0)   )  , 1).otherwise(0) )\
.withColumn('row_dummy', psf.lit(1) ).agg( {'not_connected':'sum' , 'row_dummy':'sum' }  )\
.withColumn('pct_not_connected',psf.col('sum(not_connected)')*100/psf.col('sum(row_dummy)') )\
.show()


+------------------+--------------+-------------------+
|sum(not_connected)|sum(row_dummy)|  pct_not_connected|
+------------------+--------------+-------------------+
|                25|         43139|0.05795220102459491|
+------------------+--------------+-------------------+



In [43]:
#method 2: Using Spark SQL
df1.registerTempTable('renew_power')

In [172]:
sqlContext.sql('''
select (count(date_id)*100/ first(record_count)) as pct_not_connected from 
(select 
date_id,
speed,
lag(state,1,-1) over (order by date_id) as prev_state1,
state,
lead(state,1,-1) over (order by date_id) as next_state1,
row_number() over (order by date_id) as row_number,
count(date_id) over (order by date_id rows between unbounded preceding and unbounded following) as record_count
from renew_power )
where state = 4 or (state=0 and speed<=0)
''').show()

+-------------------+
|  pct_not_connected|
+-------------------+
|0.05795220102459491|
+-------------------+



## Que 2: reading Json

In [1]:
import json

In [2]:
file_name='renew_sample_input_test.json'

In [3]:
with open(file_name) as json_file:
    json_data = json.load(json_file)

In [4]:
json_data.keys()

dict_keys(['objects', 'indicators', 'data', 'self'])

In [5]:
base_date=json_data['data']

In [6]:
from pyspark.sql.types import StructField,StructType
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.types import DoubleType

In [7]:
schema_ = StructType( [    
               StructField("timestamp", StringType(), True),
              StructField("power.ac", DoubleType(), True),
               StructField("voltage.ac.ab", DoubleType(), True),
               StructField("current.ac", DoubleType(), True)
            ]           )


In [8]:
datarows = []

In [9]:
for i in base_date:
    print (i[0], i[1][0][0],i[1][0][1],i[1][0][2])
    tuple_input = (i[0], i[1][0][0],i[1][0][1],i[1][0][2])
    datarows.append(tuple_input)

2020-01-20T09:00:00 890083.7333333333 604.7386666666666 857.0843777777778
2020-01-20T09:15:00 985881.5333333333 604.9938000000001 949.5994222222222
2020-01-20T09:30:00 1079118.9333333333 605.0138000000001 1039.5389333333333
2020-01-20T09:45:00 1173847.3333333333 604.0386 1130.607177777778
2020-01-20T10:00:00 1260267.6666666667 606.0834 1211.7521111111112
2020-01-20T10:15:00 1335955.6666666667 604.6751999999999 1286.6725777777776
2020-01-20T10:30:00 1393353.2 604.5897333333335 1342.1388
2020-01-20T10:45:00 1412581.5333333334 606.2603333333333 1354.6253777777777
2020-01-20T11:00:00 1483242.3333333333 606.7202 1422.1712888888887
2020-01-20T11:15:00 1407966.8 608.2414666666666 1348.0626222222222
2020-01-20T11:30:00 1446273.5333333334 612.4508000000001 1372.674088888889
2020-01-20T11:45:00 1103368.9333333333 615.1485333333333 1041.5581777777777


In [10]:
solar_df = sqlContext.createDataFrame(datarows,schema_)

In [102]:
solar_df.show()

+-------------------+------------------+-----------------+------------------+
|          timestamp|          power.ac|    voltage.ac.ab|        current.ac|
+-------------------+------------------+-----------------+------------------+
|2020-01-20T09:00:00| 890083.7333333333|604.7386666666666| 857.0843777777778|
|2020-01-20T09:15:00| 985881.5333333333|604.9938000000001| 949.5994222222222|
|2020-01-20T09:30:00|1079118.9333333333|605.0138000000001|1039.5389333333333|
|2020-01-20T09:45:00|1173847.3333333333|         604.0386| 1130.607177777778|
|2020-01-20T10:00:00|1260267.6666666667|         606.0834|1211.7521111111112|
|2020-01-20T10:15:00|1335955.6666666667|604.6751999999999|1286.6725777777776|
|2020-01-20T10:30:00|         1393353.2|604.5897333333335|         1342.1388|
|2020-01-20T10:45:00|1412581.5333333334|606.2603333333333|1354.6253777777777|
|2020-01-20T11:00:00|1483242.3333333333|         606.7202|1422.1712888888887|
|2020-01-20T11:15:00|         1407966.8|608.2414666666666|1348.0

In [None]:
#Save results to a csv file
solar_df.coalesce(1).write\
        .format("com.databricks.spark.csv")\
        .option("header", "True")\
        .mode("overwrite")\
        .save('solar_df_output')
    
    