# Imports do spark sql

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Imports do spark ML

In [2]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Imports do Plotly para visualização em gráficos

In [3]:
import plotly.plotly as py
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)

# Configuração básica para utilização do spark

In [4]:
conf = SparkConf().setMaster("local[10]").setAppName("regressao_linear_simples").set("spark.executor.memory", "14g")
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

# Criação do schema do arquivo que será carregado para  o dataframe spark

In [5]:
file_schema = StructType([ StructField("semanas", IntegerType(), True),
                           StructField("clientes", IntegerType(), True),
                           StructField("vendas", DoubleType(), True)])

# Leitura do de arquivo csv para dataframe spark

In [6]:
dataframe = spark.read.format("csv") \
               .option("header", "true") \
               .schema(file_schema) \
               .load('data.csv')

# Visualização de 10 registros do dataframe

In [8]:
dataframe.limit(10).toPandas()

Unnamed: 0,semanas,clientes,vendas
0,1,907,11.2
1,2,926,11.05
2,3,506,6.84
3,4,741,9.21
4,5,789,9.42
5,6,889,10.08
6,7,874,9.45
7,8,510,6.73
8,9,529,7.24
9,10,420,6.12


# Vetorização das variáveis que serão utilizadas na regressão

In [9]:
feature_assembler = VectorAssembler(inputCols=['clientes'],outputCol='independent_features')

# Transformação do dataframe para criação da coluna com variáveis vetorizadas

In [10]:
dataframe_vector = feature_assembler.transform(dataframe)
dataframe_vector.limit(10).toPandas()

Unnamed: 0,semanas,clientes,vendas,independent_features
0,1,907,11.2,[907.0]
1,2,926,11.05,[926.0]
2,3,506,6.84,[506.0]
3,4,741,9.21,[741.0]
4,5,789,9.42,[789.0]
5,6,889,10.08,[889.0]
6,7,874,9.45,[874.0]
7,8,510,6.73,[510.0]
8,9,529,7.24,[529.0]
9,10,420,6.12,[420.0]


# Convertendo o dado do spark para pandas Series

In [14]:
clientes = dataframe_vector.select(col('clientes').cast('double')).toPandas().clientes
vendas = dataframe_vector.select(col('vendas').cast('double')).toPandas().vendas

# Plotagem para análise de clientes e vendas

In [15]:
p1 = go.Scatter(x=clientes, 
                y=vendas, 
                mode='markers',
                marker=dict(color='black')
               )

iplot([p1], filename='clientes')

# Separação do dataframe em 75% treinamento e 25% teste

In [16]:
train_data,test_data = dataframe_vector.randomSplit([0.75,0.25])

# Criação do modelo de regressão linear a partir dos dados de trainamento

In [17]:
regressor = LinearRegression(featuresCol='independent_features',labelCol='vendas')
model = regressor.fit(train_data)

# Visualização dos coeficientes e intercept

In [18]:
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))

Coefficients: [0.009495397410272971]
Intercept: 1.9099898486


# Visualização do número de iterações

In [19]:
training_summary = model.summary
print("numIterations: %d" % training_summary.totalIterations)
print("objectiveHistory: %s" % str(training_summary.objectiveHistory))

numIterations: 1
objectiveHistory: [0.0]


# Visualização de resíduo

In [20]:
training_summary.residuals.limit(10).toPandas()

Unnamed: 0,residuals
0,0.677685
1,0.125339
2,0.263921
3,0.018142
4,-0.271398
5,-0.758967
6,0.306945
7,0.221943
8,-0.727365
9,-0.033696


# Visualização do RMSE e do r2 

In [21]:
print("RMSE: %f" % training_summary.rootMeanSquaredError)
print("r2: %f" % training_summary.r2)

RMSE: 0.384711
r2: 0.941001


# Utilização dos dados de teste para verificar resultado de previsão

In [22]:
pred_result = model.evaluate(test_data)
pred_result.predictions.limit(10).toPandas()

Unnamed: 0,semanas,clientes,vendas,independent_features,prediction
0,2,926,11.05,[926.0],10.702728
1,8,510,6.73,[510.0],6.752643
2,12,872,9.43,[872.0],10.189976
3,13,924,9.46,[924.0],10.683737
4,15,425,6.92,[425.0],5.945534


# Plotagem final

In [28]:
pred_result = model.evaluate(dataframe_vector)
p1 = go.Scatter(x=clientes, 
                y=vendas, 
                mode='markers',
                marker=dict(color='black')
               )

p2 = go.Scatter(x=clientes, 
                y=pred_result.predictions.toPandas().prediction,
                mode='lines',
                line=dict(color='blue', width=3)
                )

iplot([p1,p2], filename='Previsão')