In [24]:
import os
import findspark
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, when
from pyspark.sql import SparkSession
import boto3
import pandas as pd
import awswrangler as wr
from decimal import Decimal
import uuid

In [25]:
# import environment variables

aws_table_name = os.environ.get("aws_table_name")
aws_access_id = os.environ.get("aws_access_id")
aws_secret_id = os.environ.get("aws_secret_id")
region = os.environ.get("region")

In [26]:
print(aws_table_name)
print(aws_access_id)
print(aws_secret_id)
print(region)

toronto_real_estate
AKIA26R6ELQ5DIHLT2UX
qGB0SELLClp8Xe0NiJ6ET5cvprp9TTF5XZrMkSEG
us-east-1


In [27]:
spark = SparkSession.builder \
    .appName("RealEstateProject") \
    .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-bundle:1.11.375") \
    .getOrCreate()

In [28]:
findspark.init()

In [29]:
# now you can use the code you posted to read in the CSV file
file_location = "/home/vs/Documents/python/real_estate_project/toronto_real_estate.csv"
file_type = "csv"

infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [30]:
df.show(20)

+----------+--------------------+----------+----------+-----------+
|     price|             address|  bedrooms| bathrooms|postal_code|
+----------+--------------------+----------+----------+-----------+
|$2,599,900|      599 Spadina Rd|         4|Not listed|    M5P 2X1|
|$1,999,999|       61 Cameron St|         6|Not listed|    M5T 2H1|
|$1,750,000| #106 -4750 Yonge St|Not listed|Not listed| Not listed|
|$1,688,000|      140 Bogert Ave|         3|Not listed|    M2N 1K8|
|$1,520,000|150 Harlandale Av...|         4|Not listed|    M2N 1P4|
|$1,500,000|   176 Beechgrove Dr|         5|Not listed|    M1E 3Z5|
|$1,500,000|       65 Dewlane Dr|         4|Not listed| Not listed|
|$1,450,000|    16 Salisbury Ave|         3|         1|    M4X 1C2|
|$1,420,000|     91 Westrose Ave|         2|         1|    M8X 2A4|
|$1,399,999|        32 Vonda Ave|         4|Not listed|    M2N 5E9|
|$1,355,990|      100 Bartley Dr|         6|Not listed|    M4A 1C5|
|$1,299,900|      40 Cobham Cres|         4|Not 

In [31]:
# remove all non-numerical special characters
df = df.withColumn("price", regexp_replace("price", "[^0-9]", ""))

In [32]:
df = df.withColumn("address", regexp_replace("address", "[^a-zA-Z0-9 -]", ""))

In [33]:
# change the price column data type to integer
df = df.withColumn("price", df["price"].cast(IntegerType()))

In [34]:
df.show(20)

+-------+--------------------+----------+----------+-----------+
|  price|             address|  bedrooms| bathrooms|postal_code|
+-------+--------------------+----------+----------+-----------+
|2599900|      599 Spadina Rd|         4|Not listed|    M5P 2X1|
|1999999|       61 Cameron St|         6|Not listed|    M5T 2H1|
|1750000|  106 -4750 Yonge St|Not listed|Not listed| Not listed|
|1688000|      140 Bogert Ave|         3|Not listed|    M2N 1K8|
|1520000|150 Harlandale Av...|         4|Not listed|    M2N 1P4|
|1500000|   176 Beechgrove Dr|         5|Not listed|    M1E 3Z5|
|1500000|       65 Dewlane Dr|         4|Not listed| Not listed|
|1450000|    16 Salisbury Ave|         3|         1|    M4X 1C2|
|1420000|     91 Westrose Ave|         2|         1|    M8X 2A4|
|1399999|        32 Vonda Ave|         4|Not listed|    M2N 5E9|
|1355990|      100 Bartley Dr|         6|Not listed|    M4A 1C5|
|1299900|      40 Cobham Cres|         4|Not listed|    M4A 1V6|
|1299000|Ph07-51 East Lib

In [35]:
# Get the current column names
old_columns = df.columns

# Create a list of new column names in title case
new_columns = [col_name.title() for col_name in old_columns]

# Rename the columns using withColumnRenamed()
df = df.select([col(old_col_name).alias(new_col_name) for old_col_name, new_col_name in zip(old_columns, new_columns)])

