In [None]:
# Import the required library
import requests

# Get the zip file
response = requests.get(path)

# Print the status code
print(response.status_code)

# Save the file locally (more about open() in the next lesson)
local_path = f"tmp/data/source/downloaded_at=2021-02-01/PPR-ALL.zip"
with open(local_path, "wb") as f:
    f.write(response.content)

In [None]:
# Import the required method
from zipfile import ZipFile

with ZipFile(path, mode="r") as f:
# Get the list of files and print it
    file_names = f.namelist()
    print(file_names)

In [None]:
# Import the required method
from zipfile import ZipFile

with ZipFile(path, "r") as f:
    # Get the list of files
    file_names = f.namelist()
    print(file_names)
    # Extract the CSV file
    csv_file_path = f.extract(file_names[0])
    print(csv_file_path)

In [None]:
import csv
from pprint import pprint

# Open the csv file in read mode
with open(path, mode="r", encoding="windows-1252") as csv_file:
    # Open csv_file so that each row is a dictionary
    reader = csv.DictReader(csv_file)
    
    # Print the first row
    row = next(reader)
    print(type(row))
    pprint(row)

In [None]:
fieldnames = {
    "Date of Sale (dd/mm/yyyy)": "date_of_sale",
    "Address": "address",
    "Postal Code": "postal_code",
    "County": "county",
    "Price (€)": "price",
    "Description of Property": "description",
}

In [None]:
import csv

with open(path, mode="r", encoding="windows-1252") as reader_csv_file:
    reader = csv.DictReader(reader_csv_file)
    # The new file is called "PPR-2021-Dublin-new-headers.csv"
    # and will be saved inside the "tmp" folder    
    with open("/tmp/PPR-2021-Dublin-new-headers.csv",
                    mode="w",
                    encoding="windows-1252",
                ) as writer_csv_file:
        writer = csv.DictWriter(writer_csv_file, fieldnames=new_column_names)
        # Write header as first line
        writer.writeheader()
        for row in reader:
            # Write all rows in file
            writer.writerow(row)

# SQLAlchemy Database

# Engines and sessions

As you know by now, engines and sessions are key components enabling SQLAlchemy to interact with a database.
So let's create an engine and bind it to a session.

Remember:
* postgresql is the dialect
* psycopg2 is the connector
* you're working on a local server, localhost

In [None]:
# Import the function needed
from sqlalchemy import create_engine

# Create the engine
engine = create_engine("postgresql+psycopg2://dcstudent:S3cretPassw0rd@localhost:5432/campdata-prod")

In [None]:
# Import the function needed
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

# Create the engine
engine = create_engine("postgresql+psycopg2://usr:pwd@localhost:5432/mydatabase")

# Create the session
session = Session(engine)

In [None]:
# Import the objects needed
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer

# Initialize the base and set inheritance
Base = declarative_base()

class PprRawAll(Base):
    # Set the table name
    __tablename__ = "ppr_raw_all"
    # Create a primary key integer column id
    id = Column(Integer, primary_key=True)

In [None]:
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class PprRawAll(Base):
    __tablename__ = "ppr_raw_all"
    
    id = Column(Integer, primary_key=True)
    date_of_sale = Column(String(55))
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(String(55))
    description = Column(String(55))

## . Insert & Delete Operation

In [None]:
# Import the function required
from sqlalchemy.dialects.postgresql import insert
  
values = [{"date_of_sale": "2021-01-01",
           "address": "14 bow street",
           "postal_code": "dublin 7",
           "county": "dublin",
           "price": 350000,
           "description":"second-hand"}]

# Insert values in PprCleanAll
stmt = insert(PprCleanAll).values(values)

# Execute and commit
session.execute(stmt)
session.commit()

# Insert and delete

Let's now focus on our load phase. It relies heavily on two fundamental operations:
* inserting rows in a table
* deleting rows from a table

In [None]:
# Import the function required
from sqlalchemy import delete

# Delete rows lacking a description value
stmt = delete(PprCleanAll).filter(PprCleanAll.description=="")

# Execute and commit
session.execute(stmt)
session.commit()

### Insert

In [None]:
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import insert

# Select the transaction ids
clean_transaction_ids = session.query(PprCleanAll.transaction_id)

