In [1]:
import pandas as pd
import sys
from sqlalchemy import create_engine
import os.path
import cx_Oracle
import sqlalchemy as sa
from dotenv import load_dotenv

load_dotenv()
# print(os.environ)

connstring = "oracle://{}:{}@{}:{}/{}".format(os.getenv('ORACLE_USER'), os.getenv('ORACLE_PWD'), os.getenv('ORACLE_IP'), os.getenv('ORACLE_PORT'), os.getenv('ORACLE_DB'))
oracle_db = sa.create_engine(connstring)
cx_Oracle.init_oracle_client(lib_dir=os.getenv('LD_LIBRARY_PATH'))
engine = oracle_db.connect()

In [10]:
df = pd.read_csv("datasets/dataset_weekly-20220614.csv", delimiter=",", nrows=1000) # 
df = df[["country", "continent", "country_code", "population", "indicator", "weekly_count", "year_week"]]
df

Unnamed: 0,country,continent,country_code,population,indicator,weekly_count,year_week
0,Afghanistan,Asia,AFG,38928341,cases,0.0,2020-01
1,Afghanistan,Asia,AFG,38928341,cases,0.0,2020-02
2,Afghanistan,Asia,AFG,38928341,cases,0.0,2020-03
3,Afghanistan,Asia,AFG,38928341,cases,0.0,2020-04
4,Afghanistan,Asia,AFG,38928341,cases,0.0,2020-05
...,...,...,...,...,...,...,...
995,America (total),America,,738727930,cases,4.0,2020-06
996,America (total),America,,738727930,cases,5.0,2020-07
997,America (total),America,,738727930,cases,22.0,2020-08
998,America (total),America,,738727930,cases,89.0,2020-09


Drop duplicates / null

In [11]:
df = df[~df["country"].str.contains("(total)")]
df = df[["country", "continent", "population", "indicator", "weekly_count", "year_week"]].drop_duplicates().dropna()
df

  df = df[~df["country"].str.contains("(total)")]


Unnamed: 0,country,continent,population,indicator,weekly_count,year_week
0,Afghanistan,Asia,38928341,cases,0.0,2020-01
1,Afghanistan,Asia,38928341,cases,0.0,2020-02
2,Afghanistan,Asia,38928341,cases,0.0,2020-03
3,Afghanistan,Asia,38928341,cases,0.0,2020-04
4,Afghanistan,Asia,38928341,cases,0.0,2020-05
...,...,...,...,...,...,...
985,Algeria,Africa,43851043,deaths,0.0,2022-18
986,Algeria,Africa,43851043,deaths,0.0,2022-19
987,Algeria,Africa,43851043,deaths,0.0,2022-20
988,Algeria,Africa,43851043,deaths,0.0,2022-21


Do some manual value mapping

In [12]:
df["year"] = df["year_week"].apply(lambda x: x.split("-")[0])
def pick_month(x):
    return x 

def replace_month(x):
    month = int(x.split("-")[1])-1
    year = x.split("-")[0]
    if month < 5:
        month='01'
        # month='JAN'
    elif month < 9:
        month='02'
        # month='FEB'
    elif month < 14:
        month='03'
        # month='MAR'
    elif month < 18:
        month='04'
        # month='APR'
    elif month < 23:
        month='05'
        # month='MAY'
    elif month < 27:
        month='06'
        # month='JUN'
    elif month < 32:
        month='07'
        # month='JUL'
    elif month < 37:
        month='08'
        # month='AUG'
    elif month < 41:
        month='09'
        # month='SEP'
    elif month < 45:
        month='10'
        # month='OCT'
    elif month < 49:
        month='11'
        # month='NOV'
    elif month < 53:
        month='12'
        # month='DEC'
    else:
        print(x)
        sys.exit(1)
    return year + "-" + month

df["month"] = df["year_week"].apply(lambda x: replace_month(pick_month(x)))
df["year_week"] = df.apply(lambda x: x["month"] + "-" + x["year_week"].split("-")[1], axis=1)
df = df.rename(columns={'year_week': 'week'})

df

