In [126]:
import pandas as pd
import datetime as dt
import os
import io
import logging
import psycopg2
from sqlalchemy import create_engine
import localstack_client.session as boto3
from io import StringIO


logging.basicConfig(filename='app.log', filemode='a', format='%(asctime)s - %(message)s',datefmt='%d-%b-%y %H:%M:%S', level=logging.INFO)

In [127]:
chunk_size = 50000
data_list = ['customer_data', 'booking_data', 'destination_data']

In [128]:
#Downloading customer_data.csv
try:
    url='https://drive.google.com/file/d/14ptTEbFPuyjN520Zkaqyo1lmotoOgdU2/view?usp=sharing'
    url='https://drive.google.com/uc?id=' + url.split('/')[-2]

    for i, data in enumerate(pd.read_csv(url,chunksize = chunk_size)):
        df = data
        if df.shape[0] < chunk_size and i == 0:
            df.to_csv('customer_data/customer_data'+ '.csv', index = False)
        else:
            df.to_csv('customer_data/customer_data'+ '_' + str(i)+'.csv', index = False)

except Exception as e:
    logging.error("Exception occurred for customer_data", exc_info=True)
    logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

try:
    #Downloading booking_data.csv
    url='https://drive.google.com/file/d/1ZZDKAjEUv0-EIn1Dh5MITcIMOz-lUJOr/view?usp=sharing'
    url='https://drive.google.com/uc?id=' + url.split('/')[-2]

    for i, data in enumerate(pd.read_csv(url,chunksize = chunk_size)):
        df = data
        if df.shape[0] < chunk_size and i == 0:
            df.to_csv('booking_data/booking_data'+ '.csv', index = False)
        else:
            df.to_csv('booking_data/booking_data'+ '_' + str(i)+'.csv', index = False)

except Exception as e:
    logging.error("Exception occurred for booking_data", exc_info=True)
    logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

    
try:
    #Downloading destination_data.csv
    url='https://drive.google.com/file/d/15pM-pn9hXu8gUxY2PhglzI8BMPNj_hPr/view?usp=sharing'
    url='https://drive.google.com/uc?id=' + url.split('/')[-2]

    for i, data in enumerate(pd.read_csv(url,chunksize = chunk_size)):
        df = data
        if df.shape[0] < chunk_size and i == 0:
            df.to_csv('destination_data/destination_data'+'.csv', index = False)
        else:
            df.to_csv('destination_data/destination_data'+ '_' + str(i)+'.csv', index = False)

except Exception as e:
    logging.error("Exception occurred for destination_data", exc_info=True)
    logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

In [129]:
ddl = ['''CREATE TABLE customer_data (
  customer_id varchar NOT NULL UNIQUE,
  first_name varchar NOT NULL,
  last_name varchar,
  email varchar,
  phone varchar
);''',
'''CREATE TABLE booking_data (
  booking_id varchar NOT NULL UNIQUE,
  customer_id varchar NOT NULL,
  booking_date timestamp,
  destination varchar,
  number_of_passengers int,
  cost_per_passenger float,
  total_booking_value float,
  FOREIGN KEY (customer_id) REFERENCES customer_data(customer_id)
);''', 
       '''CREATE TABLE destination_data (
  destination_id varchar NOT NULL UNIQUE,
  destination varchar,
  country varchar,
  popular_season varchar
);''']

In [130]:
try:
    conn = psycopg2.connect(
        host="localhost",
        port="5432",
        database="postgres",
        user="postgres",
        password="postgres")

    cursor = conn.cursor()
    
except Exception as e:
    logging.error("Exception occurred during postgres connection", exc_info=True)
    logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")


In [132]:
def create_table(ddl, table_name, cursor,conn):
    try:
        cursor.execute('''SELECT EXISTS (
        SELECT 1
        FROM information_schema.tables
        WHERE table_name = '{}'
        ) AS table_existence;'''.format(table_name))

        if cursor.fetchone()[0]== True:
            pass
        else:
            cursor.execute(ddl)
            conn.commit()
            
    except Exception as e:
        err_str = "Exception occurred for ddl execution of table - " + table_name
        logging.error(err_str, exc_info=True)
        logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
    

for i,j in enumerate(data_list):
    create_table(ddl[i],j,cursor,conn)

In [133]:
def transform(df, files):
    df= df.fillna('')
    if files == 'booking_data':
        df.booking_date = pd.to_datetime(df.booking_date)
        df['total_booking_value'] = df.number_of_passengers * df.cost_per_passenger
        
    return df

In [134]:
#ETL - Loading to Postgres
try:
    engine = create_engine('postgresql://postgres:postgres@localhost:5432/postgres')
