<a href="https://colab.research.google.com/github/jucabnu/artigo_pos/blob/main/TCC_atual_231020_2149.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Preparando a estação de trabalho no Google Colab 

# instalando as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

# configurando  as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
 
# tornando o pyspark inicializável
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

# criando a sessão Spark e configurando os nós de cluster
from pyspark.sql import SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [17]:
# https://raw.githubusercontent.com/jucabnu/artigo_pos/main/arquivo.csv
!rm *.csv
op = input('Como deseja enviar o arquivo? (1 - URI / 2 - Upload): ')
if op == '1':
  site = str(input('Informe a URI: '))
  !wget --quiet --show progress $site
else:
  from google.colab import files
  files.upload()
!ls

Como deseja enviar o arquivo? (1 - URI / 2 - Upload): 1
Informe a URI: https://raw.githubusercontent.com/jucabnu/artigo_pos/main/arquivo.csv
arquivo.csv  spark-2.4.4-bin-hadoop2.7
sample_data  spark-2.4.4-bin-hadoop2.7.tgz


In [19]:
# Montagem do RDD (Resilient Distributed Dataset)

# Atribuindo o arquivo CSV ao RDD
rdd = sc.textFile('arquivo.csv')

# Separação dos dados pelo separador do SCV
rdd = rdd.map(lambda line: line.split(","))

# Importação do pyspark.sql para a montagem do DF (Data Frame)
from pyspark.sql import Row

# Mapeando o RDD para o DF
df = rdd.map(lambda line: Row(Versao=line[0], Qualidade=line[1])).toDF()

# Apresentando os 20 primeiros registros e o schema para verificação
df.show()
df.printSchema()

+---------+------+
|Qualidade|Versao|
+---------+------+
|        2|     3|
|       51|     2|
|       53|    10|
|       71|     7|
|       69|     1|
|       58|     8|
|       73|     3|
|       63|     5|
|       51|     2|
|       58|     2|
|       25|     9|
|        1|     2|
|       80|     6|
|       56|     5|
|       78|     7|
|       64|     3|
|       45|     1|
|       75|     7|
|       12|     1|
|       59|     5|
+---------+------+
only showing top 20 rows

root
 |-- Qualidade: string (nullable = true)
 |-- Versao: string (nullable = true)



In [21]:
# Importando todos os métodos de pyspark.sql.types
from pyspark.sql.types import *

# Função para converter o tipo de dados das colunas do DF.
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

columns = ['Qualidade', 'Versao']

# Chamando a função e passando o parâmetro para conversão em float
df = convertColumn(df, columns, FloatType())

# Alguns métodos úteis

# df.select('Qualidade').show(10)
# df.groupBy("Qualidade").count().sort("Qualidade",ascending=False).show()

# Metodo para verificar o resumo do data frame, com a contagem, média, desvio padrão, mínimo e máximo de cada elemento
df.describe().show()

+-------+------------------+-----------------+
|summary|         Qualidade|           Versao|
+-------+------------------+-----------------+
|  count|             11000|            11000|
|   mean| 50.59581818181818|5.459727272727273|
| stddev|29.018550624661852| 2.88120019967071|
|    min|               1.0|              1.0|
|    max|             100.0|             10.0|
+-------+------------------+-----------------+



In [23]:
# Preparando o DF para aplicação do Machine Learning.

# Importação da biblioteca 'DenseVector'
from pyspark.ml.linalg import DenseVector

# Definição do 'input_data' 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Substituindo o DF pelo novo Data Frame preparado
df = spark.createDataFrame(input_data, ["label", "features"])

# Importando e inicializando o 'StandardScaler'
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Ajustando o DataFrame para Scaler e transformando os dados.
scaler = standardScaler.fit(df)
scaled_df = scaler.transform(df)

# Inspecionar o resultado
scaled_df.take(2)

[Row(label=2.0, features=DenseVector([3.0]), features_scaled=DenseVector([1.0412])),
 Row(label=51.0, features=DenseVector([2.0]), features_scaled=DenseVector([0.6942]))]

In [26]:
# Aplicação prática de Machine Learning

# Dividindo os dados em conjuntos de 'treino' e 'testes'
train_data, test_data = scaled_df.randomSplit([.75,.25],seed=1234)

# Importando e inicializando o método para Regressão Linear
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(labelCol="label", maxIter=10)

# Ajustando os dados no modelo
linearModel = lr.fit(train_data)

# Gerando as predições
predicted = linearModel.transform(test_data)

# Extraind as previões e rotulando os valores corretos "conhecidos"
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Agrupando as predições e as etiquetas numa lista
predictionAndLabel = predictions.zip(labels).collect()

# Imprimindo as 5 primeiras instâncias das predições e etiquetas 
predictionAndLabel[:5]

# Mostrando o coeficiente do modelo
print('Coeficiente do modelo: ', linearModel.coefficients)

# Mostrando a intercessão do modelo
print('Intercessão do modelo: ', linearModel.intercept)

# Mostrando o RMSE. Erro de raiz quadrático médio (root-mean-square error)
print('RMSE: ', linearModel.summary.rootMeanSquaredError)

# Mostrando o R2 (coeficiente de determinação)
print('R2: ', linearModel.summary.r2)

Coeficiente do modelo:  [0.06328203733852185]
Intercessão do modelo:  49.975621959605604
RMSE:  29.131255605850733
R2:  3.904094749518361e-05


In [48]:
# Parando a instância do Spark e finalizando a aplicação
spark.stop()
