In [1]:
# I use sqlalchemy and ORM to insert the data
# I will use sqlite as a RDBMS 


In [2]:
import pandas as pd 
import json
import operator

In [3]:
from data_loader import load_data_to_dictionary

In [4]:
data_and_meta = load_data_to_dictionary()

In [5]:
data, meta = data_and_meta['data'], data_and_meta['meta']
assert all(type(x) == list for x in data)

meta = meta['view']
column_names = list(map(operator.itemgetter('name'), meta['columns']))
# df = pd.DataFrame.from_records(data, columns=column_names)


In [13]:
# row = data[0]
# measurement = dict(zip(column_names, row))
# row = data[1]
# measurement2 = dict(zip(column_names, row))

In [7]:
from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, Float
import sqlalchemy
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

In [8]:
# ['sid', 'id', 'position', 'created_at', 'created_meta', 'updated_at',
#        'updated_meta', 'meta', 'MeasureId', 'MeasureName', 'MeasureType',
#        'StratificationLevel', 'StateFips', 'StateName', 'CountyFips',
#        'CountyName', 'ReportYear', 'Value', 'Unit', 'UnitName', 'DataOrigin',
#        'MonitorOnly']

In [9]:
engine = create_engine('sqlite:///local_db.sqlite', echo=True)


In [10]:
engine.execute("""drop table if exists measurements""")
engine.execute("""drop table if exists measures""")
engine.execute("""drop table if exists county""")

engine.execute("""
CREATE TABLE IF NOT EXISTS measurements (
sid INTEGER NOT NULL,
"MeasureId" INTEGER NOT NULL, 
"ReportYear" INTEGER NOT NULL, 
"Value" FLOAT NOT NULL, 
"CountyFips" INTEGER NOT NULL, 
"UnitId" INTEGER NOT NULL,
PRIMARY KEY (sid), 
FOREIGN KEY("MeasureId") REFERENCES measures ("MeasureId"),
FOREIGN KEY("CountyFips") REFERENCES counties ("CountyFips")
FOREIGN KEY("UnitId") REFERENCES units ("UnitId")
)
""")

engine.execute("""
CREATE TABLE IF NOT EXISTS measures (
"MeasureId" INTEGER NOT NULL, 
"MeasureName" VARCHAR NOT NULL, 
"MeasureType" CARCHAR NOT NULL, 
PRIMARY KEY (MeasureId)
)
""")


# Assuming each county maps to exactly one state.
# We could create a separate ralation to the state, but don't do it to save time.
# let's assume we rarely query anything by state. Data on the highest resolution (per county) is more interesting
engine.execute("""
CREATE TABLE IF NOT EXISTS counties (
"CountyFips" INTEGER NOT NULL, 
"CountyName" VARCHAR NOT NULL, 
"StateFips" INTEGER NOT NULL, 
"StateName" VARCHAR NOT NULL, 
PRIMARY KEY (CountyFips)
)
""")

2018-08-04 14:31:33,595 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2018-08-04 14:31:33,597 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:31:33,604 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2018-08-04 14:31:33,605 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:31:33,608 INFO sqlalchemy.engine.base.Engine drop table if exists measurements
2018-08-04 14:31:33,609 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:31:33,626 INFO sqlalchemy.engine.base.Engine COMMIT
2018-08-04 14:31:33,628 INFO sqlalchemy.engine.base.Engine drop table if exists measures
2018-08-04 14:31:33,629 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:31:33,643 INFO sqlalchemy.engine.base.Engine COMMIT
2018-08-04 14:31:33,645 INFO sqlalchemy.engine.base.Engine drop table if exists county
2018-08-04 14:31:33,645 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:31:33,647 INFO sqlalchemy.engine.base

<sqlalchemy.engine.result.ResultProxy at 0x7ff32c624e10>

In [14]:
def select_columns(dict_data, list_col_names):
    return tuple(dict_data[k] for k in list_col_names)


cols_measure = ("MeasureId", "MeasureName", "MeasureType")
cols_measurement = ("sid", "MeasureId", "ReportYear", "Value", "CountyFips")
cols_county = ("CountyFips", "CountyName", "StateFips", "StateName")


def insert_if_not_exists(measurement, table, columns, id_column):
    r = engine.execute("select * from {table} where {id_column}={value}".format(
         table=table,
         id_column=id_column,
         value=measurement[id_column]))
    # assuming input data is correct in terms of measureIds
    if r.first() is None:
        insert(measurement, table, columns)
        
def insert(measurement, table, columns):
    engine.execute("INSERT INTO '{table}' {columns} values {values}".format(
        table=table,
        columns=columns, 
        values=select_columns(measurement, columns)))


for row in data[:2]:
    measurement = dict(zip(column_names, row))
    insert_if_not_exists(measurement, 'measures', cols_measure, 'MeasureId') 
    insert_if_not_exists(measurement, 'counties', cols_county, 'CountyFips') 
    insert(measurement, 'measurements', cols_measurement)