In [36]:
df.show(20)

+-------+--------------------+----------+----------+-----------+
|  Price|             Address|  Bedrooms| Bathrooms|Postal_Code|
+-------+--------------------+----------+----------+-----------+
|2599900|      599 Spadina Rd|         4|Not listed|    M5P 2X1|
|1999999|       61 Cameron St|         6|Not listed|    M5T 2H1|
|1750000|  106 -4750 Yonge St|Not listed|Not listed| Not listed|
|1688000|      140 Bogert Ave|         3|Not listed|    M2N 1K8|
|1520000|150 Harlandale Av...|         4|Not listed|    M2N 1P4|
|1500000|   176 Beechgrove Dr|         5|Not listed|    M1E 3Z5|
|1500000|       65 Dewlane Dr|         4|Not listed| Not listed|
|1450000|    16 Salisbury Ave|         3|         1|    M4X 1C2|
|1420000|     91 Westrose Ave|         2|         1|    M8X 2A4|
|1399999|        32 Vonda Ave|         4|Not listed|    M2N 5E9|
|1355990|      100 Bartley Dr|         6|Not listed|    M4A 1C5|
|1299900|      40 Cobham Cres|         4|Not listed|    M4A 1V6|
|1299000|Ph07-51 East Lib

In [37]:
# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = df.toPandas()

In [38]:
pandas_df.dtypes

Price          float64
Address         object
Bedrooms        object
Bathrooms       object
Postal_Code     object
dtype: object

In [39]:
pandas_df = pandas_df.dropna(subset=["Address"])
pandas_df["Price"] = pandas_df["Price"].apply(lambda x: Decimal(str(x)))

In [40]:
pandas_df['ID'] = [str(uuid.uuid4()) for _ in range(len(pandas_df))]

In [43]:
print(pandas_df)

         Price                    Address    Bedrooms   Bathrooms Postal_Code  \
0    2599900.0             599 Spadina Rd           4  Not listed     M5P 2X1   
1    1999999.0              61 Cameron St           6  Not listed     M5T 2H1   
2    1750000.0         106 -4750 Yonge St  Not listed  Not listed  Not listed   
3    1688000.0             140 Bogert Ave           3  Not listed     M2N 1K8   
4    1520000.0      150 Harlandale Avenue           4  Not listed     M2N 1P4   
..         ...                        ...         ...         ...         ...   
455   735000.0           53 Templeton Crt  Not listed  Not listed     M1E 2C3   
456   729000.0           703 -87 Peter St  Not listed  Not listed     M5V 0P1   
457   709000.0           7 -38 Gibson Ave  Not listed  Not listed     M9N 0A5   
458   699000.0         45 English Ivy Way  Not listed  Not listed     M2H 3M3   
459   699000.0  1201 -2015 Sheppard Ave E  Not listed  Not listed     M2J 0B3   

                           

In [44]:
# Push data to DynamoDB
wr.dynamodb.put_df(
    df=pandas_df,
    table_name=aws_table_name,
    boto3_session=boto3.Session(
        aws_access_key_id=aws_access_id,
        aws_secret_access_key=aws_secret_id,
        region_name=region
    ),
    primary_key='ID'
)

TypeError: got an unexpected keyword argument 'primary_key'

In [47]:
# Push data to DynamoDB
wr.dynamodb.put_df(
    df=pandas_df,
    table_name=aws_table_name,
    boto3_session=boto3.Session(
        aws_access_key_id=aws_access_id,
        aws_secret_access_key=aws_secret_id,
        region_name=region
    )
)

In [0]:
aws_access_key_id = '<AWS_ACCESS_KEY_ID>'
aws_secret_access_key = '<AWS_SECRET_ACCESS_KEY>'
aws_region = '<AWS_REGION>'
table_name = '<TABLE_NAME>'
dynamodb = boto3.resource('dynamodb', aws_access_key_id=aws_access_key_id,
                          aws_secret_access_key=aws_secret_access_key, region_name=aws_region)


In [0]:
table = dynamodb.Table(table_name)

In [0]:
data = pandas_df.to_dict(orient='records')

In [0]:
# Push the data to DynamoDB
with table.batch_writer() as batch:
    for item in data:
        batch.put_item(Item=item)