In [1]:
import holidays
import statistics as st
from collections import Counter
from datetime import datetime
from datetime import date
from datetime import timedelta
from pyspark import SparkContext
import json
import zipfile
import csv
import json
from operator import add, itemgetter
import numpy as np

sc = SparkContext()

In [2]:
text = sc.textFile("/home/mat/Escritorio/datasets/202003_movements.json")

In [3]:
text

/home/mat/Escritorio/datasets/202003_movements.json MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

### OBTENCIÓN DE LOS FESTIVOS DE UN AÑO. 

Calculamos los festivos y fines de semana de un año, por ejemplo el año 2020.

Tenemos las siguientes funciones:

*   obtencion_fines_semana: Con la biblioteca datetime obtenemos una lista de los fines de semana de un año.
*   vacaciones_madrid: Añadimos los festivos de la Comunidad de Madrid (2 mayo, 15 mayo y 9 noviembre).
*   obtencion_vacaciones_madrid: Con la biblioteca holiday obtenemos los festivos de un año en España y añadimos                                    los festivos y fines de semana calculados anteriormente.
*   festivo: Dada una fecha, nos devuelve si es festivo o no.
*   fecha_no_string: Dada una fecha, elimina el string por medio de la biblioteca datetime.



In [4]:
def obtencion_fines_semana(year):
  saturday = date(year,1,1)
  saturday += timedelta(days=5-saturday.weekday())
  lista_fines_semana = []
  while saturday.year == year:
    lista_fines_semana.append(saturday)
    lista_fines_semana.append(saturday + timedelta(days=1))
    saturday += timedelta(days=7)
  return lista_fines_semana

def vacaciones_madrid(year): #Son 3 días: Almudena, San Isidro, CM.
  return [date(year,5,2), date(year,5,15), date(year,11,9)]

def obtencion_vacaciones_madrid(year):
  lista_vacaciones = list(holidays.CountryHoliday('ES', year, 'CM'))
  lista_vacaciones += vacaciones_madrid(year)
  lista_vacaciones += obtencion_fines_semana(year)
  return lista_vacaciones

def festivo(day): #Devuelve un booleano que indica si un día es festivo (True) o no (False).
    return day in lista_vacaciones 

def fecha_no_string(date): #Dado un string que representa una fecha, devuelve la misma como tipo datetime.Date.
  return datetime.strptime(date, '%Y-%m-%d').date()

In [5]:
lista_vacaciones = obtencion_vacaciones_madrid(2020)

In [6]:
lista_vacaciones

