# Working with Databases 

## Inserting and Extracting Relational Data

### Inserting Data

In [None]:
-- create a new table
CREATE TABLE users (
    id INT,
    name VARCHAR(256), 
    street VARCHAR(256),
    city VARCHAR(256),
    zip VARCHAR(9)
);

In [1]:
import psycopg2

# set connection string 
conn_string = "dbname='de-with-python' host='localhost'"

# creat a connection 
conn = psycopg2.connect(conn_string)

# create a cursor 
cur = conn.cursor()

In [14]:
# define a query 
query = """
insert into users (id,name,street,city,zip)
    values(%s,%s,%s,%s,%s)
"""

# supply data
data = (1,'Big Bird','Sesame Street','Fakeville','12345')

# preview what will be sent to the database 
cur.execute(query, data)

# commit to db 
conn.commit()

In [16]:
from faker import Faker 

# instantiate a faker, list to hold data, and id object
faker = Faker()
data = []
i = 2

# fill in and udpate values 
for r in range(1000):
    data.append((i, faker.name(), faker.street_address(), faker.city(), faker.zipcode()))
    i += 1

# convert to tuple for db upsertion 
db_tuple = tuple(data)

# execute the query for the items in the tuple
cur.executemany(query, db_tuple)

# commit the transaction 
conn.commit()

### Extracting Data

In [4]:
# define query 
query = "select * from users"

# submit query 
cur.execute(query)

In [5]:
# grab a single record 
data = cur.fetchone()
print(data[0])

1


In [8]:
# print number of records and current row number
print(f"Number of rows: {cur.rowcount}")
print(f"Current Row: {cur.rownumber}")

Number of rows: 1001
Current Row: 1


In [9]:
# open a csv file
f = open("../data/from_postgres.csv", "w")

# copy the results from the database into a csv file
cur.copy_to(f, "users", sep=",")
f.close()

In [37]:
import pandas as pd 

df = pd.read_sql("select * from users", conn)

  df = pd.read_sql("select * from users", conn)


In [11]:
df

Unnamed: 0,id,name,street,city,zip
0,1,Big Bird,Sesame Street,Fakeville,12345
1,2,Misty Smith,66282 Christopher Flats Apt. 341,Masseyland,63835
2,3,Jason Miller,49331 Bruce Views,East Joshua,69414
3,4,Regina Maynard,103 Angel Roads Suite 570,Port Jasonland,21975
4,5,Alicia Conway,44755 Ryan Valley Apt. 606,Allenchester,39890
...,...,...,...,...,...
996,997,Valerie Scott,6184 Davis Manor Suite 930,South Joyberg,97295
997,998,Philip Lowe,4115 Justin Spur,Stevenhaven,25156
998,999,Duane Dillon,18729 Martinez Trail Suite 249,Shawnabury,87659
999,1000,Megan Freeman,8354 Harris Plaza,East Daisy,45106


## Inserting and Extracting NoSQL Database Data

In [23]:
from elasticsearch import Elasticsearch
from faker import Faker 
import os

faker = Faker()

# create an elastic search connection 
es = Elasticsearch({"https://localhost:9200"}, 
                   ssl_assert_fingerprint="e2c544e24108b1ee5ef4150d7e3af9aa6eb0cfc4222bbe749add1c1f8715a593",
                   http_auth=("elastic", "u8Xsv+o2ZSDT*H+UQFC3"))