Unnamed: 0,country,continent,population,indicator,weekly_count,week,year,month
0,Afghanistan,Asia,38928341,cases,0.0,2020-01-01,2020,2020-01
1,Afghanistan,Asia,38928341,cases,0.0,2020-01-02,2020,2020-01
2,Afghanistan,Asia,38928341,cases,0.0,2020-01-03,2020,2020-01
3,Afghanistan,Asia,38928341,cases,0.0,2020-01-04,2020,2020-01
4,Afghanistan,Asia,38928341,cases,0.0,2020-01-05,2020,2020-01
...,...,...,...,...,...,...,...,...
985,Algeria,Africa,43851043,deaths,0.0,2022-04-18,2022,2022-04
986,Algeria,Africa,43851043,deaths,0.0,2022-05-19,2022,2022-05
987,Algeria,Africa,43851043,deaths,0.0,2022-05-20,2022,2022-05
988,Algeria,Africa,43851043,deaths,0.0,2022-05-21,2022,2022-05


Flatten indicator to deaths and cases

In [13]:
def df_to_row(x):
    cases = x[x["indicator"] == "cases"]["weekly_count"].tolist()[0]
    deaths = x[x["indicator"] == "deaths"]["weekly_count"].tolist()[0]
    df = pd.DataFrame(columns = ["deaths", "cases"])
    df.loc[0] = [deaths, cases]
    return df
    
df = df.groupby(["country", "continent", "population", "week", "year", "month"]).apply(lambda x: df_to_row(x)).reset_index()
df

Unnamed: 0,country,continent,population,week,year,month,level_6,deaths,cases
0,Afghanistan,Asia,38928341,2020-01-01,2020,2020-01,0,0.0,0.0
1,Afghanistan,Asia,38928341,2020-01-02,2020,2020-01,0,0.0,0.0
2,Afghanistan,Asia,38928341,2020-01-03,2020,2020-01,0,0.0,0.0
3,Afghanistan,Asia,38928341,2020-01-04,2020,2020-01,0,0.0,0.0
4,Afghanistan,Asia,38928341,2020-01-05,2020,2020-01,0,0.0,0.0
...,...,...,...,...,...,...,...,...,...
361,Algeria,Africa,43851043,2022-04-18,2022,2022-04,0,0.0,25.0
362,Algeria,Africa,43851043,2022-05-19,2022,2022-05,0,0.0,22.0
363,Algeria,Africa,43851043,2022-05-20,2022,2022-05,0,0.0,21.0
364,Algeria,Africa,43851043,2022-05-21,2022,2022-05,0,0.0,23.0


In [14]:
df.drop(labels=["level_6"], inplace=True, axis=1)
ft = df[["country", "deaths", "cases", "week"]].drop_duplicates()
ft.to_csv("generated/ft.csv", index=False)
dt1 = df[["country", "continent", "population"]].drop_duplicates()
dt1.to_csv("generated/dt_space.csv", index=False)
dt2 = df[["year", "month", "week"]].drop_duplicates()
dt2.to_csv("generated/dt_time.csv", index=False)

Write the dataframe to oracle (if needed)

Note that this writes strings as Oracle CLOB, which are a mess to join. An ugly workaround is

```
select * from dt_time where month = '2020-AUG';
select count(*) from (select* from ft, dt_time where ft.week = dt_time.week);
ALTER TABLE ft DROP CONSTRAINT fk_time;
alter table ft ADD CONSTRAINT fk_time foreign key (week) references dt_time(week);
UPDATE ft t  SET week = REPLACE(t.week, 'AGO', 'AUG');
UPDATE dt_time t  SET week = REPLACE(t.week, 'AGO', 'AUG');
select * from "MEMBER" where member_name like '%AUG%';
UPDATE "MEMBER" t  SET member_name = REPLACE(t.member_name, 'AGO', 'AUG');
```

In [15]:
g = dt1.groupby("country").count().reset_index()
g[g["continent"] > 1]

Unnamed: 0,country,continent,population


In [16]:
statements = [
    "drop table ft",
    "drop table dt_space",
    "drop table dt_time",
]

for statement in statements:
    try:
        print(statement)
        engine.execute(statement)
    except:
        pass

drop table ft
drop table dt_space
drop table dt_time


In [17]:
# df.to_sql('covid_raw_data', engine, if_exists='replace', index=False)
ft.to_sql('ft', engine,  index=False, chunksize=10000) # if_exists='replace', # , method='multi'
print("Done ft")
dt1.to_sql('dt_space', engine, index=False, chunksize=10000) # if_exists='replace', # , method='multi'
print("Done dt_space")
dt2.to_sql('dt_time', engine, index=False, chunksize=10000) # if_exists='replace', # , method='multi'
print("Done dt_time")

