# Project Big Data Neferu

# Imports

In [1]:
! pip install pyspark



In [2]:
import numpy as np
from pyspark.sql import functions
from pyspark.sql.functions import col
import pyspark
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import tensorflow as tf

In [3]:
from pyspark import SparkContext

try:
    sc = SparkContext('local', 'Pyspark demo')
except ValueError:
    print('SparkContext already exists!')

from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()
except ValueError:
    print('SparkSession already exists!')

# Data Preprocessing

In [4]:
df_ = spark.read.option('header', True).format('csv').load('/content/SolarPrediction.csv')

In [5]:
df_.show()

+----------+--------------------+--------+---------+-----------+--------+--------+----------------------+-----+-----------+----------+
|  UNIXTime|                Data|    Time|Radiation|Temperature|Pressure|Humidity|WindDirection(Degrees)|Speed|TimeSunRise|TimeSunSet|
+----------+--------------------+--------+---------+-----------+--------+--------+----------------------+-----+-----------+----------+
|1475229326|9/29/2016 12:00:0...|23:55:26|     1.21|         48|   30.46|      59|                177.39| 5.62|   06:13:00|  18:13:00|
|1475229023|9/29/2016 12:00:0...|23:50:23|     1.21|         48|   30.46|      58|                176.78| 3.37|   06:13:00|  18:13:00|
|1475228726|9/29/2016 12:00:0...|23:45:26|     1.23|         48|   30.46|      57|                158.75| 3.37|   06:13:00|  18:13:00|
|1475228421|9/29/2016 12:00:0...|23:40:21|     1.21|         48|   30.46|      60|                137.71| 3.37|   06:13:00|  18:13:00|
|1475228124|9/29/2016 12:00:0...|23:35:24|     1.17|   

In [6]:
columns = ['UNIXTime', 'Data', 'Time', 'Radiation', 'Temperature', 'Pressure', 'Humidity','WindDirection(Degrees)','Speed', 'TimeSunRise','TimeSunSet']
rows = df_.count()
cnt = 0

In [7]:
for index, column in enumerate(columns):  # for each column

    notNull = df_.filter(col(str(column)).isNotNull()).count()   # we count the null values
 
    if  notNull != rows:
        print('There are '+str(rows - notNull)+' Null values in the '+str(column)+' column')   
    else:
        cnt += 1

    if cnt != index+1:
        print('There are not NULL values in the '+str(column)+' column')

    elif cnt == len(columns):
        print('There are not NULL values in the data frame')

There are not NULL values in the data frame


In [8]:
# Data

split_col = pyspark.sql.functions.split(df_['Data'], '/')
df_ = df_.withColumn('Month', split_col.getItem(0))
df_ = df_.withColumn('Day', split_col.getItem(1))
df_ = df_.withColumn('YearAux', split_col.getItem(2)) # year + time

In [9]:
# Time

split_col = pyspark.sql.functions.split(df_['Time'], ':')
df_ = df_.withColumn('Hour', split_col.getItem(0))
df_ = df_.withColumn('Minute', split_col.getItem(1))
df_ = df_.withColumn('Second', split_col.getItem(2))

In [10]:
columns.append('Month')
columns.append('Day')
columns.append('Hour')
columns.append('Minute')
columns.append('Second')

# Data visualization

In [11]:
df = df_.select([column for column in columns if column not in ['Time', 'Data', 'YearAux', 'TimeSunRise', 'TimeSunSet', 'UNIXTime']])

In [12]:
import matplotlib.pyplot as plt
import seaborn as sns

mydata = df.toPandas()
fig = plt.figure(figsize=(20,10))
fig.suptitle('Feature Correlation', fontsize=18)
#sns.heatmap(mydata.corr(), annot=True, cmap='RdBu', center=0)

Text(0.5, 0.98, 'Feature Correlation')

<Figure size 1440x720 with 0 Axes>

In [13]:
fig2 = plt.figure(figsize=(15,10))
#sns.barplot(x=mydata['Temperature'],y=mydata['Radiation'])

#Temperatura este direct proporțională cu radiația, deci este o caracteristică importantă

<Figure size 1080x720 with 0 Axes>

In [14]:
fig3 = plt.figure(figsize=(15,10))
#sns.barplot(x=mydata['Humidity'],y=mydata['Radiation'])

# Pe măsură ce nivelul de umiditate scade, radiațiile tind să crească, astfel aceste două atribute par a fi invers proporționale.

<Figure size 1080x720 with 0 Axes>

In [15]:
df.show()

