<a href="https://colab.research.google.com/github/raf4el12/consultas/blob/main/BD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Map:

In [None]:
!pip install pyspark



In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

empleados = [
    ("Ana", "Ventas", 48000),
    ("Carlos", "Marketing", 54000),
    ("Luis", "Finanzas", 60000),
    ("Marta", "Ventas", 52000),
    ("Sofía", "Finanzas", 58000)
]

rdd_empleados = sc.parallelize(empleados)

rdd_empleados_mensual = rdd_empleados.map(lambda x: (x[0], x[1], x[2], x[2] / 12))

resultado = rdd_empleados_mensual.collect()
print("Empleados con salario mensual:")
for empleado in resultado:
    print(empleado)


Empleados con salario mensual:
('Ana', 'Ventas', 48000, 4000.0)
('Carlos', 'Marketing', 54000, 4500.0)
('Luis', 'Finanzas', 60000, 5000.0)
('Marta', 'Ventas', 52000, 4333.333333333333)
('Sofía', 'Finanzas', 58000, 4833.333333333333)


# Filter:


In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

empleados = [
    ("Ana", "Ventas", 48000),
    ("Carlos", "Marketing", 54000),
    ("Luis", "Finanzas", 60000),
    ("Marta", "Ventas", 52000),
    ("Sofía", "Finanzas", 58000),
    ("Juan", "IT", 45000)
]

rdd_empleados = sc.parallelize(empleados)

rdd_empleados_filtrados = rdd_empleados.filter(lambda x: x[2] > 50000)

resultado = rdd_empleados_filtrados.collect()
print("Empleados con salario anual superior a 50,000:")
for empleado in resultado:
    print(empleado)


Empleados con salario anual superior a 50,000:
('Carlos', 'Marketing', 54000)
('Luis', 'Finanzas', 60000)
('Marta', 'Ventas', 52000)
('Sofía', 'Finanzas', 58000)


# FlatMap

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

oraciones = [
    "Aprendiendo todos los dias",
    "Spark es una herramienta poderosa para big data",
    "Navidad esta cada vez mas cerca"
]

rdd_oraciones = sc.parallelize(oraciones)

rdd_palabras = rdd_oraciones.flatMap(lambda oracion: oracion.split(" "))

resultado = rdd_palabras.collect()
print("Lista de palabras en las oraciones:")
for palabra in resultado:
    print(palabra)


Lista de palabras en las oraciones:
Aprendiendo
todos
los
dias
Spark
es
una
herramienta
poderosa
para
big
data
Navidad
esta
cada
vez
ams
cerca


# Union:

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

curso_a = [("Ana", "Matemáticas"), ("Luis", "Historia"), ("Sofía", "Química")]
curso_b = [("Carlos", "Matemáticas"), ("Marta", "Física"), ("Ana", "Biología")]

rdd_curso_a = sc.parallelize(curso_a)
rdd_curso_b = sc.parallelize(curso_b)

rdd_union = rdd_curso_a.union(rdd_curso_b)

resultado = rdd_union.collect()
print("Lista combinada de estudiantes de ambos cursos:")
for estudiante in resultado:
    print(estudiante)


Lista combinada de estudiantes de ambos cursos:
('Ana', 'Matemáticas')
('Luis', 'Historia')
('Sofía', 'Química')
('Carlos', 'Matemáticas')
('Marta', 'Física')
('Ana', 'Biología')


# Intersection:

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

tienda_a = [("Laptop", 1200), ("Tablet", 300), ("Celular", 500)]
tienda_b = [("Celular", 500), ("Televisión", 800), ("Tablet", 300)]

rdd_tienda_a = sc.parallelize(tienda_a)
rdd_tienda_b = sc.parallelize(tienda_b)

rdd_interseccion = rdd_tienda_a.intersection(rdd_tienda_b)

resultado = rdd_interseccion.collect()
print("Productos comunes en ambas tiendas:")
for producto in resultado:
    print(producto)


