### Importing Modules

In [3]:
import requests
import zipfile
#from zipfile import ZipFile
from urllib.request import urlretrieve


The ETL process
* download zip file
* extract the zipped csv file
* apply transformations to the csv file
* load the new csv data into a postgresql database
* generate insights for the shareholders

Downloading a ZIP file

In [4]:
url_path= 'https://assets.datacamp.com/production/repositories/5899/datasets/19d6cf619d6a771314f0eb489262a31f89c424c2/ppr-all.zip'

# Get the zip file
response = requests.get(url_path)

# Print the status code
print(response.status_code)

# Save the file locally (more about open() in the next lesson)
local_path = f"./ppr-all.zip"
with open(local_path, "wb") as f:
    f.write(response.content)


200


In [None]:
#another way to get the zip file from the url
urlretrieve(url_path,'_ppr-all.zip')

Exploring a ZIP file

In [9]:
# Import the required method
from zipfile import ZipFile

with ZipFile(local_path, mode="r") as f:
  	# Get the list of files and print it
    file_names = f.namelist()
    print("List of files: ", file_names)
    # Extract the CSV file
    csv_file_path = f.extract(file_names[0], path='./')
    print("Extract Path: " , csv_file_path)

List of files:  ['ppr-all.csv']
Extract Path:  ppr-all.csv


Reading from a CSV file

In [3]:
import csv
from pprint import pprint

path= './ppr-all.csv'

# Open the csv file in read mode
with open(path, mode="r", encoding="windows-1252") as csv_file:
    # Open csv_file so that each row is a dictionary
    reader = csv.DictReader(csv_file)
    
    # Print the first row
    row = next(reader)
    print(type(row))
    pprint(row)
    pprint(row['Address'])

<class 'dict'>
{'Address': '16 BURLEIGH COURT, BURLINGTON ROAD, DUBLIN 4',
 'County': 'Dublin',
 'Date of Sale (dd/mm/yyyy)': '03/01/2021',
 'Description of Property': 'Second-Hand Dwelling house /Apartment',
 'Postal Code': 'Dublin 4',
 'Price (€)': '€450,000.00'}
'16 BURLEIGH COURT, BURLINGTON ROAD, DUBLIN 4'


Writing to CSV

In [48]:
import csv

new_column_names = {
    "Date of Sale (dd/mm/yyyy)": "date_of_sale",
    "Address": "address",
    "Postal Code": "postal_code",
    "County": "county",
    "Price (€)": "price",
    "Description of Property": "description",
}

with open(path, mode="r", encoding="windows-1252") as reader_csv_file:
    reader = csv.DictReader(reader_csv_file)
    

    # The new file is called "PPR-2021-Dublin-new-headers.csv"
    # and will be saved inside the "tmp" folder    
    with open("./PPR-2021-Dublin-new-headers.csv",
                    mode="w",
                    encoding="windows-1252",
                ) as writer_csv_file:
        writer = csv.DictWriter(writer_csv_file, fieldnames=new_column_names)
        
        # Write header as first line
        writer.writerow(new_column_names) #could also use writer.writeheader()
        for row in reader:
	        # Write all rows in file
	        writer.writerow(row)



In [50]:
#now can see the result
with open('./PPR-2021-Dublin-new-headers.csv', mode="r", encoding="windows-1252") as reader_csv_file :
    reader = csv.DictReader(reader_csv_file)
    pprint(list(reader)[0].keys())
    #row = next(reader)
    #pprint(row)


dict_keys(['date_of_sale', 'address', 'postal_code', 'county', 'price', 'description'])


Engines and sessions

In [None]:
# Import the function needed
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

# Create the engine
engine = create_engine("postgresql+psycopg2://dcstudent:S3cretPassw0rd@localhost:5432/campdata-prod")

# Create the session
session = Session(engine)

Transformations

In [None]:
# Import the submodule required
from datetime import datetime

def transform_case(input_string):
    """
    Lowercase string fields
    """
    # Return the string lowercase
    return input_string.lower()
  
def update_date_of_sale(date_input):
    """
    Update date format from DD/MM/YYYY to YYYY-MM-DD
    """
    # Create a datetime object
    current_format = datetime.strptime(date_input, "%d/%m/%Y")
    # Convert to the expected date format
    new_format = current_format.strftime("%Y-%m-%d")
    return new_format
    

def update_price(price_input):
    """
    Returns price as an integer by removing:
    - "€" and "," symbol
    - Converting to float first then to integer
    """
    # Replace € with an empty string
    price_input = price_input.replace("€", "")
    # Replace comma with an empty string
    price_input = price_input.replace(",", "")
    # Convert to float
    price_input = float(price_input)
    # Return price_input as integer
    return int(price_input)
  
def update_description(description_input):
    """
    Simplifies the description field for future analysis. Returns:
    - "new" if string contains "new" substring
    - "second-hand" if string contains "second-hand" substring
    """
    description_input = transform_case(description_input)
    # Check description and return "new" or "second-hand"
    if "new" in description_input:
        return "new"
    elif "second-hand" in description_input:
        return "second-hand"
    return description_input

Querying

In [None]:
# Query the session to get row with id equal to 2
results = session.query(PprRawAll).filter(PprRawAll.id == 2)

# Get the corresponding address
print("Address:", results[0].address)

Insert

In [None]:
# Import the function required
from sqlalchemy.dialects.postgresql import insert
  
values = [{"date_of_sale": "2021-01-01",
           "address": "14 bow street",
           "postal_code": "dublin 7",
           "county": "dublin",
           "price": 350000,
           "description":"second-hand"}]

# Insert values in PprCleanAll
stmt = insert(PprCleanAll).values(values)

# Execute and commit
session.execute(stmt)
session.commit()

