# Data Engineer with Python

In [185]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [186]:
from sqlalchemy import types, create_engine
from sqlalchemy.engine.url import URL
import psycopg2
from sqlalchemy.orm import sessionmaker
import pyodbc
import urllib
import json

In [3]:
def to_postgres(user,server,database):
    engine_postgres = create_engine(f'postgresql+psycopg2://postgres:{user}@{server}/{database}')
    return engine_postgres

In [73]:
user = 'postgres'
server = 'localhost'
database = 'postgres'
to_postgres(user,server,database)

Engine(postgresql+psycopg2://postgres:***@localhost/postgres)

In [4]:
def to_sql_server(server,database):     
    quoted = urllib.parse.quote_plus('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';Trusted_Connection=yes')
    engine_sql = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quoted))
    return engine_sql

In [5]:
server = 'localhost' 
database = 'data'
to_sql_server(server,database)

Engine(mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BSERVER%3DEDM-IYKRA-Hanif%3BDATABASE%3Ddata%3BTrusted_Connection%3Dyes)

## Unstructured Data

### Image

#### Greyscale

In [11]:
path = 'C:/Users/Hanif/Data Science/Data Engineer/Data-Engineer-with-Python/image/humans/human01-00.png'

In [12]:
import cv2
  
# read the image file
img = cv2.imread(path, 2)
  
ret, bw_img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)
  
# converting to its binary form
bw = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)

In [41]:
import os
source_path = 'image'
source_horses = os.path.join(source_path,'horses')
source_humans = os.path.join(source_path,'humans')
source_horses.replace('\\','/')
source_humans.replace('\\','/')
print('horses',len(os.listdir(source_horses)))
print('humans',len(os.listdir(source_horses)))

horses 500
humans 500


In [66]:
horses = []
for i in (os.listdir(source_horses)):
    path = f'C:/Users/Hanif/Data Science/Data Engineer/Data-Engineer-with-Python/image/horses/{i}'
    img = cv2.imread(path, 2)
    ret, bw_img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)
    bw = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)
    horses.append(bw)
    np.array(horses)

  np.array(horses)


In [134]:
df = pd.DataFrame({'name':os.listdir(source_horses),'image':horses})

In [140]:
image = []
for i in range(0,len(df['image'])):
    a = df['image'][i][1]/255
    a = a.tolist()
    image.append(a)

In [141]:
df['image_process'] = image

In [142]:
df = df[['name','image_process']]

In [144]:
df.to_sql('horses_image', to_postgres(user,server,database),if_exists='replace',index=False)

500

#### Varbinary

In [7]:
server = 'MSI' 
database = 'data'
engine = to_sql(server,database)

In [8]:
Session = sessionmaker(bind=engine)
session = Session()
session.execute('''IF OBJECT_ID(N'dbo.Images', N'U') IS NULL
CREATE TABLE dbo.Images
(name varchar(100),
img varbinary(max))''')
session.commit()
session.close()

In [9]:
session.execute('''TRUNCATE TABLE dbo.Images ''')
session.commit()
session.close()

In [26]:
session.execute('''insert into dbo.Images values(
'human01-00.png',(select * from Openrowset (BULK 'C:\\Users\Hanif\Data Science\Data Engineer\Data-Engineer-with-Python\image\horses\horse01-0.png', Single_Blob)as T)
)''')
session.commit()
session.close()

In [31]:
import os
source_path = r'C:\Users\Hanif\Data Science\Data Engineer\Data-Engineer-with-Python\image'
source_horses = os.path.join(source_path,'horses')
source_horses = source_horses.replace('\\','/')
print('horses',len(os.listdir(source_horses)))

horses 500


In [33]:
for i in (os.listdir(source_horses)):
    path = f'C:/Users/Hanif/Data Science/Data Engineer/Data-Engineer-with-Python/image/horses/{i}'
    session.execute(f'''insert into dbo.Images values(
    '{i}',(select * from Openrowset (BULK '{path}', Single_Blob)as T)
    )''')
    session.commit()
    session.close()

In [34]:
pd.read_sql('''select * from dbo.images''',con=engine)

Unnamed: 0,name,img
0,human01-00.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
1,horse01-0.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
2,horse01-1.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
3,horse01-2.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
4,horse01-3.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
...,...,...
496,horse50-5.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
497,horse50-6.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
498,horse50-7.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...
499,horse50-8.png,b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\...


## Semi Structured 

### Parquet

In [5]:
df = pd.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet', engine='pyarrow')

