In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf

In [5]:
spark = SparkSession.builder.appName("ManipulaciónDatos").getOrCreate()

In [6]:
df_spark = spark.read.csv('sales.csv', header=True, sep=',',inferSchema=True)
df_spark.show(5)

+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|  CustomerName|        EmailAddress|                Item|Quantity|UnitPrice|TaxAmount|
+----------------+--------------------+----------+--------------+--------------------+--------------------+--------+---------+---------+
|         SO43701|                   1|2019-07-01|   Christy Zhu|christy12@adventu...|Mountain-100 Silv...|       1|  3399.99| 271.9992|
|         SO43704|                   1|2019-07-01|    Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|  3374.99| 269.9992|
|         SO43705|                   1|2019-07-01|     Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|  3399.99| 271.9992|
|         SO43700|                   1|2019-07-01|  Ruben Prasad|ruben10@adventure...|  Road-650 Black, 62|       1| 699.0982|  55.9279|
|         SO43703|                   1|20

In [10]:
df_filtrado = df_spark.filter(col('OrderDate') > '2019-07-01')
df_filtrado.show(5)

+----------------+--------------------+----------+----------------+--------------------+----------------+--------+---------+---------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|    CustomerName|        EmailAddress|            Item|Quantity|UnitPrice|TaxAmount|
+----------------+--------------------+----------+----------------+--------------------+----------------+--------+---------+---------+
|         SO43707|                   1|2019-07-02|      Emma Brown|emma3@adventure-w...|Road-150 Red, 48|       1|  3578.27| 286.2616|
|         SO43711|                   1|2019-07-02|Courtney Edwards|courtney1@adventu...|Road-150 Red, 56|       1|  3578.27| 286.2616|
|         SO43706|                   1|2019-07-02|    Edward Brown|edward26@adventur...|Road-150 Red, 48|       1|  3578.27| 286.2616|
|         SO43708|                   1|2019-07-02|       Brad Deng|brad2@adventure-w...|Road-650 Red, 52|       1| 699.0982|  55.9279|
|         SO43709|                   1|2019-07-02|     

**Seleccionar columnas específicas**


In [12]:
# Seleccionar columnas específicas
df_selected = df_spark.select('SalesOrderNumber', 'CustomerName', 'Item', 'Quantity', 'UnitPrice')
df_selected.show(5)

+----------------+--------------+--------------------+--------+---------+
|SalesOrderNumber|  CustomerName|                Item|Quantity|UnitPrice|
+----------------+--------------+--------------------+--------+---------+
|         SO43701|   Christy Zhu|Mountain-100 Silv...|       1|  3399.99|
|         SO43704|    Julio Ruiz|Mountain-100 Blac...|       1|  3374.99|
|         SO43705|     Curtis Lu|Mountain-100 Silv...|       1|  3399.99|
|         SO43700|  Ruben Prasad|  Road-650 Black, 62|       1| 699.0982|
|         SO43703|Albert Alvarez|    Road-150 Red, 62|       1|  3578.27|
+----------------+--------------+--------------------+--------+---------+
only showing top 5 rows



**Crear columnas calculadas**

In [16]:
df_calculado = df_spark.withColumn('Total', col('Quantity') * col('UnitPrice')) \
    .withColumn('TotalConImpuesto', col('Total') + col('TaxAmount'))

df_calculado.select('SalesOrderNumber', 'Total', 'TotalConImpuesto').show(5)

+----------------+--------+----------------+
|SalesOrderNumber|   Total|TotalConImpuesto|
+----------------+--------+----------------+
|         SO43701| 3399.99|       3671.9892|
|         SO43704| 3374.99|       3644.9892|
|         SO43705| 3399.99|       3671.9892|
|         SO43700|699.0982|        755.0261|
|         SO43703| 3578.27|       3864.5316|
+----------------+--------+----------------+
only showing top 5 rows



**Renombrar columnas**


In [8]:
# Renombrar columnas
df_renamed = df_spark.withColumnRenamed('SalesOrderNumber', 'NumeroOrden') \
                     .withColumnRenamed('OrderDate', 'FechaOrden') \
                     .withColumnRenamed('CustomerName', 'Cliente') \
                     .withColumnRenamed('Item', 'Producto') \
                     .withColumnRenamed('Quantity', 'Cantidad') \
                     .withColumnRenamed('UnitPrice', 'PrecioUnitario')

