In [None]:
# This notebooks aims to include the data cleansing stage and to check the feasibility of two use cases: 
# - Predict the location of a certain car for given time stamps
# - Predict the number of cars on a certain location for given time stamps

# The first use case seems not feasible because the location data is too randomly distributed across the city
# The second use case shows a kind of pattern that can be suitable for ML and DL algorithms

# Therefore, only the second use case will be continued

In [None]:
#LIBRARIES

import pyspark
import pyspark.sql.functions as f

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType

import pandas as pd

import plotly.offline as py
import plotly.graph_objects as go
import plotly.express as px

import numpy as np
import matplotlib.pyplot as plt

import csv

from scipy.stats import kde

import seaborn as sns

import time
from datetime import datetime

import os 
dirpath = os.getcwd()

print(sc.version)

In [None]:
# To start working with the first use case, it is necessary to extract the different car labels and copy them into
# different columns. Otherwise, in the format they are saved it is going to be impossible to filter correctly

# Therefore, the first stage is to copy all the label data in some .csv files 


df = spark.read.format("csv").option("header", "true").load("sample_table.csv")
cars_list  = df.select("carsList").rdd.flatMap(lambda x: x).collect()
small_list_size = 1000000
cars_small_lists = [cars_list[x:x+small_list_size] for x in range(0, len(cars_list), small_list_size )] #Divide cars list into smaller lists
max_size = 0
number_file = 0

for small_list in cars_small_lists: 
    number_file = number_file + 1

    with open("cars_expanded_files/cars_expanded_"+str(number_file)+".csv", "a", newline="") as f:
        wr = csv.writer(f, dialect='excel')
    
        for row in small_list: #For each row of the spark dataframe
            
            row_wr = []

            row = tuple(filter(None, row.split(',')))
            
            if len(row)>max_size:
                max_size = len(row)
                
            row_cleaned = []

            for element in row: 
                #remove [] and spaces
                element = element.replace('[', '')
                element = element.replace(']', '')
                element = element.replace(' ', '')
                row_cleaned.append(element)
            
            row_wr.append(row_cleaned)
            
            wr.writerows(row_wr)  

In [None]:
# Once we have the csv files, the labels are registered in the same timestamp in different columns
# This is saved into a new dataframe called 'appended'

paths = [dirpath + "/cars_expanded_files/cars_expanded_1.csv",\
         dirpath + "/cars_expanded_files/cars_expanded_2.csv",\
         dirpath + "/cars_expanded_files/cars_expanded_3.csv",\
         dirpath + "/cars_expanded_files/cars_expanded_4.csv",\
         dirpath + "/cars_expanded_files/cars_expanded_5.csv",\
         dirpath + "/cars_expanded_files/cars_expanded_6.csv",\
         dirpath + "/cars_expanded_files/cars_expanded_7.csv"]

schema = StructType([
    StructField("car_1", StringType(), True), StructField("car_2", StringType(), True),StructField("car_3", StringType(), True),StructField("car_4", StringType(), True),StructField("car_5", StringType(), True),StructField("car_6", StringType(), True),StructField("car_7", StringType(), True),StructField("car_8", StringType(), True),StructField("car_9", StringType(), True),StructField("car_10", StringType(), True)
])
df_cars_expanded_1 = spark.createDataFrame([], schema)
df_cars_expanded_1 = spark.read.option("delimiter", ",").option("header", "false").option("quote","").option("inferSchema", True).csv(paths[0],  schema=schema)

df_cars_expanded_2 = spark.createDataFrame([], schema)
df_cars_expanded_2 = spark.read.option("delimiter", ",").option("header", "false").option("quote","").option("inferSchema", True).csv(paths[1],  schema=schema)

df_cars_expanded_3 = spark.createDataFrame([], schema)
df_cars_expanded_3 = spark.read.option("delimiter", ",").option("header", "false").option("quote","").option("inferSchema", True).csv(paths[2],  schema=schema)

df_cars_expanded_4 = spark.createDataFrame([], schema)
df_cars_expanded_4 = spark.read.option("delimiter", ",").option("header", "false").option("quote","").option("inferSchema", True).csv(paths[3],  schema=schema)

df_cars_expanded_5 = spark.createDataFrame([], schema)
df_cars_expanded_5 = spark.read.option("delimiter", ",").option("header", "false").option("quote","").option("inferSchema", True).csv(paths[4],  schema=schema)

df_cars_expanded_6 = spark.createDataFrame([], schema)
df_cars_expanded_6 = spark.read.option("delimiter", ",").option("header", "false").option("quote","").option("inferSchema", True).csv(paths[5],  schema=schema)

df_cars_expanded_7 = spark.createDataFrame([], schema)
df_cars_expanded_7 = spark.read.option("delimiter", ",").option("header", "false").option("quote","").option("inferSchema", True).csv(paths[6],  schema=schema)


