In [1]:
pip install pyspark findspark pandas


Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
findspark.init()


In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Crear sesión de Spark
spark = SparkSession.builder.appName("BatchProcessing").getOrCreate()

# Ruta del archivo CSV
file_path = r"D:\Documentos Angelica\cata_bigData\dreaddit-test.csv"

# Cargar datos como DataFrame de Spark
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Mostrar las primeras filas
df.show(5)

# **Limpieza de Datos**
# - Eliminamos filas con valores nulos
df_cleaned = df.dropna()

# - Renombramos columnas para evitar espacios en los nombres
for col_name in df_cleaned.columns:
    df_cleaned = df_cleaned.withColumnRenamed(col_name, col_name.replace(" ", "_"))


+-----+-------------+-------+--------------+--------------------+--------------------+--------------------+--------------------+------------+-------------------+------------+-----------------+--------------+------------------+-------------+------------+---------------+------------+-----------------+----------------+--------------+----------+-----------+------------+--------------+-------------+--------------+----------------+-------------+----------------+---------------+-------------+---------------+-------------+------------+----------------+-----------------+---------------+--------------+---------------+---------------+---------------+------------+--------------+------------+---------------+---------------+---------------+---------------+-------------+----------------+----------------+--------------+----------------+---------------+----------------+---------------+----------------+------------+-------------+-------------+------------+-------------+---------------+---------------+--

In [4]:
# Mostrar estructura y resumen
df_cleaned.printSchema()
df_cleaned.describe().show()

root
 |-- id: integer (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- post_id: string (nullable = true)
 |-- sentence_range: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: string (nullable = true)
 |-- confidence: string (nullable = true)
 |-- social_timestamp: string (nullable = true)
 |-- social_karma: string (nullable = true)
 |-- syntax_ari: string (nullable = true)
 |-- lex_liwc_WC: string (nullable = true)
 |-- lex_liwc_Analytic: string (nullable = true)
 |-- lex_liwc_Clout: string (nullable = true)
 |-- lex_liwc_Authentic: string (nullable = true)
 |-- lex_liwc_Tone: double (nullable = true)
 |-- lex_liwc_WPS: double (nullable = true)
 |-- lex_liwc_Sixltr: double (nullable = true)
 |-- lex_liwc_Dic: double (nullable = true)
 |-- lex_liwc_function: double (nullable = true)
 |-- lex_liwc_pronoun: double (nullable = true)
 |-- lex_liwc_ppron: double (nullable = true)
 |-- lex_liwc_i: double (nullable = true)
 |-- lex_liwc_we: double (nullab

In [5]:
# **Análisis Exploratorio (EDA)**
# - Contar número de registros después de la limpieza
print(f"Total de registros después de la limpieza: {df_cleaned.count()}")

# - Mostrar estadísticas básicas de las columnas numéricas
df_cleaned.select([col(c).cast("double") for c in df_cleaned.columns if df_cleaned.schema[c].dataType.simpleString() == "int"]).summary().show()


Total de registros después de la limpieza: 714
+-------+------------------+
|summary|                id|
+-------+------------------+
|  count|               714|
|   mean|13861.544817927172|
| stddev|  17778.2089145411|
|    min|               2.0|
|    25%|             846.0|
|    50%|            1771.0|
|    75%|           26210.0|
|    max|           55783.0|
+-------+------------------+



In [6]:
import os

# Configurar Hadoop manualmente en Jupyter
os.environ["HADOOP_HOME"] = r"C:\Hadoop"
os.environ["hadoop.home.dir"] = r"C:\Hadoop"
os.environ["PATH"] += os.pathsep + r"C:\Hadoop\bin"

# Asegurar que Spark reconozca Hadoop
os.environ["SPARK_HOME"] = r"C:\path\to\spark"  # Reemplaza con la ruta real de tu Spark
os.environ["PYSPARK_SUBMIT_ARGS"] = "--conf spark.hadoop.home.dir=C:/Hadoop pyspark-shell"

print("✅ HADOOP_HOME configurado en Jupyter")


✅ HADOOP_HOME configurado en Jupyter


In [7]:
from pyspark.sql import SparkSession

# Crear sesión de Spark
spark = SparkSession.builder.appName("TestHadoop")\
    .config("spark.hadoop.home.dir", "C:/Hadoop")\
    .getOrCreate()

# Mostrar configuración activa
print(spark.sparkContext.getConf().getAll())


[('spark.sql.warehouse.dir', 'file:/C:/Users/Angelica%20Martinez/spark-warehouse'), ('spark.driver.port', '50810'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle

In [8]:
# **Almacenamiento de Resultados**
output_path = r"D:\Documentos Angelica\cata_bigData\processed_data.parquet"
df_cleaned.write.parquet(output_path, mode="overwrite")

print(f"Datos procesados guardados en {output_path}")

Datos procesados guardados en D:\Documentos Angelica\cata_bigData\processed_data.parquet
