<a href="https://colab.research.google.com/github/techosystem/Data-Engr-Pipeline/blob/main/Data_Engr_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This work is a process of building data pieline task with ETL. 
In task 1, i extract the data using API and web scrapping.
In task two I performed transformation and load the data in the datawarehouse in task 3.

Importing the libraries

In [None]:
import requests
import pandas as pd

from sqlalchemy import create_engine
import psycopg2
import io

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

**Task 1 : Generating data with REST API**

This involves using the most frequently used HTTP methods, GET and POST.
POST was used to generate token for authentication while
GET was used to query the server and generate data.


In [None]:
# to make an api request, we must establish a connection with the server to get authorization.

def get_jwt():
    headers = {
        'Host': 'rest.arbeitsagentur.de',
        'Connection': 'keep-alive',
        'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
    }

    data = {
      'client_id': 'c003a37f-024f-462a-b36d-b001be4cd24a',
      'client_secret': '32a39620-32b3-4307-9aa1-511e3d7f48a8',
      'grant_type': 'client_credentials'
    }

# requests.POST method is used to make a http request for access key
    response = requests.post('https://rest.arbeitsagentur.de/oauth/gettoken_cc', headers=headers, data=data, verify=False)

    return response.json()

# define the query parameters
def search(jwt, what, where, page):
    params = (
        ('page', page),
        ('size', '100'),
        ('umkreis', '50'),
        ('was', what),
        ('wo', where),
    )

    headers = {
        'Host': 'rest.arbeitsagentur.de',
        'OAuthAccessToken': jwt,
        'Connection': 'keep-alive',
    }

# requests.GET is used to retrieve data
    response = requests.get('https://rest.arbeitsagentur.de/jobboerse/jobsuche-service/pc/v4/app/jobs',
                            headers=headers, params=params, verify=False)
    return response.json()

# Generate data engineer position starting from page 1
jwt = get_jwt()
result = search(jwt["access_token"], "Data Engineer", "Deutschland", 1)
result.keys() # displays the keys of the dictionary as a list

dict_keys(['stellenangebote', 'maxErgebnisse', 'page', 'size', 'woOutput', 'facetten'])



Convert the dictionary data to pandas dataframe




In [None]:
print(len(result['stellenangebote'])) # prints length of data engineer job listings
print(len(result["facetten"]))        # prints the length of facetten
print((result['maxErgebnisse']))      # prints the total number of results
data_engr = pd.DataFrame(result['stellenangebote']) # convert to dataframe
facets = pd.DataFrame(result['facetten'])  

Fetching all the results

> The following function iterates through the pages and print all the job listings



In [None]:
def fetch_all_results():
  results_array = [] # empty list that holds the data
  fetch_more = True
  page = 1
  while fetch_more:
    result = search(jwt["access_token"], "Data Engineer", "Deutschland", page)
    
    try:
      results_array.extend(result['stellenangebote'])
    except:
      print("error parsing results")
    if len(results_array ) == result['maxErgebnisse']:
      fetch_more = False
    page += 1

  return results_array

all_results = fetch_all_results()
all_results = pd.DataFrame(all_results)
print(len(all_results))
all_results.head()

1441


**Writing the generated data to google drive**



In [None]:
data_engr.to_json("/content/drive/MyDrive/data/data_engr.json", orient='records')
facets.to_json("/content/drive/MyDrive/data/facets.json", orient='records')
all_results.to_json("/content/drive/MyDrive/data/all_results.json", orient='records')

***Explanation:***

 

*   I used the REST API provided to extract job listing data. first, POST request was made to get authorization access key

*   Then, a GET request to query the server and generate data for further computations. 
*   Three sets of data were generated including data engineer job listings, all job listings and all job details 
*   We will make further analytics with data engineer job listings(the required data)

**DATA TRANSFORMATION**

* To improve productivity and optimize business processes, data needs to be cleaned.
* here, data normalization, removing unneccessary columns are carried out here


In [None]:
# Read in the data from the drive
data = pd.read_json("/content/drive/MyDrive/data/data_engr.json") # read json data named data_engr.json
print(data.shape) # check the number of rows and columns contained in the data
data.head()   # displays the 5 rows of the data. you can specify the number of rows inside the bracket

(100, 11)


