@author: Nahuel Herpo

@release: Sep 21, 2021


----


RESOLUTION OF QUERIES WITH HADOOP

Stage 1 of the final project of the Big Data course of the
National  University  of La Plata. The stage 1 consists of
resolving  two  queries  to  the  star  database using the
map reduce paradigm.

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/drive/1TSGTRjetlhg9DHBIlS33GEYg1y3lX_RN

**IMPORTS**

In [226]:
import string
import math
import sys
import os
from Emulador_MapReduce import Job
sys.setrecursionlimit(10000)

**Attached functions**

In [227]:
def typeNumber(star):
  """
  This function returns the number of the star type. The bigger
  the star, the higher the number.

  Args:
    star (str): A star category (Enana, Gigante, so on...).

  Returns:
    int: Star category (-1 if it is an unrecognized type).
  """

  kind = star.split(' ')[0].upper()

  if kind == 'ENANAS':
    return 0
  elif kind == 'GIGANTES':
    return 1
  elif kind == 'SUPERGIGANTES':
    return 2
  elif kind == 'HIPERGIGANTES':
    return 3
  else:
    return -1   #unrecognized type

def calcAbsMag(period_in_days):
  """
  Calculate the absolute magnitude of the star using the period of days. The
  period of days is the number of days it takes for the star to restore its
  brightness. To calculate the real brightness of the object, the absolute
  magnitude must be calculated with the following formula:
  M = –1.43 – 2.81 x log(P).

  Args:
    period_in_days (float): The days period of the star.

  Returns:
    float: Absolute magnitude.
  """

  return ((-1.43) + (-2.81) * math.log(period_in_days))


def calcDistInPC(min_mag, max_mag, abs_mag):
  """
  Calculates the distance in parsec (astronomical measurement), for this it
  receives the minimum visual magnitude, the maximum visual magnitude and
  the absolute magnitude (real brightness of the object). The formula for
  the calculation is: m - M = -5 + 5log(D), where m is the mean visual
  magnitude and D is the distance in persecs.

  Args:
    min_mag (float): Minimum visual magnitude.
    max_mag (float): Maximum visual magnitude.
    abs_mag (float): Absolute magnitude.

  Returns:
    float: Distance in parsec (from Earth).
  """

  # Calculate the mean
  magnitud_mean = (min_mag + max_mag) / 2 #calculo la media

  # Formula with D cleared (the base of the logarithm is 10.)
  return (10 ** ((magnitud_mean - abs_mag + 5) / 5))

**JOB 1**

The job1 filters stars from "Estrellas.txt" that are of the type entered by parameter.

In [229]:
#  JOB 1 -->> Filter by the type of star.

def map1Filter(k1, v1, context):    # k1 = id_estrella  v1 = edad, tipo
  star_type = typeNumber(v1.split('\t')[1])
  if star_type == context['tipoDeEstrellas']: # Si la observacion es de una estrella de tipo ENANA entonces escribo la tupla
    context.write(int(k1), star_type)

def reduce1(k2, v2, context):   # k2 = id_estrella  v2 = tipo
    context.write(k2, v2.next())

# Outout:   id_estrella, tipo

**JOB 2**

The job2 calculates for each observation the distance from the earth, performs a join between the observations and the output of job1, and finally, calculates the average distance for each star.

In [230]:
#  JOB 2 -->> Realiza un join y calcula el promedio por estrella.

def map2_Obs(k1, v1, context):  # k1 = id_estrella  v1 = nombre, dni, mag_min, mag_max, periodo, fechaObs.
  values = v1.split('\t')
  distance = calcDistInPC(float(values[2]), float(values[3]), calcAbsMag(float(values[4])))
  context.write((int(k1), 'OBS'), ('OBS', distance, 1))

def map2_Est(k1, v1, context):  # k1 = id_estrella  v1 = tipo.
  context.write((int(k1), 'EST'), ('TIP', v1))

def combiner2(k2, v2, context):  # k2 = (id_estrella, 'EST' | 'OBS')  v2 = (('EST', tipo) | ('OBS', distancia, 1))
  count = 0
  distance_sum = 0.0
  for v in v2:
    if v[0] == 'OBS':
      count += v[2]
      distance_sum += v[1]
    elif v[0] == 'TIP':
      context.write(k2, v2)
  if count != 0:  # Si por lo menos hay una observacion
    context.write(k2, ('OBS', distance_sum, count))

def reduce2(k2, v2, context):  # k2 = (id_estrella, 'EST' | 'OBS')  v2 = (('EST', tipo) | ('OBS', distancia, 1))
  count = 0
  distance_sum = 0.0
  first = v2.next()
  if first[0] == 'TIP':
    for v in v2:
      count += v[2]
      distance_sum += v[1]
    if count != 0: # Si por lo menos hay una observacion
      context.write(k2[0], distance_sum / count)

def shuffle2(key1, key2):
  if key1[0] == key2[0]:
    return 0
  elif key1[0] < key2[0]:
    return -1
  else:
    return 1

def sort2(key1, key2):
  if key1[1] == key2[1]:
    return 0
  elif key1[1] == 'EST':
    return -1
  else:
    return 1

# Output:   id_estrella, prom_distancia

**Query 1**

For each star of a given type (query parameter), what is the average distance from Earth among all observers?

Job1 Driver

In [231]:
#JOB_1 - Filter
input_directory1 = 'data/Estrellas/'
output_directory1 = 'outputs/job1_filter/'

