In [4]:
class staging_tables():
    create_inventory_stg = """
    create table if not exists staging_inventory(
        bibnum bigint,
        title varchar(max),
        isbn varchar(max),
        publicationyear varchar(500),
        publisher varchar(500),
        itemtype varchar(500),
        itemcollection varchar(500),
        floatingitem varchar(500),
        itemlocation varchar(500),
        reportdate text,
        itemcount int,
        author varchar(500),
        subjects varchar(max))
    """

    create_loans_stg = """
    create table if not exists staging_loans(
        id text,
        checkoutyear int,
        bibnumber bigint,
        itembarcode bigint,
        itemtype text,
        collection text,
        callnumber text,
        itemtitle varchar(max),
        subjects varchar(max),
        checkoutdatetime text )
    """

    create_code_staging = """
    create table if not exists staging_codes(
        code varchar(10), 
        description text,
        code_type text,
        format_group text,
        format_subgroup varchar(50),
        category_group varchar(20),
        category_subgroup text,
        age_group varchar(10))
    """

    copy_stg_inventory = """
    copy staging_inventory
    from 's3://seattle-test/{}.csv' 
    iam_role 'arn:aws:iam::351134467134:role/redshift_access_s3'
    csv delimiter '|' ignoreheader 1
    """

    copy_stg_loans = """
    copy staging_loans
    from 's3://seattle-test/{}.csv' 
    iam_role 'arn:aws:iam::351134467134:role/redshift_access_s3'
    csv delimiter '|' ignoreheader 1
    """

    copy_stg_code = """
    copy staging_codes
    from 's3://seattle-test/{}.csv'
    iam_role 'arn:aws:iam::351134467134:role/redshift_access_s3'
    csv delimiter '|' ignoreheader 1
    """
    ##must be created from inventory: df of only 1 column, subjects, then remove duplicate, explode the string column. upload.
    create_staging_exploded_subjects = """
    create table if not exists staging_exploded_subjects(
        subjects_string varchar(max),
        subject varchar(500))
    """

    copy_stg_exploded_subjects= """
    copy staging_exploded_subjects
    from 's3://seattle-test/{}.csv'
    iam_role 'arn:aws:iam::351134467134:role/redshift_access_s3'
    csv delimiter '|' ignoreheader 1
    """

In [90]:
class dim_tables():
    create_time_dim = """
    create table if not exists dim_time(
        time_key timestamp PRIMARY KEY,
        quarter_of_year int not null,
        day_of_week int not null,
        day int not null,
        month int not null,
        year int not null)
    """
    insert_time_dim = """
    insert into dim_time(
    select
        time_key,
        extract(quarter from time_key) as quarter,
        extract(dayofweek from time_key) as day_of_week,
        extract(day from time_key) as day,
        extract(month from time_key) as month,
        extract(year from time_key) as year
    from 
        (select 
            distinct to_date(substring(checkoutdatetime,1,10), 'YYYY-MM-DD') as time_key
        from staging_loans)
    )
    """


    create_dim_collection = """
    create table if not exists dim_collections(
        code varchar(10),
        item_type varchar(50))
    """

    insert_dim_collection = """
    insert into dim_collections(
    select 
        code, 
        category_group
    from staging_codes 
    where category_group!='')
    """

    create_dim_subject = """
    create table if not exists dim_subject(
        subject varchar(500),
        subject_id int identity(1,1))
    """

    insert_dim_subject = """
    insert into dim_subject
    (select 
        distinct subject 
    from staging_exploded_subjects)
    """

    create_dim_books = """
    create table if not exists dim_books(
        bibnum bigint PRIMARY KEY,
        title varchar(max),
        publisher varchar(500),
        author varchar(500))
    """

    insert_dim_books = """
    insert into dim_books
    with grouped_table as (
        select 
            *, 
            row_number() over ( partition by bibnum order by itemcount) as num
        from staging_inventory)    
    select 
        bibnum,
        title,
        publisher,
        author
    from grouped_table
    where num=1
    """

