<div style="width: 100%; clear: both;">
  <div style="float: left; width: 50%;">
    <img src="https://brandemia.org/sites/default/files/sites/default/files/uoc_nuevo_logo.jpg" align="left" style="width: 80%;">
  </div>
  <div style="float: right; width: 50%; text-align: right;">
    <h3 style="text-align: left; font-weight: bold;">Optimización del sistema de bicicletas compartidas en la ciudad de Valencia.</h3>
    <p style="text-align: left; font-weight: bold; font-size: 100%;">Análisis predictivo, rutas de reparto para el balanceo y gestión eficiente de las estaciones.</p>
    <p style="margin: 0; text-align: right;">Jose Luis Santos Durango</p>
    <hr style="border-top: 1px solid #ccc; margin: 10px 0;">
    <p style="margin: 0; padding-top: 22px; text-align:right;">ETL_Spark.ipynb ·M2.879 · Trabajo Final de Máster · Área 2</p>
    <p style="margin: 0; text-align:right;">2023-2 · Máster universitario en Ciencia de datos (Data science)</p>
    <p style="margin: 0; text-align:right;">Estudios de Informática, Multimedia y Telecomunicación</p>
  </div>
</div>
<div style="width:100%;">&nbsp;</div>


# Procesamiento de los datos históricos con Spark

![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)

Lo primero que vamos a hacer en este notebook es configurar el entorno de trabajo de Spark. Para ello, se nos ha asignado un servidor de la UOC con recursos en un cluster de Cloudera, para poder ejecutar mayor cantidad de datos en paralelo, ya que el procesamiento en local se quedaba obsoleto debido a la gran cantidad de datos de los que se dispone.


