In [213]:
# talk about techs used - duckdb for a local dummy database
# columnar
#
import duckdb
import timeit
import pandas as pd

In [265]:
### DDL to instantiate database ###

# create initial database connection (and instantiate database if it doesn't exist already) 
con = duckdb.connect('dim-model.db')

# create two separate schemas
con.sql("""CREATE SCHEMA IF NOT EXISTS snow;
           CREATE SCHEMA IF NOT EXISTS star;""")

In [215]:
# create a table using the initial data - auto create columns based on information
sql = """
CREATE OR REPLACE TABLE raw_companies AS
SELECT *
FROM read_csv_auto('SorensonworkfirmFIVEdata.csv')
"""
con.sql(sql)

In [55]:
# verify initial data types
sql = """
DESCRIBE raw_companies;
"""

initial_cols_dict = {col[0]:col[1] for col in con.sql(sql).fetchall()}

In [56]:
initial_cols_dict

{'YEAR': 'BIGINT',
 'FIRMNAME': 'VARCHAR',
 'FIVEFIRMID': 'BIGINT',
 'FIRMID': 'BIGINT',
 'CUSIPHeaderName': 'VARCHAR',
 'CUSIPHISTORYID': 'VARCHAR',
 'CUSIPHistoryName': 'VARCHAR',
 'FOUND': 'BIGINT',
 'FATE': 'VARCHAR',
 'ESTATE': 'BIGINT',
 'SALES': 'DOUBLE',
 'EMPLOY': 'DOUBLE',
 'ZIP': 'VARCHAR',
 'LAT': 'DOUBLE',
 'LON': 'DOUBLE',
 'RANDD': 'BIGINT',
 'PRODUCTS': 'BIGINT',
 'PTYPES': 'BIGINT',
 'SYSTEM': 'BIGINT',
 'GRAPHIC': 'BIGINT',
 'CEO': 'VARCHAR',
 'OWN': 'BIGINT',
 'CPU': 'BIGINT',
 'OS': 'BIGINT',
 'APPS': 'BIGINT',
 'COMM': 'BIGINT',
 'MONITOR': 'BIGINT',
 'DISK': 'BIGINT',
 'MEMORY': 'BIGINT',
 'BOARD': 'BIGINT',
 'CUSIPHEADERID': 'VARCHAR'}

## A snowflake data model

Steps of Kimball dimensional modeling:

1. Select the business process
2. Declare the grain
3. Identify the dimensions
4. Identify the facts

![](snow.png)

There's a step before this:

0. Know thy data

In [82]:
# check lat/long/zip data columns
con.sql('SELECT DISTINCT lat, lon, zip FROM raw_companies')

┌────────────────────┬─────────────────────┬───────┐
│        LAT         │         LON         │  ZIP  │
│       double       │       double        │ int64 │
├────────────────────┼─────────────────────┼───────┤
│ 0.7058759927749634 │   -1.39546799659729 │ 15213 │
│  0.742671012878418 │ -1.2435790300369263 │  1821 │
│ 0.7440609931945801 │  -1.244994044303894 │  1851 │
│  0.574072003364563 │ -2.0455870628356934 │ 92121 │
│ 0.7424650192260742 │  -1.247694969177246 │  1460 │
│ 0.7433350086212158 │ -1.2468310594558716 │  1886 │
│ 0.5903890132904053 │ -2.0649309158325195 │ 90501 │
│  0.597449004650116 │ -2.0777499675750732 │ 93010 │
│ 0.5310789942741394 │ -1.4706029891967773 │ 32301 │
│ 0.7433980107307434 │ -1.2454240322113037 │  1824 │
│          ·         │          ·          │    ·  │
│          ·         │          ·          │    ·  │
│          ·         │          ·          │    ·  │
│ 0.5964959859848022 │ -2.0702381134033203 │ 91367 │
│ 0.7462400197982788 │ -1.2473299503326416 │  

In [197]:
### INITIAL DATA CLEANSING ###

# change column zip to type varchar
sql = """
ALTER TABLE raw_companies ALTER COLUMN zip SET DATA TYPE VARCHAR;
"""

# ensure leading 0's present in zip
sql += """

UPDATE raw_companies
SET zip = RIGHT('0' || zip, 5);

""" 

# convert lat/long variables to degrees from radians
sql += f"""

UPDATE raw_companies
SET lat = DEGREES(lat),
    lon = DEGREES(lon);
    
"""

con.sql(sql)