+---------+-----------+--------+--------+----------------------+-----+-----+---+----+------+------+
|Radiation|Temperature|Pressure|Humidity|WindDirection(Degrees)|Speed|Month|Day|Hour|Minute|Second|
+---------+-----------+--------+--------+----------------------+-----+-----+---+----+------+------+
|     1.21|         48|   30.46|      59|                177.39| 5.62|    9| 29|  23|    55|    26|
|     1.21|         48|   30.46|      58|                176.78| 3.37|    9| 29|  23|    50|    23|
|     1.23|         48|   30.46|      57|                158.75| 3.37|    9| 29|  23|    45|    26|
|     1.21|         48|   30.46|      60|                137.71| 3.37|    9| 29|  23|    40|    21|
|     1.17|         48|   30.46|      62|                104.95| 5.62|    9| 29|  23|    35|    24|
|     1.21|         48|   30.46|      64|                 120.2| 5.62|    9| 29|  23|    30|    24|
|      1.2|         49|   30.46|      72|                112.45| 6.75|    9| 29|  23|    25|    19|


# Outliers removal
Applying Z score for every feature and keeping the one that its absolute value is smaller than our threshhold(3)

In [16]:
# Temperature	Pressure	Humidity	WindDirection(Degrees)	Speed	Month	Day	Hour	Minute	Second
threshold = 4

outliers = [] 

