# Using Marshmallow with SQL Alchemy Integration

In [1]:
from os import environ
import urllib
#import json

from sqlalchemy import create_engine, inspect
from sqlalchemy import Table, Column, ForeignKey, Integer, String, Float, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref
from sqlalchemy.orm import sessionmaker, selectinload
from sqlalchemy.sql import union, select, and_, or_, not_, text
from sqlalchemy.sql import bindparam
from sqlalchemy.sql.functions import coalesce

from marshmallow import pprint, fields, EXCLUDE
from marshmallow_sqlalchemy import ModelSchema

# Setup

In [2]:
driver = environ.get('SQL_DRIVER', '{ODBC Driver 17 for SQL Server}')
host = environ.get('SQL_HOST', 'sql-fabulous')
db = environ.get('SQL_DB', 'ScratchDB')
user = environ.get('SQL_USER', 'sa')
pw = environ.get('SQL_PASSWORD', 'HelloWorld1')

con_str = f'DRIVER={driver};SERVER={host};DATABASE={db};UID={user};PWD={pw}'

params = urllib.parse.quote_plus(con_str)  

# 'echo' emits generated sql
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}", echo=True)

# Define ORM Schema

In [3]:
Base = declarative_base()

class Survey(Base):
    __tablename__ = 'DirectionalSurvey'

    ID = Column(Integer, autoincrement=False, primary_key=True)
    API = Column(String(32), nullable=True)
    WKID = Column(String(32), nullable=True)
    FIPS = Column(String(4), nullable=True)
    STATUS_CODE = Column(String(1), nullable=False)

    def __repr__(self):
        return f"Survey(ID={self.ID}, API={self.API}, WKID={self.WKID}, Points={self.stations})"

class SurveyReport(Base):
    __tablename__ = 'SurveyReport'

    ID = Column(Integer, autoincrement=False, primary_key=True)
    DirectionalSurveyId = Column(Integer, ForeignKey('DirectionalSurvey.ID'), nullable=False)
    Azimuth = Column(Float, nullable=True)
    MD = Column(Float, nullable=True)
    Inclination = Column(Float, nullable=True)
    STATUS_CODE = Column(String(1), nullable=False)
    #survey = relationship(Survey, backref=backref('stations'))
    survey = relationship(Survey, backref=backref('stations', uselist=True))

    def __repr__(self):
        return f"Report(ID={self.ID}, FK={self.DirectionalSurveyId}, STATUS={self.STATUS_CODE})"

# Set mapped tables to local vars so that full SQL metadata is available.
# surveys = Survey.__table__
# points = SurveyReport.__table__

# Define Marshmallow Schema

In [4]:
class SurveySchema(ModelSchema):
    class Meta:
        unknown = EXCLUDE
        ordered = True
        model = Survey
        
    # Overrides    
    ID = fields.Int(required=True, data_key="SurveyId")
    stations = fields.Nested("SurveyReportSchema", many=True, exclude=("survey",))

class SurveyReportSchema(ModelSchema):
    class Meta:
        unknown = EXCLUDE
        ordered = True
        model = SurveyReport

    ID = fields.Int(required=True, data_key="SurveyReportId")
    survey = fields.Nested(SurveySchema, many=False, exclude=("stations",))
    
survey_serdes = SurveySchema()  
report_serdes = SurveyReportSchema()

# Bind Engine to Schema and Create Session, Connection¶

In [5]:
Base.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
session = DBSession()
conn = engine.connect()

2019-08-24 21:43:30,404 INFO sqlalchemy.engine.base.Engine SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)
2019-08-24 21:43:30,408 INFO sqlalchemy.engine.base.Engine ()
2019-08-24 21:43:30,412 INFO sqlalchemy.engine.base.Engine SELECT schema_name()
2019-08-24 21:43:30,413 INFO sqlalchemy.engine.base.Engine ()
2019-08-24 21:43:30,416 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2019-08-24 21:43:30,417 INFO sqlalchemy.engine.base.Engine ()
2019-08-24 21:43:30,419 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS NVARCHAR(60)) AS anon_1
2019-08-24 21:43:30,420 INFO sqlalchemy.engine.base.Engine ()


# Select