# Create the output directory, if it does not exist
if not os.path.exists('outputs/job1_filter/'):
    os.mkdir('outputs/job1_filter/')

# Read the star type
star_type = int(input("Por favor, ingresa el tipo de estrella: "))

#Job1 On
job1 = Job(input_directory1, output_directory1, map1Filter, reduce1)
dictionary = { 'tipoDeEstrellas': star_type } # Tipo de estrella.
job1.setParams(dictionary)
print(job1.waitForCompletion())

True


Job2 Driver

In [232]:
#JOB_2 - Join
input_directory2 = output_directory1
output_directory2 = 'outputs/job2_join/'

# Create the output directory, if it does not exist
if not os.path.exists('outputs/job2_join/'):
    os.mkdir('outputs/job2_join/')

#Job2 On
job2 = Job(input_directory2, output_directory2, map2_Est, reduce2)
job2.addInputPath('data/Observaciones/', map2_Obs)
job2.setShuffleCmp(shuffle2)
job2.setSortCmp(sort2)
print(job2.waitForCompletion())

True


**JOB A**

In [233]:
# JOB A - JOIN

def mapA_Obs(k1, v1, context):  # k1 = id_estrella  v1 = nombre, dni, mag_min, mag_max, periodo, fechaObs.
  values = v1.split('\t')
  context.write((k1, 'OBS'), ('OBS', values[0], int(values[1])))

def mapA_Est(k1, v1, context):  # k1 = id_estrella  v1 = edad, tipo.
  values = v1.split('\t')
  context.write((k1, 'EST'), ('TIP', values[1]))

def reduceA(k2, v2, context):  # k2 = (id_estrella, 'EST' | 'OBS')  v2 = tipo | (nombre, dni)
  first = True
  tipo = None
  for v in v2:
    if first:
      if v[0] == 'TIP':
        tipo = v[1]
      first = False
    else:
      if tipo != None:
        context.write(k2[0], (tipo, v[1], v[2]))

def shuffleA(key1, key2):
  if key1[0] == key2[0]:
    return 0
  elif key1[0] < key2[0]:
    return -1
  else:
    return 1

def sortA(key1, key2):
  if key1[1] == key2[1]:
    return 0
  elif key1[1] == 'EST':
    return -1
  else:
    return 1

# Output:   id, tipo, nombre, dni

**JOB B**

In [240]:
# JOB B -->>

def mapB(k1, v1, context):  # k1 = id_estrella   v1 = tipo, nombre, dni
  values = v1.split('\t')
  context.write((int(values[2]), values[1]), typeNumber(values[0]))

def reduceB(k2, v2, context):  # k2 = (dni, nombre)   v1 = tipo_int
  observedTypes = [False, False, False, False]
  count = 0
  for v in v2:
    observedTypes[v] = True
  for i in range(len(observedTypes)):
    if observedTypes[i]:
      count = count + 1
  context.write(k2, count)

**JOB C**

In [241]:
# JOB C

def mapC(k1, v1, context):    # k1 = dni  v1 = nombre, cant_tipos_observados
  values = v1.split('\t')
  context.write(int(values[1]), (values[0], int(k1), int(values[1])))

def reduceC(k2, v2, context): # k2 = cant_tipos_observados  v2 = (nombre, dni, cant_tipos_observados)
  first = True
  max = -1
  for v in v2:
    if first:
      context.write(v[0], v[1])
      max = v[2]  # Me aseguro que el primero es el maximo ya que la etapa sort los ordeno asi.
      first = False
    else:
      if v[2] == max:
        context.write(v[0], v[1])
      else:
        break

def shuffleC(key1, key2): # Devulvo simepre 0 para que todo vaya a un reducer.
  return 0

def sortC(key1, key2):  # Ordeno del que mas tipos observo al que menos tipos observo.
  if key1 == key2:
    return 0
  elif key1 > key2: # de mayor a menor
    return -1
  else:
    return 1

#Output: nombre, dni

**Query 2**

*JobA Driver*

In [239]:
# Input and output directories
input_directoryA = 'data/Observaciones/'
output_directoryA = 'outputs/jobA_join/'

# Create the output directory, if it does not exist
if not os.path.exists('outputs/jobA_join/'):
    os.mkdir('outputs/jobA_join/')

# JobA On
jobA = Job(input_directoryA, output_directoryA, mapA_Obs, reduceA)
jobA.addInputPath('data/Estrellas/', mapA_Est)
jobA.setShuffleCmp(shuffleA)
jobA.setSortCmp(sortA)
print(jobA.waitForCompletion())

True


*JobB Driver*

In [242]:
# Input and output directories
input_directoryB = output_directoryA
output_directoryB = 'outputs/jobB_count/'

# Create the output directory, if it does not exist
if not os.path.exists('outputs/jobB_count/'):
    os.mkdir('outputs/jobB_count/')

#JobB On
jobB = Job(input_directoryB, output_directoryB, mapB, reduceB)
print(jobB.waitForCompletion())

True


*JobC Driver*

In [243]:
# Input and output directories
input_directoryC = output_directoryB
output_directoryC = 'outputs/JobC_top/'

# Create the output directory, if it does not exist
if not os.path.exists('outputs/JobC_top/'):
    os.mkdir('outputs/JobC_top/')

#JobB On
jobC = Job(input_directoryC, output_directoryC, mapC, reduceC)
jobC.setShuffleCmp(shuffleC)
jobC.setSortCmp(sortC)
print(jobC.waitForCompletion())

True
