In [1]:
import psycopg2

In [2]:
class DataBaseManager:
    def __init__(self, host, database_name, database_user, database_password):
        self.host = host
        self.database_name = database_name
        self.database_user = database_user
        self.database_password = database_password

    def _state_connection(self):
        conn = None
        try:
            print('Connecting to the PostgreSQL database...')
            conn = psycopg2.connect(
                host=self.host,
                database=self.database_name,
                user=self.database_user,
                password=self.database_password)
    
        except (Exception, psycopg2.DatabaseError) as error:
            print(error)

        finally:
            if conn is not None:
                return conn
                print('Connection successfuly stablished')
        
    def import_data(self, file_path, target_table):
        
        conn = self._state_connection()
        
        cur = conn.cursor()
        
        with open(file_path, 'r') as f:
            next(f)
            cur.copy_from(f, target_table, sep=',')

        conn.commit()
        print("Data Successfuly loaded.")
       
        conn.close()
        print('Database connection closed.')
        
    def execute_query(self, query, expects_return=False):
        
        conn = self._state_connection()
        
        cur = conn.cursor()
        
        cur.execute(query)

        conn.commit()
        print("Query successfuly ran.")
        
        try:
            results = cur.fetchall()
            print('Results fetched')
            conn.close()
            print('Database connection closed.')
            return results
        except:
            conn.close()
            print('Database connection closed.')
            pass

In [3]:
# Ingesting the raw data

db_manager = DataBaseManager('postgres', 'postgres', 'postgres', 'postgres')
db_manager.import_data('trips.csv', 'raw_trips')

Connecting to the PostgreSQL database...
Data Successfuly loaded.
Database connection closed.


In [4]:
# Performing the transformations
db_manager = DataBaseManager('postgres', 'postgres', 'postgres', 'postgres')

query = """
    INSERT INTO public.trips
    select 
        region,
        cast(replace(replace(raw_trips.origin_coord, 'POINT (', '('), ' ', ', ') as point) as origin_coord,
        cast(replace(replace(raw_trips.destination_coord, 'POINT (', '('), ' ', ', ') as point) as destination_coord,
        date(datetime) as trip_date,
        date_trunc('hour', datetime) as trip_datetime,
        datasource

    from raw_trips;
"""

db_manager.execute_query(query)

Connecting to the PostgreSQL database...
Query successfuly ran.
Database connection closed.


In [5]:
# Refreshing the materizlied views without compromising the usage from dashboards
db_manager = DataBaseManager('postgres', 'postgres', 'postgres', 'postgres')

query = """
    REFRESH MATERIALIZED VIEW CONCURRENTLY dim_date;
"""

db_manager.execute_query(query)

Connecting to the PostgreSQL database...
Query successfuly ran.
Database connection closed.


In [6]:
# Getting the weekly average number of trips by region
db_manager = DataBaseManager('postgres', 'postgres', 'postgres', 'postgres')

region = 'Hamburg'

weekly_avg_trips = f"""
    select
        max(trips.region) as region,
        count(calendar.week)/count(distinct calendar.week) as weekly_avg
    from trips as trips
    join dim_date as calendar
        on calendar.date = trips.trip_date
    group by region
    having region = '{region}'
"""

results = db_manager.execute_query(weekly_avg_trips)

print(results)

Connecting to the PostgreSQL database...
Query successfuly ran.
Results fetched
Database connection closed.
[('Hamburg                                           ', 5)]