Done ft
Done dt_space
Done dt_time


Post-processing the data

In [18]:
statements = [
    "create table foo (country varchar2(255), continent varchar2(255), population varchar2(255))",
    "insert into foo select country, continent, population from dt_space",
    "drop table dt_space",
    "rename foo to dt_space",
    "create table foo (week varchar2(255), year varchar2(255), month varchar2(255))",
    "insert into foo select week, year, month from dt_time",
    "drop table dt_time",
    "rename foo to dt_time",
    "create table foo (week varchar2(255), country varchar2(255), deaths int, cases int)",
    "insert into foo select week, country, deaths, cases from ft",
    "drop table ft",
    "rename foo to ft",
    "alter table ft add primary key(week, country)",
    "alter table dt_space add primary key(country)",
    "alter table dt_time add primary key(week)",
    "alter table ft ADD CONSTRAINT fk_time foreign key (week) references dt_time(week)",
    "alter table ft ADD CONSTRAINT fk_space foreign key (country) references dt_space(country)",
    "select * from dt_time where month = '2020-AUG'",
    "select count(*) from (select* from ft, dt_time where ft.week = dt_time.week)",
    "ALTER TABLE ft DROP CONSTRAINT fk_time",
    "alter table ft ADD CONSTRAINT fk_time foreign key (week) references dt_time(week)"]

for statement in statements:
    print(statement)
    engine.execute(statement)

create table foo (country varchar2(255), continent varchar2(255), population varchar2(255))
insert into foo select country, continent, population from dt_space
drop table dt_space
rename foo to dt_space
create table foo (week varchar2(255), year varchar2(255), month varchar2(255))
insert into foo select week, year, month from dt_time
drop table dt_time
rename foo to dt_time
create table foo (week varchar2(255), country varchar2(255), deaths int, cases int)
insert into foo select week, country, deaths, cases from ft
drop table ft
rename foo to ft
alter table ft add primary key(week, country)
alter table dt_space add primary key(country)
alter table dt_time add primary key(week)
alter table ft ADD CONSTRAINT fk_time foreign key (week) references dt_time(week)
alter table ft ADD CONSTRAINT fk_space foreign key (country) references dt_space(country)
select * from dt_time where month = '2020-AUG'
select count(*) from (select* from ft, dt_time where ft.week = dt_time.week)
ALTER TABLE ft DRO

Feed the metadata structure