df_renamed.show(5)

+-----------+--------------------+----------+--------------+--------------------+--------------------+--------+--------------+---------+
|NumeroOrden|SalesOrderLineNumber|FechaOrden|       Cliente|        EmailAddress|            Producto|Cantidad|PrecioUnitario|TaxAmount|
+-----------+--------------------+----------+--------------+--------------------+--------------------+--------+--------------+---------+
|    SO43701|                   1|2019-07-01|   Christy Zhu|christy12@adventu...|Mountain-100 Silv...|       1|       3399.99| 271.9992|
|    SO43704|                   1|2019-07-01|    Julio Ruiz|julio1@adventure-...|Mountain-100 Blac...|       1|       3374.99| 269.9992|
|    SO43705|                   1|2019-07-01|     Curtis Lu|curtis9@adventure...|Mountain-100 Silv...|       1|       3399.99| 271.9992|
|    SO43700|                   1|2019-07-01|  Ruben Prasad|ruben10@adventure...|  Road-650 Black, 62|       1|      699.0982|  55.9279|
|    SO43703|                   1|2019-07

**Limpieza básica (duplicados y valores faltantes)**

In [10]:
# Identificar duplicados
from pyspark.sql import functions as F

df_duplicates = df_spark.groupBy('SalesOrderNumber') \
    .agg(F.count('*').alias('Cantidad')) \
    .filter(F.col('Cantidad') > 1)

df_duplicates.show()

+----------------+--------+
|SalesOrderNumber|Cantidad|
+----------------+--------+
|         SO51539|       3|
|         SO51654|       4|
|         SO52871|       2|
|         SO53123|       2|
|         SO53163|       2|
|         SO53404|       3|
|         SO53414|       2|
|         SO53883|       3|
|         SO53987|       2|
|         SO54100|       2|
|         SO54042|       7|
|         SO54611|       2|
|         SO54999|       2|
|         SO55549|       2|
|         SO56348|       3|
|         SO56522|       3|
|         SO56596|       3|
|         SO56614|       3|
|         SO56935|       2|
|         SO57196|       3|
+----------------+--------+
only showing top 20 rows



In [11]:
# Eliminar duplicados (manteniendo el menor SalesOrderLineNumber)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy('SalesOrderNumber').orderBy('SalesOrderLineNumber')

df_no_duplicates = df_spark.withColumn('row_num', row_number().over(windowSpec)) \
    .filter(col('row_num') == 1) \
    .drop('row_num')
# Manejar valores faltantes (si existen)
df_no_duplicates = df_no_duplicates.na.drop()
df_no_duplicates.show(5)

+----------------+--------------------+----------+----------------+--------------------+--------------------+--------+---------+---------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|    CustomerName|        EmailAddress|                Item|Quantity|UnitPrice|TaxAmount|
+----------------+--------------------+----------+----------------+--------------------+--------------------+--------+---------+---------+
|         SO43697|                   1|2019-07-01|     Cole Watson|cole1@adventure-w...|    Road-150 Red, 62|       1|  3578.27| 286.2616|
|         SO43698|                   1|2019-07-01|Rachael Martinez|rachael16@adventu...|Mountain-100 Silv...|       1|  3399.99| 271.9992|
|         SO43699|                   1|2019-07-01|   Sydney Wright|sydney61@adventur...|Mountain-100 Silv...|       1|  3399.99| 271.9992|
|         SO43700|                   1|2019-07-01|    Ruben Prasad|ruben10@adventure...|  Road-650 Black, 62|       1| 699.0982|  55.9279|
|         SO43701|         

In [13]:
df_duplicates = df_no_duplicates.groupBy('SalesOrderNumber') \
    .agg(F.count('*').alias('Cantidad')) \
    .filter(F.col('Cantidad') > 1)

df_duplicates.show()

+----------------+--------+
|SalesOrderNumber|Cantidad|
+----------------+--------+
+----------------+--------+

