# Objectives
* Connect to database using Python
* Analyse data using Pandas and Pyspark
* Creating and updating database tables
* Using data.table and lazy queries
* Running background jobs in Jupyter
* Run queries on Google Collab from local database
* Run H2O.ai to create ML model
* Save results to parquet file
* Use dbt and Airflow to orchestrate everything
* Convert to script with classes
* Create front end using Django
* Test the following tools:
  * DVC for data versioning
  * ML flow for model versioning
  * ML flow vs Air flow
  * Model monitoring 

# Packages

In [1]:
from sqlalchemy import create_engine, inspect

from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text

import pandas as pd

from datetime import datetime, date

import sys

In [2]:
# Importing module from a different location
sys.path.insert(0, "C:\\Users\\User\\AppData\\Local\\spark\\spark-3.1.1-bin-hadoop3.2\\python")

from pyspark.sql import SparkSession
from pyspark import __version__ as py_ver
from py4j import __version__ as py4_ver

# Querying a database

First ensure that you have the relevant database connectors in place. For Postgres, this is `psycopg2`. We will be using `SQLAlchemy` because it provides it keeps our code tidy, Pythonic and secure against SQL Injection. The alternative is to use the connector + SQL commands directly but this has the challenge of maintaining tidy code.

References: 
* https://www.compose.com/articles/using-postgresql-through-sqlalchemy/
* https://www.learndatasci.com/tutorials/using-databases-python-postgres-sqlalchemy-and-alembic/
* https://docs.sqlalchemy.org/en/14/orm/extensions/automap.html

In [3]:
db_string = "postgresql://postgres:john@localhost:5432/chinook"

* username is postgres
* password is password
* IP address is localhost (127.0.0.1)
* port is 5432
* the database name is books.

In [4]:
engine = create_engine(db_string)

In [5]:
insp = inspect(engine)
insp.get_table_names()

['artist',
 'album',
 'employee',
 'customer',
 'invoice',
 'invoiceline',
 'track',
 'playlist',
 'playlisttrack',
 'genre',
 'mediatype']

## Read tables using raw SQL

In [6]:
res = engine.execute('SELECT * FROM album LIMIT 10')
res

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x14f5264c280>

In [7]:
for r in res:
    print(r)

(1, 'For Those About To Rock We Salute You', 1)
(2, 'Balls to the Wall', 2)
(3, 'Restless and Wild', 2)
(4, 'Let There Be Rock', 1)
(5, 'Big Ones', 3)
(6, 'Jagged Little Pill', 4)
(7, 'Facelift', 5)
(8, 'Warner 25 Anos', 6)
(9, 'Plays Metallica By Four Cellos', 7)
(10, 'Audioslave', 8)


## Recommended approach - using ORM
ORM stands for Object Relational Mapping. It maps the tables in a database as classes so that we can interact with the tables in an object oriented way.
We first need to create a session which will give us the ability to roll-back changes in case of any eventualities or commit once we're ready.

In [8]:
# Global variable
Session = sessionmaker(bind=engine)
Session