In [84]:
# display the results of cleansing
con.sql('SELECT DISTINCT lat, lon, zip FROM raw_companies')

┌────────────────────┬─────────────────────┬─────────┐
│        LAT         │         LON         │   ZIP   │
│       double       │       double        │ varchar │
├────────────────────┼─────────────────────┼─────────┤
│ 40.443715245612395 │  -79.95442665060105 │ 15213   │
│  42.55191460463936 │  -71.25182991208851 │ 01821   │
│ 42.631554610361704 │  -71.33290425753655 │ 01851   │
│ 32.891902929409454 │ -117.20350532704757 │ 92121   │
│  42.54011203775357 │  -71.48765585356153 │ 01460   │
│  42.58995875831633 │  -71.43815747264645 │ 01886   │
│  33.82679873243329 │ -118.31182646328719 │ 90501   │
│  34.23130644074354 │ -119.04630402549535 │ 93010   │
│ 30.428584959960602 │  -84.25934462029834 │ 32301   │
│  42.59356851329268 │   -71.3575407498728 │ 01824   │
│          ·         │           ·         │   ·     │
│          ·         │           ·         │   ·     │
│          ·         │           ·         │   ·     │
│  42.36467179873263 │  -71.10417488261118 │ 02139   │
│ 41.08256

In [140]:
# set schema
# templated python - DRY
schema = 'snow'

In [256]:
# create compustat dimension
# MD5 hash as keys vs ordered int
# surrogate vs. natural keys
# primary keys (collisions)
sql = f"""


CREATE OR REPLACE TABLE {schema}.compustat (
    cusip_id VARCHAR,
    cusip_name VARCHAR,
    cusip_historical_id VARCHAR,
    cusip_historical_name VARCHAR,
    PRIMARY KEY (cusip_id)
);

"""

sql += f"""

INSERT INTO {schema}.compustat (cusip_id, cusip_name, cusip_historical_id, cusip_historical_name)
WITH cid AS (
    SELECT DISTINCT
        CUSIPHEADERID AS cusip_id,
        CUSIPHEADERNAME AS cusip_name,
        CUSIPHISTORYID AS cusip_historical_id,
        CUSIPHISTORYNAME AS cusip_historical_name
    FROM
        raw_companies
)
SELECT
    MD5(COALESCE(cusip_id || cusip_name || cusip_historical_id || cusip_historical_name, '-1')) AS cusip_id, --1 added for nulls
    cusip_name,
    cusip_historical_id,
    cusip_historical_name
FROM
    cid;

"""

con.sql(sql)

In [146]:
# create ceo dimension
# naming conventions
# joining on strings vs. others
sql = f"""

CREATE OR REPLACE TABLE {schema}.ceos (
    ceo_id VARCHAR,
    ceo_name VARCHAR,
    PRIMARY KEY (ceo_id)
);

"""

sql += f"""

INSERT INTO {schema}.ceos (ceo_id, ceo_name)

SELECT DISTINCT
    MD5(COALESCE(ceo, '-1')) AS ceo_id,
    ceo AS ceo_name
FROM
    raw_companies;
"""

con.sql(sql)

In [248]:
# create locations dimension
# type 2 dimensions - change slow
# insert only
# primary keys
sql = f"""

CREATE OR REPLACE TABLE {schema}.locations (
    location_id VARCHAR,
    zipcode VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE,
    PRIMARY KEY (location_id)
);

"""

sql += f"""

INSERT INTO {schema}.locations (location_id, zipcode, latitude, longitude)

SELECT DISTINCT
    MD5(COALESCE(zip || CAST(lat AS VARCHAR) || CAST(lon AS VARCHAR), '-1')),
    zip,
    lat,
    lon
FROM
    raw_companies;
"""

con.sql(sql)

In [216]:
# create companies dimension
# normalization benefits

sql = f"""

CREATE OR REPLACE TABLE {schema}.companies (
    company_id VARCHAR,
    company_name VARCHAR,
    cusip_id VARCHAR,
    ceo_id VARCHAR,
    founding_year INTEGER,
    ownership_type VARCHAR,
    exit_type VARCHAR,
    exit_comment VARCHAR,
    location_id VARCHAR,
    PRIMARY KEY (company_id),
    
);

"""

con.sql(sql)

