# Process Reviews Data

In [1]:
## Lets import our requirements
import boto3
import pandas as pd
import psycopg2
import logging
import io
import gzip

import os
from dotenv import load_dotenv

# # load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

pd.set_option('display.max_columns', 500)

## 1. Initialize Boto3 client

In [2]:
# boto3 will initialize connection using environment variables
s3 = boto3.resource('s3')

2024-08-08 23:10:01,334 - INFO - Found credentials in environment variables.


In [3]:
def list_files_in_folder(bucket_name, folder_prefix):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)

    # List objects within the specified bucket and folder
    obj_list = []
    for obj in bucket.objects.filter(Prefix=folder_prefix):
        # Check if the object is a file (not a folder)
        if not obj.key.endswith('/'):
            obj_list.append(obj.key)
    return obj_list


bucket_name = 'airbnb-capstone-project'
folder_prefix = 'raw/reviews/'  # Specify the folder prefix
items = list_files_in_folder(bucket_name, folder_prefix)
items

['raw/reviews/albany-reviews.csv.gz',
 'raw/reviews/chicago-reviews.csv.gz',
 'raw/reviews/los-angeles-reviews.csv.gz',
 'raw/reviews/new-york-city-reviews.csv.gz',
 'raw/reviews/san-francisco-reviews.csv.gz',
 'raw/reviews/seattle-reviews.csv.gz',
 'raw/reviews/washington-dc-reviews.csv.gz']

#### 2.1 Function to read data

In [None]:
def read_review_data(file_path):
    s3_base_url = f"s3://{bucket_name}/"

    # dates to be parsed
    date_columns = ['date']

    df = pd.read_csv(s3_base_url + file_path, compression='gzip', parse_dates=date_columns)
    return df

#### 2.2 Perform data quality checks

In [None]:
def clean_review_data(df_original, market_name):
    # create copy to avoid modifying the original dataframe
    df = df_original.copy()

    # Add market name column
    logging.info('Adding market name column')
    df['market'] = market_name

    # Change column name
    logging.info('Converting column name')
    df.rename(columns={'date':'review_date'}, inplace=True)

    # Drop rows with empty review comments
    logging.info('Removing rows with empty review comments')
    df = df.dropna(subset=['comments'])
    
    # Drop reviewer_name column
    logging.info('Dropping reviewer_name column')
    df = df.drop(columns=['reviewer_name'])

    # Check for duplicates
    logging.info('Checking for duplicates')
    duplicate_rows = df.duplicated().sum()
    logging.info(f'Found {duplicate_rows} duplicate rows')
    df = df.drop_duplicates()

    # Ensure review_date is a datetime type
    logging.info('Ensuring review_date is a datetime type and converting to date only')
    df['review_date'] = pd.to_datetime(df['review_date']).dt.date

    logging.info('Data cleaning completed for market: %s', market_name)
    return df

#### 2.3 Function to save to parqauet and upload to S3

In [None]:
def upload_cleaned_data_to_s3(df, bucket_name, market_name):
    print('Upload starting...')
    s3 = boto3.client('s3')

    # Convert DataFrame to Parquet in memory
    parquet_buffer = io.BytesIO()
    df.to_parquet(parquet_buffer, index=False, engine='pyarrow')

    # Seek to the beginning of the buffer
    parquet_buffer.seek(0)

    # Construct the S3 key
    s3_key = f"processed/reviews/{market_name}-reviews_processed.parquet"

    # Upload the Parquet file to S3
    s3.put_object(Bucket=bucket_name, Key=s3_key, Body=parquet_buffer.getvalue())
    print(f"File uploaded to S3 bucket '{bucket_name}' with key '{s3_key}'")

#### 2.4 Main script to process files

In [None]:
def process_files(bucket_name, folder_prefix):
    items = list_files_in_folder(bucket_name, folder_prefix)

    for object in items:
        # Extract market name from the file name
        file_name = object.split('/')[-1]
        market_name = '-'.join(file_name.split('-')[:-1])
        
        # Download the file from S3
        logging.info(f'Downloading file: {object}')
        df_calendar = read_review_data(object) # needs to be changed in utils script
        
        # Clean the data
        df_calendar_cleaned = clean_review_data(df_calendar, market_name)
        
        # Upload the cleaned data to S3
        upload_cleaned_data_to_s3(df_calendar_cleaned, bucket_name, market_name)

## 3. Run the processing pipeline

In [None]:
bucket_name = 'airbnb-capstone-project'
folder_prefix = 'raw/reviews/'

list_files_in_folder(bucket_name, folder_prefix)

In [None]:
bucket_name = 'airbnb-capstone-project'
input_folder_path = 'raw/reviews/'


process_files(bucket_name, input_folder_path)