In [None]:
!pip install redis

In [None]:
import pandas as pd
import psycopg2
import redis

In [None]:
# Redis Cloud Instance Information
redis_host = 'redis-14735.c84.us-east-1-2.ec2.cloud.redislabs.com:14735'
redis_port ='15487'
redis_password ='test'

In [None]:
# Postgres Database Information
pg_host = '34.31.101.211'
pg_database = 'nancy_redisproj'
pg_user = 'nancyredis'
pg_password = 'test'

In [None]:
!curl ipecho.net/plain

34.85.180.177

In [None]:
# Redis Client Object
r = redis.Redis(host='redis-14735.c84.us-east-1-2.ec2.cloud.redislabs.com:14735', port=15487, password='test', ssl=True)

In [None]:
def extract_data():
    # Extract data from CSV file using pandas
    data = pd.read_csv('customer_call_logs.csv')
    
    # Cache data in Redis for faster retrieval
    redis_client.set('customer_call_logs', df.to_msgpack(compress='zlib'))

In [None]:
def transform_data():
    # Retrieve data from Redis cache
    data = pd.read_json(redis_client.get('customer_call_logs'))

    # Transform data (clean, structure, format)
    # Clean and structure data
    df = df.drop_duplicates()  # Remove duplicate rows
    df['duration_minutes'] = df['duration_seconds'] / 60  # Add a new column for duration in minutes
    df = df[['customer_id', 'call_date', 'duration_minutes', 'cost', 'destination']]  # Select relevant columns
    
    # Format data
    df['call_date'] = pd.to_datetime(df['call_date'], format='%Y-%m-%d %H:%M:%S')
    df['cost'] = df['cost'].apply(lambda x: round(x, 2))  # Round cost to 2 decimal places

    return transformed_data


In [None]:
def load_data():
    # Connect to Postgres database
    conn = psycopg2.connect(host=pg_host, database=pg_database, user=pg_user, password=pg_password)

    # Create a cursor object
    cur = conn.cursor()

    # Create a table to store the data
    cur.execute('CREATE TABLE IF NOT EXISTS customer_call_logs (\
                 customer_id INT,\
                 call_cost_usd FLOAT,\
                 call_destination VARCHAR,\
                 call_date TIMESTAMP,\
                 call_duration_min FLOAT\
                 )')
    

In [None]:
# Insert the transformed data into the database
    for i, row in transformed_data.iterrows():
        cur.execute(f"INSERT INTO customer_call_logs (customer_id, call_cost_usd, call_destination, call_date, call_duration_min) VALUES ({row['customer_id']}, {row['call_cost_usd']}, '{row['call_destination']}', '{row['call_date']}', {row['call_duration_min']})")

In [None]:
# Commit the changes
    conn.commit()

In [None]:
# Close the cursor and connection
    cur.close()
    conn.close()

In [None]:
def data_pipeline():
    # Data pipeline function
    extract_data()
    transformed_data = transform_data()
    load_data(transformed_data)

In [None]:
if __name__ == '__main__':
# Run the data pipeline function
    data_pipeline()

In [None]:

def main():
    # Connect to Neo4j
    driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))

    # Extract data from Neo4j
    with driver.session() as session:
        result = session.run(neo4j_query)
        df = pd.DataFrame([r.values() for r in result], columns=result.keys())

    # Transform data
    df = transform_data(df)

    # Connect to PostgreSQL
    conn = psycopg2.connect(host=p_host, database=p_database, user=p_user, password=p_password)

    # Load data into PostgreSQL
    insert_data(df, conn)

    # Close connections
    driver.close()
    conn.close()

if __name__ == "__main__":

**Best Practices**:

**Caching data in Redis for faster retrieval**: This reduces the time taken to access the data from the CSV file by caching it in Redis, thereby reducing the overall execution time of the pipeline.

**Using parameterized queries to prevent SQL injection attacks**: This ensures that the data being inserted into the Postgres database is sanitized, preventing SQL injection attacks that could compromise the security of the database.

**Using context managers to ensure resources are properly managed**: This helps to ensure that resources such as the connection to the Postgres database are properly managed and released after use, preventing memory leaks and other issues that could affect the performance of the pipeline.

**Recommendations for deployment and running the pipeline with a cloud-based provider:**

**Use a cloud-based Redis service**: Hosting Redis on the cloud allows for easy scalability and reduces the management overhead of maintaining an on-premise Redis instance.

**Use a managed Postgres database service**: This ensures that the database is properly managed, backed up, and monitored, reducing the risk of data loss and other issues that could affect the performance of the pipeline.

**Containerize the pipeline using Docker**: Containerizing the pipeline using Docker allows for easy deployment and scaling of the pipeline, making it more resilient to changes in demand and ensuring that it can be easily deployed to multiple environments.