In [6]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.30,1.00,0.5,0.00,0.0,1.0,14.30,2.5,0.00
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.10,1.0,N,43,237,1,7.90,1.00,0.5,4.00,0.0,1.0,16.90,2.5,0.00
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.90,1.00,0.5,15.00,0.0,1.0,34.90,2.5,0.00
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.90,1.0,N,138,7,1,12.10,7.25,0.5,0.00,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.40,1.00,0.5,3.28,0.0,1.0,19.68,2.5,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3066761,2,2023-01-31 23:58:34,2023-02-01 00:12:33,,3.05,,,107,48,0,15.80,0.00,0.5,3.96,0.0,1.0,23.76,,
3066762,2,2023-01-31 23:31:09,2023-01-31 23:50:36,,5.80,,,112,75,0,22.43,0.00,0.5,2.64,0.0,1.0,29.07,,
3066763,2,2023-01-31 23:01:05,2023-01-31 23:25:36,,4.67,,,114,239,0,17.61,0.00,0.5,5.32,0.0,1.0,26.93,,
3066764,2,2023-01-31 23:40:00,2023-01-31 23:53:00,,3.15,,,230,79,0,18.15,0.00,0.5,4.43,0.0,1.0,26.58,,


### XML from Webscrapping

In [16]:
import datetime
import requests
from bs4 import BeautifulSoup
import time

In [7]:
def real_time_price(stock_code): 
    headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36'}
    url_1 = 'https://finance.yahoo.com/quote/'
    url_2 = '.JK?p='
    url_3 = '.JK&.tsrc=fin-srch'
    url = url_1 + stock_code + url_2 + stock_code + url_3
    resp = requests.get(url, headers=headers) 
    web_content = BeautifulSoup(resp.text,'lxml')
    web_content = web_content.find('div',{"class":"My(6px) Pos(r) smartphone_Mt(6px) W(100%)"})
    web_content = web_content.find('fin-streamer', {'data-field': 'regularMarketPrice'}).text.replace(',', '')

    if web_content ==[]:
        web_content = '99999'
    return web_content

In [8]:

def to_dataframe(stocks):
    price = []
    col = []
    timestamp = datetime.datetime.now()
    timestamp = timestamp.strftime("%Y-%m-%d %H:%M:%S")
    for stock_code in stocks:
        price.append(real_time_price(stock_code))
    col = [timestamp]
    col.extend(price)
    df = pd.DataFrame(col)
    df = df.T
    df = df.rename(columns={0:'timestamp',1:stocks[0],2:stocks[1],3:stocks[2]})
    return df

In [9]:

def to_dataframe(stocks):
    price = []
    col = []
    timestamp = datetime.datetime.now()
    timestamp = timestamp.strftime("%Y-%m-%d %H:%M:%S")
    for stock_code in stocks:
        price.append(real_time_price(stock_code))
    col = [timestamp]
    col.extend(price)
    df = pd.DataFrame(col)
    df = df.T
    df = df.rename(columns={0:'timestamp',1:stocks[0],2:stocks[1],3:stocks[2]})
    return df

In [10]:
stocks = ['BBRI','BBCA','BBNI']

In [17]:
df=to_dataframe(stocks)

In [20]:
df

Unnamed: 0,timestamp,BBRI,BBCA,BBNI
0,2023-05-30 14:58:33,5550.0,9275.0,9075.0


### JSON

In [36]:
server = 'MSI' 
database = 'data'
engine = to_sql(server,database)

In [37]:
Session = sessionmaker(bind=engine)
session = Session()
session.execute('''IF OBJECT_ID(N'dbo.CustomerOrder', N'U') IS NULL
CREATE TABLE dbo.CustomerOrder
(
CustomerID bigint null,
FirstName varchar(max) null,
LastName varchar(max) null,
OrderDetails NVARCHAR(max)
CHECK (IsJSON(OrderDetails)= 1)
)''')
session.commit()
session.close()

In [38]:
session.execute('''TRUNCATE TABLE dbo.CustomerOrder''')
session.commit()
session.close()

In [14]:
session.execute('''INSERT INTO dbo.CustomerOrder
SELECT CustomerID,FirstName,LastName,
OrderDetails = (select InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,Total from invoices as b 
where a.CustomerId=b.CustomerId FOR JSON AUTO)
from customers as a''')
session.commit()
session.close()

In [15]:
pd.read_sql('''select * from dbo.CustomerOrder''',con=engine)