Productos comunes en ambas tiendas:
('Tablet', 300)
('Celular', 500)


# Distinc:

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

productos = [
    "Laptop", "Tablet", "Sofá", "Celular", "Tablet", "Celular", "Laptop", "Cámara"
]

rdd_productos = sc.parallelize(productos)

rdd_unicos = rdd_productos.distinct()

resultado = rdd_unicos.collect()
print("Lista de productos únicos:")
for producto in resultado:
    print(producto)


Lista de productos únicos:
Tablet
Sofá
Celular
Laptop
Cámara


# GroupByKey

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

ventas = [
    ("Electrónica", 300),
    ("Electrónica", 450),
    ("Ropa", 120),
    ("Ropa", 80),
    ("Hogar", 150),
    ("Electrónica", 250),
    ("Hogar", 200)
]

rdd_ventas = sc.parallelize(ventas)

rdd_ventas_agrupadas = rdd_ventas.groupByKey()

resultado = rdd_ventas_agrupadas.collect()
print("Montos de ventas agrupados por departamento:")
for departamento, montos in resultado:
    print(departamento, list(montos))


Montos de ventas agrupados por departamento:
Electrónica [300, 450, 250]
Hogar [150, 200]
Ropa [120, 80]


# reduceByKey

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

ventas = [
    ("Electrónica", 300),
    ("Electrónica", 450),
    ("Ropa", 120),
    ("Ropa", 80),
    ("Hogar", 150),
    ("Electrónica", 250),
    ("Hogar", 200)
]

rdd_ventas = sc.parallelize(ventas)

rdd_totales = rdd_ventas.reduceByKey(lambda x, y: x + y)

resultado = rdd_totales.collect()
print("Total de ventas por departamento:")
for departamento, total in resultado:
    print(departamento, total)


Total de ventas por departamento:
Electrónica 1000
Hogar 350
Ropa 200


# SortByKey

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

empleados = [
    (48000, "Ana"),
    (54000, "Carlos"),
    (60000, "Luis"),
    (52000, "Marta"),
    (58000, "Sofía"),
    (45000, "Juan")
]

rdd_empleados = sc.parallelize(empleados)

rdd_ordenados = rdd_empleados.sortByKey()

resultado = rdd_ordenados.collect()
print("Empleados ordenados por salario:")
for salario, empleado in resultado:
    print(f"{empleado}: {salario}")


Empleados ordenados por salario:
Juan: 45000
Ana: 48000
Marta: 52000
Carlos: 54000
Sofía: 58000
Luis: 60000


# Join:

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

empleados = [
    (1, "Ana"),
    (2, "Carlos"),
    (3, "Luis"),
    (4, "Marta"),
    (5, "Sofía")
]

departamentos = [
    (1, "Ventas", 48000),
    (2, "Marketing", 54000),
    (3, "Finanzas", 60000),
    (4, "Ventas", 52000),
    (5, "Finanzas", 58000)
]

rdd_empleados = sc.parallelize(empleados)
rdd_departamentos = sc.parallelize(departamentos)

rdd_empleados_departamentos = rdd_empleados.join(rdd_departamentos)

resultado = rdd_empleados_departamentos.collect()
print("Detalles completos de empleados y sus departamentos:")
for id_empleado, info in resultado:
    nombre = info[0]
    departamento = info[1][0]
    salario = info[1][1]
    print(f"ID: {id_empleado}, Nombre: {nombre}, Departamento: {departamento}, Salario Anual: ${salario}")



Detalles completos de empleados y sus departamentos:
ID: 4, Nombre: Marta, Departamento: V, Salario Anual: $e
ID: 1, Nombre: Ana, Departamento: V, Salario Anual: $e
ID: 5, Nombre: Sofía, Departamento: F, Salario Anual: $i
ID: 2, Nombre: Carlos, Departamento: M, Salario Anual: $a
ID: 3, Nombre: Luis, Departamento: F, Salario Anual: $i


