In [1]:
# Import libraries
import pandas as pd
import psycopg2
import glob
from sqlalchemy import create_engine

# inspect data in the folder

In [2]:
# Read all the files in 'data' folder and save them as a list
files = sorted(glob.glob('../data/*')) # in ascending order
print(f'How many files in total: {len(files)}')

# Select only the monthly traffic state files
monthly_files = [file for file in files if file[-9:] == 'TRAMS.csv']
print(f'How many monthly traffic state files: {len(monthly_files)}')

monthly_files

How many files in total: 23
How many monthly traffic state files: 22


['../data/2019_01_Gener_TRAMS_TRAMS.csv',
 '../data/2019_02_Febrer_TRAMS_TRAMS.csv',
 '../data/2019_03_Marc_TRAMS_TRAMS.csv',
 '../data/2019_04_Abril_TRAMS_TRAMS.csv',
 '../data/2019_05_Maig_TRAMS_TRAMS.csv',
 '../data/2019_06_Juny_TRAMS_TRAMS.csv',
 '../data/2019_07_Juliol_TRAMS_TRAMS.csv',
 '../data/2019_08_Agost_TRAMS_TRAMS.csv',
 '../data/2019_09_Setembre_TRAMS_TRAMS.csv',
 '../data/2019_10_Octubre_TRAMS_TRAMS.csv',
 '../data/2019_11_Novembre_TRAMS_TRAMS.csv',
 '../data/2019_12_Desembre_TRAMS_TRAMS.csv',
 '../data/2020_01_Gener_TRAMS_TRAMS.csv',
 '../data/2020_02_Febrer_TRAMS_TRAMS.csv',
 '../data/2020_03_Marc_TRAMS_TRAMS.csv',
 '../data/2020_04_Abril_TRAMS_TRAMS.csv',
 '../data/2020_05_Maig_TRAMS_TRAMS.csv',
 '../data/2020_06_Juny_TRAMS_TRAMS.csv',
 '../data/2020_07_Juliol_TRAMS_TRAMS.csv',
 '../data/2020_08_Agost_TRAMS_TRAMS.csv',
 '../data/2020_09_Setembre_TRAMS_TRAMS.csv',
 '../data/2020_10_Octubre_TRAMS_TRAMS.csv']

In [3]:
# A data file containing geometry info for each traffic section of BCN
bcn_geom = [file for file in files if file not in monthly_files]
bcn_geom

['../data/transit_relacio_trams_format_long.csv']

In [4]:
# Sneak peek of each data I - monthly traffic data
traffic_example = pd.read_csv(monthly_files[0])

print(traffic_example.shape)
traffic_example.head()

(951405, 4)


Unnamed: 0,idTram,data,estatActual,estatPrevist
0,1,20190101000551,0,0
1,2,20190101000551,0,0
2,3,20190101000551,0,0
3,4,20190101000551,0,0
4,5,20190101000551,0,0


In [5]:
traffic_example.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 951405 entries, 0 to 951404
Data columns (total 4 columns):
 #   Column        Non-Null Count   Dtype
---  ------        --------------   -----
 0   idTram        951405 non-null  int64
 1   data          951405 non-null  int64
 2   estatActual   951405 non-null  int64
 3   estatPrevist  951405 non-null  int64
dtypes: int64(4)
memory usage: 29.0 MB


In [6]:
# Sneak peek of each data II - traffic section geometries
geom_example = pd.read_csv(bcn_geom[0])

print(geom_example.shape)
geom_example.head()

(2784, 5)


Unnamed: 0,Tram,Tram_Components,Descripció,Longitud,Latitud
0,1,1,Diagonal (Ronda de Dalt a Doctor Marañón),2.11203535639414,41.384191
1,1,2,Diagonal (Ronda de Dalt a Doctor Marañón),2.101502862881051,41.381631
2,2,1,Diagonal (Doctor Marañón a Ronda de Dalt),2.111944376806616,41.384467
3,2,2,Diagonal (Doctor Marañón a Ronda de Dalt),2.101594089443895,41.381868
4,3,1,Diagonal (Doctor Marañón a Pl. Pius XII),2.112093343037027,41.384229


In [7]:
# How many geoms per 'Tram (= street section)'
geom_example.groupby('Tram').Tram_Components.count().unique()

array([ 2,  3, 10, 15, 12,  6,  9,  4, 14, 22, 17,  8, 11,  7,  5, 13, 20,
       19, 16, 18, 24, 26, 23, 33, 37, 21, 38])

* All data in csv format
* Geometry composed of 2-38 points