In [None]:
#This extracts the integers from the cell in order to get rid of weird characters
for i in range(1,max_size+1): 
    df_cars_expanded_1 = df_cars_expanded_1.withColumn("car_"+str(i), pyspark.sql.functions.regexp_extract("car_"+str(i), '(\d+)', 1))
    df_cars_expanded_2 = df_cars_expanded_2.withColumn("car_"+str(i), pyspark.sql.functions.regexp_extract("car_"+str(i), '(\d+)', 1))
    df_cars_expanded_3 = df_cars_expanded_3.withColumn("car_"+str(i), pyspark.sql.functions.regexp_extract("car_"+str(i), '(\d+)', 1))
    df_cars_expanded_4 = df_cars_expanded_4.withColumn("car_"+str(i), pyspark.sql.functions.regexp_extract("car_"+str(i), '(\d+)', 1))
    df_cars_expanded_5 = df_cars_expanded_5.withColumn("car_"+str(i), pyspark.sql.functions.regexp_extract("car_"+str(i), '(\d+)', 1))
    df_cars_expanded_6 = df_cars_expanded_6.withColumn("car_"+str(i), pyspark.sql.functions.regexp_extract("car_"+str(i), '(\d+)', 1))
    df_cars_expanded_7 = df_cars_expanded_7.withColumn("car_"+str(i), pyspark.sql.functions.regexp_extract("car_"+str(i), '(\d+)', 1))
    
appended = df_cars_expanded_1.union(df_cars_expanded_2) 
appended = appended.union(df_cars_expanded_3)
appended = appended.union(df_cars_expanded_4) 
appended = appended.union(df_cars_expanded_5) 
appended = appended.union(df_cars_expanded_6) 
appended = appended.union(df_cars_expanded_7)


In [None]:
#This box joins the original dataframe and the recently created one 

def with_column_index(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

df1 = with_column_index(df)
df2 = with_column_index(appended)
df_expanded = df1.join(df2, df1.ColumnIndex == df2.ColumnIndex, 'inner').drop("ColumnIndex")
df_expanded.show()

In [None]:
# Now, all the labels are written in different columns. It is possible to look for one car in particular. 
# The chosen car is the one with label 182

# STUDY OF ONE CAR

# This box takes all the timestamps in which a given car is registered
# Then, the timestamps are sorted by chronological order 
df_182 = df_expanded.select("timestamp", "latitude", "longitude").filter("car_1 == '182' OR car_2 == '182'\
                                                                OR car_3 == '182' OR car_4 == '182'\
                                                                OR car_5 == '182' OR car_6 == '182'\
                                                                OR car_7 == '182' OR car_8 == '182'\
                                                                OR car_9 == '182' OR car_10 == '182'")


df_182 = df_182.sort(df_182.timestamp, ascending = True)
df_182.show()
print("The number of timestamps associated with this car is: "+ str(df_182.count()))

# The following plot is a density plot,which gives an idea of the distribution of the locations 
x = pd.to_numeric(df_182.toPandas()['latitude'], errors='coerce')
y = pd.to_numeric(df_182.toPandas()['longitude'], errors='coerce')
sns.set(style="white", color_codes=True)
sns.jointplot(x=x, y=y, kind='kde', color="skyblue")

In [None]:
# However, it does not show any temporal distribution. The timestamps, in the way they are registered, are not 
# suitable for plotting. Therefore, a new column has to be created that register the time as a float number

# This piece of code transforms the customized format of the timestamps to a datetime format
list_timestamps = []

# This piece of code turns the Spark Dataframe into a Pandas Dataframe, which is used for plotting.
df_182_pandas = df_182.toPandas() 

for i in range(len(df_182_pandas['timestamp'])): 
    new_timestamp = datetime.strptime(df_182_pandas['timestamp'][i], '%Y-%m-%d %H:%M:%S.%f %Z')
    list_timestamps.append(new_timestamp)

# This piece of transforms the datetime format into floats numbers 
list_timestamps_float = []
for i in range(len(list_timestamps)): 
    new_timestamp = datetime.timestamp(list_timestamps[i])
    list_timestamps_float.append(new_timestamp)

# The column of float timestamps is also added   
df_182_pandas["timestamp_float"] = list_timestamps_float

In [None]:
# This plot shows the variation of locations along the time

data = df_182_pandas
fig = px.scatter(data, x= "latitude", y="longitude", color= "timestamp_float", color_continuous_scale='Magma')
fig.show()


In [None]:
# The second use case is to analize one particular location and then try to predict the number of cars in it

#STUDY OF ONE LOCATION

# Registered latitudes 
df_expanded.groupby('latitude').count().distinct().show()

# Registered longitudes 
df_expanded.groupby('longitude').count().distinct().show()


df_location = df_expanded.select("timestamp", "total_cars").filter("latitude == '32.072323' AND longitude == '34.790555'")


df_location = df_location.sort(df_location.timestamp, ascending = True)
df_location.show()
print("The number of timestamps associated with this location is: "+ str(df_location.count()))

df_location_pandas = df_location.toPandas()

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(y=df_location_pandas.total_cars,
                    mode='lines',
                    name='Test '))

fig.update_layout(title='Temporal evolution of the number of cars in (32.072323, 34.790555)',
                   xaxis_title='Timestamp',
                   yaxis_title='Number of cars')

fig.show()

In [None]:
#Save all dataframes for future usage

df_182.write.format("parquet").save("df_182.parquet")
df_182_pandas.to_pickle("df_182_pandas") 
df_expanded.write.format("parquet").save("df_expanded.parquet")
df_location.write.format("parquet").save("df_location.parquet")