# Select the columns and cast the appropriate types if needed
transactions_to_insert = session.query(
    cast(PprRawAll.date_of_sale, Date),
    PprRawAll.address,
    PprRawAll.postal_code,
    PprRawAll.county,
    cast(PprRawAll.price, Integer),
    PprRawAll.description,
  # Filter for the new rows
).filter(~PprRawAll.transaction_id.in_(clean_transaction_ids))

# Print total number of transactions to insert
# it should be 3154 if the transactions need to be inserted
# 0, if all transactions have been inserted
print("Transactions to insert:", transactions_to_insert.count())

# Insert the rows from the previously selected transactions
columns = ["date_of_sale", "address", "postal_code",
          "county", "price","description"]
stm = insert(PprCleanAll).from_select(columns, transactions_to_insert)

# Execute and commit the statement to make changes in the database.
session.execute(stm)
session.commit()

### Delete

In [None]:
# Import the delete module
from sqlalchemy import delete

# Get all the ppr_raw_all transaction ids
raw_transaction_ids = session.query(PprRawAll.transaction_id)

# Filter all the ppr_clean_all table transactions that are not present in the ppr_raw_all table
transactions_to_delete = session.query(PprCleanAll).filter(~PprCleanAll.transaction_id.in_(raw_transaction_ids))

# Print transactions to delete
print("Transactions to delete:", transactions_to_delete.count())

# Delete the selected transactions
# (Please note: the param "synchronize_session=False" has been inserted
# to avoid inconsistent results if a session expires)
transactions_to_delete.delete(synchronize_session=False)

# Commit the session to make the changes in the database
session.commit()

# Operators

## . or_() Operator

In [None]:
# Import the operator you need
from sqlalchemy import or_

# Query the clean table to retrieve the total number of
# transactions for the Dublin or Cork counties
result = session.query(PprCleanAll) \
                .filter(or_(PprCleanAll.county == "dublin", PprCleanAll.county == "cork")) \
                .all()

print("First row address:", result[0].address)

## . and_() Operator

In [None]:
# Import the and function needed
from sqlalchemy import and_

# Retrieve all sales transactions for January 2021
result = session.query(PprCleanAll).filter(and_).all()

print("First row address:", result[0].address)

# Aggregate Function

Aggregate functions perform calculation on rows

1. COUNT --> func.count()
2. SUM --> func.sum()
3. MAX --> func.max()
4. MIN --> func.min()
5. AVG --> func.avg()

In [None]:
# Import the submodule required
from sqlalchemy import func

# Get the maximum, minimum and average values for each product category
result = session.query(Products.category,
                       func.max(Products.price),
                       func.min(Products.price),
                       func.avg(Products.price)) \
                .group_by(Products.category).all()

print("Result:", result)

# Insights

### Creating the insights view with pure SQL

In [None]:
from common.base import session

# Create the view with the appropriate metrics
query = """
CREATE VIEW insights AS
SELECT county,
       COUNT(*) AS sales_count,
       SUM(CAST(price AS int)) AS sales_total,
       MAX(CAST(price AS int)) AS sales_max,
       MIN(CAST(price AS int)) AS sales_min,
       AVG(CAST(price AS int))::numeric(10,2) AS sales_avg
FROM ppr_clean_all
GROUP BY county
"""

# Execute and commit
session.execute(query)
session.commit()

## Excel File : xlsxwiter Library

### Create Workbook

In [None]:
# Import the library needed
import xlsxwriter

# Create a new Excel file
workbook = xlsxwriter.Workbook("insights.xlsx")

# Initialize a worksheet
worksheet = workbook.add_worksheet()

# Write the data in the current sheet
data = ["Hello", "Datacamp"]
worksheet.write(0, 0, data[0])
worksheet.write(0, 1, data[1])

# Close the file
workbook.close()

### Add a table into Excel file

In [None]:
import xlsxwriter

workbook = xlsxwriter.Workbook("Products.xlsx")
worksheet = workbook.add_worksheet()
    
# Create a table with the available data in the current sheet
worksheet.add_table(
    "B3:E8",
    {
        "data": data,
        "columns": [
          	# Use the appropriate names for the columns
            {"header": "id"},
            {"header": "category"},
            {"header": "name"},
            {"header": "price"},
        ],
    },
)

# Close the current file
workbook.close()