In [8]:
# Check summary info
geom_example.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2784 entries, 0 to 2783
Data columns (total 5 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           --------------  -----  
 0   Tram             2784 non-null   int64  
 1   Tram_Components  2784 non-null   int64  
 2   Descripció       2784 non-null   object 
 3   Longitud         2784 non-null   object 
 4   Latitud          2779 non-null   float64
dtypes: float64(1), int64(2), object(2)
memory usage: 108.9+ KB


* 'Longitud' column is in object type -> convert to float
* This gives an error message showing that some rows have unseparted Longitud and Latitud.
* This error also gives an hint on why data type of 'Longitud' column is wrongly assigned. Some rows have both longitude & latitude information in one column.

In [24]:
# Show all the rows have longitude & latitude not separated
filter_not_separated = geom_example.Longitud.apply(lambda x: len(x) > 20)
not_separated = geom_example['Longitud'][filter_not_separated]
not_separated

TypeError: object of type 'float' has no len()

In [None]:
# Get index of not_separated rows
not_separated_ind = not_separated.index

# Confirm if these rows lack 'Latitud' values
no_latitudes = geom_example.iloc[not_separated_ind]
no_latitudes

* This result explains why there are null values found only in 'Latitud' column.

In [None]:
# Split the longitude & latitude by using split method
longitud_split = no_latitudes['Longitud'].apply(lambda x: x.split()[0])
latitud_split = no_latitudes['Longitud'].apply(lambda x: x.split()[1])

print(longitud_split)
print(latitud_split)

In [None]:
# Assign the split values accordingly
geom_example.iloc[not_separated_ind, 3] = longitud_split
geom_example.iloc[not_separated_ind, 4] = latitud_split

# Result
geom_example.iloc[not_separated_ind]

In [None]:
# Check 1. datatype and 2. not-null counts after conversion 
geom_example.info()

* No more null values in Latitud
* both Longitud & Latitud needs to be in float type

In [None]:
# Change datatype of Longitud & Latitud
geom_example['Longitud'] = pd.to_numeric(geom_example['Longitud'])
geom_example['Latitud'] = pd.to_numeric(geom_example['Latitud'])
geom_example.info()

# Connect to the database

In [None]:
# Define a function connect to the database 'bcn_traffic'
# In case the connecting attempt fails, print the error message
def connect_to_db():
    try:
        #conn for connection
        conn =  psycopg2.connect(dbname='bcn_traffic',
                                 user='bcn',
                                 password='bcn',
                                 host='postgis',
                                 port='5432')

    except psycopg2.DatabaseError:
        print ("I am unable to connect the database")
    return conn

# Create tables parsed by month (1 table for each month)

In [None]:
# Create names for each table using monthly_files list

# Select year_month part only
table_names = [name[8:][:-16] for name in monthly_files]

# Modify table names
table_names = [name[8:]+name[:4] for name in table_names]

table_names[:3] #year_month

In [25]:
# Function to create a table inside the PostGIS database
def create_table(query):
    # connect to the db
    conn = connect_to_db()

    try:
        cur = conn.cursor()  # initiate cursor (communication with db)
        #print('Connected')
        cur.execute(query)   # execute the query
        #print('Query executed')
        conn.commit()

    except psycopg2.DatabaseError: # print error if fails
        print ("Failed to create the table")

    # Close the communication & connection with the postgis
    finally:
        cur.close()
        conn.close ()

In [26]:
# Create tables for monthly traffic observation data
for name in table_names:
    
    # Prepare a query to create table
    q_create_tables = f"""
                      drop table if exists {name};
                      create table {name}
                      (
                          idTram int,
                          data bigint,
                          estatActual int,
                          estatPrevist int
                      )
                      """
    
    create_table(q_create_tables)
    print(f'{name} created')

Gener2019 created
Febrer2019 created
Marc2019 created
Abril2019 created
Maig2019 created
Juny2019 created
Juliol2019 created
Agost2019 created
Setembre2019 created
Octubre2019 created
Novembre2019 created
Desembre2019 created
Gener2020 created
Febrer2020 created
Marc2020 created
Abril2020 created
Maig2020 created
Juny2020 created
Juliol2020 created
Agost2020 created
Setembre2020 created
Octubre2020 created


In [27]:
# Prepare a query to create geom table
q_create_geom_table = """
                      create table Traffic_geom
                      (
                          Tram int,
                          Tram_Components int,
                          Descripció varchar,
                          Longitud float,
                          Latitud float
                      )
                      """

create_table(q_create_geom_table)
print('Table Traffic_geom created')

Table Traffic_geom created


# Read the files, transform the data and load it to the db

In [28]:
# Function to fill the previously created tables
def fill_table_with_data(query, filepath):
    # connect to the db
    conn = connect_to_db()

    try:
        cur = conn.cursor()
        
        with open(filepath, 'r') as file:
           
            next(file) # Skip header

            cur.copy_expert(query, file)
            
            conn.commit() # commit the changes to the db

    # Print error message if query fails
    except psycopg2.DatabaseError:
        print ("Failed to copy data to the table")

    finally:
        cur.close()
        conn.close ()
        #print('Connection closed')

In [29]:
# Insert data to each month's table
for i in range(len(table_names)):
        
    copy_sql = f"""
               COPY {table_names[i]}
               FROM STDIN
               WITH CSV
               DELIMITER ',';
               """

    fill_table_with_data(copy_sql, monthly_files[i])
    print(f'Data inserted to {table_names[i]}')

Data inserted to Gener2019
Data inserted to Febrer2019
Data inserted to Marc2019
Data inserted to Abril2019
Data inserted to Maig2019
Data inserted to Juny2019
Data inserted to Juliol2019
Data inserted to Agost2019
Data inserted to Setembre2019
Data inserted to Octubre2019
Data inserted to Novembre2019
Data inserted to Desembre2019
Data inserted to Gener2020
Data inserted to Febrer2020
Data inserted to Marc2020
Data inserted to Abril2020
Data inserted to Maig2020
Data inserted to Juny2020
Data inserted to Juliol2020
Data inserted to Agost2020
Data inserted to Setembre2020
Data inserted to Octubre2020


In [30]:
# Insert data from the dataframe geom_example to the Traffic_geom table

# Create SQL connection engine
engine = create_engine('postgresql://bcn:bcn@postgis:5432/bcn_traffic')
print('Engine created')

# Dataframe to the database
geom_example.to_sql('Traffic_geom', engine)
print('Data inserted to Taffic_geom')

Engine created
Data inserted to Taffic_geom


# ETL summary

* Bulk insertion of CSV files to PostGIS databse (faster than line-by-line iteration)
* Data inserted without transformation (e.g. data type for datetime and geometry not assigned yet)

-----------------------------------------

# End of ETL

### To check the database using terminal:
1. docker exec -it bcn-trafficforecast_postgis_1 bash
2. psql postgresql://bcn:bcn@postgis:5432/bcn_traffic