es.info()

  es = Elasticsearch({"https://localhost:9200"},


ObjectApiResponse({'name': 'srmarshall-mac.local', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'sKoTSYCWTkuz0Fhd8DCQLw', 'version': {'number': '8.14.3', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': 'd55f984299e0e88dee72ebd8255f7ff130859ad0', 'build_date': '2024-07-07T22:04:49.882652950Z', 'build_snapshot': False, 'lucene_version': '9.10.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

In [21]:
# add data
doc = {
    "name": faker.name(), 
    "street": faker.street_address(),
    "city": faker.city(),
    "zip": faker.zipcode(),
}

res = es.index(index="users", body=doc)

print(res["result"])



created


In [None]:
from elasticsearch import helpers 

# prepare actions for elsaticsearch helper
actions = [
    {
        "_index": "users", 
        "_source": {
            "name": faker.name(),
            "street": faker.street_address(),
            "city": faker.city(),
            "zip":faker.zipcode()
        }
    }
    for x in range (998)
]

# insert using bulk 
res = helpers.bulk(es, actions)

In [33]:
# query index
doc = {
    "query": {"match_all": {}}
}

res = es.search(index="users", body=doc)

# preview response
print(res)

{'took': 2, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 999, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'users', '_id': 'pbutp5ABPMaFYuJ5P3rs', '_score': 1.0, '_source': {'name': 'Donna Smith', 'street': '0216 Myers Stravenue Apt. 961', 'city': 'North Scott', 'zip': '65089'}}, {'_index': 'users', '_id': 'pru2p5ABPMaFYuJ52nqS', '_score': 1.0, '_source': {'name': 'Erik Brooks', 'street': '18719 Kayla Mountains', 'city': 'New Geraldland', 'zip': '01093'}}, {'_index': 'users', '_id': 'p7u2p5ABPMaFYuJ52nqT', '_score': 1.0, '_source': {'name': 'Zachary Sanchez', 'street': '2048 Kent Bridge', 'city': 'Brandonhaven', 'zip': '67417'}}, {'_index': 'users', '_id': 'qLu2p5ABPMaFYuJ52nqT', '_score': 1.0, '_source': {'name': 'Glen Yates', 'street': '1324 Mclaughlin Stream Apt. 671', 'city': 'Donaldsonland', 'zip': '48112'}}, {'_index': 'users', '_id': 'qbu2p5ABPMaFYuJ52nqT', '_score': 1.0, '_source': {'name': 'Sha

In [34]:
print(res["hits"]["hits"])

[{'_index': 'users', '_id': 'pbutp5ABPMaFYuJ5P3rs', '_score': 1.0, '_source': {'name': 'Donna Smith', 'street': '0216 Myers Stravenue Apt. 961', 'city': 'North Scott', 'zip': '65089'}}, {'_index': 'users', '_id': 'pru2p5ABPMaFYuJ52nqS', '_score': 1.0, '_source': {'name': 'Erik Brooks', 'street': '18719 Kayla Mountains', 'city': 'New Geraldland', 'zip': '01093'}}, {'_index': 'users', '_id': 'p7u2p5ABPMaFYuJ52nqT', '_score': 1.0, '_source': {'name': 'Zachary Sanchez', 'street': '2048 Kent Bridge', 'city': 'Brandonhaven', 'zip': '67417'}}, {'_index': 'users', '_id': 'qLu2p5ABPMaFYuJ52nqT', '_score': 1.0, '_source': {'name': 'Glen Yates', 'street': '1324 Mclaughlin Stream Apt. 671', 'city': 'Donaldsonland', 'zip': '48112'}}, {'_index': 'users', '_id': 'qbu2p5ABPMaFYuJ52nqT', '_score': 1.0, '_source': {'name': 'Shannon Bennett', 'street': '083 Jennifer Garden Suite 313', 'city': 'Port Melindastad', 'zip': '66440'}}, {'_index': 'users', '_id': 'qru2p5ABPMaFYuJ52nqT', '_score': 1.0, '_source'

In [35]:
# preview items 
for doc in res["hits"]["hits"]:
    print(doc["_source"])

{'name': 'Donna Smith', 'street': '0216 Myers Stravenue Apt. 961', 'city': 'North Scott', 'zip': '65089'}
{'name': 'Erik Brooks', 'street': '18719 Kayla Mountains', 'city': 'New Geraldland', 'zip': '01093'}
{'name': 'Zachary Sanchez', 'street': '2048 Kent Bridge', 'city': 'Brandonhaven', 'zip': '67417'}
{'name': 'Glen Yates', 'street': '1324 Mclaughlin Stream Apt. 671', 'city': 'Donaldsonland', 'zip': '48112'}
{'name': 'Shannon Bennett', 'street': '083 Jennifer Garden Suite 313', 'city': 'Port Melindastad', 'zip': '66440'}
{'name': 'Christian Flores', 'street': '670 Bethany Centers Suite 114', 'city': 'Lake Angelastad', 'zip': '42955'}
{'name': 'Daniel Cobb', 'street': '5752 Alexandra Common', 'city': 'Brianburgh', 'zip': '12600'}
{'name': 'Daniel Webb', 'street': '70063 Johnson Dale', 'city': 'Sharonbury', 'zip': '24312'}
{'name': 'Damon Vega', 'street': '1518 Thomas Corners', 'city': 'Saraberg', 'zip': '46996'}
{'name': 'Jonathan Martin', 'street': '1900 West Knoll Suite 796', 'city'

In [43]:
# convert results to a df 
df = pd.json_normalize(res["hits"]["hits"])

df.head()

In [45]:
# search for a specific entry uisng the match attribue and passing the nested attribute you'd like 
doc = {
    "query": {
        "match": {
            "name": "Donna Smith"
        }
    }
}

res = es.search(index="users", body=doc)

print(res["hits"]["hits"][0]["_source"])

{'name': 'Donna Smith', 'street': '0216 Myers Stravenue Apt. 961', 'city': 'North Scott', 'zip': '65089'}


In [46]:
doc = {
    "query":{
        "match": {
            "city": "Saraberg"
        }
    }
}

res = es.search(index="users", body=doc)

for item in res["hits"]["hits"]:
    print(item["_source"])

{'name': 'Damon Vega', 'street': '1518 Thomas Corners', 'city': 'Saraberg', 'zip': '46996'}


In [54]:
# boolean queries 
doc = {
    "query": {
        "bool": {
            "must": { "match" : {"city": "Saraberg"}}, 
            "filter": {"term": {"zip": "53217"}}
        }
    }
}

res = es.search(index="users", body=doc)

if len(res["hits"]["hits"]) == 0:
    print("No results found!") ## SM: this is expected behavior for the fake data

No results found!


## Building Pipelines in Apache Airflow 

Build on existing Airflow knowledge to extract data from PostgreSQL and write it to an elasticsearch index

In [None]:
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
import psycopg2
from elasticsearch import Elasticsearch
import os


# define helper functions
def query_pg(db_name, host, query, output_filepath):
    conn_string = f"dbname={db_name} host={host}"
    conn = psycopg2.connect(conn_string)
    df = pd.read_sql(query, conn)
    df.to_csv(output_filepath)
    print("---------- Postgres Data Saved ----------")


def insert_es(host, ssl_assert_fingerprint, username, password, input_filepath):
    # establish an Elasticsearch connection
    es = Elasticsearch(
        {host},
        ssl_assert_fingerprint=ssl_assert_fingerprint,
        http_auth=(username, password),
    )
    df = pd.read_csv(input_filepath)
    for i, r in df.iterrows():
        doc = r.to_json()
        res = es.index(index="frompostgres", body=doc)
        print(res)


# set defaults
default_args = {
    "owner": "srmarshall",
    "start_date": dt.datetime(2024, 7, 12),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# create dag
with DAG(
    "pg_to_elasticsearch_dag",
    default_args=default_args,
    schedule_interval=timedelta(minutes=5),
) as dag:
    getData = PythonOperator(
        task_id="QueryPostgreSQL",
        python_callable=query_pg,
        op_kwargs={
            "db_name": "de-with-python",
            "host": "localhost",
            "query": "select name, city, from users",
            "output_filepath": "/Users/srmarshall/Desktop/code/personal/de-with-python/data/pg_csv.csv",
        },
    )

    insertData = PythonOperator(
        task_id="InsertElasticSearch",
        python_callable=insert_es,
        op_kwargs={
            "host": "https://localhost:9200",
            "ssl_assert_fingerprint": os.getenv("ES_FINGERPRINT"),
            "username": "elasticsearch",
            "password": os.getenv("ES_PASSWORD"),
            "input_filepath": "/Users/srmarshall/Desktop/code/personal/de-with-python/data/pg_csv.csv",
        },
    )

    getData >> insertData