sessionmaker(class_='Session', bind=Engine(postgresql://postgres:***@localhost:5432/chinook), autoflush=True, autocommit=False, expire_on_commit=True)

In [9]:
session = Session()
session

<sqlalchemy.orm.session.Session at 0x14f7539f610>

Now we begin the process of creating classes out of our existing database table. Each table needs to be represented as a class in Python.

In [10]:
Base = automap_base()
Base.prepare(engine=engine, reflect=True)   # Reflect the tables in an engine(database)

In [11]:
insp.get_table_names()

['artist',
 'album',
 'employee',
 'customer',
 'invoice',
 'invoiceline',
 'track',
 'playlist',
 'playlisttrack',
 'genre',
 'mediatype']

In [12]:
Album = Base.classes.album

In [13]:
q = session.query(Album)

In [14]:
for r in q.limit(10):
    print(r.AlbumId, r.Title, r.ArtistId)

1 For Those About To Rock We Salute You 1
2 Balls to the Wall 2
3 Restless and Wild 2
4 Let There Be Rock 1
5 Big Ones 3
6 Jagged Little Pill 4
7 Facelift 5
8 Warner 25 Anos 6
9 Plays Metallica By Four Cellos 7
10 Audioslave 8


In [15]:
insp.get_columns(table_name="album")[1]["name"]

enumerate(insp.get_columns(table_name="album"))

[insp.get_columns(table_name="album")[i]["name"] for i,t in enumerate(insp.get_columns(table_name="album"))]

['AlbumId', 'Title', 'ArtistId']

### Query operations
Things you can do with this [query object](https://docs.sqlalchemy.org/en/14/orm/query.html)

In [16]:
for r in q.filter(Album.Title=="Out Of Exile"):
    print(r.AlbumId, r.Title, r.ArtistId)

11 Out Of Exile 8


In [17]:
# Filter is more versatile e.g. like() can be used
for r in q.filter_by(Title="Out Of Exile"):
    print(r.AlbumId, r.Title, r.ArtistId)

11 Out Of Exile 8


In [18]:
for r in q.from_statement(statement=text("SELECT * from album LIMIT 10")):
    print(r.AlbumId, r.Title, r.ArtistId)

1 For Those About To Rock We Salute You 1
2 Balls to the Wall 2
3 Restless and Wild 2
4 Let There Be Rock 1
5 Big Ones 3
6 Jagged Little Pill 4
7 Facelift 5
8 Warner 25 Anos 6
9 Plays Metallica By Four Cellos 7
10 Audioslave 8


In [19]:
for r in q.limit(10):
    print(r.AlbumId, r.Title, r.ArtistId)

1 For Those About To Rock We Salute You 1
2 Balls to the Wall 2
3 Restless and Wild 2
4 Let There Be Rock 1
5 Big Ones 3
6 Jagged Little Pill 4
7 Facelift 5
8 Warner 25 Anos 6
9 Plays Metallica By Four Cellos 7
10 Audioslave 8


In [20]:
for r in q.slice(5,10):
    print(r.AlbumId, r.Title, r.ArtistId)

6 Jagged Little Pill 4
7 Facelift 5
8 Warner 25 Anos 6
9 Plays Metallica By Four Cellos 7
10 Audioslave 8


In [21]:
for r in q.filter(Album.Title.like("O%")):
    print(r.AlbumId, r.Title, r.ArtistId)

11 Out Of Exile 8
68 Outbreak 79
81 One By One 84
166 Olodum 112
176 Original Soundtracks 1 116
184 Os C�es Ladram Mas A Caravana N�o P�ra 121
187 Out Of Time 122
200 O Samba Pocon� 130


In [22]:
for r in q.order_by(-Album.ArtistId).limit(10):
    print(r.AlbumId, r.Title, r.ArtistId)

347 Koyaanisqatsi (Soundtrack from the Motion Picture) 275
346 Mozart: Chamber Music 274
345 Monteverdi: L'Orfeo 273
344 Schubert: The Late String Quartets & String Quintet (3 CD's) 272
342 Locatelli: Concertos for Violin, Strings and Continuo, Vol. 3 271
341 Great Recordings of the Century - Shubert: Schwanengesang, 4 Lieder 270
340 Liszt - 12 �tudes D'Execution Transcendante 269
339 Great Recordings of the Century: Paganini's 24 Caprices 268
338 Nielsen: The Six Symphonies 267
337 Szymanowski: Piano Works, Vol. 1 266


### Retrieving SQL statements
This is how you convert an SQLAlchemy query object to the full SQL statement

In [23]:
# help(q.statement.compile)
str(q.statement.compile(bind=engine))

'SELECT album."AlbumId", album."Title", album."ArtistId" \nFROM album'

This is useful because it returns a select object which contains the select SQL statement.

In [24]:
q.statement

<sqlalchemy.sql.selectable.Select object at 0x0000014F7551E6A0>

# Read database using Pandas

In [25]:
pd.read_sql(sql="album", con=engine)

Unnamed: 0,AlbumId,Title,ArtistId
0,1,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2
2,3,Restless and Wild,2
3,4,Let There Be Rock,1
4,5,Big Ones,3
...,...,...,...
342,343,Respighi:Pines of Rome,226
343,344,Schubert: The Late String Quartets & String Qu...,272
344,345,Monteverdi: L'Orfeo,273
345,346,Mozart: Chamber Music,274


In [26]:
pd.read_sql(sql=q.statement, con=engine)

Unnamed: 0,AlbumId,Title,ArtistId
0,1,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2
2,3,Restless and Wild,2
3,4,Let There Be Rock,1
4,5,Big Ones,3
...,...,...,...
342,343,Respighi:Pines of Rome,226
343,344,Schubert: The Late String Quartets & String Qu...,272
344,345,Monteverdi: L'Orfeo,273
345,346,Mozart: Chamber Music,274


In [27]:
pd.read_sql(sql=q.filter(Album.Title.like("O%")).statement, con=engine)

Unnamed: 0,AlbumId,Title,ArtistId
0,11,Out Of Exile,8
1,68,Outbreak,79
2,81,One By One,84
3,166,Olodum,112
4,176,Original Soundtracks 1,116
5,184,Os C�es Ladram Mas A Caravana N�o P�ra,121
6,187,Out Of Time,122
7,200,O Samba Pocon�,130


# Working with Pyspark
Reference: 
* https://phoenixnap.com/kb/install-spark-on-windows-10
* https://spark.apache.org/docs/latest/api/python/index.html
* https://www.tutorialspoint.com/pyspark/pyspark_environment_setup.htm

We will use a previous Spark installation: <br>
$spark_home <br>
[1] "C:\\Users\\User\\AppData\\Local\\spark\\spark-3.1.1-bin-hadoop3.2"

In [28]:
py_ver

'3.1.1'

In [29]:
py4_ver

'0.10.9'

In [30]:
# Python executable path
sys.executable

'C:\\Users\\User\\AppData\\Local\\Programs\\Python\\Python38\\python.exe'

## Initiating a session

In [31]:
sc = SparkSession.builder.getOrCreate()
sc

## Importing data
### From Pandas

In [33]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})

sp_df = sc.createDataFrame(pandas_df)
sp_df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [34]:
sp_df.printSchema()
sp_df.show()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



### From CSV

In [35]:
sp_csv = sc.read.csv("../inputs/task2_data1.csv")
sp_csv

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [36]:
sp_csv.show()

+--------------------+--------------------+-----------------+--------+
|                 _c0|                 _c1|              _c2|     _c3|
+--------------------+--------------------+-----------------+--------+
|        Company.Name|             Address|             City|Postcode|
|        Carsten Helm|         Ulmenstr. 8|           Wismar|   23966|
|Zirpel & Pautzsch...|    Paditzer Str. 33|        Altenburg|    4600|
|     Eberhard Zessin|   Steingartenweg 12|       Heidelberg|   69118|
|        Gerold Fuchs|          Mühlweg 12|        Dietingen|   78661|
|     Rudi Biedritzky|  Zaisentalstr. 70/1|       Reutlingen|   72760|
|      Wolfgang Jäger|       Wiesenstr. 11|           Rodgau|   63110|
|       Mario Tsiknas|          Am Delf 31|  Bad Zwischenahn|   26160|
|Matthias Essers G...|Leopold-Hoesch-St...|    Geilenkirchen|   52511|
|       Andre Hanisch|   Im Kressgraben 18|   Untereisesheim|   74257|
|         Paul Strigl|Thomas-Schwarz-St...|           Dachau|   85221|
|     

### From SQL

Update conf/spark-defaults.conf to include the setting: `spark.driver.extraClassPath` = `E:\\Softwares\\postgresql-42.2.22.jar`. This can't be set through sparkConf(), the code chunk below will fail.

Reference:
* https://spark.apache.org/docs/latest/configuration.html
* https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
* https://stackoverflow.com/questions/30983982/how-to-use-jdbc-source-to-write-and-read-data-in-pyspark

In [37]:
# sc.sparkContext.stop()
# confs = [("spark.driver.extraClassPath", "E:\\Softwares\\postgresql-42.2.22.jar")]
# conf = sc.sparkContext._conf.setAll(confs)
# sc.sparkContext.stop()
# sc = SparkSession.builder.config(conf=conf).getOrCreate()
# sc.sparkContext._conf.getAll()

In [38]:
sc.sparkContext._conf.getAll()

[('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.sql.warehouse.dir',
  'C:/Users/User/AppData/Local/spark/spark-3.1.1-bin-hadoop3.2/tmp/hive'),
 ('spark.app.id', 'local-1643519339664'),
 ('spark.local.dir',
  'C:/Users/User/AppData/Local/spark/spark-3.1.1-bin-hadoop3.2/tmp/local'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '53645'),
 ('spark.driver.extraClassPath', 'E:\\Softwares\\postgresql-42.2.22.jar'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'DESKTOP-BTP6V55.mshome.net'),
 ('spark.app.startTime', '1643519337873'),
 ('spark.ui.showConsoleProgress', 'true')]

In [39]:
sp_pg = sc.read.jdbc(
    url="jdbc:postgresql://localhost:5432/chinook", 
    table="album", 
    properties={"user":"postgres", "password":"john"})

sp_pg

DataFrame[AlbumId: int, Title: string, ArtistId: int]

In [40]:
sp_pg.show()

+-------+--------------------+--------+
|AlbumId|               Title|ArtistId|
+-------+--------------------+--------+
|      1|For Those About T...|       1|
|      2|   Balls to the Wall|       2|
|      3|   Restless and Wild|       2|
|      4|   Let There Be Rock|       1|
|      5|            Big Ones|       3|
|      6|  Jagged Little Pill|       4|
|      7|            Facelift|       5|
|      8|      Warner 25 Anos|       6|
|      9|Plays Metallica B...|       7|
|     10|          Audioslave|       8|
|     11|        Out Of Exile|       8|
|     12| BackBeat Soundtrack|       9|
|     13|The Best Of Billy...|      10|
|     14|Alcohol Fueled Br...|      11|
|     15|Alcohol Fueled Br...|      11|
|     16|       Black Sabbath|      12|
|     17|Black Sabbath Vol...|      12|
|     18|          Body Count|      13|
|     19|    Chemical Wedding|      14|
|     20|The Best Of Buddy...|      15|
+-------+--------------------+--------+
only showing top 20 rows



## Exporting data
### To Pandas

In [41]:
sp_pg.toPandas()

Unnamed: 0,AlbumId,Title,ArtistId
0,1,For Those About To Rock We Salute You,1
1,2,Balls to the Wall,2
2,3,Restless and Wild,2
3,4,Let There Be Rock,1
4,5,Big Ones,3
...,...,...,...
342,343,Respighi:Pines of Rome,226
343,344,Schubert: The Late String Quartets & String Qu...,272
344,345,Monteverdi: L'Orfeo,273
345,346,Mozart: Chamber Music,274
