Data Pipelines with Redis

Background Information
As a telecommunications data engineer, you have been tasked with building a pipeline that can
efficiently extract, transform, and load data from CSV files into a Postgres database. The data to
be extracted is related to customer call logs, which contain information about the duration, cost,
and destination of customer calls. The extracted data needs to be transformed to ensure it is in
the correct format and structure for storage in the database. The pipeline should also cache
data using Redis to speed up the data extraction and transformation.
Guidelines
You can follow the steps below:

● Start by creating a new Python file and importing the necessary libraries (pandas,
psycopg2, and redis).

● Create a Redis client object and connect it to the Redis Labs cloud instance.

● Create an extract function that reads the CSV files using pandas and caches the data in
Redis.

● Create a transform function that cleans, structures, and formats the extracted data.

● Create a load function that connects to the Postgres database using psycopg2 and loads

the transformed data into the database.
● Combine the extract, transform, and load functions into a single data pipeline that
extracts data from a CSV file, transforms it and loads it into a Postgres database.

● Test the pipeline with a sample dataset to ensure it works correctly.
Sample CSV Files
We’ve provided a sample CSV file (customer_call_logs.csv) that you can use for this data
pipeline. Files for this project can be downloaded from here (link).
Deliverables

We will be expected to deliver a GitHub repository with the following:
● A python file for the data pipeline.

● Documentation of the pipeline.

○ Highlight at least 3 best practices used during the implementation.



○ Recommendations for deployment and running the pipeline with a cloud-based
provider.

In [None]:
#Import libray
!pip install redis



Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pandas as pd
import psycopg2
import redis
from datetime import datetime

import logging
#Setup logger
logging.basicConfig(filename='redisdata.log', level=logging.DEBUG)


In [None]:
# Redis Cloud Instance Information
redis_host = 'redis-13274.c9.us-east-1-4.ec2.cloud.redislabs.com'
redis_port = 13274
redis_password = 'hvAHQBclIFwdNQnKCUodbnNL3nblebde' 

In [None]:
# Redis Client Object info
r = redis.Redis(
  host=redis_host,
  port=redis_port,
  password=redis_password,
  charset="utf-8", 
  decode_responses=True)

In [None]:
!pip install psycopg2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:

import sys
# import the connect library for psycopg2
import psycopg2
# import the error handling libraries for psycopg2
from psycopg2 import OperationalError, errorcodes, errors
import psycopg2.extras as extras
import pandas as pd


In [None]:
# Postgres Database Information [running on my PC]
pg_host = '127.0.0.1'
pg_database = 'postgres'
pg_user = 'postgres'
pg_password = 'pass12345'

In [None]:
#Extract the data
def extract_data():
    
    """
    Function reads data from csv file on local disk into a dataframe and loads it to redis instance hosted at redislabs.com
    Returns the dataframe 
    """
    
    try:
        #read file from local drive
        data = pd.read_csv('/content/customer_call_logs.csv')

        # Cache data in Redis for faster retrieval
        r.set('call_logs1', data.to_json(orient='records'))
    
    except Exception as e:
        err = "Extract() error - "+e
        logging.debug(err)
    
    return data

In [None]:

#read file from local drive
data = pd.read_csv('/content/customer_call_logs.csv')
data.sample()

Unnamed: 0,customer_id,call_cost,call_destination,call_date,call_duration
1,1002,$2.50,Mombasa,2022-01-01 10:05:00,00:08:15


In [None]:
def transform_data():
    
    """
    Function retrieves data from redis cache to speed up performance 
    Data preprocessing and cleaning is done - convert date column to datetime data type, remove dollar sign from call cost column
    Returns dataframe containing the processed/cleaned data
    """
    
    try:
        # Retrieve data from Redis cache
        data = pd.read_json(r.get('call_logs1'))

        #Cast call_date column from object to datetime
        data['call_date'] = pd.to_datetime(data['call_date']) 

        # Remove dollar sign from column 'call_cost'
        data['call_cost'] = data['call_cost'].str.replace('$', '')

        transformed_data = data
    
    except Exception as e:
        err = "Transform() error - "+e
        logging.debug(err)
    
    return transformed_data

In [None]:
def load_data(transformed_data):
    
    """
    Function connects to postgres instance running on local PC 
    Transformed_data df is loaded into postgres database 
    Connection to postgres is then closed
    
    """
    
    try:
        # Connect to Postgres database
        #conn = psycopg2.connect(“dbname=test user=postgres password=secret”)
        conn = psycopg2.connect(host=pg_host, database=pg_database, user=pg_user, password=pg_password)

        # Create a cursor object
        cur = conn.cursor()

        # Create a table to store the data
        cur.execute('CREATE TABLE IF NOT EXISTS customer_call_logs (\
                     customer_id INT,\
                     call_cost_usd FLOAT,\
                     call_destination VARCHAR,\
                     call_date TIMESTAMP,\
                     call_duration_min VARCHAR\
                     )')

        # Insert the transformed data into the database
        for i, row in transformed_data.iterrows():
            cur.execute(f"INSERT INTO customer_call_logs (customer_id, call_cost_usd, call_destination, call_date, call_duration_min) VALUES ({row['customer_id']}, {row['call_cost']}, '{row['call_destination']}', '{row['call_date']}', '{row['call_duration']}')")

        # Commit the changes
        conn.commit()

        # Close the cursor and connection
        cur.close()
        conn.close()
    
    except Exception as e:
        err = "Load() error - "+e
        logging.debug(err)
        

In [None]:
def data_pipeline():
    # Data pipeline function
    extract_data()
    transformed_data = transform_data()
    load_data(transformed_data)

In [None]:
if __name__ == '__main__':
    # Run the data pipeline function
    data_pipeline()