for i in mydata['Temperature']: 
    z = (i- np.mean(mydata['Temperature']))/np.std(mydata['Temperature'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Temperature are: ', outliers) 

UFuncTypeError: ignored

In [None]:
outliers = [] 

for i in mydata['Pressure']: 
    z = (i- np.mean(mydata['Pressure']))/np.std(mydata['Pressure'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Pressure are: ', outliers)

In [None]:
outliers = [] 

for i in mydata['Humidity']: 
    z = (i- np.mean(mydata['Humidity']))/np.std(mydata['Humidity'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Humidity are: ', outliers)

In [None]:
outliers = [] 

for i in mydata['WindDirection(Degrees)']: 
    z = (i- np.mean(mydata['WindDirection(Degrees)']))/np.std(mydata['WindDirection(Degrees)'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in WindDirection(Degrees): ', outliers)

In [None]:
outliers = [] 

for i in mydata['Speed']: 
    z = (i- np.mean(mydata['Speed']))/np.std(mydata['Speed'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Speed: ', outliers)

In [None]:
outliers = [] 

for i in mydata['Month']: 
    z = (i- np.mean(mydata['Month']))/np.std(mydata['Month'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Month: ', outliers)

In [None]:
threshold = 4
outliers = [] 

for i in mydata['Day']: 
    z = (i- np.mean(mydata['Day']))/np.std(mydata['Day'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Day: ', outliers)

In [None]:
outliers = [] 

for i in mydata['Hour']: 
    z = (i- np.mean(mydata['Hour']))/np.std(mydata['Hour'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Hour: ', outliers)

In [None]:
outliers = [] 

for i in mydata['Minute']: 
    z = (i- np.mean(mydata['Minute']))/np.std(mydata['Minute'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Minute: ', outliers)

In [None]:
outliers = [] 

for i in mydata['Second']: 
    z = (i- np.mean(mydata['Second']))/np.std(mydata['Second'])
    if z > threshold or z < 0-threshold: 
        outliers.append(i) 
print('The outliers in Second: ', outliers)

In [None]:
plt.figure(figsize=(20,10))

distr = mydata[["Temperature","Pressure","Humidity","WindDirection(Degrees)","Speed"]]

for i, column in enumerate(distr):
    plt.subplot(2,3,i+1)
    sns.histplot(distr[column],kde=True)

# Column type conversion

In [17]:
cols = ['Radiation', 'Temperature', 'Pressure', 'Humidity', 'WindDirection(Degrees)', 'Speed', 'Month', 'Day', 'Hour','Minute','Second']

In [18]:
def isfloat(x):
    try:
        float(x)
    except :
        return False
    else:
        return True
line1 = df.head(1)[0]

df = df.select([c for c in df.columns if not isfloat(line1[c])] + [df[c].cast("float").alias(c) for c in df.columns ])

# Train Test Splitting

In [19]:
train, test = df.randomSplit([0.7, 0.3], seed = 41)

# Creating freatures assembler

In [20]:
assembler = VectorAssembler(
    inputCols=['Temperature', 'Pressure', 'Humidity', 'WindDirection(Degrees)', 'Speed', 'Month', 'Day', 'Hour', 'Minute', 'Second'],
    outputCol='Features')


train_data = assembler.transform(train)
test_data = assembler.transform(test)

In [21]:
train_data =  train_data.select([column for column in ['Features', 'Radiation']])
test_data = test_data.select([column for column in ['Features', 'Radiation']])

# ML Models

## Linear Regression

In [22]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(labelCol = 'Radiation', featuresCol = 'Features')
lrModel = lr.fit(train_data)

prediction = lrModel.evaluate(test_data)

print("MAE: {}".format(prediction.meanAbsoluteError))

MAE: 146.64277488396166


## Random Forest

In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(labelCol = 'Radiation',
                            featuresCol = 'Features')

In [24]:
featureIndexer =\
    VectorIndexer(inputCol="Features", outputCol="indexedFeatures", maxCategories=4).fit(train_data)

pipeline = Pipeline(stages=[featureIndexer, rf])

In [25]:
model = pipeline.fit(train_data)

In [26]:
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(
    labelCol="Radiation", predictionCol="prediction", metricName="mae")
rmse = evaluator.evaluate(predictions)
print("MAE on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)

Root Mean Squared Error (RMSE) on test data = 77.5756
RandomForestRegressionModel: uid=RandomForestRegressor_52defa01a024, numTrees=20, numFeatures=10


## Gradient-boosted tree regression

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

gbt = GBTRegressor(labelCol = 'Radiation', featuresCol="indexedFeatures", maxIter=10)

pipeline = Pipeline(stages=[featureIndexer, gbt])

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

evaluator = RegressionEvaluator(
    labelCol="Radiation", predictionCol="prediction", metricName="mae")
rmse = evaluator.evaluate(predictions)
print("MAE on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)

# Deep learning model

In [28]:
cols = ['Radiation', 'Temperature', 'Pressure', 'Humidity', 'WindDirection(Degrees)', 'Speed', 'Month', 'Day', 'Hour','Minute','Second']

training_labels = train.select([column for column in cols if column in ['Radiation']])
testing_labels = test.select([column for column in cols if column in ['Radiation']])

training_data = train.select([column for column in cols if column not in ['Radiation']])
testing_data = test.select([column for column in cols if column not in ['Radiation']])

pd_train_data = training_data.toPandas()
pd_test_data  = testing_data.toPandas()

pd_train_labels = training_labels.toPandas()
pd_test_labels  = testing_labels.toPandas()


In [29]:
from keras.layers import Dense, Dropout
from keras.optimizers import SGD, Adam
from keras.models import Sequential


model = Sequential()
    
model.add(Dense(128, activation='relu', input_dim=10))
model.add(Dropout(0.33))
    
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.33))

model.add(Dense(32, activation='relu'))
model.add(Dropout(0.33))

model.add(Dense(1, activation='linear'))
    
model.compile(metrics='mae', loss='mae', optimizer=Adam(learning_rate=0.0001))
history = model.fit(pd_train_data, pd_train_labels, validation_data=(pd_test_data, pd_test_labels), epochs=50, batch_size=32)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [30]:
model.evaluate(pd_test_data, pd_test_labels)



[73.53791046142578, 73.53791046142578]

## Another arhitecture

In [31]:
from keras.layers import Dense, Dropout
from keras import regularizers

model = Sequential()

model.add(Dense(64, activation='relu',  kernel_initializer='normal', input_dim=10, kernel_regularizer=regularizers.l2(l=0.01)))
model.add(Dropout(0.1))

model.add(Dense(128, activation='relu', kernel_regularizer=regularizers.l2(l=0.01)))
model.add(Dropout(0.1))

model.add(Dense(256, activation='relu', kernel_regularizer=regularizers.l2(l=0.01)))
model.add(Dropout(0.1))

model.add(Dense(128, activation='relu', kernel_regularizer=regularizers.l2(l=0.01)))
model.add(Dropout(0.1))

model.add(Dense(64, activation='relu', kernel_regularizer=regularizers.l2(l=0.01)))
model.add(Dropout(0.1))

model.add(Dense(1, activation='linear'))

In [None]:
model.compile(metrics='mae', loss='mae', optimizer=Adam(learning_rate=0.0001))
history = model.fit(pd_train_data, pd_train_labels, validation_data=(pd_test_data, pd_test_labels), epochs=300, batch_size=32)

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300
Epoch 57/300
Epoch 58/300
Epoch 59/300
Epoch 60/300
Epoch 61/300
Epoch 62/300
Epoch 63/300
Epoch 64/300
Epoch 65/300
Epoch 66/300
Epoch 67/300
Epoch 68/300
Epoch 69/300
Epoch 70/300
Epoch 71/300
Epoch 72/300
Epoch 73/300
Epoch 74/300
Epoch 75/300
Epoch 76/300
Epoch 77/300
Epoch 78

In [None]:
model.evaluate(pd_test_data, pd_test_labels)