In [None]:

# Instalar PySpark
!pip -q install pyspark==3.5.1

# Montar o Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Caminho base do projeto (pasta Eixo_05)
base_path = "/content/drive/MyDrive/Eixo_05/dados/"

In [None]:
# Iniciar sessão Spark
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("imdb-processamento")
    .getOrCreate()
)
spark


In [None]:

# Imports necessários
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml.feature import (
    StringIndexer, RegexTokenizer, StopWordsRemover,
    HashingTF, IDF, Word2Vec, MinMaxScaler
)

# Função auxiliar para valores nulos
def calcula_valores_nulos(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if nullRows > 0:
            null_columns_counts.append((k, nullRows, (nullRows / numRows) * 100))
    return null_columns_counts

In [None]:
# Função principal de limpeza e transformação
def limpar_transformar(spark, base_path):
    # 1) Ler o CSV salvo no Drive
    #reviews = spark.read.csv(base_path + "dataset.csv", header=True, escape="\"").limit(5000)
    reviews = spark.read.csv(base_path + "dataset.csv", header=True, escape="\"")

    # 2) Tratar valores nulos
    nulos = calcula_valores_nulos(reviews)
    if nulos:
        reviews = reviews.dropna()

    # 3) Indexar rótulos (sentiment -> label)
    indexer = StringIndexer(inputCol="sentiment", outputCol="label")
    df = indexer.fit(reviews).transform(reviews)

    # 4) Limpeza de texto
    df = df.withColumn("review", regexp_replace(col("review"), '<.*/>', ''))
    df = df.withColumn("review", regexp_replace(col("review"), '[^A-Za-z ]+', ''))
    df = df.withColumn("review", regexp_replace(col("review"), ' +', ' '))
    df = df.withColumn("review", lower(col("review")))

    # 5) Tokenização e remoção de stopwords
    df = RegexTokenizer(inputCol="review", outputCol="words", pattern="\W").transform(df)
    feature_data = StopWordsRemover(inputCol="words", outputCol="filtered").transform(df)

    # 6) HashingTF
    htf = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=250)
    HTFfeaturizedData = htf.transform(feature_data)

    # 7) TF-IDF
    idf = IDF(inputCol="rawfeatures", outputCol="features")
    idfModel = idf.fit(HTFfeaturizedData)
    TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)

    # 8) Word2Vec + MinMaxScaler
    w2v = Word2Vec(vectorSize=250, minCount=5, inputCol="filtered", outputCol="features")
    W2VfeaturizedData = w2v.fit(feature_data).transform(feature_data)

    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    scaled = scaler.fit(W2VfeaturizedData).transform(W2VfeaturizedData)
    W2VfeaturizedData = scaled.select("sentiment", "review", "label", "scaledFeatures") \
                              .withColumnRenamed("scaledFeatures", "features")

    # 9) Salvar resultados no Drive
    (HTFfeaturizedData
     .select("sentiment", "review", "label", "rawfeatures")
     .withColumnRenamed("rawfeatures", "features")
     .write.mode("overwrite").parquet(base_path + "HTFfeaturizedData"))

    TFIDFfeaturizedData.select("sentiment", "review", "label", "features") \
        .write.mode("overwrite").parquet(base_path + "TFIDFfeaturizedData")

    W2VfeaturizedData.select("sentiment", "review", "label", "features") \
        .write.mode("overwrite").parquet(base_path + "W2VfeaturizedData")

    return (
        HTFfeaturizedData.select("sentiment", "review", "label", "rawfeatures")
                         .withColumnRenamed("rawfeatures", "features"),
        TFIDFfeaturizedData.select("sentiment", "review", "label", "features"),
        W2VfeaturizedData.select("sentiment", "review", "label", "features"),
    )


In [None]:
# Executar processamento e salvar resultados no Drive
HTF, TFIDF, W2V = limpar_transformar(spark, base_path=base_path)

print("Dados processados e salvos no Google Drive em:")
print(f"{base_path}HTFfeaturizedData")
print(f"{base_path}TFIDFfeaturizedData")
print(f"{base_path}W2VfeaturizedData")

print("Contagens:", HTF.count(), TFIDF.count(), W2V.count())