In [5]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
import random

# Ejercicio 1: Preliminares y datos perdidos

Creamos los objetos pedidos

In [8]:
spark_conf = SparkConf().setMaster("local[4]").setAppName('spark-i-dont-love-you')
sc = SparkContext(conf=spark_conf)

Leemos el archivo

In [14]:
data = sc.textFile('household_power_consumption.txt')

Removemos la primera fila

In [24]:
def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr

rdd = data.mapPartitionsWithIndex(remove_header)

¿Funciona? Obviamente 🤗

In [26]:
rdd.take(1)

['16/12/2006;17:24:00;4.216;0.418;234.840;18.400;0.000;1.000;17.000']

Transformamos los datos a tupla:

In [50]:
def to_float(x):
    try:
        return float(x)
    except:
        return None

def to_tuple(values):
    strings = list(map(str, values[:2]))
    numbers = list(map(to_float, values[2:]))
    return tuple(strings + numbers)

rdd2 = rdd.map(lambda x: x.split(';')).map(to_tuple)

¿Funciona? Obviamente 🤗

In [51]:
rdd2.take(1)

[('16/12/2006', '17:24:00', 4.216, 0.418, 234.84, 18.4, 0.0, 1.0, 17.0)]

In [55]:
print(f"La cantidad de datos TOTALES es {rdd2.count()}")

La cantidad de datos TOTALES es 2075259


In [52]:
def has_missing_value(data):
    for x in data:
        if x is None:
            return True
    return False

has_missing = rdd2.map(has_missing_value).filter(lambda x: x)

In [54]:
print(f"La cantidad de datos faltantes es {has_missing.count()}")

La cantidad de datos faltantes es 25979


In [72]:
def get_year(x):
    date = x.split('/')
    return data[2]

missing_by_year = rdd2.filter(has_missing_value).map(lambda x: (x[0].split('/')[-1], 1)).groupByKey().map(lambda x: (x[0], len(x[1])))

In [86]:
print("La cantidad de registros nulos por año es:")

for data_year in missing_by_year.collect():
    print(data_year[0], data_year[1])

La cantidad de registros nulos por año es:
2008 135
2007 3931
2010 17629
2006 4
2009 4280


El mayor es el 2010 😀

In [92]:
def max_year(row):
    day, month, year = row[0].split('/')
    return year == '2010'

def row_to_month(row):
    day, month, year = row[0].split('/')
    return (month, 1)

missing_by_month = rdd2 \
    .filter(max_year) \
    .filter(has_missing_value) \
    .map(row_to_month) \
    .groupByKey() \
    .map(lambda x: (x[0], len(x[1]))) \
    .sortBy(lambda x: int(x[0]))

In [93]:
print("La cantidad de registros nulos por mes para el año 2010 es:")

for data_month in missing_by_month.collect():
    print(data_month[0], data_month[1])

La cantidad de registros nulos por mes para el año 2010 es:
1 3131
2 2
3 2027
4 1
5 1
6 2
7 1
8 7226
9 5237
10 1


Como podemos ver, el mes con mayor cantidad de registros nulos es el "8", es decir, Agosto

# Ejercicio 2: Preparación de los datos

Genere un objeto donde se encuentren sólo aquellos datos sin registros perdidos.

In [107]:
without_missing = rdd2.filter(lambda x: not has_missing_value(x))

La transformación al formato solicitado:

In [99]:
def transform(x):
    numbers = x[2:]
    day, month, year = list(map(int, x[0].split('/')))
    return tuple([year, month, day] + list(numbers))

Ejemplo:

In [108]:
without_missing.first()

('16/12/2006', '17:24:00', 4.216, 0.418, 234.84, 18.4, 0.0, 1.0, 17.0)

In [109]:
transform(without_missing.first())

(2006, 12, 16, 4.216, 0.418, 234.84, 18.4, 0.0, 1.0, 17.0)

In [110]:
final_rdd = without_missing.map(transform)

In [114]:
for row in final_rdd.zipWithIndex().filter(lambda x: x[1] < 5).collect():
    print(row)

((2006, 12, 16, 4.216, 0.418, 234.84, 18.4, 0.0, 1.0, 17.0), 0)
((2006, 12, 16, 5.36, 0.436, 233.63, 23.0, 0.0, 1.0, 16.0), 1)
((2006, 12, 16, 5.374, 0.498, 233.29, 23.0, 0.0, 2.0, 17.0), 2)
((2006, 12, 16, 5.388, 0.502, 233.74, 23.0, 0.0, 1.0, 17.0), 3)
((2006, 12, 16, 3.666, 0.528, 235.68, 15.8, 0.0, 1.0, 17.0), 4)


# Ejercicio 3: Patrones globales

In [132]:
columns = 'year,month,day,global_activity_power,global_reactive_power,voltage,global_intensity,submetering_1,submetering_2,submetering_3'.split(',')
columns

['year',
 'month',
 'day',
 'global_activity_power',
 'global_reactive_power',
 'voltage',
 'global_intensity',
 'submetering_1',
 'submetering_2',
 'submetering_3']

In [134]:
def report(col_index):
    column_rdd = final_rdd.map(lambda x: x[col_index])
    
    n = column_rdd.count()
    average = column_rdd.reduce(lambda x, y: (x + y)) / n
    
    std = (column_rdd.map(lambda x: (x - average) ** 2).reduce(lambda x, y: x + y) / 2) ** 0.5
    
    se = std / n
    conf_1 = average + 1.96 * se
    conf_2 = average - 1.96 * se
    
    print('Promedio:', average)
    print('DE:', std)
    print(f'Intervalo de confianza: ({conf_1}, {conf_2})')

Promedio: 0.12371447630388221
DE: 114.10223569001965
Intervalo de confianza: (0.12382360750213352, 0.12360534510563091)


In [133]:
columns_to_report = [
    'global_activity_power',
    'global_reactive_power',
    'voltage',
    'global_intensity',
]

In [None]:
for col in columns_to_report:
    report(columns.index(col)