2018-08-04 14:32:36,866 INFO sqlalchemy.engine.base.Engine select * from measures where MeasureId=83
2018-08-04 14:32:36,867 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:32:36,869 INFO sqlalchemy.engine.base.Engine select * from counties where CountyFips=1051
2018-08-04 14:32:36,870 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:32:36,873 INFO sqlalchemy.engine.base.Engine INSERT INTO 'measurements' ('sid', 'MeasureId', 'ReportYear', 'Value', 'CountyFips') values (2, '83', '1999', '5', '1051')
2018-08-04 14:32:36,874 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:32:36,877 INFO sqlalchemy.engine.base.Engine COMMIT
2018-08-04 14:32:36,891 INFO sqlalchemy.engine.base.Engine select * from measures where MeasureId=83
2018-08-04 14:32:36,892 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:32:36,898 INFO sqlalchemy.engine.base.Engine select * from counties where CountyFips=1073
2018-08-04 14:32:36,899 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 14:32:36,901 INFO sql

In [12]:
measurement

{'CountyFips': '1051',
 'CountyName': 'Elmore',
 'DataOrigin': 'Monitor Only',
 'MeasureId': '83',
 'MeasureName': 'Number of days with maximum 8-hour average ozone concentration over the National Ambient Air Quality Standard',
 'MeasureType': 'Counts',
 'MonitorOnly': '1',
 'ReportYear': '1999',
 'StateFips': '1',
 'StateName': 'Alabama',
 'StratificationLevel': 'State x County',
 'Unit': 'No Units',
 'UnitName': 'No Units',
 'Value': '5',
 'created_at': 1439356237,
 'created_meta': '925122',
 'id': '0CEF0EA4-44D1-43F9-B7A3-BA8760697583',
 'meta': None,
 'position': 2,
 'sid': 2,
 'updated_at': 1439356237,
 'updated_meta': '925122'}

In [20]:
class Measurement(Base):
    __tablename__ = 'measurements'

    id = Column(Integer, primary_key=True)
    
    MeasureId = Column(Integer, ForeignKey('measures.MeasureId'))
    Measure = relationship("Measure")
#     state = relationship("Address", back_populates='user', cascade="all, delete, delete-orphan")
#     county = relationship("Address", back_populates='user', cascade="all, delete, delete-orphan")
    report_year = Column(Integer)
    value = Column(Float)
#     unit = relationship("Unit")
#     data_origin=relationship("DataOrigin", )
    
#     def __repr__(self):
#         return "<Measurement(name='%s', fullname='%s', password='%s')>" % (
#                                self.name, self.fullname, self.password)

class Measure(Base):
    __tablename__ = 'measures'
    MeasureId = Column(Integer, primary_key=True)
    MeasureName = Column(String, nullable=False)
    # Probably it might have sense to make another relationship. for measure type but I assume it is good enough.
    MeasureType = Column(String, nullable=False) 
    

In [22]:

Session = sessionmaker(bind=engine)

In [23]:
metadata.create_all(engine)

2018-08-04 13:20:54,291 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2018-08-04 13:20:54,292 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 13:20:54,293 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2018-08-04 13:20:54,294 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 13:20:54,296 INFO sqlalchemy.engine.base.Engine PRAGMA table_info("measurements")
2018-08-04 13:20:54,296 INFO sqlalchemy.engine.base.Engine ()
2018-08-04 13:20:54,298 INFO sqlalchemy.engine.base.Engine PRAGMA table_info("measures")
2018-08-04 13:20:54,299 INFO sqlalchemy.engine.base.Engine ()


In [33]:
session = Session()

session.add(Measure(MeasureId=measurement_normalized_dict['MeasureId'],
                    MeasureName=measurement_normalized_dict['MeasureName'],
                   MeasureType=measurement_normalized_dict['MeasureType']))

session.commit()

2018-08-04 13:24:07,032 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2018-08-04 13:24:07,033 INFO sqlalchemy.engine.base.Engine INSERT INTO measures (id, "MeasureName", "MeasureType") VALUES (?, ?, ?)
2018-08-04 13:24:07,035 INFO sqlalchemy.engine.base.Engine ('83', 'Number of days with maximum 8-hour average ozone concentration over the National Ambient Air Quality Standard', 'Counts')
2018-08-04 13:24:07,037 INFO sqlalchemy.engine.base.Engine ROLLBACK


IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: measures.id [SQL: 'INSERT INTO measures (id, "MeasureName", "MeasureType") VALUES (?, ?, ?)'] [parameters: ('83', 'Number of days with maximum 8-hour average ozone concentration over the National Ambient Air Quality Standard', 'Counts')] (Background on this error at: http://sqlalche.me/e/gkpj)

In [27]:
sqlalchemy.__version__ 

'1.2.1'

In [None]:
engine.execute("")