In [2]:
%connections redshift_database

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Connections to be included:
redshift_database


In [10]:
import re
import pandas as pd
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql import SparkSession



In [None]:
# Initialize Spark and Glue context
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Load data from S3 using Glue catalog
datasource = glueContext.create_dynamic_frame.from_catalog(database="oscars-data", table_name="bronze")

# Convert Glue DynamicFrame to Spark DataFrame
spark_df = datasource.toDF()

# Convert Spark DataFrame to Pandas DataFrame for processing
df = spark_df.toPandas()

# Define the clean_budget function
def clean_budget(budget):
    if isinstance(budget, (int, float)):
        return str(budget)
    budget = re.sub(r'\[.*?\]|\(.*?\)', '', budget).strip()
    if '–' in budget or '-' in budget:
        budget = '0'
    if budget.startswith('$'):
        budget = budget.replace('$', 'US$', 1)
    budget = budget.replace('USD$', 'US$')
    budget = re.sub(r'(\d)\.(\d{3})\.(\d{3})', r'\1,\2,\3', budget)
    match = re.match(r'(US\$|£|€)\s*([\d\.]+)\s*million', budget, re.IGNORECASE)
    if match:
        currency = match.group(1)
        amount = float(match.group(2))
        budget = f'{currency} {amount * 1_000_000:,.0f}'
    budget = budget.replace(' ', '')
    return budget

# Apply the clean_budget function to the budget column
df['budget'] = df['budget'].apply(clean_budget)

In [None]:
# Static exchange rates
exchange_rates = {
    'GBP': 1.38,  # Example rate for GBP to USD
    'EUR': 1.18   # Example rate for EUR to USD
}

# Function to convert currencies to USD using static rates
def convert_to_usd(budget):
    budget = str(budget)
    if budget == '0':
        return 0
    match = re.match(r'(US\$|£|₤|€)\s*([\d,\.]+)', budget)
    if match:
        currency = match.group(1)
        amount = float(match.group(2).replace(',', ''))
        if currency in ['£', '₤']:
            amount_in_usd = amount * exchange_rates['GBP']
        elif currency == '€':
            amount_in_usd = amount * exchange_rates['EUR']
        else:
            amount_in_usd = amount
        return int(amount_in_usd)
    return 0

# Apply the conversion function to create a new column
df['budget_in_usd'] = df['budget'].apply(convert_to_usd)

# Ensure correct data types
df['film'] = df['film'].astype(str)
df['wiki_url'] = df['wiki_url'].astype(str)
df['budget'] = df['budget'].astype(str)
df['year'] = pd.to_numeric(df['year'], errors='coerce').fillna(0).astype(int)
df['oscar_winner'] = df['oscar_winner'].astype(str)
df['budget_in_usd'] = pd.to_numeric(df['budget_in_usd'], errors='coerce').fillna(0).astype(int)

# Reorder columns to match the desired schema
df = df[['film', 'year', 'wiki_url', 'oscar_winner', 'budget', 'budget_in_usd']]

In [12]:
# Convert Pandas DataFrame back to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Convert Spark DataFrame to Glue DynamicFrame
transformed_dynamic_frame = DynamicFrame.fromDF(spark_df, glueContext, "transformed_dynamic_frame")

# Define connection options for Redshift
my_conn_options = {
    "dbtable": "dev_oscars_data",
    "database": "dev"
}

# Write the DynamicFrame to Redshift
try:
    redshift_results = glueContext.write_dynamic_frame.from_jdbc_conf(
        frame = transformed_dynamic_frame,
        catalog_connection = "redshift_database",
        connection_options = my_conn_options,
        redshift_tmp_dir = "s3://yipitdata-bucket/redshift_temp/"
    )
    print("Data written to Redshift successfully")
except Exception as e:
    print(f"Error writing data to Redshift: {e}")

Data written to Redshift successfully
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():
