In [50]:
#pip install pyspark

In [51]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [52]:
df = pd.read_csv('/content/dados.csv')
df.head()



Unnamed: 0,product_id,product_name,category,price,quantity_sold,sales_date
0,1,Widget A,Widgets,19.99,100,2023-01-01
1,2,Widget B,Widgets,29.99,150,2023-01-02
2,3,Gadget A,Gadgets,99.99,200,2023-01-03
3,4,Gadget B,Gadgets,199.99,50,2023-01-04
4,5,Tool A,Tools,9.99,300,2023-01-05


In [53]:
df.drop(columns=["sales_date"])

Unnamed: 0,product_id,product_name,category,price,quantity_sold
0,1,Widget A,Widgets,19.99,100
1,2,Widget B,Widgets,29.99,150
2,3,Gadget A,Gadgets,99.99,200
3,4,Gadget B,Gadgets,199.99,50
4,5,Tool A,Tools,9.99,300
5,6,Tool B,Tools,14.99,400
6,7,Widget A,Widgets,19.99,110
7,8,Widget B,Widgets,29.99,160
8,9,Gadget A,Gadgets,99.99,210
9,10,Gadget B,Gadgets,199.99,60


In [54]:
from pyspark.sql import SparkSession

# Inicialização da sessão Spark
spark = SparkSession.builder \
    .appName("Pipeline de Limpeza e Transformação para Aplicações de IA") \
    .getOrCreate()


In [55]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.sql import Row

# Schema dos dados
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", FloatType(), True),
    StructField("quantity_sold", IntegerType(), True)
])

# Criando DataFrame
df = spark.createDataFrame(df, schema=schema)

# Exibindo DataFrame
df.show()


+----------+------------+--------+------+-------------+
|product_id|product_name|category| price|quantity_sold|
+----------+------------+--------+------+-------------+
|         1|    Widget A| Widgets| 19.99|          100|
|         2|    Widget B| Widgets| 29.99|          150|
|         3|    Gadget A| Gadgets| 99.99|          200|
|         4|    Gadget B| Gadgets|199.99|           50|
|         5|      Tool A|   Tools|  9.99|          300|
|         6|      Tool B|   Tools| 14.99|          400|
|         7|    Widget A| Widgets| 19.99|          110|
|         8|    Widget B| Widgets| 29.99|          160|
|         9|    Gadget A| Gadgets| 99.99|          210|
|        10|    Gadget B| Gadgets|199.99|           60|
|        11|      Tool A|   Tools|  9.99|          310|
|        12|      Tool B|   Tools| 14.99|          420|
|        13|    Widget C| Widgets| 24.99|          130|
|        14|    Widget D| Widgets| 34.99|          170|
|        15|    Gadget C| Gadgets|149.99|       

In [56]:
from pyspark.sql.functions import col

# Filtrando produtos com preço maior que zero
cleaned_data = df.filter(col("price") > 0)

# Exibindo dados limpos
cleaned_data.show()


+----------+------------+--------+------+-------------+
|product_id|product_name|category| price|quantity_sold|
+----------+------------+--------+------+-------------+
|         1|    Widget A| Widgets| 19.99|          100|
|         2|    Widget B| Widgets| 29.99|          150|
|         3|    Gadget A| Gadgets| 99.99|          200|
|         4|    Gadget B| Gadgets|199.99|           50|
|         5|      Tool A|   Tools|  9.99|          300|
|         6|      Tool B|   Tools| 14.99|          400|
|         7|    Widget A| Widgets| 19.99|          110|
|         8|    Widget B| Widgets| 29.99|          160|
|         9|    Gadget A| Gadgets| 99.99|          210|
|        10|    Gadget B| Gadgets|199.99|           60|
|        11|      Tool A|   Tools|  9.99|          310|
|        12|      Tool B|   Tools| 14.99|          420|
|        13|    Widget C| Widgets| 24.99|          130|
|        14|    Widget D| Widgets| 34.99|          170|
|        15|    Gadget C| Gadgets|149.99|       

In [57]:
from pyspark.ml.feature import VectorAssembler

# Selecionando colunas relevantes
feature_columns = ["price", "quantity_sold"]

# Criando o vetor de features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
final_data = assembler.transform(cleaned_data).select("features")

# Exibindo dados finais preparados
final_data.show(truncate=False)


+--------------------------+
|features                  |
+--------------------------+
|[19.989999771118164,100.0]|
|[29.989999771118164,150.0]|
|[99.98999786376953,200.0] |
|[199.99000549316406,50.0] |
|[9.989999771118164,300.0] |
|[14.989999771118164,400.0]|
|[19.989999771118164,110.0]|
|[29.989999771118164,160.0]|
|[99.98999786376953,210.0] |
|[199.99000549316406,60.0] |
|[9.989999771118164,310.0] |
|[14.989999771118164,420.0]|
|[24.989999771118164,130.0]|
|[34.9900016784668,170.0]  |
|[149.99000549316406,180.0]|
|[249.99000549316406,70.0] |
|[12.989999771118164,320.0]|
|[19.989999771118164,430.0]|
|[21.989999771118164,120.0]|
|[31.989999771118164,140.0]|
+--------------------------+
only showing top 20 rows



In [58]:
# Encerrando a sessão Spark
spark.stop()