# CoGroup:

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

empleados = [
    (1, "Ana"),
    (2, "Carlos"),
    (3, "Luis"),
    (4, "Marta"),
    (5, "Sofía")
]

proyectos = [
    (1, "Proyecto A"),
    (1, "Proyecto B"),
    (2, "Proyecto A"),
    (3, "Proyecto C"),
    (4, "Proyecto A"),
    (4, "Proyecto B"),
    (5, "Proyecto D")
]

horas_proyectos = [
    (1, 20),
    (1, 15),
    (2, 30),
    (3, 40),
    (4, 25),
    (4, 10),
    (5, 35)
]

rdd_empleados = sc.parallelize(empleados)
rdd_proyectos = sc.parallelize(proyectos)
rdd_horas = sc.parallelize(horas_proyectos)

rdd_proyectos_kv = rdd_proyectos.map(lambda x: (x[0], x[1]))
rdd_horas_kv = rdd_horas.map(lambda x: (x[0], x[1]))

rdd_empleados_proyectos_horas = rdd_proyectos_kv.cogroup(rdd_horas_kv)

rdd_resultado = rdd_empleados.join(rdd_empleados_proyectos_horas)

resultado = rdd_resultado.collect()
print("Detalles de proyectos y horas dedicadas por cada empleado:")
for id_empleado, (nombre, (proyectos, horas)) in resultado:
    proyectos = list(proyectos)
    horas = list(horas)
    print(f"ID: {id_empleado}, Nombre: {nombre}, Proyectos: {proyectos}, Horas dedicadas: {horas}")


Detalles de proyectos y horas dedicadas por cada empleado:
ID: 1, Nombre: Ana, Proyectos: ['Proyecto A', 'Proyecto B'], Horas dedicadas: [20, 15]
ID: 2, Nombre: Carlos, Proyectos: ['Proyecto A'], Horas dedicadas: [30]
ID: 3, Nombre: Luis, Proyectos: ['Proyecto C'], Horas dedicadas: [40]
ID: 4, Nombre: Marta, Proyectos: ['Proyecto A', 'Proyecto B'], Horas dedicadas: [25, 10]
ID: 5, Nombre: Sofía, Proyectos: ['Proyecto D'], Horas dedicadas: [35]


# Coalesce:

In [None]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes, 4)
print("Número de particiones originales:", rdd_estudiantes.getNumPartitions())

rdd_coalescido = rdd_estudiantes.coalesce(2)
print("Número de particiones después de coalesce:", rdd_coalescido.getNumPartitions())

resultado = rdd_coalescido.collect()
print("\nDatos de los estudiantes:")
for estudiante in resultado:
    print(estudiante)


Número de particiones originales: 4
Número de particiones después de coalesce: 2

Datos de los estudiantes:
('Carlos', 'Matemáticas', 85)
('Ana', 'Historia', 78)
('Luis', 'Matemáticas', 92)
('Marta', 'Ciencia', 88)
('Sofía', 'Historia', 91)
('Juan', 'Ciencia', 95)
('Clara', 'Matemáticas', 74)
('Diego', 'Historia', 82)
('Laura', 'Ciencia', 80)


In [13]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
max_calificacion = rdd_estudiantes.map(lambda x: x[2]).reduce(lambda a, b: a if a > b else b)
print("La calificación más alta es:", max_calificacion)
# REDUCE (FUNC)

La calificación más alta es: 95


In [14]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# COLLECT (FUNC)
resultado = rdd_estudiantes.collect()
print("Datos de los estudiantes:")
for estudiante in resultado:
    print(estudiante)


Datos de los estudiantes:
('Carlos', 'Matemáticas', 85)
('Ana', 'Historia', 78)
('Luis', 'Matemáticas', 92)
('Marta', 'Ciencia', 88)
('Sofía', 'Historia', 91)
('Juan', 'Ciencia', 95)
('Clara', 'Matemáticas', 74)
('Diego', 'Historia', 82)
('Laura', 'Ciencia', 80)