In [20]:
class bridge_link():
    create_bridge_subjects = """
    create table if not exists bridge_subject(
        subject_group_id varchar(32),
        subject_id int,
        subjects_string varchar(max)
    )
    """
    insert_bridge_subjects="""
    insert into bridge_subject
    with subjects_cte as(
        select 
            subjects_string,
            md5(subjects_string) as hash_value
    from
        (select 
            distinct subjects_string
        from staging_exploded_subjects)
    )
    select 
        cte.hash_value as subject_group_id,
        ds.subject_id,
        cte.subjects_string
    from
        ((staging_exploded_subjects ses left outer join subjects_cte cte
        on ses.subjects_string=cte.subjects_string) 
        left outer join dim_subject ds 
        on ses.subject = ds.subject)
    """

In [76]:
class fact():
    #['id', 'time_key', 'itemtype', 'collection', 'bibnumber', 'subject_group']
    create_fact_loans = """
    create table if not exists fact_loans(
        id varchar(50) PRIMARY KEY,
        time_key timestamp,
        itemtype varchar(10),
        collection varchar(10),
        bibnumber bigint,
        subject_group_id varchar(32)
    )
    """
    insert_fact_loans_old = """
    insert into fact_loans (
    with subjects_cte as(
        select
            distinct 
                subject_group_id,
                subjects_string
        from bridge_subject)
    select 
        sl.id, 
        to_date(substring(sl.checkoutdatetime,1,10), 'YYYY-MM-DD') as time_key, 
        sl.itemtype, 
        sl.collection, 
        sl.bibnumber, 
        cte.subject_group_id as subject_group
    from staging_loans sl
    left outer join subjects_cte cte
    on (sl.subjects = cte.subjects_string)
    )
    """

    insert_fact_loans = """        
    insert into fact_loans (
    with unique_books_db as (
        select
            distinct 
                bibnum,
                subjects 
        from staging_inventory
    )
    select 
        sl.id, 
        to_date(substring(sl.checkoutdatetime,1,10), 'YYYY-MM-DD') as time_key, 
        sl.itemtype, 
        sl.collection, 
        sl.bibnumber, 
        md5(udb.subjects)
    from staging_loans sl left outer join 
    unique_books_db udb on sl.bibnumber = udb.bibnum
    )
    """

In [1]:
def tidy_split_new(df, column, new_column, sep='|', keep=False):
    """
    Split the values of a column and expand so the new DataFrame has one split
    value per row. Filters rows where the column is missing.
    Modified from https://stackoverflow.com/questions/12680754/split-explode-pandas-dataframe-string-entry-to-separate-rows
    Params
    ------
    df         : pandas.DataFrame
               dataframe with the column to split and expand
    column     : str
               the column to split and expand
    new_column : str
                name of the new column containing the split values
    sep        : str
               the string used to split the column's values
    keep       : bool
               whether to retain the presplit value as it's own row

    Returns
    -------
    pandas.DataFrame
        Returns a dataframe with the same columns as `df`.
    """
    indexes = list()
    new_values = list()
    df = df.dropna(subset=[column])
    for i, presplit in enumerate(df[column].astype(str)):
        values = presplit.split(sep)
        if keep and len(values) > 1:
            indexes.append(i)
            new_values.append(presplit)
        for value in values:
            indexes.append(i)
            new_values.append(value)
    new_df = df.iloc[indexes, :].copy()
    new_df[new_column] = new_values
    return new_df


In [2]:
import json
import pandas as pd
from sodapy import Socrata
from IPython.core.display import display, HTML
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
display(HTML("<style>.container { width:100% !important; }</style>"))
pd.set_option('display.max_colwidth', None)

with open('credentials.json') as f:
    data = json.load(f)
    socrates_id = data['socrates_id']
    socrates_key = data['socrates_api_key']
    app_token = data['app_token']
    
inventory_db = "6vkj-f5xf"
loans_db = "5src-czff"
codes_db = "pbt3-ytbc"
client = Socrata("data.seattle.gov", app_token, username=socrates_id,password=socrates_key, timeout=100)

# code_response = client.get(codes_db, limit=50000, select="code, description,code_type,format_group,format_subgroup,category_group,category_subgroup,age_group")
code_response = client.get(codes_db)
codes_df = pd.DataFrame.from_records(code_response)
codes_df=codes_df[["code","description","code_type","format_group","format_subgroup","category_group","category_subgroup","age_group"]]
codes_df.to_csv('codes_db.csv', index=False, sep="|")
print("codes downloaded")

count_inventory_response = client.get(inventory_db, select="count(*)", where = 'reportdate > "2020-04-29T00:00:00.000"')
target_count = int(count_inventory_response[0]['count'])