delete

In [None]:
# Import the function required
from sqlalchemy import delete

# Delete rows lacking a description value
stmt = delete(PprCleanAll).filter(PprCleanAll.description=="")
#Could also delete filtered rows at the end of a query ie.
#session.query(Movies).filter(Movies.title== "movie title").delete()

# Execute and commit
session.execute(stmt)
session.commit()

Insert operations

In [None]:
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import insert

# Select the transaction ids
clean_transaction_ids = session.query(PprCleanAll.transaction_id)

# Select the columns and cast the appropriate types if needed
transactions_to_insert = session.query(
    cast(PprRawAll.date_of_sale, Date),
    PprRawAll.address,
    PprRawAll.postal_code,
    PprRawAll.county,
    cast(PprRawAll.price, Integer),
    PprRawAll.description,
  # Filter for the new rows
).filter(~PprRawAll.transaction_id.in_(clean_transaction_ids))

# Print total number of transactions to insert
# it should be 3154 if the transactions need to be inserted
# 0, if all transactions have been inserted
print("Transactions to insert:", transactions_to_insert.count())

# Insert the rows from the previously selected transactions
columns = ["date_of_sale", "address", "postal_code",
          "county", "price","description"]
stm = insert(PprCleanAll).from_select(columns, transactions_to_insert)

# Execute and commit the statement to make changes in the database.
session.execute(stm)
session.commit()

Delete operation

In [None]:
# Import the delete module
from sqlalchemy import delete

# Get all the ppr_raw_all transaction ids
raw_transaction_ids = session.query(PprRawAll.transaction_id)

# Filter all the ppr_clean_all table transactions that are not present in the ppr_raw_all table
transactions_to_delete = session.query(PprCleanAll).filter(~PprCleanAll.transaction_id.in_(raw_transaction_ids))

# Print transactions to delete
print("Transactions to delete:", transactions_to_delete.count())

# Delete the selected transactions
# (Please note: the param "synchronize_session=False" has been inserted
# to avoid inconsistent results if a session expires)
transactions_to_delete.delete(synchronize_session=False)

# Commit the session to make the changes in the database
session.commit()

operators and queries

* in / not in:  in_, ~in_
* basic comparison: ==, !=, >, >= , <, <=
* identity: is_, is_not
* string_comparison: like notlike
* conjuctions and negations: and_, or_ 

Use:
* and_(expression1, expression2, ..., expressionN)
* (expression1 & expression2 & ... & expressionN)

In [None]:
# Import the operator you need
from sqlalchemy import or_

# Query the clean table to retrieve the total number of
# transactions for the Dublin or Cork counties
result = session.query(PprCleanAll) \
                .filter(or_(PprCleanAll.county == "dublin", PprCleanAll.county == "cork")) \
                .all()

print("First row address:", result[0].address)

In [None]:
# Import the and function needed
from sqlalchemy import and_

# Retrieve all sales transactions for January 2021
result = session.query(PprCleanAll).filter(and_(PprCleanAll.date_of_sale >= "2021-01-01", PprCleanAll.date_of_sale <= "2021-01-31")).all()

print("First row address:", result[0].address)

aggregate functions and group by

In [None]:
# Import the submodule required
from sqlalchemy import func

# Get the maximum, minimum and average values for each product category
result = session.query(Products.category,
                       func.max(Products.price),
                       func.min(Products.price),
                       func.avg(Products.price)) \
                .group_by(Products.category).all()

print("Result:", result)

Creating the insights view and writing pure sql queries with python

In [None]:
from common.base import session

# Create the view with the appropriate metrics
query = """
create or replace view insights AS
SELECT county,
       count(*) AS sales_count,
       sum(CAST(price AS int)) AS sales_total,
       max(CAST(price AS int)) AS sales_max,
       min(CAST(price AS int)) AS sales_min,
       avg(CAST(price AS int))::numeric(10,2) AS sales_avg
FROM ppr_clean_all
GROUP BY county
"""

# Execute and commit
session.execute(query) #it is possible to execute raw SQL as an argument for session.execute()
session.commit()

In [None]:
session.execute("SELECT COUNT(*) FROM insights").all()

Working with excel files

adding data

In [None]:
import xlsxwriter

workbook = xlsxwriter.Workbook("Greetings.xlsx")
worksheet = workbook.add_worksheet()
worksheet.write(0, 0,
"Hello Datacamp")
workbook.close()

adding a table

In [None]:
import xlsxwriter
workbook = xlsxwriter.Workbook("Books.xlsx")
worksheet = workbook.add_worksheet()

data = [[1,"Sapiens"],
        [2,"Greenlights"]]

worksheet.add_table(
    "B3:E6",{"data": data,
    "columns": [
    {"header": "id"},
    {"header": "name"}
    ]})

workbook.close()

example adding data

In [None]:
# Import the library needed
import xlsxwriter

# Create a new Excel file
workbook = xlsxwriter.Workbook("insights.xlsx")

# Initialize a worksheet
worksheet = workbook.add_worksheet()

# Write the data in the current sheet
data = ["Hello", "Datacamp"]
worksheet.write(0, 0, data[0])
worksheet.write(0, 1, data[1])

# Close the file
workbook.close()

example adding tables

In [None]:
import xlsxwriter

workbook = xlsxwriter.Workbook("Products.xlsx")
worksheet = workbook.add_worksheet()
    
# Create a table with the available data in the current sheet
worksheet.add_table(
    "B3:E8",
    {
        "data": data,
        "columns": [
          	# Use the appropriate names for the columns
            {"header": "id"},
            {"header": "category"},
            {"header": "name"},
            {"header": "price"},
        ],
    },
)

# Close the current file
workbook.close()