[datetime.date(2020, 1, 1),
 datetime.date(2020, 1, 6),
 datetime.date(2020, 3, 19),
 datetime.date(2020, 4, 9),
 datetime.date(2020, 4, 10),
 datetime.date(2020, 4, 13),
 datetime.date(2020, 5, 1),
 datetime.date(2020, 8, 15),
 datetime.date(2020, 10, 12),
 datetime.date(2020, 11, 2),
 datetime.date(2020, 12, 7),
 datetime.date(2020, 12, 8),
 datetime.date(2020, 12, 25),
 datetime.date(2020, 6, 1),
 datetime.date(2020, 5, 2),
 datetime.date(2020, 5, 15),
 datetime.date(2020, 11, 9),
 datetime.date(2020, 1, 4),
 datetime.date(2020, 1, 5),
 datetime.date(2020, 1, 11),
 datetime.date(2020, 1, 12),
 datetime.date(2020, 1, 18),
 datetime.date(2020, 1, 19),
 datetime.date(2020, 1, 25),
 datetime.date(2020, 1, 26),
 datetime.date(2020, 2, 1),
 datetime.date(2020, 2, 2),
 datetime.date(2020, 2, 8),
 datetime.date(2020, 2, 9),
 datetime.date(2020, 2, 15),
 datetime.date(2020, 2, 16),
 datetime.date(2020, 2, 22),
 datetime.date(2020, 2, 23),
 datetime.date(2020, 2, 29),
 datetime.date(2020, 3, 

### PREPARACIÓN DE LA CONSULTA DE DATOS:

Útiles para cuando calculemos la media de los viajes, tipos de usuario etc, de nuestros archivos. 

In [12]:
directorio = "/home/mat/Escritorio/datasets"

In [13]:
fichero = '202003_movements.json'

In [14]:
fichero

'202003_movements.json'

In [108]:
line = '{"_id": {"$oid": "5eeca06d618046678f83b07a"}, "user_day_code": "a880483c5b4c4a773bd66e0a353ccb13bcf98c7a6605373bb7ee7f54ebe6b07c", "idplug_base": 8, "user_type": 3, "idunplug_base": 8, "travel_time": 7612873, "idunplug_station": 136, "ageRange": 5, "idplug_station": 108, "unplug_hourTime": "2020-03-17T18:00:00Z", "zip_code": "28022"}\n'

In [109]:
line #una linea del json (se define más abajo)

'{"_id": {"$oid": "5eeca06d618046678f83b07a"}, "user_day_code": "a880483c5b4c4a773bd66e0a353ccb13bcf98c7a6605373bb7ee7f54ebe6b07c", "idplug_base": 8, "user_type": 3, "idunplug_base": 8, "travel_time": 7612873, "idunplug_station": 136, "ageRange": 5, "idplug_station": 108, "unplug_hourTime": "2020-03-17T18:00:00Z", "zip_code": "28022"}\n'

### MEDIA DE USUARIOS POR FECHAS. 
Vamos a calcular la media de uso del bicimad comparando entre días laborales y festivos.
Entonces, hallamos los siguientes datos:
- Obtenemos el par (día, usuario) y convertimos el día de str a datetime.
- Agrupamos por día.
- Calculamos el número de usuarios por día.
- Comparamos entre laboral y festivo y los agrupamos.
- Calculamos la media de usuarios por tipo de día.
- Devolvemos un diccionario con estrucutra:  clave: si es festivo, valor: media de usuarios por día.
- Finalmente se calcula el porcentaje de variación entre festivos y laborables.




In [23]:
#Primero obtenemos el par (día, usuario), convertimos el día para que sea de tipo datetime y no string:
from datetime import datetime

def tupla_dia_usuario(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
    line = json.loads(line)
    user = line['user_day_code']
    day = line['unplug_hourTime'][0:10]
    return fecha_no_string(day), user

In [24]:
tupla_dia_usuario(line)

(datetime.date(2020, 3, 17),
 'a880483c5b4c4a773bd66e0a353ccb13bcf98c7a6605373bb7ee7f54ebe6b07c')

In [25]:
# Calculamos bicimaniacos por día, decidimos si ese día es festivo, agrupamos por claves (tipo de día), calculamos la
#media y devolvemos la media en días laborables y festivos:

uso_medio = text.map(tupla_dia_usuario)\
  .groupByKey()\
  .mapValues(set)\
  .mapValues(len)\
  .map(lambda x : (x[0] in lista_vacaciones, x[1]))\
  .groupByKey()\
  .mapValues(st.mean)\
  .collectAsMap()

print(f"De media en laborables {round(uso_medio[False], 2)} bicimaniacos.")
print(f"En días festivos hay de media {round(uso_medio[True] ,2)} bicimaniacos.")
print(f"Así, se tiene que de media los bicimaniacos utilizan más la biciMAD en días \
laborables un {round(uso_medio[False]/uso_medio[True]*100 -100 ,2)}% más que \
en días festivos.")

De media en laborables 5544.17 bicimaniacos.
En días festivos hay de media 3383.8 bicimaniacos.
Así, se tiene que de media los bicimaniacos utilizan más la biciMAD en días laborables un 63.84% más que en días festivos.


### COMPARACIÓN DE USOS EN DÍAS POR FRECUENCIAS. 
Vamos a comparar los días de mayor y menos uso del bicimad distinguendo si los días son laborales o festivos.
Entonces hallamos los siguientes datos:
- Obtenemos la fecha y la pasamos a formato no str.
- Agrupamos por fecha.
- Ponemos los datos de la forma: (Es festivo, día, nº uso).
- Separamos entre días festivos y laborales.
- Ordenamos los resultados de la forma:(día, nº uso).


In [26]:
def tupla_dia(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
      data = json.loads(line)
      day = data['unplug_hourTime'][0:10]
      return fecha_no_string(day), 1


In [27]:
result = text.map(tupla_dia)\
  .groupByKey()\
  .mapValues(sum)\
  .map(lambda x : (x[0] in lista_vacaciones,(x[0], x[1])))

laboral = sorted(result.filter(lambda x: x[0] == False)\
  .map(lambda x : (x[1][0], x[1][1]))\
  .collectAsMap()\
  .items() , key=lambda x:x[1], reverse=True)

fiesta = sorted(result.filter(lambda x: x[0] == True)\
  .map(lambda x : (x[1][0], x[1][1]))\
  .collectAsMap()\
  .items(), key=lambda x:x[1], reverse=True)

In [28]:
print("------------------------------------ \t ------------------------------------")
print("Top 5 días más frecuentes en laboral \t Top 5 días más frecuentes en festivo")
print("------------------------------------ \t ------------------------------------")
for i in range(5):
  print(f"\t{str(laboral[i][0])}: {laboral[i][1]}   \t\t\t {str(fiesta[i][0])}: {fiesta[i][1]}")
print("-------------------------------------- \t ---------------------------------------")
print("Top 5 días menos frecuentes en laboral \t Top 5 días menos frecuentes en festivo")
print("-------------------------------------- \t ---------------------------------------")
for i in range(5):
  print(f"\t{str(laboral[-(i+1)][0])}: {laboral[-(i+1)][1]}   \t\t\t {str(fiesta[-(i+1)][0])}: {fiesta[-(i+1)][1]}")
print("-------------------------------------- \t ---------------------------------------")


------------------------------------ 	 ------------------------------------
Top 5 días más frecuentes en laboral 	 Top 5 días más frecuentes en festivo
------------------------------------ 	 ------------------------------------
	2020-03-04: 14230   			 2020-03-07: 9059
	2020-03-10: 13705   			 2020-03-08: 8292
	2020-03-05: 13364   			 2020-03-01: 6622
	2020-03-06: 13129   			 2020-03-14: 2601
	2020-03-09: 12949   			 2020-03-15: 1096
-------------------------------------- 	 ---------------------------------------
Top 5 días menos frecuentes en laboral 	 Top 5 días menos frecuentes en festivo
-------------------------------------- 	 ---------------------------------------
	2020-03-17: 163   			 2020-03-15: 1096
	2020-03-16: 301   			 2020-03-14: 2601
	2020-03-13: 7773   			 2020-03-01: 6622
	2020-03-02: 9955   			 2020-03-08: 8292
	2020-03-12: 9971   			 2020-03-07: 9059
-------------------------------------- 	 ---------------------------------------


### COMPARACIÓN ESTACIONES FRECUENTES.
Vamos a comparar las estaciones más y menos utilizadas, para esto vamos a diferenciar entre desenganchar y enganchar una bicicleta en días laborales y festivos.



## Caso desenganche:
- Obtenemos el par (estación desenganche, día).
- Comprobamos si el día es festivo.
- Contamos cuantas veces se ha utilizado la estación cada día y clasificamos entre las 5 que más y las 5 que menos se han utlizado.

In [29]:
def tupla_dia_desengancha(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
      data = json.loads(line)
      day = data['unplug_hourTime'][0:10]
      unplugStation = data['idunplug_station']
      return unplugStation, fecha_no_string(day)

In [30]:
dias_desengancha = text.map(tupla_dia_desengancha)\
  .map(lambda x: (x[0], festivo(x[1])))\
  .groupByKey()\
  .mapValues(Counter)

In [31]:
laboral_desengancha = dias_desengancha.map(lambda x: (x[0],x[1][False]))
laboral_desengancha_mayor_top5 = laboral_desengancha.takeOrdered(5, lambda x: -x[1])
laboral_desengancha_menor_top5 = laboral_desengancha.takeOrdered(5,lambda x: x[1])


In [32]:
fiesta_desengancha = dias_desengancha.map(lambda x: (x[0], x[1][True]))
fiesta_desengancha_mayor_top5 = fiesta_desengancha.takeOrdered(5, lambda x: -x[1])
fiesta_desengancha_menor_top5 = fiesta_desengancha.takeOrdered(5, lambda x: x[1])

In [33]:
print("-------------------------------------------------\t --------------------------------------------------")
print("Top 5 estaciones desengancho más frecuentes en laboral \t Top 5 estaciones desengancho más frecuentes en festivo")
print("------------------------------------------------- \t -------------------------------------------------")
for i in range(len(laboral_desengancha_mayor_top5)):
  print(f"\t{laboral_desengancha_mayor_top5[i][0]}:\
   {laboral_desengancha_mayor_top5[i][1]}  \t\t\t\t\t\t\
   {fiesta_desengancha_mayor_top5[i][0]}:\
   {fiesta_desengancha_mayor_top5[i][1]}")
print("--------------------------------------------------- \t ---------------------------------------------------")
print("Top 5 estaciones desengancho menos frecuentes en laboral \t Top 5 estaciones desengancho menos frecuentes en festivo")
print("--------------------------------------------------- \t ---------------------------------------------------")
for i in range(len(laboral_desengancha_menor_top5 )):
  print(f"\t{laboral_desengancha_menor_top5 [i][0]}:\
   {laboral_desengancha_menor_top5 [i][1]}  \t\t\t\t\t\t\
   {fiesta_desengancha_menor_top5[i][0]}:{fiesta_desengancha_menor_top5[i][1]}")
print("--------------------------------------------------- \t ---------------------------------------------------")

-------------------------------------------------	 --------------------------------------------------
Top 5 estaciones desengancho más frecuentes en laboral 	 Top 5 estaciones desengancho más frecuentes en festivo
------------------------------------------------- 	 -------------------------------------------------
	175:   1603  						   57:   481
	43:   1382  						   43:   460
	57:   1359  						   175:   441
	163:   1219  						   132:   383
	149:   1142  						   90:   303
--------------------------------------------------- 	 ---------------------------------------------------
Top 5 estaciones desengancho menos frecuentes en laboral 	 Top 5 estaciones desengancho menos frecuentes en festivo
--------------------------------------------------- 	 ---------------------------------------------------
	29:   117  						   28:23
	28:   133  						   144:33
	173:   176  						   173:34
	144:   201  						   93:36
	119:   221  						   191:36
--------------------------------------------------

## Caso enganche:
- Obtenemos el par (estación enganche, día).
- Comprobamos si el día es festivo.
- Contamos cuantas veces se ha utilizado la estación cada día y clasificamos entre las 5 que más y las 5 que menos se han utlizado.

In [34]:
def tupla_dia_engancha(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
      data = json.loads(line)
      day = data['unplug_hourTime'][0:10]
      plugStation = data['idplug_station']
      return plugStation, fecha_no_string(day)

In [35]:
dias_engancha = text.map(tupla_dia_engancha)\
  .map(lambda x: (x[0], festivo(x[1])))\
  .groupByKey()\
  .mapValues(Counter)

In [36]:
dia_laboral_engancha = dias_engancha.map(lambda x: (x[0],x[1][False]))
dia_laboral_engancha_mayortop5 = dia_laboral_engancha.takeOrdered(5, lambda x: -x[1])
dia_laboral_engancha_menortop5 = dia_laboral_engancha.takeOrdered(5,lambda x: x[1])


dias_fiesta_engancha = dias_engancha.map(lambda x: (x[0], x[1][True]))
dias_fiesta_engancha_mayortop5 = dias_fiesta_engancha.takeOrdered(5, lambda x: -x[1])
dias_fiesta_engancha_menortop5 = dias_fiesta_engancha.takeOrdered(5, lambda x: x[1])

In [37]:
print("-------------------------------------------------\t --------------------------------------------------")
print("Top 5 estaciones plug más frecuentes en laboral \t Top 5 estaciones plug más frecuentes en festivo")
print("------------------------------------------------- \t -------------------------------------------------")
for i in range(len(dia_laboral_engancha_mayortop5)):
  print(f"\t{dia_laboral_engancha_mayortop5[i][0]}:\
   {dia_laboral_engancha_mayortop5[i][1]}  \t\t\t\t\t\t\
   {dias_fiesta_engancha_mayortop5[i][0]}:\
   {dias_fiesta_engancha_mayortop5[i][1]}")
print("--------------------------------------------------- \t ---------------------------------------------------")
print("Top 5 estaciones plug menos frecuentes en laboral \t Top 5 estaciones plug menos frecuentes en festivo")
print("--------------------------------------------------- \t ---------------------------------------------------")
for i in range(len(dia_laboral_engancha_menortop5)):
  print(f"\t{dia_laboral_engancha_menortop5[i][0]}:\
   {dia_laboral_engancha_menortop5[i][1]}  \t\t\t\t\t\t\
   {dias_fiesta_engancha_menortop5[i][0]}: {dias_fiesta_engancha_menortop5[i][1]}")
print("--------------------------------------------------- \t ---------------------------------------------------")

-------------------------------------------------	 --------------------------------------------------
Top 5 estaciones plug más frecuentes en laboral 	 Top 5 estaciones plug más frecuentes en festivo
------------------------------------------------- 	 -------------------------------------------------
	175:   1616  						   43:   458
	43:   1427  						   57:   456
	57:   1381  						   175:   448
	163:   1244  						   132:   398
	149:   1137  						   90:   312
--------------------------------------------------- 	 ---------------------------------------------------
Top 5 estaciones plug menos frecuentes en laboral 	 Top 5 estaciones plug menos frecuentes en festivo
--------------------------------------------------- 	 ---------------------------------------------------
	29:   119  						   28: 17
	28:   123  						   144: 30
	173:   183  						   29: 33
	144:   200  						   147: 34
	88:   207  						   173: 35
--------------------------------------------------- 	 --------------------

## Top 5 trayectos más repetidos
Queremos dar los cinco trayectos que más se repiten en nuestros datos de bicimad.
Entonces hallamos lo siguiente:
- Obtenemos la tupla (día, (estación desenganche,estación enganche, duración del trayecto)).
- Descartamos los trayectos demasiado cortos (duración menos de un minuto).
- Calculamos cuantas veces se ha hecho el trayecto en día laboral y en día festivo.

Para hallar los 5 más frecuentes:
- Hacemos un map (uno para día laboral y otro para día festivo) que tome el par de estaciones que define el trayecto y se toman los 5 con valores más altos.


In [38]:
def tupla_timepoEnDia(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
    line = json.loads(line)
    day = line['unplug_hourTime'][0:10]
    unplug_st = line['idunplug_station']
    plug_st = line['idplug_station']
    travel_time = line['travel_time']
    return fecha_no_string(day), (unplug_st, plug_st, travel_time)

In [39]:
viajes = text.map(tupla_timepoEnDia)\
  .filter(lambda x: (x[1][0]!=x[1][1]) | (x[1][2]>60))\
  .map(lambda x: ((x[1][0],x[1][1]), festivo(x[0])))\
  .groupByKey()\
  .mapValues(Counter)

viajes_laboral_top5 = viajes.map(lambda x: (x[0], x[1][False]))\
  .takeOrdered(5, lambda x: -x[1])

viajes_fiesta_top5 = viajes.map(lambda x: (x[0], x[1][True]))\
  .takeOrdered(5, lambda x: -x[1])

In [40]:
print("----------------------------------------- \t -----------------------------------------")
print("Top 5 trayectos más frecuentes en laboral \t Top 5 trayectos más frecuentes en festivo")
print("----------------------------------------- \t -----------------------------------------")
for i in range(5):
  print(f"\t{viajes_laboral_top5[i][0]}:\
   {viajes_laboral_top5[i][1]} \t\t\t\t\
   {viajes_fiesta_top5[i][0]}:\
   {viajes_fiesta_top5[i][1]}")
print("----------------------------------------- \t -----------------------------------------")

----------------------------------------- 	 -----------------------------------------
Top 5 trayectos más frecuentes en laboral 	 Top 5 trayectos más frecuentes en festivo
----------------------------------------- 	 -----------------------------------------
	(9, 149):   88 				   (132, 132):   57
	(130, 149):   69 				   (64, 64):   36
	(64, 75):   66 				   (175, 175):   31
	(64, 78):   63 				   (43, 43):   28
	(175, 27):   62 				   (6, 6):   26
----------------------------------------- 	 -----------------------------------------


## Número de viajes por usuario


Queremos saber cuantos trayectos se realizan de meda por día.
Analizamos los casos.

### Sin tener en cuenta el tipo de usuario

- Obtenemos el par (día, usuario) y calculamos el número de trayectos realizados por día y usuario.
- Tomamos el día y el número de trayectos hechos y hacemos la media.
- Separamos entre días festivos y laborales y hacemos la media por tipo de día.

In [41]:
def tupla_dia_usuario(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
    line = json.loads(line)
    user = line['user_day_code']
    day = line['unplug_hourTime'][0:10]
    return fecha_no_string(day), user

In [42]:
media_viajes = text.map(tupla_dia_usuario)\
  .map(lambda x : ((x[0], x[1]), 1))\
  .groupByKey()\
  .mapValues(sum)\
  .map(lambda x: (x[0][0], x[1]))\
  .groupByKey()\
  .mapValues(st.mean)\
  .map(lambda x: (festivo (x[0]), x[1]))\
  .groupByKey()\
  .mapValues(st.mean)\
  .collectAsMap()

In [43]:
print(f"En laboral, cada usuario, de media realiza {round(media_viajes[False] ,2)} viajes")
print(f"En festivo, cada usuario, de media realiza {round(media_viajes[True] ,2)} viajes")

En laboral, cada usuario, de media realiza 1.9 viajes
En festivo, cada usuario, de media realiza 1.64 viajes


### Teniendo en cuenta el tipo de usuario

- Obtenemos la tupla (día (en formato datetime), usuario, tipo usuario).
- Calculamos el número de trayectos por día y por tipo de usuario.
- Separamos entre día laboral y día festivo y realizamos la media por cada tipo de día.

In [44]:
def tupla_dia_tipoUsuario(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
      data = json.loads(line)
      day = data['unplug_hourTime'][0:10]
      user_type = data['user_type']
      user_id = data['user_day_code']
      return (fecha_no_string(day), user_id, user_type) , 1

In [45]:
media_tipoUsuario =text.map(tupla_dia_tipoUsuario)\
  .reduceByKey(lambda x,y: x+y)\
  .map(lambda x: ((x[0][0], x[0][2]), x[1]))\
  .groupByKey()\
  .mapValues(st.mean)\
  .map(lambda x: ((festivo(x[0][0]), x[0][1]), x[1]))\
  .groupByKey()\
  .mapValues(st.mean)\
  .collectAsMap()

In [46]:
media_tipoUsuario

{(False, 2): 1.7574731575275053,
 (True, 1): 1.5174254255556958,
 (True, 3): 16.252832991569832,
 (True, 2): 1.6728087579870476,
 (False, 1): 1.6050298922819155,
 (False, 3): 21.699455112585596}

In [47]:
print("Las medias de usos por día del servicio por cada tipo de usuario son\
 las siguientes:")
print("     Laborable         Festivo")
print("  ---------------    ----------------")
#print(f"0:\t{round(mean_type[(False, 0)],1)} \t\t\
#   {round(mean_type[(True, 0)],1)} ")
print(f"1:\t{round(media_tipoUsuario[(False, 1)],1)} \t\t\
   {round(media_tipoUsuario[(True, 1)],1)} ")
print(f"2:\t{round(media_tipoUsuario[(False, 2)],1)} \t\t\
   {round(media_tipoUsuario[(True, 2)],1)} ")
print(f"3:\t{round(media_tipoUsuario[(False, 3)],1)} \t\t\
   {round(media_tipoUsuario[(True, 3)],1)} ")


Las medias de usos por día del servicio por cada tipo de usuario son las siguientes:
     Laborable         Festivo
  ---------------    ----------------
1:	1.6 		   1.5 
2:	1.8 		   1.7 
3:	21.7 		   16.3 


# Usuario más frecuente

Queremos ver el tipo de usuario que más utiliza el servicio bicimad comparando entre días laborales y festivos.
Para esto hacemos lo siguiente:
- Obtenemos el par (día (en formato datetime),(tipo de usuariom, ID del usuario)).
- Vamos a considerar tan soo el día y el tipo de usuario (1, 2 o 3).
- Separamos entre día laboral y día festivo y con el tipo de usuario calculamos el número de veces que se usa el servicio.

Lo mostramos en formato diccionario con clave booleana para indicar si laboral o festivo y diccionario con valor asociado con claves al tipo de usuario y al número de veces utilizado pot cada tipo de usuario en día laboral o festivo.


In [48]:
def tupla_dia_tipoUsuario_codigoUsuario(line):
 with open(fichero, 'r', errors='replace') as f:
  for data in f:
    line = json.loads(line)
    user_type = line['user_type']
    user_code = line['user_day_code']
    day = line['unplug_hourTime'][0:10]
    return fecha_no_string(day), (user_type,user_code)

In [49]:
tipos_usuario = text.map(tupla_dia_tipoUsuario_codigoUsuario)\
                  .map(lambda x: ((x[0],x[1][0]), x[1][1]))\
                  .groupByKey()\
                  .mapValues(lambda x: len(set(x)))\
                  .map(lambda x: ((festivo(x[0][0]), x[0][1]), x[1]))\
                  .reduceByKey(lambda x,y: x+y)\
                  .map(lambda x: (x[0][0],(x[0][1],x[1])))\
                  .groupByKey()\
                  .mapValues(dict)\
                  .collectAsMap()

In [50]:
print(f"En laborales, obtenemos el siguiente orden de tipos de usuario (de \
más a menos usos):")
tipos_ordenados = "  "
for pair in sorted(tipos_usuario[False].items(), key = lambda x: x[1], reverse = True):
  tipos_ordenados += str(pair[0]) + ", "
print(tipos_ordenados[:-2])

print(f"En festivos, obtenemos el siguiente orden de tipos de usuario (de \
más a menos usos):")
tipos_ordenados = "  "
for pair in sorted(tipos_usuario[True].items(), key = lambda x: x[1], reverse = True):
  tipos_ordenados += str(pair[0]) + ", "
print(tipos_ordenados[:-2])

En laborales, obtenemos el siguiente orden de tipos de usuario (de más a menos usos):
  1, 3, 2
En festivos, obtenemos el siguiente orden de tipos de usuario (de más a menos usos):
  1, 2, 3


# Número total de bucles

Entendemos po ciclo una secuencia de trayectos tal que el final de uno y el inicio del siguiente coinciden. De manera que se repite una estación en la concatenación de viajes.

Mostramos el siguiente ejemplo:

[0,1], [1,8], [4,7], [2,8], [8,2], [2,3], [6,7]

sería:

[0,1,8], [4,7], [2,8,2,3], [6,7]

Y solo hay un ciclo, dado en:

[2,8,2,3]


Definiremos funciones:


*   pares_concatenados: Si es posible concatena pares de trayectos. Si no es posble devuelve la lista vacía.
*   trayectos_concatenados: Dada una lista (secuencia) de trayectos, concatena si es posible los trayectos.
*   numero_ciclos: Cuenta los ciclos que hay en una secuencia de trayectos ya concatenados.

In [51]:
#CONCATENA PARES DE VIAJES SI ES POSIBLE; SI NO DEVUELVE BOOL FALSE
def pares_concatenados(list1, list2):
  if list1[-1] == list2[0]:
    return list1 + list2[1:]
  else:
     return []

#DADA UNA LISTA CON PARES (salida, llegada) CONCATENA LOS VIAJES SI ES POSIBLES
def trayectos_concatenados(lista):
  concat = []
  i = 0
  while i < len(lista):

    if i < len(lista)-1:
      union = pares_concatenados(lista[i], lista[i+1])
      if  union == []:
        if len(lista[i]) > 2:          
          concat.append(lista[i])
        else:
          if lista[i][0] == lista[i][1]:
            concat.append(lista[i])
      else:
        lista[i+1] = union
        if i + 1 == len(lista) - 1:
          concat.append(lista[i+1])
    else:
      if lista[i][0] == lista[i][1]:
        concat.append(lista[i])
    i = i + 1    
  return concat

#CUENTA LOS CICLOS 
def numero_ciclos(lista):
  contador = 0
  lista = trayectos_concatenados(lista)
  for elemento in lista:
    num_ciclos = 0
    vistos = []
    for i in elemento:
      if i in vistos:
        num_ciclos = num_ciclos + 1
      else:
        vistos.append(i)
    contador = contador + num_ciclos
  return contador

In [52]:
#Ejemplo
numero_ciclos([[1,3],[3,1],[1,4],[2,2],[1,5],[5,9]])

2

In [53]:
#Ejemplo
trayectos_concatenados([[1,3],[3,1],[1,4],[2,2],[1,5],[5,9]])

[[1, 3, 1, 4], [2, 2], [1, 5, 9]]

Ahora, queremos hallar los ciclos que hay en nuestros datos.
- Calculamos el par (día, ID del usuario, [salida, llegada]).


In [54]:
def tupla_dias_idUsuarioViaje(line):
    data = json.loads(line)
    uid = data['user_day_code']     
    day = data['unplug_hourTime'][0:10]
    start = data['idunplug_station']
    end = data['idplug_station'] 
    return  fecha_no_string(day), uid, [start, end]

In [55]:
data = json.loads(line)
data['unplug_hourTime'][0:10]

'2020-03-17'

Definimos los siguientes funciones:
* crea: toma como input una lista de listas del tipo [usuario,[salida, llegada]] y junta los trayectos hechos por el mismo usuario, devolviendo la información en un diccionario.
* suma_ciclos: toma un diccionario tipo clave: usuario y valor asociado a una lista de trayectos y obtiene el número total de ciclos.


In [56]:
def crea(lista):
  dic = dict()
  i = 0
  while i < len(lista):
    if lista[i][0] in dic:
      dic[lista[i][0]].append(lista[i][1])
    else:
      dic[lista[i][0]] = [lista[i][1]]
    i = i+1
  return dic

In [57]:
def suma_ciclos(lista_dic):
  suma = 0
  for i in lista_dic:
    suma = suma + numero_ciclos(lista_dic[i])
  return suma

- Obtenemos la tupla (día, ID del usuario, [salida,llegada]) que hemos calculado más arriba.
- Clasificamos entre días laborales y días festivos, y agrupamos.
- Dentro de cada sección (laboral, festivo) agrupamos los trayectos hechos por el mismo usuario con la función anteriormente definida, crea.
- Contamos los ciclos realizados por cada usuario y los sumamos.


In [58]:
ciclos = text.map(tupla_dias_idUsuarioViaje)\
.map(lambda x: ((festivo(x[0]), (x[1], x[2]))))\
.groupByKey()\
.map(lambda x: (x[0], crea(list(x[1]))))\
.map(lambda x: (x[0], suma_ciclos(x[1])))

In [59]:
print('Ciclos realizados en días festivos: ' + str(ciclos.collect()[1][1]))

print('Ciclos realizados en días laborables: ' + str(ciclos.collect()[0][1]))

Ciclos realizados en días festivos: 4107
Ciclos realizados en días laborables: 19780


ÁNALISIS DE UNA SEMANA 

In [60]:
ruta="/home/mat/Escritorio/datasets"
Umbral_estaciones = 73 
Umbral_distritos = 738 

In [61]:
import calendar
año= input("Escribe 2018,2019 o 2020 para seleccionar el año sobre el que quieres realizar el estudio:")
mes = int(input("Escribe el número del mes sobre el que quieres realizar el estudio: "))
dia_inicial = int(input("Introduzca el primer día de la semana a analizar,entre 1 y 22:"))
dia_final = dia_inicial+6
nombre_mes = calendar.month_name[mes]
print(f"Analizaremos la semana del {dia_inicial} al {dia_final} del mes {nombre_mes} y año {año}")

Escribe 2018,2019 o 2020 para seleccionar el año sobre el que quieres realizar el estudio:2020
Escribe el número del mes sobre el que quieres realizar el estudio: 3
Introduzca el primer día de la semana a analizar,entre 1 y 22:13
Analizaremos la semana del 13 al 19 del mes March y año 2020


Creamos un json de la semana elegida

In [62]:
import os

os.chdir(ruta)
#seleccionamos el fichero del mes en el que se encuentra la semana
if mes<10:
    fichero = ('202003_movements.json')
else:
    fichero = ('202003_movements.json')

In [63]:
#lista con las líneas del json de la semana
lista_json_semana=[]
with open(fichero, 'r', errors='replace') as f:
  for line in f:
    linea = json.loads(line)
    mes1=int(linea['unplug_hourTime'][5:7]) #sacamos el mes del json
    dia1=int(linea['unplug_hourTime'][8:10]) #sacamos el día del json  
    if mes1 == mes:
      if dia_inicial<= dia1 and dia1 <= dia_final: #vemos que se encuentre dentro de los parámentros
        lista_json_semana.append(line)

In [64]:
lista_json_semana

['{"_id": {"$oid": "5eec831756bdeb043ff16499"}, "user_day_code": "80b68d43f8fff7de897a80f92b3364b57b89aa0ea40e2db68d7d4a43bb48490f", "idplug_base": 2, "user_type": 1, "idunplug_base": 27, "travel_time": 282, "idunplug_station": 35, "ageRange": 5, "idplug_station": 16, "unplug_hourTime": "2020-03-13T00:00:00Z", "zip_code": "28012"}\n',
 '{"_id": {"$oid": "5eec831756bdeb043ff1649a"}, "user_day_code": "70837e26eca3efe50a5f8f37136e2099b6629c360d0e7b391aa4a8483ba63310", "idplug_base": 19, "user_type": 1, "idunplug_base": 14, "travel_time": 131, "idunplug_station": 143, "ageRange": 0, "idplug_station": 142, "unplug_hourTime": "2020-03-13T00:00:00Z", "zip_code": ""}\n',
 '{"_id": {"$oid": "5eec831756bdeb043ff1649b"}, "user_day_code": "bb48942d6a1f87097aac8011b416a9ece668d536f7a416946a342217885c21c6", "idplug_base": 25, "user_type": 1, "idunplug_base": 9, "travel_time": 450, "idunplug_station": 13, "ageRange": 0, "idplug_station": 218, "unplug_hourTime": "2020-03-13T00:00:00Z", "zip_code": ""}

In [65]:
rdd_semana = sc.parallelize(lista_json_semana)

Usos estaciones semana elegida

In [76]:
from datetime import datetime

def salidas(line):
  """
  Sacamos solo la fecha y las estaciones de salida y llegada.
  Y la clave será el par (fecha, estación de salida)
  """
  linea = json.loads(line)
  start = linea['idunplug_station']
  end = linea['idplug_station']
  date_ = linea['unplug_hourTime'][0:10]
  date = datetime.strptime(date_, '%Y-%m-%d').weekday()
  return (date, start), end

def llegadas(line):
  """
  Sacamos solo la fecha y las estaciones de salida y llegada.
  Y la clave será el par (fecha, estación de llegada)
  """
  linea = json.loads(line)
  start = linea['idunplug_station']
  end = linea['idplug_station']
  date_ = linea['unplug_hourTime'][0:10]
  date = datetime.strptime(date_, '%Y-%m-%d').weekday()
  return (date, end), start

In [74]:
line

'{"_id": {"$oid": "5eeca06d618046678f83b07a"}, "user_day_code": "a880483c5b4c4a773bd66e0a353ccb13bcf98c7a6605373bb7ee7f54ebe6b07c", "idplug_base": 8, "user_type": 3, "idunplug_base": 8, "travel_time": 7612873, "idunplug_station": 136, "ageRange": 5, "idplug_station": 108, "unplug_hourTime": "2020-03-17T18:00:00Z", "zip_code": "28022"}\n'

In [77]:
salidas(line)

((1, 136), 108)

In [78]:
llegadas(line)

((1, 108), 136)

In [79]:
viajes1 = rdd_semana.map(salidas).map(lambda x: (x[0], 1)).reduceByKey(add) 
lista1 = viajes1.collect()
lista_salidas = list(map(lambda x:[x[0][1],x[0][0],x[1]], lista1))  #lista con: estaciones salidas,día(del 0 al 6), estaciones llegadas


viajes2 = rdd_semana.map(llegadas).map(lambda x: (x[0], 1)).reduceByKey(add) 
lista2 = viajes2.collect()
lista_llegadas = list(map(lambda x:[x[0][1],x[0][0],x[1]], lista2)) #lista con: estaciones llegadas,día(del 0 al 6), estaciones salidas

salidas = rdd_semana.map(salidas)
viajes_totales = rdd_semana.map(llegadas).cogroup(salidas).map(lambda x: (x[0], len(list(x[1][0])) + len(list(x[1][1])) ) ).map(lambda x:((x[0][1],x[0][0]),x[1])) 
lista_totales = viajes_totales.collect()
viajes_totales_semana = sc.parallelize(lista_totales) #rdd de viajes totales,con salidas y llegadas

In [80]:
lista_salidas

[[35, 4, 19],
 [143, 4, 20],
 [13, 4, 48],
 [60, 4, 21],
 [107, 4, 16],
 [114, 4, 36],
 [59, 4, 63],
 [30, 4, 48],
 [163, 4, 84],
 [150, 4, 17],
 [9, 4, 52],
 [39, 4, 14],
 [27, 4, 30],
 [109, 4, 17],
 [162, 4, 45],
 [95, 4, 52],
 [10, 4, 25],
 [195, 4, 21],
 [134, 4, 39],
 [124, 4, 21],
 [21, 4, 35],
 [87, 4, 23],
 [89, 4, 37],
 [62, 4, 57],
 [122, 4, 27],
 [19, 4, 56],
 [96, 4, 36],
 [57, 4, 112],
 [189, 4, 53],
 [48, 4, 58],
 [115, 4, 34],
 [50, 4, 47],
 [81, 4, 49],
 [202, 4, 33],
 [90, 4, 72],
 [161, 4, 72],
 [63, 4, 26],
 [142, 4, 38],
 [157, 4, 58],
 [16, 4, 32],
 [77, 4, 49],
 [206, 4, 21],
 [207, 4, 38],
 [192, 4, 32],
 [91, 4, 47],
 [113, 4, 36],
 [45, 4, 69],
 [179, 4, 36],
 [128, 4, 69],
 [133, 4, 38],
 [82, 4, 43],
 [187, 4, 72],
 [185, 4, 66],
 [197, 4, 33],
 [196, 4, 28],
 [93, 4, 24],
 [58, 4, 54],
 [182, 4, 52],
 [7, 4, 23],
 [12, 4, 24],
 [36, 4, 18],
 [53, 4, 72],
 [215, 4, 18],
 [25, 4, 22],
 [131, 4, 52],
 [37, 4, 27],
 [56, 4, 34],
 [101, 4, 32],
 [85, 4, 22],
 [1

In [81]:
lista_llegadas

[[16, 4, 26],
 [142, 4, 44],
 [218, 4, 21],
 [132, 4, 67],
 [19, 4, 60],
 [206, 4, 25],
 [203, 4, 27],
 [168, 4, 69],
 [213, 4, 43],
 [143, 4, 22],
 [106, 4, 38],
 [215, 4, 16],
 [47, 4, 23],
 [122, 4, 26],
 [21, 4, 36],
 [56, 4, 32],
 [197, 4, 36],
 [13, 4, 48],
 [187, 4, 69],
 [175, 4, 101],
 [115, 4, 42],
 [130, 4, 30],
 [15, 4, 18],
 [183, 4, 64],
 [139, 4, 58],
 [82, 4, 39],
 [196, 4, 24],
 [89, 4, 26],
 [88, 4, 8],
 [35, 4, 19],
 [7, 4, 20],
 [199, 4, 33],
 [83, 4, 61],
 [91, 4, 52],
 [57, 4, 107],
 [128, 4, 63],
 [174, 4, 13],
 [81, 4, 47],
 [51, 4, 32],
 [177, 4, 56],
 [43, 4, 77],
 [149, 4, 85],
 [216, 4, 26],
 [181, 4, 54],
 [207, 4, 45],
 [135, 4, 90],
 [49, 4, 60],
 [125, 4, 35],
 [200, 4, 32],
 [102, 4, 31],
 [117, 4, 22],
 [189, 4, 53],
 [80, 4, 33],
 [170, 4, 57],
 [185, 4, 60],
 [76, 4, 32],
 [95, 4, 57],
 [164, 4, 53],
 [217, 4, 16],
 [211, 4, 27],
 [147, 4, 15],
 [93, 4, 21],
 [25, 4, 25],
 [194, 4, 19],
 [46, 4, 50],
 [75, 4, 34],
 [184, 4, 48],
 [208, 4, 93],
 [116,

In [82]:
lista_totales

[((16, 4), 58),
 ((142, 4), 82),
 ((218, 4), 40),
 ((132, 4), 135),
 ((206, 4), 46),
 ((168, 4), 142),
 ((106, 4), 72),
 ((122, 4), 53),
 ((56, 4), 66),
 ((130, 4), 60),
 ((82, 4), 82),
 ((196, 4), 52),
 ((88, 4), 26),
 ((128, 4), 132),
 ((174, 4), 34),
 ((216, 4), 56),
 ((200, 4), 64),
 ((102, 4), 55),
 ((80, 4), 63),
 ((170, 4), 108),
 ((76, 4), 65),
 ((164, 4), 102),
 ((194, 4), 33),
 ((46, 4), 104),
 ((184, 4), 94),
 ((208, 4), 183),
 ((116, 4), 34),
 ((94, 4), 73),
 ((210, 4), 52),
 ((124, 4), 43),
 ((136, 4), 77),
 ((114, 4), 75),
 ((38, 4), 109),
 ((214, 4), 56),
 ((26, 4), 104),
 ((204, 4), 103),
 ((28, 4), 36),
 ((62, 4), 118),
 ((172, 4), 84),
 ((18, 4), 84),
 ((166, 4), 61),
 ((20, 4), 68),
 ((48, 4), 104),
 ((58, 4), 104),
 ((52, 4), 91),
 ((120, 4), 23),
 ((8, 4), 57),
 ((176, 4), 55),
 ((50, 4), 87),
 ((190, 4), 47),
 ((84, 4), 84),
 ((64, 4), 109),
 ((30, 4), 94),
 ((86, 4), 61),
 ((14, 4), 80),
 ((212, 4), 74),
 ((154, 4), 54),
 ((44, 4), 69),
 ((118, 4), 69),
 ((162, 4

In [83]:
viajes_totales_semana.take(1)

[((16, 4), 58)]

Dado umbral,estaciones distritos mas repreentativos

In [84]:
def create_list_media(rdd_lista):
  """"
  Devuelve una lista de listas
  valor1: 0 si es lunes, 1 si es martes, etc.
  valor2: lista con nº estacion/nombre del distrito y nº de viajes que ha hecho maas que la media.
  """
  i = 0
  lista=[]
  lunes=[]
  martes=[]
  miercoles=[]
  jueves=[]
  viernes=[]
  sabado=[]
  domingo=[]
  while i < len(rdd_lista):
    dia = rdd_lista[i][0][1]
    estacion = rdd_lista[i][0][0]
    diferencia_viajes = round(rdd_lista[i][1][0][0] - rdd_lista[i][1][1][0])
    if dia == 0:
      lunes.append([estacion,diferencia_viajes])
    elif dia == 1:
      martes.append([estacion,diferencia_viajes])
    elif dia == 2:
      miercoles.append([estacion,diferencia_viajes])
    elif dia == 3:
      jueves.append([estacion,diferencia_viajes])
    elif dia == 4:
      viernes.append([estacion,diferencia_viajes])
    elif dia == 5:
      sabado.append([estacion,diferencia_viajes])
    elif dia == 6:
      domingo.append([estacion,diferencia_viajes])
    i += 1
  i=0
  while i < 7:
    if i == 0:
      lista.append([i]+lunes)
    elif i == 1:
      lista.append([i]+martes)
    elif i == 2:
      lista.append([i]+miercoles)
    elif i == 3:
      lista.append([i]+jueves)
    elif i == 4:
      lista.append([i]+viernes)
    elif i == 5:
      lista.append([i]+sabado)
    elif i == 6:
      lista.append([i]+domingo)
    i += 1
  return lista

In [85]:
#json de medias de llegadas 
with open('/home/mat/Escritorio/uso_medio_llegadas.json','r') as fichero:
  lista_llegadas = json.load(fichero)   #id,weekday,media
#json de medias de salidas
with open('/home/mat/Escritorio/uso_medio_salidas.json','r') as fichero:
  lista_salidas = json.load(fichero)   #id,weekday,media
#diccionario {estación:distrito} . Lo usaremos para pasar de estación a distrito
with open('/home/mat/Escritorio/dictBarrio.json','r') as fichero:
   dic_distritos = json.load(fichero)  
    
rdd_media_llegadas = sc.parallelize(lista_llegadas)
rdd_media_salidas = sc.parallelize(lista_salidas)

In [86]:
rdd_media_llegadas.take(1)

[[189, 0, 46.666666666666664]]

In [87]:
rdd_media_salidas.take(1)

[[189, 0, 39.666666666666664]]

In [88]:
def aux1(line):
  """
  Parámetro: linea de la rdd
  Conserva la clave y cambia el valor a una lista de ese mismo valor
  Se utilizará en el estudio de estaciones
  """
  dia_estacion = line[0]
  cant = len(line[1])
  return dia_estacion,[cant]

In [89]:
def estacion_llegadas_dia(line):
  """
  Parámetro: linea de la rdd 
  Devuelve: estación de llegada y número que corresponde al día(lunes 0, martes1), 1 de viaje realizado
  """
  linea = json.loads(line)
  dia = int(linea['unplug_hourTime'][8:10])
  estacion = linea['idplug_station']
  if dia == dia_inicial:
    dia = 0
  elif dia == dia_inicial + 1:
    dia = 1
  elif dia == dia_inicial + 2:
    dia = 2
  elif dia == dia_inicial + 3:
    dia = 3
  elif dia == dia_inicial + 4:
    dia = 4
  elif dia == dia_inicial + 5:
    dia = 5
  elif dia == dia_final:
    dia = 6
  return (estacion,dia),1

In [90]:
estacion_llegadas_dia(line)

((108, 4), 1)

In [91]:
def estacion_salidas_dia(line):
  """
  Parámetro: linea de la rdd 
  Devuelve: estación de salida y número que corresponde al día(lunes 0, martes1), 1 de viaje realizado
  """
  linea = json.loads(line)
  dia = int(linea['unplug_hourTime'][8:10])
  estacion = linea['idunplug_station']
  if dia == dia_inicial:
    dia = 0
  elif dia == dia_inicial+1:
    dia = 1
  elif dia == dia_inicial+2:
    dia = 2
  elif dia == dia_inicial+3:
    dia = 3
  elif dia == dia_inicial + 4:
    dia = 4
  elif dia == dia_inicial +5:
    dia = 5
  elif dia == dia_final:
    dia = 6
  return (estacion,dia),1

In [92]:
estacion_salidas_dia(line)

((136, 4), 1)

In [93]:
def estacion_por_distrito(line):
  """
  Parámetro: linea de la rdd con key: (estación,dia) y value: media(rdd media)/1 (rdd semana), 
  Se cambia estación por distrito usando el diccionario
  """
  estacion = str(line[0][0])
  dia = line[0][1]
  media = line[1]
  if estacion == '80' or estacion == '1' or estacion == '111' or estacion == '116' or estacion == '25' or estacion == '128' or estacion == '90' or estacion == '161' or estacion=='20':
    if len(estacion) == 1:
      estacion = '00'+estacion
    elif len(estacion) == 2:
      estacion = '0'+estacion
    if estacion == '128' or estacion == '090' or estacion == '161' or estacion=='020':
      estacion = estacion + ' ampliacion'
    else:
      estacion = estacion + ' a'
  distrito = dic_distritos[estacion]
  return (distrito,dia),media

In [94]:
def media_medias_distritos(line):
  """
  Parámetro: linea de la rdd (rdd medias)
  hacemos media de las medias para ver la media de los distritos
  """
  estacion_dia = line[0]
  lista_de_medias = line[1]
  new_media=(sum(int(i) for i in lista_de_medias))/len(lista_de_medias)
  return estacion_dia,[new_media]

In [97]:
#llegadas por estacion
media_llegadas_estacion = rdd_media_llegadas.map(lambda x: ((x[0],x[1]),x[2])).groupByKey().mapValues(list)
estacion_dia_llegadas = rdd_semana.map(estacion_llegadas_dia).groupByKey().map(aux1)
llegada_estacion = estacion_dia_llegadas.join(media_llegadas_estacion).filter(lambda x: x[1][0][0] > x[1][1][0]+ Umbral_estaciones).collect()
lista_llegadas_estacion = create_list_media(llegada_estacion)
    
  #salidas por estacion
media_salidas_estacion = rdd_media_salidas.map(lambda x: ((x[0],x[1]),x[2])).groupByKey().mapValues(list)
estacion_dia_salidas = rdd_semana.map(estacion_salidas_dia).groupByKey().map(aux1)
salida_estacion = estacion_dia_salidas.join(media_salidas_estacion).filter(lambda x: x[1][0][0] > x[1][1][0]+Umbral_estaciones).collect()
lista_salidas_estacion = create_list_media(salida_estacion)

In [98]:
media_llegadas_estacion.take(1)

[((189, 0), [46.666666666666664])]

In [99]:
media_salidas_estacion.take(1)

[((189, 0), [39.666666666666664])]

In [100]:
#llegadas por distritos
distrito_llegadas = rdd_semana.map(estacion_llegadas_dia).filter(lambda x: x[0][0]!= 66 and x[0][0]!= 64 and x[0][0]!=21 and x[0][0]!=28 and x[0][0]< 212).map(estacion_por_distrito).groupByKey().map(aux1)
distrito_medias_llegadas=rdd_media_llegadas.map(lambda x: ((x[0],x[1]),x[2])).filter(lambda x: x[0][0]!= 66 and x[0][0]!= 64 and x[0][0]!=21 and x[0][0]!=28 and x[0][0]<212).map(estacion_por_distrito).groupByKey().mapValues(list).map(media_medias_distritos)
llegadas_distritos = distrito_llegadas.join(distrito_medias_llegadas).filter(lambda x: x[1][0][0] > x[1][1][0]+Umbral_distritos).collect()
lista_llegadas_distritos = create_list_media(llegadas_distritos)
    
  #salidas por distritos
distrito_salidas = rdd_semana.map(estacion_salidas_dia).filter(lambda x: x[0][0]!= 66 and x[0][0]!= 64 and x[0][0]!=21 and x[0][0]!=28 and x[0][0]<212).map(estacion_por_distrito).groupByKey().map(aux1)
distrito_medias_salidas=rdd_media_salidas.map(lambda x: ((x[0],x[1]),x[2])).filter(lambda x: x[0][0]!= 66 and x[0][0]!= 64 and x[0][0]!=21 and x[0][0]!=28 and x[0][0]<212).map(estacion_por_distrito).groupByKey().mapValues(list).map(media_medias_distritos)
salidas_distritos = distrito_salidas.join(distrito_medias_salidas).filter(lambda x: x[1][0][0] > x[1][1][0]+Umbral_distritos).collect()
lista_salidas_distritos=create_list_media(salidas_distritos)

In [101]:
distrito_medias_llegadas.take(1)

[(('CONCEPCIÓN', 0), [46.0])]

In [102]:
distrito_medias_salidas.take(1)

[(('CONCEPCIÓN', 0), [39.0])]

5 estaciones más importantes en la semana

In [103]:
def salidas_mapper(line):
  """
  sacamos de la rdd :estacion_inicio, estacion_final
  """
  try:
    linea = json.loads(line)
    inicio = linea['idunplug_station']
    final = linea['idplug_station']
  except:
    inicio = line['idunplug_station']
    final = line['idplug_station']    
  return inicio,final


def llegadas_mapper(line):
  """
  sacamos de la rdd :estacion_final,estacion_inicio
  """
  try: 
    linea = json.loads(line)
    inicio = linea['idunplug_station']
    final = linea['idplug_station']
  except: 
    inicio = line['idunplug_station']
    final = line['idunplug_station']
  return final,inicio



def lista_viajes_totales(rddinicial):
    """
    Usamos rdd semana
    devuelve lista decreciente por numero de viajes totales en formato:
    (estacion, nº salidas, nº llegadas, nº viajes totales)
   
    """
    lista_llegadas_numero = rddinicial.map(llegadas_mapper).groupByKey().mapValues(list)
    viajes_totales = rddinicial.map(salidas_mapper).groupByKey().mapValues(list).cogroup(lista_llegadas_numero).map(lambda x: (x[0],len(list(x[1][0])[0]), len(list(x[1][1])[0]),len(list(x[1][0])[0]) + len(list(x[1][1])[0]) ))
    lista_viajes_totales = viajes_totales.collect()
    return sorted(lista_viajes_totales, key=itemgetter(3), reverse = True)


def lista_viajes_totales_top(rddinicial):
  """
  usamos rdd semana
  devuelve lista con top 5 estaciones más importantes según nuúmero de viajes en formato:
  (estacion, nº viajes totales )
  """
  viajes = lista_viajes_totales(rddinicial)
  i = 0
  listaaux = []
  while i < 5:
    tupla_estacion_viajes = (viajes[i][0],viajes[i][3])
    listaaux.append(tupla_estacion_viajes)
    i = i+1
  return listaaux

In [105]:
lista_top_5 = lista_viajes_totales_top(rdd_semana)
print("------------------------------------------------")
print("Top 5 estaciones más usadas en la semana elegida")
for i in range(5):
    print(f"{lista_top_5[i][0]}:{lista_top_5[i][1]}")
print("------------------------------------------------")

------------------------------------------------
Top 5 estaciones más usadas en la semana elegida
175:352
57:340
208:284
149:272
135:259
------------------------------------------------