counter=0
keep_records=[]
limit=50000
while counter<target_count:
    inventory_results = client.get(inventory_db, limit=limit, offset=counter, where='reportdate > "2020-04-29T00:00:00.000"')
    temp = pd.DataFrame.from_records(inventory_results)
    temp = temp[["bibnum","title","isbn", "publicationyear", "publisher", "itemtype", "itemcollection", "floatingitem", "itemlocation", "reportdate", "itemcount", "author", "subjects"]]    
    keep_records.append(temp)
    counter+=limit    
inventory_df = pd.concat(keep_records)
inventory_df.to_csv('may_inventory.csv', index=False, sep="|")
print("inventory downloaded")

subjects = inventory_df['subjects'].unique()
subjects_df = pd.DataFrame({'subjects':subjects})
subjects_df = tidy_split_new(subjects_df, 'subjects', 'subject', sep=',', keep=False)
subjects_df.to_csv("subjects_exploded.csv", sep='|', index=False)
print("subjects downloaded")

# loans_results = client.get(loans_db,select="id, checkoutyear,bibnumber, itembarcode, itemtype,collection, callnumber, itemtitle, subjects, checkoutdatetime", where = 'checkoutdatetime > "2020-04-30T23:59:00.00"')
loans_results = client.get(loans_db, where = 'checkoutdatetime > "2020-04-30T23:59:00.00"')
loans_df = pd.DataFrame.from_records(loans_results)
loans_df = loans_df[["id", "checkoutyear","bibnumber", "itembarcode", "itemtype","collection", "callnumber", "itemtitle", "subjects", "checkoutdatetime"]]
loans_df.to_csv("may_loans.csv", sep='|', index=False)
print("loans downloaded")

codes downloaded
inventory downloaded
subjects downloaded
loans downloaded


In [7]:
import psycopg2
conn=psycopg2.connect(custom_string) #something like dbname = dev host=cluster-x2.cyg5lpc6yu7l.us-west-2.redshift.amazonaws.com, port= 5439, user=awsuser password= XXXX
conn.autocommit = True
cur = conn.cursor()


In [9]:
for string in [staging_tables.create_inventory_stg, staging_tables.create_loans_stg, staging_tables.create_code_staging , staging_tables.create_staging_exploded_subjects]:
    cur.execute(string)
print("all_staging_tables created")

all_staging_tables created


In [10]:
for string, file in zip([staging_tables.copy_stg_inventory, staging_tables.copy_stg_loans, staging_tables.copy_stg_code , staging_tables.copy_stg_exploded_subjects], \
                       ['may_inventory','may_loans','codes_db','subjects_exploded']):
    print(file,' is being processed')
    cur.execute(string.format(file))

may_inventory  is being processed
may_loans  is being processed
codes_db  is being processed
subjects_exploded  is being processed


In [88]:
for string in [dim_tables.create_time_dim, dim_tables.create_dim_collection, dim_tables.create_dim_subject , dim_tables.create_dim_books]:
    cur.execute(string)
print("all dim_tables created")

all dim_tables created


In [91]:
for ind, string in enumerate([dim_tables.insert_time_dim, dim_tables.insert_dim_collection, dim_tables.insert_dim_subject , dim_tables.insert_dim_books]):
    print(ind, ' being executed')
    cur.execute(string)
print("all dim_tables inserted")

0  being executed
1  being executed
2  being executed
3  being executed
all dim_tables inserted


In [19]:
#create bridge table
cur.execute(bridge_link.create_bridge_subjects)

In [24]:
cur.execute(bridge_link.insert_bridge_subjects)

In [78]:
cur.execute(fact.create_fact_loans)

In [79]:
cur.execute(fact.insert_fact_loans)

In [93]:
import json
import pandas as pd
from sodapy import Socrata
from IPython.core.display import display, HTML
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
display(HTML("<style>.container { width:100% !important; }</style>"))
pd.set_option('display.max_colwidth', None)

with open('credentials.json') as f:
    data = json.load(f)
    socrates_id = data['socrates_id']
    socrates_key = data['socrates_api_key']
    app_token = data['app_token']
    
inventory_db = "6vkj-f5xf"
loans_db = "5src-czff"
codes_db = "pbt3-ytbc"
client = Socrata("data.seattle.gov", app_token, username=socrates_id,password=socrates_key, timeout=100)