Unnamed: 0,beruf,titel,refnr,arbeitsort,arbeitgeber,aktuelleVeroeffentlichungsdatum,modifikationsTimestamp,eintrittsdatum,logoHashId,hashId,externeUrl
0,Data Engineer,Data Engineer (m/w/d),15474-35852-S,"{'ort': 'Oldenburg (Oldb)', 'region': 'Nieders...",STELLENWERT GmbH & Co. KG,2022-12-06,2022-12-06T12:13:00.925,2022-12-13,gqNNdlh9xZZTvm-IciFtaDFVFYqirlO2Jgq00FQ9lQQ=,OXR4uanq79pGXsRnIPAEeQSsetVswH3x7xMPCdXjppc=,
1,Data Engineer,Data Engineer (m/w/d) Big Data,11858-4872373-STA-S,"{'plz': '80686', 'ort': 'München', 'region': '...",LexCom Informationssysteme GmbH,2022-12-06,2022-12-06T15:21:46.96,2022-12-06,,HdgZ1QLCptkq7lie2B3GYya92Z2qaivuc6ohBAm1YBE=,https://www.stellenanzeigen.de/job/4872373/?ut...
2,Data Engineer,Data Engineer (m/w/d),16045-k56465.1941-S,"{'plz': '22145', 'ort': 'Braak bei Hamburg', '...",SIMunich GmbH,2022-11-24,2022-11-24T17:11:26.859,2022-11-25,,dZHzPSksKFeBm3aLZoF7iiKJcG4YqFtx6U4LI7k-qS8=,
3,Data Engineer,Data Engineer (m/w/d),10000-1192260804-S,"{'plz': '24116', 'ort': 'Kiel', 'strasse': 'Al...",Bartels-Langness Handelsgesell schaft mbH & Co...,2022-12-06,2022-12-06T18:01:13.496,2022-12-06,1c-YxVMEQPJkV4FHtgNZ0UW2_CMdRDNQtnhrrupWD6M=,egvYH6IG5f1GB1ko-IUWgo6ubvdNa8npjD5Jz8-XcMk=,
4,Data Engineer,Data Science Support Engineer (w/d/m),13635-6b18a1e0_JB3407291-S,"{'plz': '94032', 'ort': 'Passau', 'region': 'B...",ONE LOGIC GmbH,2022-12-05,2022-12-05T17:31:27.134,2022-12-09,,8QghzxXt7yx8dtoUuqQmXLDyrCf8mWETk5i4G4ZpwiQ=,https://www.jobexport.de/job/3407291.html?exp=...


From the data displayed above:

The dataset contains 100 records and 11 columns.
arbeitsort column contains dictionary data and needs to be normalized.
Hence, we can start the data transformation by normalizing the arbeitsort column

In [None]:
# normalized the arbeitsort column and combine it with the parent dataframe
data = data.join(pd.json_normalize(data["arbeitsort"])) 

# rename the columns and drop the arbeitsort column
data = data.rename(columns = {'koordinaten.lat':'latitude', 'koordinaten.lon':'longitude' }).drop(["arbeitsort"], axis = 1)
print(data.shape)
data.head()

(100, 18)


Unnamed: 0,beruf,titel,refnr,arbeitgeber,aktuelleVeroeffentlichungsdatum,modifikationsTimestamp,eintrittsdatum,logoHashId,hashId,externeUrl,ort,region,land,latitude,longitude,plz,strasse,ortsteil
0,Data Engineer,Data Engineer (m/w/d),15474-35852-S,STELLENWERT GmbH & Co. KG,2022-12-06,2022-12-06T12:13:00.925,2022-12-13,gqNNdlh9xZZTvm-IciFtaDFVFYqirlO2Jgq00FQ9lQQ=,OXR4uanq79pGXsRnIPAEeQSsetVswH3x7xMPCdXjppc=,,Oldenburg (Oldb),Niedersachsen,Deutschland,53.140992,8.214311,,,
1,Data Engineer,Data Engineer (m/w/d) Big Data,11858-4872373-STA-S,LexCom Informationssysteme GmbH,2022-12-06,2022-12-06T15:21:46.96,2022-12-06,,HdgZ1QLCptkq7lie2B3GYya92Z2qaivuc6ohBAm1YBE=,https://www.stellenanzeigen.de/job/4872373/?ut...,München,Bayern,Deutschland,48.132609,11.510644,80686.0,,
2,Data Engineer,Data Engineer (m/w/d),16045-k56465.1941-S,SIMunich GmbH,2022-11-24,2022-11-24T17:11:26.859,2022-11-25,,dZHzPSksKFeBm3aLZoF7iiKJcG4YqFtx6U4LI7k-qS8=,,Braak bei Hamburg,Schleswig-Holstein,Deutschland,53.614081,10.255431,22145.0,,
3,Data Engineer,Data Engineer (m/w/d),10000-1192260804-S,Bartels-Langness Handelsgesell schaft mbH & Co...,2022-12-06,2022-12-06T18:01:13.496,2022-12-06,1c-YxVMEQPJkV4FHtgNZ0UW2_CMdRDNQtnhrrupWD6M=,egvYH6IG5f1GB1ko-IUWgo6ubvdNa8npjD5Jz8-XcMk=,,Kiel,Schleswig-Holstein,Deutschland,54.330489,10.112741,24116.0,Alte Weide 7-13,
4,Data Engineer,Data Science Support Engineer (w/d/m),13635-6b18a1e0_JB3407291-S,ONE LOGIC GmbH,2022-12-05,2022-12-05T17:31:27.134,2022-12-09,,8QghzxXt7yx8dtoUuqQmXLDyrCf8mWETk5i4G4ZpwiQ=,https://www.jobexport.de/job/3407291.html?exp=...,Passau,Bayern,Deutschland,48.567655,13.464913,94032.0,,