1. [Definición del entorno Spark](#1)
        1.1 Descompresión de las carpetas de datos
2. [Extracción de los datos históricos de cada estación y almacenamiento en un fichero txt](#2)
3. [Conclusiones](#3)

In [1]:
# import ceil
import os
import tarfile
import time
import findspark
import pandas as pd
findspark.init()
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, date_format, avg, max, lag, sum as spark_sum, expr
from datetime import datetime
import tarfile
from multiprocessing.pool import ThreadPool

<a id='1'></a>
# 1. Definición del entorno de Spark

In [2]:
def start_spark(app_name,master):
    """
    Function to initialize a SparkContext.

    Parameters:
        app_name (str): Name of the Spark application.
        master_mode (str): Spark master URL (e.g., "local", "yarn", "spark://host:port").

    Returns:
        SparkContext: A SparkContext object.
    """
    try:
        conf = SparkConf().setAppName(app_name).setMaster(master)
        sc = SparkContext(conf=conf)
        print("Spark initialized successfully.")
        return sc
    except Exception as e:
        print("Error initializing Spark:", str(e))
        return None

# Set master mode to local
sc = start_spark(app_name="SparkStart",master="local[1]")

Spark initialized successfully.


## 1.1 Descompresión las carpetas de datos

In [3]:
# Directory where the compressed files are located
directory = "data"

# Get the list of compressed files
compressed_files = [os.path.join(directory, file) for file in os.listdir(directory) if file.endswith(".tar.gz")]

# Function to decompress a file
def extract_file(file):
    """
    Function to extract a tar.gz file.

    Parameters:
        file (str): Path to the tar.gz file.

    Returns:
        None
    """
    output_folder = "data_descomp"  # Output folder for extracted files
    os.makedirs(output_folder, exist_ok=True)  # Create the output folder if it doesn't exist
    with tarfile.open(file, 'r:gz') as tar_ref:
        tar_ref.extractall(output_folder)

In [4]:
# Decompress files in parallel
#pool = ThreadPool(len(compressed_files))
#pool.map(extract_file, compressed_files)
#pool.close()
#pool.join()

Una vez que tenemos los datos descomprimidos en la carpeta data_descomp de nuestro directorio, lo que vamos a hacer es procesar los datos con Spark. En el notebook ETL_Python para procesar los datos y dejarlos en granularidad horaria, que será la granularidad con la que vamos a trabajar, lo que hicimos fue recorrer cada 60 líneas dentro de los archivos de datos de las estaciones, tomando como referencia un único registro por hora. Debido a problemas de calidad de los datos históricos, no todas las horas coincidían en el minuto, por lo que imputamos el minuto 00 para todos los registros independientemente de que no correspondiera al minuto 00. En este notebook, vamos a procesar los datos tomando como referencia el valor medio de cada hora, obteniendo así un valor más representativo para cada hora. 

<a id='2'></a>
# 2. Extracción de los datos históricos de cada estación y almacenamiento en un fichero txt

In [5]:
def get_directories(data):
    """
    Get directories containing data for specified years.

    Parameters:
        data_years (list): A list of integers representing the years.

    Returns:
        list: A list of directory paths containing data for the specified years.
    """
    
    start_directories = []
    data_dir_test = os.path.join("data_descomp", data)
    for month in os.listdir(data_dir_test):
        if month != ".DS_Store":
            data_dir_month = os.path.join(data_dir_test, month)
            for day in os.listdir(data_dir_month):
                if day != ".DS_Store":
                    start_directories.append(os.path.join(data_dir_month, day))
    return start_directories

def process_data(data_years):
    """
    Process data from directories using Spark.

    Parameters:
        data_years (list): A list of integers representing the years.
    """
    spark = SparkSession.builder \
        .appName("DataProcessing") \
        .getOrCreate()
    
    start_directories = get_directories(data_years)
    output = 'data_descomp/data_trips.txt'
    

    def process_line(line, station_id):
        data = line.strip().split(',')
        timestamp_str = data[0]
        bikes = float(data[1])
        parking = float(data[2])
        # Convertir la cadena de timestamp a un objeto datetime
        timestamp = datetime.strptime(timestamp_str, '%Y/%m/%d %H:%M:%S')
        return (station_id, timestamp, bikes, parking)

    # Read data from directories into an RDD and process each line
    rdd = spark.sparkContext.parallelize(start_directories) \
            .flatMap(lambda directory: [(os.path.basename(directory), os.path.join(directory, file)) for file in os.listdir(directory)]) \
            .filter(lambda station: 'checkpoints' not in station[1]) \
            .flatMap(lambda station: [(os.path.basename(station[1]), line) for line in open(station[1], 'r', encoding='latin-1').readlines()])

    # Process each line and calculate total_trips
    rdd = rdd.map(lambda x: process_line(x[1], x[0])) \
         .toDF(['station_id', 'timestamp', 'bikes', 'parking']) \
         .withColumn('date', date_format('timestamp', 'yyyy-MM-dd')) \
         .withColumn('hour', date_format('timestamp', 'HH')) \
         .withColumn("bikes_diff", expr("abs(bikes - lag(bikes, 1) over (partition by station_id, date, hour order by timestamp))")) \
         .fillna({"bikes_diff": 0}) \
         .withColumn("total_trips", spark_sum("bikes_diff").over(Window.partitionBy("station_id", "date", "hour").orderBy("timestamp")))

    # Calculate average values per station, day, and hour
    avg_df = rdd.groupBy("station_id", "date", "hour").agg(
            avg("bikes").alias("avg_bikes"),
            avg("parking").alias("avg_parking"),
            max("total_trips").alias("max_total_trips") # adding max value for the hour as it is a counter
        )
        
    # Write results to text file without header
    avg_df.write.mode("overwrite").csv(output, header=False)


    spark.stop()


In [6]:
# Let's execute data_processor for 2021 data
#process_data('2021')

<a id='3'></a>
# 3. Conclusiones

Una vez hemos procesado los datos con Spark, se debería haber creado un fichero data_trips.txt en el sistema de ficheros distribuidos de Hadoop, de forma que con el comando -getmerge podamos unir en un único fichero local que será el que usaremos para el tratamiento de nuestros datos.

Veamos que efectivamente existe el fichero en el sistema distribuido.

In [7]:
!hdfs dfs -ls data_descomp
!hdfs dfs -getmerge data_descomp/data_trips.txt data_trips.txt

Found 5 items
drwxr-xr-x   - josant05 josant05          0 2024-04-17 00:56 data_descomp/2021
drwxr-xr-x   - josant05 josant05          0 2024-04-17 09:09 data_descomp/2022
drwxr-xr-x   - josant05 josant05          0 2024-04-17 00:51 data_descomp/2023
drwxr-xr-x   - josant05 josant05          0 2024-04-22 23:16 data_descomp/data_avg.txt
drwxr-xr-x   - josant05 josant05          0 2024-04-26 02:08 data_descomp/data_trips.txt


Veamos el dataset una vez hemos cargado los datos en un dataframe.

In [8]:
# Load the dataset into a dataframe with a header
df = pd.read_csv('data_trips.txt', header=None, names=['station_id', 'date', 'hour', 'bikes_avg', 'parkings_avg', 'total_trips'])

# Convert column 'hour' to the format 'hh:mm'
df['hour'] = pd.to_datetime(df['hour'], format='%H').dt.strftime('%H:%M')

# Let's get some ordered data and check manually if total_trips is correct
df_sorted = df.sort_values(by=['date', 'station_id', 'hour'], ascending=True)

df.info()
df_sorted.head(24)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2416382 entries, 0 to 2416381
Data columns (total 6 columns):
station_id      int64
date            object
hour            object
bikes_avg       float64
parkings_avg    float64
total_trips     float64
dtypes: float64(3), int64(1), object(2)
memory usage: 110.6+ MB


Unnamed: 0,station_id,date,hour,bikes_avg,parkings_avg,total_trips
918104,1,2021-01-01,00:00,14.333333,10.666667,1.0
687860,1,2021-01-01,01:00,14.0,11.0,0.0
2235365,1,2021-01-01,02:00,14.0,11.0,0.0
1136048,1,2021-01-01,03:00,14.0,11.0,0.0
1473982,1,2021-01-01,04:00,14.0,11.0,0.0
1389512,1,2021-01-01,05:00,14.0,11.0,0.0
1244836,1,2021-01-01,06:00,14.0,11.0,0.0
410144,1,2021-01-01,07:00,14.0,11.0,0.0
1546429,1,2021-01-01,08:00,14.0,11.0,0.0
494601,1,2021-01-01,09:00,14.0,11.0,0.0


Podemos ver que efectivamente se han calculado los valores de los viajes por hora de forma correcta. Hemos comprobado el primer día para la primera estación de forma manual y los valores coinciden. Respecto al dataset, podemos ver que el total de registros es 2.416.382 registros, lo cual nos dice que hay algunos registros olvidados por algún problema de calidad de los datos. Se comprobará en la parte del análisis.