count_inventory_response = client.get(inventory_db, select="count(*)", where = 'reportdate > "2018-04-29T00:00:00.000" and reportdate < "2018-05-31T23:59:00.000"')
target_count = int(count_inventory_response[0]['count'])
print('there are {} records'.format(target_count))
counter=0
keep_records=[]
limit=50000
while counter<target_count:
    inventory_results = client.get(inventory_db, limit=limit, offset=counter, where='reportdate > "2018-04-29T00:00:00.000" and reportdate < "2018-05-31T23:59:00.000"')
    temp = pd.DataFrame.from_records(inventory_results)
    temp = temp[["bibnum","title","isbn", "publicationyear", "publisher", "itemtype", "itemcollection", "floatingitem", "itemlocation", "reportdate", "itemcount", "author", "subjects"]]    
    keep_records.append(temp)
    counter+=limit    
inventory_df = pd.concat(keep_records)
inventory_df.to_csv('may2018_inventory.csv', index=False, sep="|")
print("inventory downloaded")

there are 1345588 records
inventory downloaded


In [98]:
may = pd.read_csv('may_inventory.csv', sep= '|')
old = pd.read_csv('may2018_inventory.csv', sep= '|')
may = may[['bibnum','subjects','title','author']]
may.columns=['may_bibnum','may_subjects','may_title','may_author']
old = old[['bibnum','subjects','title','author']]
new = may.merge(old, left_on='may_bibnum', right_on='bibnum', how='right')
new[new['may_bibnum'].isnull()]

Unnamed: 0,may_bibnum,may_subjects,may_title,may_author,bibnum,subjects,title,author
5713821,,,,,2714256,Popular music 1961 1970,"Bob Dylan in concert : Brandeis University, 1963.","Dylan, Bob, 1941-"
5713822,,,,,1885336,Large type books,Hot poppies.,"Nadelson, Reggie"
5713823,,,,,725171,Readers Juvenile literature,"Fish and fables / contributors, Dolores R. Amato ... [and others].",
5713824,,,,,2277927,"Popular music 2001 2010, Rhythm and blues music",Walkin' in the shadow of life / the Neville Brothers.,Neville Brothers
5713825,,,,,2461452,"New Age music, Music for meditation, Hindu chants, Songs Sanskrit",Om deeksha / mixed and compiled by Maneesh de Moor.,
...,...,...,...,...,...,...,...,...
5803907,,,,,1886161,"Women United States Psychology, Love United States, Man woman relationships United States",Women & love / Mira Kirshenbaum.,"Kirshenbaum, Mira"
5803908,,,,,2929953,"Celts Fiction, Women veterinarians Fiction, Love stories, Fantasy fiction",Storm warrior / Dani Harper.,"Harper, Dani"
5803909,,,,,2551676,"Diet United States, Diet France, Food crops United States, Food crops France, Food Psychological aspects, Food habits United States, Food habits France, Gastronomy",The taste of place : a cultural journey into terroir / Amy B. Trubek.,"Trubek, Amy B."
5803910,,,,,1712433,"Dietrich Marlene, Entertainers Germany Biography",Nehmt nur mein Leben-- : Reflexionen / Marlene Dietrich.,"Dietrich, Marlene"


In [101]:
new[(new['may_title']!=new['title']) & (new['may_bibnum'].notnull())]

