In [3]:
pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (2.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m695.7 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10
Note: you may need to restart the kernel to use updated packages.


In [5]:
# imports
import os
from datetime import datetime, timedelta
import itertools
import json
import requests

import psycopg2
from psycopg2.extras import execute_batch
import pandas as pd

In [36]:
# Parsing the configration file
def parse_configs(config_file_path):
    """
    Parses the configuration file and retrieves API credentials.

    Args:
        config_file_path (str): Path to the configuration file (JSON format).

    Returns:
        tuple: A tuple containing the API key, API secret, and API endpoint.
    """

    with open(config_file_path) as config_file:
        configs = json.load(config_file)

    # retrieve API credientials
    NYT_BOOKS_API_KEY = configs["API"]["NYT_BOOKS_API_KEY"]
    NYT_BOOKS_API_SECRET = configs["API"]["NYT_BOOKS_API_SECRET"]
    NYT_BOOKS_API_ENDPOINT = configs["API"]["NYT_API_ENDPOINT"]

    return NYT_BOOKS_API_KEY, NYT_BOOKS_API_SECRET, NYT_BOOKS_API_ENDPOINT


def generate_incremental_dates(start_date, end_date, offset):
    """
    Generates a list of dates starting from `start_date`, incremented by `offset`, 
    until the `end_date` is reached.

    Args:
        start_date (str): The starting date in the format "YYYY-MM-DD".
        end_date (str): The ending date in the format "YYYY-MM-DD".
        offset (int): The number of days to increment for each date in the range.

    Returns:
        list: A list of `datetime` objects representing the dates from `start_date`
              to `end_date`, incremented by `offset` days.
    """

    # format dates
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    # init date variable
    date = start_date

    # Init dates list
    dates = []

    # loop to get all required dates in specified range
    while date <= end_date:
        dates.append(date)

        date += timedelta(days=offset)

    return dates


# Define a function to fetch data from API
def fetch_data_from_api(NYT_BOOKS_API_KEY, DATE):
    """
    Fetches book data from the New York Times Books API for a specific date and 
    saves the response as a JSON file in the `raw_data` folder.

    Args:
        NYT_BOOKS_API_KEY (str): The API key for accessing the New York Times Books API.
        DATE (str): The date for which to fetch book data, in the format "YYYY-MM-DD".

    Returns:
        None: This function does not return any value. It saves the data to a file.
    """

    # connect to API and pull data
    URL = f"https://api.nytimes.com/svc/books/v3/lists/overview.json?published_date={DATE}&api-key={NYT_BOOKS_API_KEY}"
    response = requests.get(URL)

    if response.status_code == 200:
        # Save the data to a JSON file in the raw_data folder
        os.makedirs("./raw_data", exist_ok=True)

        with open(f"./raw_data/{DATE}.json", "w") as raw_file:
            json.dump(response.json(), raw_file)
    else:
        raise Exception(f"Failed to fetch data from API. Status code: {response.status_code}")


In [10]:
# get API credientials
NYT_BOOKS_API_KEY, NYT_BOOKS_API_SECRET, NYT_BOOKS_API_ENDPOINT = parse_configs('../config.json')

print(NYT_BOOKS_API_KEY, NYT_BOOKS_API_SECRET, NYT_BOOKS_API_ENDPOINT)

uXunYAlGg7Bp446vfkb2dU2iMCTGvZXo 2f6wNv3eXA3GYWM9 https://api.nytimes.com/svc/books/v3/lists/overview.json


In [20]:
START_DATE = '2020-12-27'
END_DATE = '2023-12-31'
OFFSET = 7

dates = generate_incremental_dates(START_DATE, END_DATE, OFFSET)

In [15]:
# for date in dates:
#     fetch_data_from_api(NYT_BOOKS_API_KEY, date)

In [10]:
# Function to load the transformed data into the DWH
def init_db_connection(host, database, user, password, port):
    # Connect to PostgreSQL (replace with your own connection details)
    conn = psycopg2.connect(
        host=host,
        dbname=database,
        user=user,
        password=password,
        port=port
    )
    cursor = conn.cursor()
    
    return conn, cursor


In [11]:
conn, cursor = init_db_connection("postgres-dev", "mydb", "admin", "admin", "5432")

In [58]:
cursor.execute("""
WITH RECURSIVE date_series AS (
    SELECT '2010-01-01'::DATE AS Date
    UNION ALL
    SELECT (Date + INTERVAL '1 day')::DATE as Date
    FROM date_series
    WHERE Date < '2030-12-31'
)
INSERT INTO dwh.DimDate
    (
        DateKey,
        FullDate,
        DayOfMonth,
        DayName,
        DayOfWeek,
        DayOfYear,
        WeekOfYear,
        Month,
        MonthName,
        Quarter,
        Year,
        MonthYear
    )
SELECT 
    EXTRACT(YEAR FROM Date) * 10000 + EXTRACT(MONTH FROM Date) * 100 + EXTRACT(DAY FROM Date) AS DateKey,
    Date AS FullDate,
    TO_CHAR(Date, 'DD') AS DayOfMonth,
    TO_CHAR(Date, 'FMDay') AS DayName,
    EXTRACT(DOW FROM Date) + 1 AS DayOfWeek,  -- Day of the week starting from Sunday = 1
    EXTRACT(DOY FROM Date) AS DayOfYear,
    TO_CHAR(Date, 'IW') AS WeekOfYear,
    TO_CHAR(Date, 'MM') AS Month,
    TO_CHAR(Date, 'FMMonth') AS MonthName,
    CASE 
        WHEN EXTRACT(MONTH FROM Date) BETWEEN 1 AND 3 THEN '1'
        WHEN EXTRACT(MONTH FROM Date) BETWEEN 4 AND 6 THEN '2'
        WHEN EXTRACT(MONTH FROM Date) BETWEEN 7 AND 9 THEN '3'
        WHEN EXTRACT(MONTH FROM Date) BETWEEN 10 AND 12 THEN '4'
    END AS Quarter,
    TO_CHAR(Date, 'YYYY') AS Year,
    TO_CHAR(Date, 'YYYYMM') AS MonthYear
FROM date_series
;

           """)

In [52]:
cursor.execute("""CREATE TABLE dwh.DimDate (
  DateKey integer,
  FullDate date PRIMARY KEY,
  DayOfMonth varchar,
  DayName varchar,
  DayOfWeek varchar,
  DayOfYear varchar,
  WeekOfYear varchar,
  Month varchar,
  MonthName varchar,
  Quarter varchar,
  Year varchar,
  MonthYear varchar
);
           """)

In [59]:
conn.commit()

In [57]:
conn.rollback()

In [50]:
cursor.execute(
            "INSERT INTO test (num, data) VALUES (%s, %s)",
            (100, "abc'def"))

<psycopg.Cursor [COMMAND_OK] [INTRANS] (host=postgres-dev user=admin database=mydb) at 0xffff901deeb0>

In [62]:
# cursor.execute("SELECT * FROM dwh.DimDate")
# cursor.fetchone()

In [63]:
# for record in cursor:
#     print(record)

In [15]:
def transform_data(data):
    # init lists to store each table data
    lists_data = []
    books_data = []
    buy_links_data = []
    fact_best_sellers_data = []
    date_data = []

    # DimLists
    for list_entry in data['results']['lists']:
        list_dict = {
            'id': list_entry['list_id'],
            'list_name': list_entry['list_name'],
            'list_name_encoded': list_entry['list_name_encoded'],
            'display_name': list_entry['display_name'],
            'updated': datetime.strptime(data['results']['bestsellers_date'], "%Y-%m-%d").date(),
            'list_image': list_entry.get('list_image', ''),
            'list_image_width': list_entry.get('list_image_width', None),
            'list_image_height': list_entry.get('list_image_height', None),
        }
        lists_data.append(list_dict)

        # DimBooks
        for book in list_entry['books']:
            book_id = book['primary_isbn13']
            book_dict = {
                'id': book_id,
                'title': book['title'],
                'publisher': book['publisher'],
                'author': book['author'],
                'contributor': book['contributor'],
                'contributor_note': book['contributor_note'],
                'description': book['description'],
                'created_date': datetime.strptime(book['created_date'], "%Y-%m-%d %H:%M:%S").date(),
                'updated_date': datetime.strptime(book['updated_date'], "%Y-%m-%d %H:%M:%S").date(),
                'age_group': book['age_group'],
                'amazon_product_url': book['amazon_product_url'],
                'primary_isbn13': book['primary_isbn13'],
                'primary_isbn10': book['primary_isbn10'],
                'book_image_width': book['book_image_width'],
                'book_image_height': book['book_image_height'],
                'first_chapter_link': book['first_chapter_link'],
                'book_uri': book['book_uri'],
                'sunday_review_link': book['sunday_review_link']
            }
            books_data.append(book_dict)

            # DimBooksBuyLinks
            for buy_link in book['buy_links']:
                buy_link_dict = {
                    'book_id': book_id,
                    'website_name': buy_link['name'],
                    'website_url': buy_link['url']
                }
                buy_links_data.append(buy_link_dict)

            # Fct_best_sellers_publish
            fct_best_seller_dict = {
                'bestsellers_date': datetime.strptime(data['results']['bestsellers_date'], "%Y-%m-%d").date(),
                'published_date': datetime.strptime(data['results']['published_date'], "%Y-%m-%d").date(),
                'previous_published_date': datetime.strptime(data['results']['previous_published_date'], "%Y-%m-%d").date(),
                'next_published_date': datetime.strptime(data['results']['next_published_date'], "%Y-%m-%d").date(),
                'list_id': list_entry['list_id'],
                'book_id': book_id,
                'rank': book['rank'],
                'weeks_on_list': book['weeks_on_list'],
                'price': book['price']
            }
            fact_best_sellers_data.append(fct_best_seller_dict)


    # Convert to DataFrames
    df_lists = pd.DataFrame(lists_data)
    df_books = pd.DataFrame(books_data)
    df_buy_links = pd.DataFrame(buy_links_data)
    df_best_sellers = pd.DataFrame(fact_best_sellers_data)

    return df_lists, df_books, df_buy_links, df_best_sellers


# Function to load the transformed data into the DWH
def load_data(cursor, df_lists, df_books, df_buy_links, df_best_sellers):
    # Insert data into DimLists
    execute_batch(cursor, """
        INSERT INTO stage.lists (id, list_name, list_name_encoded, display_name, updated, list_image, list_image_width, list_image_height)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """, df_lists.values.tolist())

    # Insert data into DimBooks
    execute_batch(cursor, """
        INSERT INTO stage.books (id, title, publisher, author, contributor, contributor_note, description, created_date, updated_date, age_group, amazon_product_url, primary_isbn13, primary_isbn10, book_image_width, book_image_height, first_chapter_link, book_uri, sunday_review_link)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, df_books.values.tolist())

    # Insert data into DimBooksBuyLinks
    execute_batch(cursor, """
        INSERT INTO stage.books_buy_links (book_id, website_name, website_url)
        VALUES (%s, %s, %s)
    """, df_buy_links.values.tolist())

    # Insert data into Fct_best_sellers_publish
    execute_batch(cursor, """
        INSERT INTO stage.best_sellers_puplish (bestsellers_date, published_date, previous_published_date, next_published_date, list_id, book_id, rank, weeks_on_list, price)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, df_best_sellers.values.tolist())

    # Commit and close
    conn.commit()
    cursor.close()
    conn.close()




with open('../raw_data/2021-01-03.json', 'r') as json_file:
    data = json.load(json_file)
    
# print(data)

df_lists, df_books, df_buy_links, df_best_sellers = transform_data(data)
load_data(cursor, df_lists, df_books, df_buy_links, df_best_sellers)

UndefinedTable: relation "stage.lists" does not exist
LINE 2:         INSERT INTO stage.lists (id, list_name, list_name_en...
                            ^


In [16]:
conn.rollback()