<a href="https://colab.research.google.com/github/leioscx/Ejemplos_Transformaciones_y_Acciones/blob/main/ejemplos_mejorados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "Ejemplo_Mejorado")

In [None]:
# TRANSFORMACIONES

In [None]:
# Crear el RDD de ejemplo
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Aplicar filter y map en una sola función lambda
resultado = rdd.map(lambda x: x ** 2 if x % 2 == 0 else None).filter(lambda x: x is not None)

# Resultado: [4, 16, 36, 64, 100]
print(resultado.collect())


[4, 16, 36, 64, 100]


In [None]:
# Crear el RDD de frases
rdd = sc.parallelize(["hola mundo", "hola Spark", "mundo de Big Data"])

# Usar mapPartitions para dividir en palabras por partición y luego distinct para obtener únicas
resultado = rdd.mapPartitions(lambda partition: (word for line in partition for word in line.split(" "))).distinct()

# Resultado: ['hola', 'mundo', 'Spark', 'de', 'Big', 'Data']
print(resultado.collect())


['hola', 'mundo', 'Spark', 'de', 'Big', 'Data']


In [None]:
# Crear dos RDDs de nombres
rdd1 = sc.parallelize(["Juan", "Ana", "Carlos", "Miguel", "Carlos"])
rdd2 = sc.parallelize(["Carlos", "Ana", "Maria", "Sofia", "Ana"])

# Eliminar duplicados antes de realizar la union e intersección
rdd1_distinct = rdd1.distinct()
rdd2_distinct = rdd2.distinct()

# Unión de ambos RDDs sin duplicados internos
union_resultado = rdd1_distinct.union(rdd2_distinct)

# Intersección de ambos RDDs
interseccion_resultado = rdd1_distinct.intersection(rdd2_distinct)

# Resultado de la unión: ['Juan', 'Ana', 'Carlos', 'Miguel', 'Maria', 'Sofia']
# Resultado de la intersección: ['Carlos', 'Ana']
print("Unión:", union_resultado.collect())
print("Intersección:", interseccion_resultado.collect())

Unión: ['Juan', 'Ana', 'Carlos', 'Miguel', 'Carlos', 'Ana', 'Maria', 'Sofia']
Intersección: ['Carlos', 'Ana']


In [None]:
# ACCIONES

In [None]:
# Crear un RDD de números
numeros = sc.parallelize([1, 6, 3, 8, 5, 10, 2, 7])

# Filtrar solo los números mayores a 5
numeros_mayores_5 = numeros.filter(lambda x: x > 5)

# Sumar solo los elementos mayores a 5
suma_mayores_5 = numeros_mayores_5.reduce(lambda a, b: a + b)
print("Suma de números mayores a 5:", suma_mayores_5)

# Recolectar los elementos filtrados
numeros_filtrados = numeros_mayores_5.collect()
print("Números mayores a 5:", numeros_filtrados)


Suma de números mayores a 5: 31
Números mayores a 5: [6, 8, 10, 7]


In [None]:
# Crear un RDD de palabras
palabras = sc.parallelize(["Spark", "es", "una", "herramienta", "muy", "potente", "para", "Big", "Data"])

# Filtrar palabras con más de 4 letras
palabras_largas = palabras.filter(lambda palabra: len(palabra) > 4)

# Contar la cantidad de palabras largas
total_palabras_largas = palabras_largas.count()
print("Cantidad de palabras con más de 4 letras:", total_palabras_largas)

# Transformar a mayúsculas y obtener la primera palabra larga
primera_palabra_larga_mayus = palabras_largas.map(lambda palabra: palabra.upper()).first()
print("Primera palabra larga en mayúsculas:", primera_palabra_larga_mayus)


Cantidad de palabras con más de 4 letras: 3
Primera palabra larga en mayúsculas: SPARK


In [None]:
# Crear un RDD de números
numeros = sc.parallelize([10, 20, 5, 15, 30, 50, 25, 45, 35])

# Obtener el valor máximo y mínimo
valor_maximo = numeros.max()
valor_minimo = numeros.min()
print("Valor máximo:", valor_maximo)
print("Valor mínimo:", valor_minimo)

# Ordenar y tomar los primeros 4 elementos como muestra
muestra = numeros.sortBy(lambda x: x).take(4)
print("Muestra ordenada de los primeros 4 elementos:", muestra)


Valor máximo: 50
Valor mínimo: 5
Muestra ordenada de los primeros 4 elementos: [5, 10, 15, 20]


In [None]:
# Crear un RDD de pares clave-valor
ventas = sc.parallelize([("productoA", 100), ("productoB", 150), ("productoA", 200),
                         ("productoC", 300), ("productoB", 100), ("productoA", 150)])

# Contar la frecuencia de cada clave
frecuencia_por_producto = ventas.countByKey()
print("Frecuencia de ventas por producto:", dict(frecuencia_por_producto))

# Sumar ventas por producto y contar ocurrencias de cada clave
ventas_totales_y_frecuencia = ventas.groupByKey().mapValues(lambda x: (sum(x), len(x)))

# Calcular el promedio de ventas por producto
promedio_ventas_por_producto = ventas_totales_y_frecuencia.mapValues(lambda x: x[0] / x[1]).collect()
print("Promedio de ventas por producto:", dict(promedio_ventas_por_producto))


Frecuencia de ventas por producto: {'productoA': 3, 'productoB': 2, 'productoC': 1}
Promedio de ventas por producto: {'productoA': 150.0, 'productoB': 125.0, 'productoC': 300.0}