Unnamed: 0,may_bibnum,may_subjects,may_title,may_author,bibnum,subjects,title,author
3977,2996404.0,"Inheritance and succession Drama, Highlands Scotland Drama, Television series, Fiction television programs, Video recordings for the hearing impaired",Monarch of the Glen. Series 4 / an Ecosse Films production for BBC Scotland ; produced by Stephen Garwood ; written by Mark Holloway ... [and others] ; directed by Rick Stroud .. [and others]..,,2996404,"Inheritance and succession Drama, Highlands Scotland Drama, Television series, Fiction television programs, Video recordings for the hearing impaired",Monarch of the glen. Series 4 / an Ecosse Films production for BBC Scotland ; produced by Stephen Garwood ; written by Mark Holloway ... [and others] ; directed by Rick Stroud .. [and others]..,
3978,2996404.0,"Inheritance and succession Drama, Highlands Scotland Drama, Television series, Fiction television programs, Video recordings for the hearing impaired",Monarch of the Glen. Series 4 / an Ecosse Films production for BBC Scotland ; produced by Stephen Garwood ; written by Mark Holloway ... [and others] ; directed by Rick Stroud .. [and others]..,,2996404,"Inheritance and succession Drama, Highlands Scotland Drama, Television series, Fiction television programs, Video recordings for the hearing impaired",Monarch of the glen. Series 4 / an Ecosse Films production for BBC Scotland ; produced by Stephen Garwood ; written by Mark Holloway ... [and others] ; directed by Rick Stroud .. [and others]..,
6201,2597998.0,"Man woman relationships Drama, Sex Religious aspects Drama, Tantric Buddhism Drama, Coming of age films, Feature films, Fiction films, Motion pictures Spanish","No mires para abajo = Don't look down / Pensa & Rocca Cine con el apoyo de Instituto Nacional de Cine y Artes Audiovisuales y la participación de Fonds Sud Cinéma, CNC ; en coproducción con Charivari Films ; en asociación con Orgon Films ; producción ejecutiva, Daniel Pensa, Miguel Angel Rocca ; guión y dirección, Eliseo Subiela.",,2597998,"Man woman relationships Drama, Sex Religious aspects Drama, Tantric Buddhism Drama, Coming of age films, Feature films, Fiction films, Motion pictures Spanish","No mires para abajo : Don't look down / Pensa & Rocca Cine con el apoyo de Instituto Nacional de Cine y Artes Audiovisuales y la participación de Fonds Sud Cinéma, CNC ; en coproducción con Charivari Films ; en asociación con Orgon Films ; producción ejecutiva, Daniel Pensa, Miguel Angel Rocca ; guión y dirección, Eliseo Subiela.",
6202,2597998.0,"Man woman relationships Drama, Sex Religious aspects Drama, Tantric Buddhism Drama, Coming of age films, Feature films, Fiction films, Motion pictures Spanish","No mires para abajo = Don't look down / Pensa & Rocca Cine con el apoyo de Instituto Nacional de Cine y Artes Audiovisuales y la participación de Fonds Sud Cinéma, CNC ; en coproducción con Charivari Films ; en asociación con Orgon Films ; producción ejecutiva, Daniel Pensa, Miguel Angel Rocca ; guión y dirección, Eliseo Subiela.",,2597998,"Man woman relationships Drama, Sex Religious aspects Drama, Tantric Buddhism Drama, Coming of age films, Feature films, Fiction films, Motion pictures Spanish","No mires para abajo : Don't look down / Pensa & Rocca Cine con el apoyo de Instituto Nacional de Cine y Artes Audiovisuales y la participación de Fonds Sud Cinéma, CNC ; en coproducción con Charivari Films ; en asociación con Orgon Films ; producción ejecutiva, Daniel Pensa, Miguel Angel Rocca ; guión y dirección, Eliseo Subiela.",
6203,2597998.0,"Man woman relationships Drama, Sex Religious aspects Drama, Tantric Buddhism Drama, Coming of age films, Feature films, Fiction films, Motion pictures Spanish","No mires para abajo = Don't look down / Pensa & Rocca Cine con el apoyo de Instituto Nacional de Cine y Artes Audiovisuales y la participación de Fonds Sud Cinéma, CNC ; en coproducción con Charivari Films ; en asociación con Orgon Films ; producción ejecutiva, Daniel Pensa, Miguel Angel Rocca ; guión y dirección, Eliseo Subiela.",,2597998,"Man woman relationships Drama, Sex Religious aspects Drama, Tantric Buddhism Drama, Coming of age films, Feature films, Fiction films, Motion pictures Spanish","No mires para abajo : Don't look down / Pensa & Rocca Cine con el apoyo de Instituto Nacional de Cine y Artes Audiovisuales y la participación de Fonds Sud Cinéma, CNC ; en coproducción con Charivari Films ; en asociación con Orgon Films ; producción ejecutiva, Daniel Pensa, Miguel Angel Rocca ; guión y dirección, Eliseo Subiela.",
...,...,...,...,...,...,...,...,...
5713631,1944933.0,"Married people Employment United States Statistics Periodicals, Statistics","Employment characteristics of families / United States Department of Labor, Bureau of Labor Statistics.",,1944933,Married people Employment United States Statistics Periodicals,"News. Employment characteristics of families / United States Department of Labor, Bureau of Labor Statistics.",
5713644,440670.0,,,,440670,,,
5713665,440811.0,,,,440811,,,
5713738,442899.0,,,,442899,,,