sql += f"""

INSERT INTO {schema}.companies (company_id, company_name, cusip_id, ceo_id, founding_year, ownership_type, exit_type, exit_comment, location_id)

SELECT DISTINCT
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,
    firmname,
    MD5(COALESCE(CUSIPHEADERID || CUSIPHEADERNAME || CUSIPHISTORYID || CUSIPHISTORYNAME, '-1')),
    MD5(COALESCE(ceo, '-1')),
    found,
    CASE own
        WHEN 0 THEN 'Private'
        WHEN 1 THEN 'Public'
        WHEN 2 THEN 'Subsidiary'
        ELSE NULL END,
    CASE estate
        WHEN 0 THEN 'Censored'
        WHEN 1 THEN 'Exited market'
        WHEN 2 THEN 'Acquired'
        WHEN 3 THEN 'Spun off'
        WHEN 4 THEN 'Changed Name'
        ELSE NULL END,
    fate,
     MD5(COALESCE(zip || CAST(lat AS VARCHAR) || CAST(lon AS VARCHAR), '-1'))
    
FROM
    raw_companies
    ;
"""

con.sql(sql)

In [164]:
# create product_offerings dimension
# what is a fact vs. a dimension
sql = f"""

CREATE OR REPLACE TABLE {schema}.product_offerings (
    product_offering_id VARCHAR,
    product_types_offered VARCHAR,
    PRIMARY KEY (product_offering_id)
);

"""

sql += f"""

INSERT INTO {schema}.product_offerings (product_offering_id, product_types_offered)

WITH po AS (
    SELECT
        CASE
            WHEN system = 1 AND graphic = 0 THEN 'Desktop systems'
            WHEN system = 0 AND graphic = 1 THEN 'Graphics systems'
            WHEN system = 1 AND graphic = 1 THEN 'Desktop + Graphics systems'
            ELSE NULL END as offering
    from raw_companies
)
SELECT DISTINCT
    MD5(COALESCE(offering, '-1')),
    offering    
FROM
    po;
"""

con.sql(sql)

In [238]:
# create integration dimension
# type 4 dimension - change fast
sql = f"""

CREATE OR REPLACE TABLE {schema}.integrations (
    integration_id VARCHAR,
    cpu_source VARCHAR,
    os_source VARCHAR,
    application_source VARCHAR,
    communications_hardware_source VARCHAR,
    disk_source VARCHAR,
    ram_source VARCHAR,
    motherboard_source VARCHAR,
    PRIMARY KEY (integration_id)
);

"""

sql += f"""

INSERT INTO {schema}.integrations
(
    integration_id, 
    cpu_source, 
    os_source, 
    application_source, 
    communications_hardware_source, 
    disk_source, 
    ram_source, 
    motherboard_source
)

SELECT DISTINCT
    MD5(
        CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
    ), 
    CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END,
    CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
FROM
    raw_companies;
;

"""

con.sql(sql)

In [221]:
### create years table by dynamically assigning values ###

# integer values as key which helps with partitioning.
# date table's grain goes, other date information

sql = """
CREATE OR REPLACE TABLE years AS

WITH RECURSIVE years AS (
    SELECT 1970 as year
    UNION ALL
    SELECT year + 1
    FROM years
    WHERE year < 2000
)
SELECT 
    year,
    CASE 
        WHEN MOD(year - 1970, 12) = 0 THEN 'Dog'
        WHEN MOD(year - 1970, 12) = 1 THEN 'Pig'
        WHEN MOD(year - 1970, 12) = 2 THEN 'Rat'
        WHEN MOD(year - 1970, 12) = 3 THEN 'Ox'
        WHEN MOD(year - 1970, 12) = 4 THEN 'Tiger'
        WHEN MOD(year - 1970, 12) = 5 THEN 'Rabbit'
        WHEN MOD(year - 1970, 12) = 6 THEN 'Dragon'
        WHEN MOD(year - 1970, 12) = 7 THEN 'Snake'
        WHEN MOD(year - 1970, 12) = 8 THEN 'Horse'
        WHEN MOD(year - 1970, 12) = 9 THEN 'Sheep'
        WHEN MOD(year - 1970, 12) = 10 THEN 'Monkey'
        WHEN MOD(year - 1970, 12) = 11 THEN 'Rooster'
    END as zodiac_type
FROM years;
"""

con.sql(sql)

### making the fact table

In [229]:
# create workstation_sales table
# additive facts vs. non-additive