In [19]:
statements = [
    """DROP TABLE database CASCADE CONSTRAINTS""",
    """CREATE TABLE database (
       database_id varchar2(255) NOT NULL,
       database_name varchar2(255) NOT NULL,
       IPaddress varchar2(16) NOT NULL,
       port NUMBER NOT NULL,
       PRIMARY KEY (database_id),
       UNIQUE(database_name, IPaddress, port)
    )""",
    """DROP TABLE groupbyoperator CASCADE CONSTRAINTS""",
    """CREATE TABLE groupbyoperator (
       groupbyoperator_id varchar2(255) NOT NULL,
       groupbyoperator_name varchar2(255) NOT NULL UNIQUE,
       groupbyoperator_synonyms varchar2(1000),
       PRIMARY KEY (groupbyoperator_id)
    )""",
    """DROP TABLE hierarchy CASCADE CONSTRAINTS""",
    """CREATE TABLE hierarchy (
       hierarchy_id varchar2(255) NOT NULL,
       hierarchy_name varchar2(255) NOT NULL UNIQUE,
       hierarchy_synonyms varchar2(1000),
       PRIMARY KEY (hierarchy_id)
    )""",
    """DROP TABLE fact CASCADE CONSTRAINTS""",
    """CREATE TABLE fact (
       fact_id varchar2(255) NOT NULL,
       fact_name varchar2(255) NOT NULL UNIQUE,
       fact_synonyms varchar2(1000),
       database_id varchar2(255) NULL REFERENCES database (database_id) ON DELETE CASCADE,
       PRIMARY KEY (fact_id)
    )""",
    """DROP TABLE "TABLE" CASCADE CONSTRAINTS""",
    """CREATE TABLE "TABLE" (
       table_id varchar2(255) NOT NULL,
       table_name varchar2(255) NOT NULL UNIQUE,
       table_type varchar2(255) NOT NULL,
       fact_id varchar2(255) DEFAULT NULL REFERENCES fact (fact_id),
       hierarchy_id varchar2(255) DEFAULT NULL REFERENCES hierarchy (hierarchy_id) ON DELETE CASCADE,
       PRIMARY KEY (table_id)
    )""",
    """DROP TABLE relationship CASCADE CONSTRAINTS""",
    """CREATE TABLE relationship (
       relationship_id varchar2(255) NOT NULL,
       table1 varchar2(255) NOT NULL REFERENCES "TABLE" (table_id) ON DELETE CASCADE,
       table2 varchar2(255) NOT NULL REFERENCES "TABLE" (table_id) ON DELETE CASCADE,
       PRIMARY KEY (relationship_id)
    )""",
    """DROP TABLE "COLUMN" CASCADE CONSTRAINTS""",
    """CREATE TABLE "COLUMN" (
       column_id varchar2(255) NOT NULL,
       column_name varchar2(255) NOT NULL,
       column_type varchar2(255) NOT NULL,
       isKey number(1)  NOT NULL,
       relationship_id varchar2(255) DEFAULT NULL,
       table_id varchar2(255) NOT NULL REFERENCES "TABLE"(table_id) ON DELETE CASCADE,
       PRIMARY KEY (column_id) -- , UNIQUE (column_name, table_id)
    )""",
    """DROP TABLE "LEVEL" CASCADE CONSTRAINTS""",
    """CREATE TABLE "LEVEL" (
       level_id varchar2(255) NOT NULL,
       level_type varchar2(255) NOT NULL,
       level_description varchar2(200),
       level_name varchar2(255) NOT NULL UNIQUE,
       cardinality NUMBER DEFAULT NULL,
       hierarchy_id varchar2(255) NOT NULL REFERENCES "HIERARCHY" (hierarchy_id) ON DELETE CASCADE,
       level_synonyms varchar2(1000),
       column_id varchar2(255) NOT NULL REFERENCES "COLUMN"(column_id),
       "MIN" DOUBLE PRECISION DEFAULT NULL,
       "MAX" DOUBLE PRECISION DEFAULT NULL,
       "AVG" DOUBLE PRECISION DEFAULT NULL,
       isDescriptive NUMBER(1) DEFAULT 0,
       mindate DATE DEFAULT NULL,
       maxdate DATE DEFAULT NULL,
       PRIMARY KEY (level_id)
    )""",
    """DROP TABLE hierarchy_in_fact CASCADE CONSTRAINTS""",
    """CREATE TABLE hierarchy_in_fact (
       fact_id varchar2(255) NOT NULL REFERENCES fact (fact_id),
       hierarchy_id varchar2(255) NOT NULL REFERENCES hierarchy (hierarchy_id) ON DELETE CASCADE,
       PRIMARY KEY (fact_id, hierarchy_id)
    )""",
    """DROP TABLE language_predicate CASCADE CONSTRAINTS""",
    """CREATE TABLE language_predicate (
       language_predicate_id varchar2(255) NOT NULL,
       language_predicate_name varchar2(255) NOT NULL UNIQUE,
       language_predicate_synonyms varchar2(1000) DEFAULT NULL,
       language_predicate_type varchar2(255) DEFAULT NULL,
       PRIMARY KEY (language_predicate_id)
    )""",
    """DROP TABLE language_operator CASCADE CONSTRAINTS""",
    """CREATE TABLE language_operator (
       language_operator_id varchar2(255) NOT NULL,
       language_operator_name varchar2(255) NOT NULL UNIQUE,
       language_operator_synonyms varchar2(1000) DEFAULT NULL,
       language_operator_type varchar2(255) DEFAULT NULL,
       PRIMARY KEY (language_operator_id)
    )""",
    """DROP TABLE measure CASCADE CONSTRAINTS""",
    """CREATE TABLE measure (
       measure_id varchar2(255) NOT NULL,
       measure_name varchar2(255) NOT NULL,
       fact_id varchar2(255) NOT NULL REFERENCES fact (fact_id),
       measure_synonyms varchar2(1000),
       column_id varchar2(255) NOT NULL REFERENCES "COLUMN" (column_id) ON DELETE CASCADE,
       PRIMARY KEY (measure_id),
       UNIQUE(measure_name, fact_id)
    )""",
    """DROP TABLE member CASCADE CONSTRAINTS""",
    """CREATE TABLE member (
       member_id varchar2(255) NOT NULL,
       member_name varchar2(255) NOT NULL,
       level_id varchar2(255) NOT NULL REFERENCES "LEVEL" (level_id) ON DELETE CASCADE,
       member_synonyms varchar2(1000),
       PRIMARY KEY (member_id),
       UNIQUE(member_name, level_id)
    )""",
    """DROP TABLE groupbyoperator_of_measure CASCADE CONSTRAINTS""",
    """CREATE TABLE groupbyoperator_of_measure (
       groupbyoperator_id varchar2(255) NOT NULL REFERENCES groupbyoperator (groupbyoperator_id) ON DELETE CASCADE,
       measure_id varchar2(255) NOT NULL REFERENCES measure (measure_id) ON DELETE CASCADE,
       PRIMARY KEY (groupbyoperator_id, measure_id)
    )""",
    """DROP TABLE "SYNONYM" CASCADE CONSTRAINTS""",
    """CREATE TABLE "SYNONYM" (
       synonym_id varchar2(255) NOT NULL,
       table_name varchar2(255) NOT NULL,
       reference_id varchar2(255) NOT NULL,
       "TERM" varchar2(255) NOT NULL,
       PRIMARY KEY (synonym_id),
       UNIQUE(term, reference_id, table_name)
    )""",
    """DROP TABLE OLAPSESSION CASCADE CONSTRAINTS""",
    """CREATE TABLE OLAPSESSION (
       "TIMESTAMP" NUMBER,
       session_id varchar2(255),
       annotation_id varchar2(255),
       value_en varchar2(1000),
       value_ita varchar2(1000),
       limit long,
       fullquery_serialized blob,
       fullquery_tree varchar2(1000),
       olapoperator_serialized blob
    )""",
    # """CREATE MATERIALIZED VIEW ssb_members
    #    BUILD IMMEDIATE
    #    REFRESH COMPLETE
    #    ENABLE QUERY REWRITE
    #    AS select m.MEMBER_ID, m.MEMBER_NAME, l.LEVEL_ID, l.LEVEL_NAME, l.LEVEL_TYPE, t.TABLE_ID, t.TABLE_NAME, c.COLUMN_NAME
    #       from "LEVEL" l JOIN "COLUMN" c ON(l.COLUMN_ID = c.COLUMN_ID) JOIN "TABLE" t ON(c.TABLE_ID = t.TABLE_ID) LEFT JOIN "MEMBER" m on (l.LEVEL_ID = m.LEVEL_ID)"""
]
for statement in statements:
    try:
        engine.execute(statement)
    except Exception as e:
        print(statement)
        print(e)