In [15]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# Cuenta el nUmero de elementos en el RDD count (func)
num_estudiantes = rdd_estudiantes.count()
print("Número de estudiantes:", num_estudiantes)


Número de estudiantes: 9


In [16]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# Obtiene el primer elemento del RDD FIRST (FUNC)
primer_estudiante = rdd_estudiantes.first()
print("Primer estudiante:", primer_estudiante)


Primer estudiante: ('Carlos', 'Matemáticas', 85)


In [17]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# Toma los primeros 3 elementos del RDD TAKE (FUNC)
primeros_tres = rdd_estudiantes.take(3)
print("Primeros tres estudiantes:")
for estudiante in primeros_tres:
    print(estudiante)


Primeros tres estudiantes:
('Carlos', 'Matemáticas', 85)
('Ana', 'Historia', 78)
('Luis', 'Matemáticas', 92)


In [18]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# guardamos el contenido del RDD en un archivo de texto saveAsTextFile
rdd_estudiantes.saveAsTextFile("estudiantes_output")

print("Datos guardados en la carpeta 'estudiantes_output'.")


Datos guardados en la carpeta 'estudiantes_output'.


In [23]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# Encuentra el estudiante con la calificación más baja MAX, MIN (FUNC)

estudiante_max = rdd_estudiantes.max(key=lambda x: x[2])


estudiante_min = rdd_estudiantes.min(key=lambda x: x[2])

print("Estudiante con la calificación más alta:", estudiante_max)
print("Estudiante con la calificación más baja:", estudiante_min)


Estudiante con la calificación más alta: ('Juan', 'Ciencia', 95)
Estudiante con la calificación más baja: ('Clara', 'Matemáticas', 74)


In [24]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# se crea un RDD clave-valor donde la clave es la materia y el valor es 1
rdd_materias = rdd_estudiantes.map(lambda x: (x[1], 1))

# usamos countByKey para contar cuántos estudiantes hay en cada materia
conteo_por_materia = rdd_materias.countByKey()

print("Número de estudiantes por materia:")
for materia, conteo in conteo_por_materia.items():
    print(materia, ":", conteo)
# countByKey

Número de estudiantes por materia:
Matemáticas : 3
Historia : 3
Ciencia : 3


In [29]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

estudiantes = [
    ("Carlos", "Matemáticas", 85),
    ("Ana", "Historia", 78),
    ("Luis", "Matemáticas", 92),
    ("Marta", "Ciencia", 88),
    ("Sofía", "Historia", 91),
    ("Juan", "Ciencia", 95),
    ("Clara", "Matemáticas", 74),
    ("Diego", "Historia", 82),
    ("Laura", "Ciencia", 80)
]

rdd_estudiantes = sc.parallelize(estudiantes)
# USAMOS foreach para imprimir cada estudiante con un formato personalizado

rdd_estudiantes.foreach(lambda x: print(f"Estudiante: {x[0]}, Materia: {x[1]}, Calificación: {x[2]}"))

resultado = rdd_estudiantes.collect()
# FOREACH (FUNC)

In [30]:
!apt-get install git

!git config --global user.name "raf4el12"
!git config --global user.email "gomerorafael121@gmail.com"


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git is already the newest version (1:2.34.1-1ubuntu1.11).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [31]:
# Clona tu repositorio (reemplaza la URL por la de tu repositorio)
!git clone https://github.com/raf4el12/queries_rdd.git


Cloning into 'queries_rdd'...


In [32]:
!git add .

fatal: not a git repository (or any of the parent directories): .git


In [33]:
!git commit -m "consultas de PySpark desde Colab"


fatal: not a git repository (or any of the parent directories): .git


In [34]:
!git push origin main


fatal: not a git repository (or any of the parent directories): .git