sql = f"""

CREATE OR REPLACE TABLE {schema}.workstation_sales (
    year_id INTEGER,
    company_id VARCHAR,
    integration_id VARCHAR,
    product_offering_id VARCHAR,
    sales DOUBLE,
    research_budget DOUBLE,
    employee_count DOUBLE,
    product_offering_count INTEGER,
    product_category_count INTEGER,
    
);

"""

con.sql(sql)

sql += f"""

INSERT INTO {schema}.workstation_sales 
   (year_id,
    company_id ,
    integration_id ,
    product_offering_id ,
    sales ,
    research_budget ,
    employee_count ,
    product_offering_count ,
    product_category_count
    )

SELECT
    year,
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,

    MD5(
        CASE WHEN cpu = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN os = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN apps = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN comm = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN disk = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN memory = 0 THEN 'Bought' ELSE 'Produced' END || 
        CASE WHEN board = 0 THEN 'Bought' ELSE 'Produced' END
    ),
    
    MD5(COALESCE(
        CASE
            WHEN system = 1 AND graphic = 0 THEN 'Desktop systems'
            WHEN system = 0 AND graphic = 1 THEN 'Graphics systems'
            WHEN system = 1 AND graphic = 1 THEN 'Desktop + Graphics systems'
            ELSE NULL END,
        '-1'
    )),
    
    sales,
    randd,
    employ,
    products,
    ptypes
    
    
FROM
    raw_companies
    ;
"""

con.sql(sql)

In [258]:
con.sql('select count(*) from workstation_snowflake.workstation_sales a join workstation_snowflake.companies b on a.company_id = b.company_id')

con.sql('select count(*) from workstation_snowflake.workstation_sales a join workstation_snowflake.integrations b on a.integration_id = b.integration_id')

con.sql('select count(*) from workstation_snowflake.companies') # 388

con.sql("select 'locations', count(*) from workstation_snowflake.companies c join workstation_snowflake.locations l on c.location_id = l.location_id")

con.sql("select 'locations', count(*) from workstation_snowflake.companies c join workstation_snowflake.compustat s on c.cusip_id = s.cusip_id")

# con.sql("select 'locations', count(*) from workstation_snowflake.companies c join workstation_snowflake.ceos s on c.ceo_id = s.ceo_id")

# con.sql('select company_name, c.location_id, l.location_id from workstation_snowflake.companies c left join workstation_snowflake.locations l on c.location_id = l.location_id')

# con.sql('select count(*) from raw_companies')

┌─────────────┬──────────────┐
│ 'locations' │ count_star() │
│   varchar   │    int64     │
├─────────────┼──────────────┤
│ locations   │          388 │
└─────────────┴──────────────┘

## A star data model

Steps of Kimball dimensional modeling:

1. Select the business process
2. Declare the grain
3. Identify the dimensions
4. Identify the facts

![](star.png)

In [259]:
# update schema
schema = 'star'

In [268]:
# make new companies attribute which contains
# pros/cons of denormalizing to star
sql = f"""

CREATE OR REPLACE TABLE {schema}.companies (
    company_id VARCHAR,
    company_name VARCHAR,
    cusip_id VARCHAR,
    cusip_name VARCHAR,
    cusip_historical_id VARCHAR,
    cusip_historical_name VARCHAR,
    ceo_name VARCHAR,
    founding_year INTEGER,
    ownership_type VARCHAR,
    exit_type VARCHAR,
    exit_comment VARCHAR,
    zipcode VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE,
    PRIMARY KEY (company_id),
    
);

"""

con.sql(sql)

sql += f"""

INSERT INTO {schema}.companies (
    company_id ,
    company_name ,
    cusip_id ,
    cusip_name ,
    cusip_historical_id ,
    cusip_historical_name ,
    ceo_name ,
    founding_year ,
    ownership_type ,
    exit_type ,
    exit_comment ,
    zipcode ,
    latitude ,
    longitude )

SELECT DISTINCT
    MD5(COALESCE(   COALESCE(firmname, '-1') ||
                    COALESCE(CAST(CUSIPHEADERID AS VARCHAR),'-1')||
                    COALESCE(CAST(firmid AS VARCHAR),'-1')||
                    COALESCE(CAST(found AS VARCHAR),'-1') ||
                    COALESCE(CAST(own AS VARCHAR),'-1') ||
                    COALESCE(CAST(fate AS VARCHAR),'-1') ||
                    COALESCE(CAST(estate AS VARCHAR),'-1') ||
                    COALESCE(ceo,'-1') ||
                    COALESCE(zip,'-1')
                    , '-1')) as id,
    firmname,
    CUSIPHEADERID,
    CUSIPHeaderName,
    CUSIPHISTORYID,
    CUSIPHistoryName,
    ceo,
    found,
    CASE own
        WHEN 0 THEN 'Private'
        WHEN 1 THEN 'Public'
        WHEN 2 THEN 'Subsidiary'
        ELSE NULL END,
    CASE estate
        WHEN 0 THEN 'Censored'
        WHEN 1 THEN 'Exited market'
        WHEN 2 THEN 'Acquired'
        WHEN 3 THEN 'Spun off'
        WHEN 4 THEN 'Changed Name'
        ELSE NULL END,
    fate,
    ZIP,
    lat,
    lon
    
FROM
    raw_companies
    ;
"""