DROP TABLE database CASCADE CONSTRAINTS
(cx_Oracle.DatabaseError) ORA-00942: table or view does not exist
[SQL: DROP TABLE database CASCADE CONSTRAINTS]
(Background on this error at: https://sqlalche.me/e/14/4xp6)
DROP TABLE groupbyoperator CASCADE CONSTRAINTS
(cx_Oracle.DatabaseError) ORA-00942: table or view does not exist
[SQL: DROP TABLE groupbyoperator CASCADE CONSTRAINTS]
(Background on this error at: https://sqlalche.me/e/14/4xp6)
DROP TABLE hierarchy CASCADE CONSTRAINTS
(cx_Oracle.DatabaseError) ORA-00942: table or view does not exist
[SQL: DROP TABLE hierarchy CASCADE CONSTRAINTS]
(Background on this error at: https://sqlalche.me/e/14/4xp6)
DROP TABLE fact CASCADE CONSTRAINTS
(cx_Oracle.DatabaseError) ORA-00942: table or view does not exist
[SQL: DROP TABLE fact CASCADE CONSTRAINTS]
(Background on this error at: https://sqlalche.me/e/14/4xp6)
DROP TABLE "TABLE" CASCADE CONSTRAINTS
(cx_Oracle.DatabaseError) ORA-00942: table or view does not exist
[SQL: DROP TABLE "TABLE" CASCAD

In [20]:
engine.commit()

AttributeError: 'Connection' object has no attribute 'commit'