In [6]:
my_surveys = session.query(Survey) \
                .options(selectinload(Survey.stations))\
                .filter(Survey.STATUS_CODE.in_(['N', 'C']))\
                .filter(Survey.ID < 2)\
                .filter(Survey.stations.any())\
                .all()

try:
    for s in my_surveys:
        print(f"S: {type(s)}, {s}")
        s_json = survey_serdes.dump(s)
        pprint(s_json, indent=2)
        
        for sr in s.stations:
            print(f"SR: {type(sr)}, {sr}")
            sr_json = report_serdes.dump(sr)
            pprint(sr_json, indent=2)
        
except Exception as e:
    print(f"{e}")

2019-08-24 21:43:30,512 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2019-08-24 21:43:30,514 INFO sqlalchemy.engine.base.Engine SELECT [DirectionalSurvey].[ID] AS [DirectionalSurvey_ID], [DirectionalSurvey].[API] AS [DirectionalSurvey_API], [DirectionalSurvey].[WKID] AS [DirectionalSurvey_WKID], [DirectionalSurvey].[FIPS] AS [DirectionalSurvey_FIPS], [DirectionalSurvey].[STATUS_CODE] AS [DirectionalSurvey_STATUS_CODE] 
FROM [DirectionalSurvey] 
WHERE [DirectionalSurvey].[STATUS_CODE] IN (?, ?) AND [DirectionalSurvey].[ID] < ? AND (EXISTS (SELECT 1 
FROM [SurveyReport] 
WHERE [DirectionalSurvey].[ID] = [SurveyReport].[DirectionalSurveyId]))
2019-08-24 21:43:30,515 INFO sqlalchemy.engine.base.Engine ('N', 'C', 2)
2019-08-24 21:43:30,520 INFO sqlalchemy.engine.base.Engine SELECT [SurveyReport].[DirectionalSurveyId] AS [SurveyReport_DirectionalSurveyId], [SurveyReport].[ID] AS [SurveyReport_ID], [SurveyReport].[Azimuth] AS [SurveyReport_Azimuth], [SurveyReport].[MD] AS [SurveyReport

In [7]:
my_surveys = session.query(Survey)\
                .join(SurveyReport)\
                .filter(Survey.STATUS_CODE.in_(['N', 'C']))\
                .filter(Survey.WKID.like('WKID2%'))\
                .filter(SurveyReport.Azimuth >= 7)\
                .all()

try:
    for s in my_surveys:
        print(f"S: {type(s)}, {s}")
        s_json = survey_serdes.dump(s)
        pprint(s_json, indent=2)
        
        for sr in s.stations:
            print(f"SR: {type(sr)}, {sr}")
            sr_json = report_serdes.dump(sr)
            pprint(sr_json, indent=2)
        
except Exception as e:
    print(f"{e}")

2019-08-24 21:43:30,546 INFO sqlalchemy.engine.base.Engine SELECT [DirectionalSurvey].[ID] AS [DirectionalSurvey_ID], [DirectionalSurvey].[API] AS [DirectionalSurvey_API], [DirectionalSurvey].[WKID] AS [DirectionalSurvey_WKID], [DirectionalSurvey].[FIPS] AS [DirectionalSurvey_FIPS], [DirectionalSurvey].[STATUS_CODE] AS [DirectionalSurvey_STATUS_CODE] 
FROM [DirectionalSurvey] JOIN [SurveyReport] ON [DirectionalSurvey].[ID] = [SurveyReport].[DirectionalSurveyId] 
WHERE [DirectionalSurvey].[STATUS_CODE] IN (?, ?) AND [DirectionalSurvey].[WKID] LIKE ? AND [SurveyReport].[Azimuth] >= ?
2019-08-24 21:43:30,548 INFO sqlalchemy.engine.base.Engine ('N', 'C', 'WKID2%', 7)
2019-08-24 21:43:30,554 INFO sqlalchemy.engine.base.Engine SELECT [SurveyReport].[ID] AS [SurveyReport_ID], [SurveyReport].[DirectionalSurveyId] AS [SurveyReport_DirectionalSurveyId], [SurveyReport].[Azimuth] AS [SurveyReport_Azimuth], [SurveyReport].[MD] AS [SurveyReport_MD], [SurveyReport].[Inclination] AS [SurveyReport_Incl