con.sql(sql)

In [269]:
con.sql('select count(*) from workstation_snowflake.workstation_sales a join star.companies b on a.company_id = b.company_id')

┌──────────────┐
│ count_star() │
│    int64     │
├──────────────┤
│          682 │
└──────────────┘

### Some query testing

In [279]:
con.sql("""
select zipcode, sum(sales) 
from workstation_snowflake.workstation_sales a 
join workstation_snowflake.companies c on a.company_id=c.company_id
join workstation_snowflake.locations l on c.location_id=l.location_id
group by 1 order by 2 desc""")

┌─────────┬────────────────┐
│ zipcode │   sum(sales)   │
│ varchar │     double     │
├─────────┼────────────────┤
│ 10504   │ 598003998720.0 │
│ 45479   │ 154701496704.0 │
│ 94304   │ 141994998272.0 │
│ 1754    │ 132260221440.0 │
│ 95054   │ 121301996099.0 │
│ 14650   │  95705997312.0 │
│ 1821    │  81184972152.0 │
│ 95134   │  67506182944.0 │
│ 7660    │  48000000000.0 │
│ 95008   │  36615437056.0 │
│  ·      │             ·  │
│  ·      │             ·  │
│  ·      │             ·  │
│ 3060    │            1.0 │
│ 90260   │            1.0 │
│ 41048   │            1.0 │
│ 27709   │           NULL │
│ 92803   │           NULL │
│ 97124   │           NULL │
│ 94063   │           NULL │
│ 94577   │           NULL │
│ 77094   │           NULL │
│ 19380   │           NULL │
├─────────┴────────────────┤
│   156 rows (20 shown)    │
└──────────────────────────┘

In [280]:
con.sql("""
select zipcode, sum(sales) 
from workstation_snowflake.workstation_sales a 
join star.companies c on a.company_id=c.company_id
group by 1 order by 2 desc""")

┌─────────┬────────────────┐
│ zipcode │   sum(sales)   │
│ varchar │     double     │
├─────────┼────────────────┤
│ 10504   │ 598003998720.0 │
│ 45479   │ 154701496704.0 │
│ 94304   │ 141994998272.0 │
│ 1754    │ 132260221440.0 │
│ 95054   │ 121301996099.0 │
│ 14650   │  95705997312.0 │
│ 1821    │  81184972152.0 │
│ 95134   │  67506182944.0 │
│ 7660    │  48000000000.0 │
│ 95008   │  36615437056.0 │
│  ·      │             ·  │
│  ·      │             ·  │
│  ·      │             ·  │
│ 3060    │            1.0 │
│ 90260   │            1.0 │
│ 41048   │            1.0 │
│ 27709   │           NULL │
│ 92803   │           NULL │
│ 97124   │           NULL │
│ 94063   │           NULL │
│ 94577   │           NULL │
│ 77094   │           NULL │
│ 19380   │           NULL │
├─────────┴────────────────┤
│   156 rows (20 shown)    │
└──────────────────────────┘

In [272]:
%%timeit -n 10

con.sql("""
select zipcode, sum(sales) 
from workstation_snowflake.workstation_sales a 
join workstation_snowflake.companies c on a.company_id=c.company_id
join workstation_snowflake.locations l on c.location_id=l.location_id
group by 1 order by 1 desc""")

The slowest run took 46.68 times longer than the fastest. This could mean that an intermediate result is being cached.
372 µs ± 564 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [276]:
%%timeit -n 10

con.sql("""
select zipcode, sum(sales) 
from workstation_snowflake.workstation_sales a 
join star.companies c on a.company_id=c.company_id
group by 1 order by 1 desc""")

88.6 µs ± 29.1 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