* our data is normalized with pandas method, json-normalize and join it with the parent dataframe.all_results
* koordinaten.lat and koordinaten.lon columns were renamed to latitude and longitude respectively
* we dropped the arbeitsort column

In [None]:
# view information about the data including the data type
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 18 columns):
 #   Column                           Non-Null Count  Dtype  
---  ------                           --------------  -----  
 0   beruf                            100 non-null    object 
 1   titel                            100 non-null    object 
 2   refnr                            100 non-null    object 
 3   arbeitgeber                      100 non-null    object 
 4   aktuelleVeroeffentlichungsdatum  100 non-null    object 
 5   modifikationsTimestamp           100 non-null    object 
 6   eintrittsdatum                   100 non-null    object 
 7   logoHashId                       16 non-null     object 
 8   hashId                           100 non-null    object 
 9   externeUrl                       73 non-null     object 
 10  ort                              98 non-null     object 
 11  region                           98 non-null     object 
 12  land                   

**Removing unnecessary columns**

In [None]:
data.drop(['modifikationsTimestamp','logoHashId', 'hashId', 'externeUrl', 'strasse', 'ortsteil'], axis = 1, inplace=True)
data.shape

(100, 12)

**Loading to database**

* After performing the transformation on the data, I load data to the database named stepstone
* postgresql is used as the tool for data storage

In [None]:
# Installing the postgresql database on google colab

!apt install postgresql postgresql-contrib &>log
!service postgresql start
!sudo -u postgres psql -c "CREATE USER root WITH SUPERUSER"
%load_ext sql
%config SqlMagic.feedback=False 
%config SqlMagic.autopandas=True
%sql postgresql+psycopg2://@/postgres

ERROR:  role "root" already exists


'Connected: @postgres'

In [None]:
!sudo -u postgres createuser --interactive

Enter name of role to add: emmanuel
Shall the new role be a superuser? (y/n) y


In [None]:
!sudo -u postgres createdb etl_pipeline

In [None]:
!sudo -u root psql etl_pipeline

psql (10.22 (Ubuntu 10.22-0ubuntu0.18.04.1))
Type "help" for help.

etl_pipeline=# ALTER USER user_name WITH PASSWORD 'root';
ERROR:  role "user_name" does not exist
etl_pipeline=# ALTER USER root WITH PASSWORD 'root';
ALTER ROLE
etl_pipeline=# \q


In [None]:
engine = create_engine('postgresql://root:root@localhost:5432/etl_pipeline')

In [None]:
data.head(0).to_sql('data_engineer', engine, if_exists='replace',index=False) #drops old table and creates new empty table


In [None]:
# Writing the file to database
conn = engine.raw_connection()
cur = conn.cursor()
output = io.StringIO()
data.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
contents = output.getvalue()
cur.copy_from(output, 'data_engineer', null="") # null values become ''
conn.commit()

In [None]:
!psql -d etl_pipeline

psql (10.22 (Ubuntu 10.22-0ubuntu0.18.04.1))
Type "help" for help.

etl_pipeline=# \d+ data_engineer
                                                Table "public.data_engineer"
             Column              |       Type       | Collation | Nullable | Default | Storage  | Stats target | Description 
---------------------------------+------------------+-----------+----------+---------+----------+--------------+-------------
 beruf                           | text             |           |          |         | extended |              | 
 titel                           | text             |           |          |         | extended |              | 
 refnr                           | text             |           |          |         | extended |              | 
 arbeitgeber                     | text             |           |          |         | extended |              | 
 aktuelleVeroeffentlichungsdatum | text             |           |          |         | extended |              | 


In [None]:
\