Unnamed: 0,CustomerID,FirstName,LastName,OrderDetails
0,1,Luís,Gonçalves,"[{""InvoiceId"":98,""CustomerId"":1,""InvoiceDate"":..."
1,2,Leonie,Köhler,"[{""InvoiceId"":1,""CustomerId"":2,""InvoiceDate"":""..."
2,3,François,Tremblay,"[{""InvoiceId"":99,""CustomerId"":3,""InvoiceDate"":..."
3,4,Bjørn,Hansen,"[{""InvoiceId"":2,""CustomerId"":4,""InvoiceDate"":""..."
4,5,František,Wichterlová,"[{""InvoiceId"":77,""CustomerId"":5,""InvoiceDate"":..."
5,6,Helena,Holý,"[{""InvoiceId"":46,""CustomerId"":6,""InvoiceDate"":..."
6,7,Astrid,Gruber,"[{""InvoiceId"":78,""CustomerId"":7,""InvoiceDate"":..."
7,8,Daan,Peeters,"[{""InvoiceId"":3,""CustomerId"":8,""InvoiceDate"":""..."
8,9,Kara,Nielsen,"[{""InvoiceId"":56,""CustomerId"":9,""InvoiceDate"":..."
9,10,Eduardo,Martins,"[{""InvoiceId"":25,""CustomerId"":10,""InvoiceDate""..."


### Database Backup

![image.png](attachment:image.png)

In [5]:
import sqlite3

In [6]:
def to_postgres(user,server,database):
    engine_postgres = create_engine(f'postgresql+psycopg2://postgres:{user}@{server}/{database}')
    return engine_postgres
user = 'postgres'
server = 'localhost'
database = 'postgres'
engine = to_postgres(user,server,database)

In [6]:
db = 'chinook.db'
def run_query(q):
    with sqlite3.connect(db) as conn:
        return pd.read_sql(q,conn)
def run_command(c):
    with sqlite3.connect(db) as conn:
        conn.isolation_level = None
        conn.execute(c)
def show_tables():
    q = '''
        SELECT
            name,
            type
        FROM sqlite_master
        WHERE type IN ("table","view");
        '''
    return run_query(q)

In [7]:
show_tables()

Unnamed: 0,name,type
0,albums,table
1,sqlite_sequence,table
2,artists,table
3,customers,table
4,employees,table
5,genres,table
6,invoices,table
7,invoice_items,table
8,media_types,table
9,playlists,table


In [9]:
table = show_tables()['name']
for i in table:
    q = 'SELECT * FROM'+ " " +i 
    a = run_query(q)
    a.to_sql(i, schema='public', con = engine, chunksize=200, method='multi', index=False, if_exists='replace')

In [54]:
tables = pd.read_sql('''SELECT * FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema = 'public' ''',con=engine)

In [112]:
list_table = tables['table_name'].tolist()

In [177]:
for x,i in zip(list_table,list_table):
    globals()[x] = pd.read_sql(i,con=engine)

In [178]:
playlists = playlists.rename({'Name':'PlaylistName'},axis='columns')
tracks=tracks.rename({'Name':'ProductName'},axis='columns').drop('UnitPrice',axis=1)
artists = artists.rename({'Name':'ArtistName'},axis='columns')
media_types = media_types.rename({'Name':'MediaName'},axis='columns')
genres  = genres.rename({'Name':'GenreName'},axis='columns')
employees['EmployeeName'] = employees['FirstName']+' '+employees['LastName']
employees = employees[['EmployeeId','EmployeeName']]

In [213]:
order = invoice_items.merge(invoices,on='InvoiceId').drop(['InvoiceLineId','InvoiceId'],axis=1)
order = order.merge(tracks,on='TrackId').drop('TrackId',axis=1)
order = order.merge(albums,on='AlbumId').drop('AlbumId',axis=1)
order = order.merge(artists,on='ArtistId').drop('ArtistId',axis=1)
order = order.merge(media_types,on='MediaTypeId').drop('MediaTypeId',axis=1)
order = order.merge(genres,on='GenreId').drop('GenreId',axis=1)
order = order.merge(customers,on='CustomerId').drop('CustomerId',axis=1)
order = order.merge(employees,right_on='EmployeeId',left_on='SupportRepId').drop(['SupportRepId'],axis=1)

In [193]:
server = 'MSI' 
database = 'data'
to_sql_server(server,database)

Engine(mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BSQL+Server%7D%3BSERVER%3DMSI%3BDATABASE%3Ddata%3BTrusted_Connection%3Dyes)

In [217]:
def to_sql_server(server,database):     
    quoted = urllib.parse.quote_plus('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';Trusted_Connection=yes')
    engine_sql = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(quoted))
    return engine_sql

In [218]:
order.to_sql('MasterOrder', con=to_sql_server(server,database),if_exists='replace',index=False)

-1