except Exception as e:
    err_str = "Exception occurred during create_engine"
    logging.error(err_str, exc_info=True)
    logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

In [136]:
try:
    for i in data_list:
        all_files = os.listdir(i+"/")    
        csv_files = list(filter(lambda f: f.endswith('.csv'), all_files))
        for j in csv_files:
            df= pd.read_csv(i+'/'+j)
            df = transform(df, i)
            df.to_sql(i, engine, index= False, if_exists = 'append')
            
except Exception as e:
    err_str = "Exception occurred during loading"
    logging.error(err_str, exc_info=True)
    logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
        
        

In [137]:
#Batch transfer to S3
BUCKET = 'travel-bucket'
s3 = boto3.resource('s3')
try:
    for i in data_list:
        all_files = os.listdir(i+"/")    
        csv_files = list(filter(lambda f: f.endswith('.csv'), all_files))
        for j in csv_files:
            s3.Bucket(BUCKET).upload_file(Filename= i+'/'+ j, Key= i+'/'+ j)
            
            
except Exception as e:
    err_str = "Exception occurred during S3 transfer"
    logging.error(err_str, exc_info=True)
    logging.info("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

In [147]:
'$ awslocal lambda create-function \
    --function-name trending \
    --runtime python3.9 \
    --timeout 10 \
    --zip-file fileb://lambda.zip \
    --handler lambda_function.lambda_handler \
    --role cool-stacklifter'



def lambda_handler(event, context):
    import localstack_client.session as boto3
    from io import StringIO
    
    s3 = boto3.resource('s3')
    BUCKET = 'travel-bucket'
    csv_files=[]
    s3_bucket = s3.Bucket(BUCKET)
    for obj in s3_bucket.objects.all():
        if obj.key.split('/')[0]== 'booking_data':
            csv_files.append(obj.key)
            
    
    s3_client = boto3.client('s3')
    for i, file_path in enumerate(csv_files):
        # Read the CSV file from S3
        response = s3_client.get_object(Bucket=BUCKET, Key= file_path)
        csv_content = response['Body'].read().decode('utf-8')

        # Create a Pandas DataFrame
        df_booking_data = pd.read_csv(io.StringIO(csv_content))
        
        if i==0:
            df = df_booking_data.copy()    
        else:
            df = pd.concat([df,df_booking_data])

           
    df['total_revenue_per_destination'] = df.number_of_passengers * df.cost_per_passenger
    dff = df.groupby(by='destination').agg({'booking_id':'count', 'total_revenue_per_destination': 'sum'})
    dff = dff.rename(columns={'booking_id':'total_bookings_per_destination'}).reset_index()
    
    csv_buffer = StringIO()
    dff.to_csv(csv_buffer)
    s3.Object(BUCKET, 'total_bookings_per_destination.csv').put(Body=csv_buffer.getvalue())

lambda_handler(1,2)

In [148]:
for obj in s3_bucket.objects.all():
        print(obj.key)

booking_data/booking_data.csv
customer_data/customer_data.csv
destination_data/destination_data.csv
total_bookings_per_destination.csv
x.csv
z.csv


In [83]:
ZIPNAME = "lambda.zip"


def aws_file():
    with open(ZIPNAME, 'rb') as file_data:
        bytes_content = file_data.read()
    return bytes_content

client = boto3.client("lambda")
def lambda_creator():
    lambda_client = boto3.client('lambda')
    response = lambda_client.create_function(
        Code={
            'ZipFile': aws_file()
        },
        Description='total_revenue_per_destination',
        FunctionName='total_revenue_per_destination',
        Handler='lambda_function.lambda_handler',
        Publish=True,
        Role='rds-temp-leads-stream',
        Runtime='python3.9',
    )
    return response
lambda_creator()

ClientError: An error occurred (ValidationException) when calling the CreateFunction operation: 1 validation error detected: Value 'rds-temp-leads-stream' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::\d{12}:role/?[a-zA-Z_0-9+=,.@\-_/]+

In [73]:
client.list_functions()

{'ResponseMetadata': {'RequestId': 'ac659037-f102-45af-8cac-0ccfdfb5ae70',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'content-type': 'text/plain; charset=utf-8',
   'content-length': '17',
   'x-amzn-requestid': 'ac659037-f102-45af-8cac-0ccfdfb5ae70',
   'x-amz-request-id': 'ac659037-f102-45af-8cac-0ccfdfb5ae70',
   'connection': 'close',
   'date': 'Sat, 30 Dec 2023 08:23:56 GMT',
   'server': 'hypercorn-h11'},
  'RetryAttempts': 0},
 'Functions': []}