### Testing the Pipeline

This notebook is just a set of calls to the methods and modules defined previously in order to test them. 

In [1]:
from data_fetcher import fetch_data
from data_cleaner import clean_transform_data
from mongodb_uploader import upload_to_mongodb
from pipeline import data_pipeline

from data_ingestion.dynamodb_setup import create_dynamodb_table
from data_ingestion.dynamodb_insert import insert_data_into_dynamodb
from data_processing.redshift_setup import create_redshift_cluster
from data_processing.redshift_insert import insert_data_into_redshift

In [2]:
api_url = "https://data.sfgov.org/resource/g8m3-pdis.json"

query = "limit=100"


In [3]:
from pymongo import MongoClient
database_name='mydatabase'
collection_name = 'mycollection'
mongodb_uri = 'mongodb+srv://DE-Mente:Klqw5LyXZFebsU2j@cluster0.em28so6.mongodb.net'

In [4]:
%%time
# Testing the data_fetcher module:

data = fetch_data(api_url, query)

print(type(data))

https://data.sfgov.org/resource/g8m3-pdis.json?$limit=100
<class 'list'>
CPU times: user 56.4 ms, sys: 5.99 ms, total: 62.4 ms
Wall time: 311 ms


In [5]:
%%time
# Testing the data_cleaner module:
    
df = clean_transform_data(data)
print(df.shape)
df.head()

(100, 22)
CPU times: user 60.2 ms, sys: 1.06 ms, total: 61.3 ms
Wall time: 68.9 ms


In [6]:
df.columns

Index(['ttxid', 'certificate_number', 'ownership_name', 'dba_name',
       'full_business_address', 'city', 'state', 'business_zip',
       'dba_start_date', 'dba_end_date', 'location_start_date',
       'location_end_date', 'parking_tax', 'transient_occupancy_tax',
       'supervisor_district', 'neighborhoods_analysis_boundaries', 'location',
       'uniqueid', 'mailing_address_1', 'mail_city', 'mail_zipcode',
       'mail_state'],
      dtype='object')

In [7]:
df.head()

Unnamed: 0,ttxid,certificate_number,ownership_name,dba_name,full_business_address,city,state,business_zip,dba_start_date,dba_end_date,...,parking_tax,transient_occupancy_tax,supervisor_district,neighborhoods_analysis_boundaries,location,uniqueid,mailing_address_1,mail_city,mail_zipcode,mail_state
0,0022280-08-001,22280,Yee Shubert Y,Hang On Investments,751 Clay St,San Francisco,CA,94108,1968-10-01,2018-06-29 00:00:00,...,False,False,3,Chinatown,"{'type': 'Point', 'coordinates': [-122.40577, ...",0022280-08-001-0022280--08-01-2009,-,-,-,-
1,0025394-02-001,25394,Sedgwick Llp,Sedgwick Llp,333 Bush St 30th Fl,San Francisco,CA,94104,1968-10-01,2018-03-30 00:00:00,...,False,False,3,Financial District/South Beach,"{'type': 'Point', 'coordinates': [-122.40321, ...",0025394-02-001-0025394--09-06-2011,-,-,-,-
2,0035158-08-001,35158,Digenova A,65 Fresno St Apts,65 Fresno St,San Francisco,CA,94133,1973-01-16,2017-06-30 00:00:00,...,False,False,3,North Beach,"{'type': 'Point', 'coordinates': [-122.40664, ...",0035158-08-001-0035158--02-01-2006,-,-,-,-
3,0049323-05-001,49323,Iron Mountain Inform Mgt Inc,Iron Mountain Info Mgmt Inc,50 Crisp Rd,San Francisco,CA,94124,1998-01-15,2017-11-13 00:00:00,...,False,False,10,Bayview Hunters Point,"{'type': 'Point', 'coordinates': [-122.37979, ...",0049323-05-001-0049323--01-01-2004,-,-,-,-
4,1155567-07-171,1073842,Macrae Inc,"Mlegal Consulting, Inc.",Embarcadero Center 1130,San Francisco,CA,94111,2017-06-01,-,...,False,False,3,Financial District/South Beach,"{'type': 'Point', 'coordinates': [-122.39798, ...",1155567-07-171-1073842-5400-5499-06-01-2017,437 Kipling St Ste 200,Palo Alto,94301,CA


In [9]:
# Clear all the database registers

from pymongo import MongoClient
client = MongoClient(mongodb_uri)

# Replace 'your_collection' with the name of your MongoDB collection
collection = client[database_name][collection_name]

# Clear all documents in the collection
collection.delete_many({})

# Close the MongoDB connection
client.close()

In [10]:
%%time
# testing the mongodb_uploader module:

upload_to_mongodb(df, mongodb_uri, collection_name, database_name)

Data uploaded to MongoDB collection '{collection_name}' in database '{database_name}' successfully.
CPU times: user 124 ms, sys: 26.1 ms, total: 150 ms
Wall time: 1.26 s


In [11]:
%%time
# Testing the pipeline module:
from pipeline import data_pipeline

api_url = "https://data.sfgov.org/resource/g8m3-pdis.json"

# Example query 
query = "limit=100"


CPU times: user 10 µs, sys: 4 µs, total: 14 µs
Wall time: 18.1 µs


In [12]:
%%time
data_pipeline(api_url, query, mongodb_uri, database_name, collection_name)

https://data.sfgov.org/resource/g8m3-pdis.json?$limit=100
Data uploaded to MongoDB collection '{collection_name}' in database '{database_name}' successfully.
CPU times: user 216 ms, sys: 27.7 ms, total: 244 ms
Wall time: 31.6 s


In [37]:
%%time
# Test the streamlit running process
import subprocess
import time
import os
import signal

 # Step 4: Show  a dashboard in Streamlit, wait for some time and then terminate the process. 
try:
    # Define the command to run the Streamlit app
    streamlit_command = "streamlit run streamlit_dashboard.py"

    # Start the Streamlit app using subprocess
    streamlit_process = subprocess.Popen(streamlit_command, shell=True, preexec_fn=os.setsid)

    # Do other tasks or operations in the pipeline

    # Optionally, wait for some time
    time.sleep(30)  # tuned for 30 seconds. Adjust the duration as needed

    # Terminate the Streamlit process
    streamlit_process.terminate()
    
except Exception as e:
    print(f"Error runing streamlit Dashboard: {e}")
        
finally:
    # Close process by killing it
    os.killpg(os.getpgid(streamlit_process.pid), signal.SIGTERM)
    

CPU times: user 6.7 ms, sys: 6.11 ms, total: 12.8 ms
Wall time: 30 s


In [1]:
# When you run a subprocess using subprocess.Popen in a Jupyter Notebook cell, 
# it might not handle subprocess termination correctly due to differences in how Jupyter manages processes.
# I've used os.setsid to start the subprocess in a new process group. This ensures that the termination signal 
# is sent to the entire process group, including the subprocess and its child processes.
# os.setsid(): This function is used to start a new session and become the leader of a new process group.
# signal.SIGTERM is a specific signal that stands for "terminate." It is commonly used to gracefully 
# terminate a process. When a process receives the SIGTERM signal, it has the opportunity to perform 
# cleanup operations before exiting.

import os
import signal
os.killpg(os.getpgid(streamlit_process.pid), signal.SIGTERM)

NameError: name 'streamlit_process' is not defined

In [5]:
%%time
data_pipeline(api_url, query, mongodb_uri, database_name, collection_name)

https://data.sfgov.org/resource/g8m3-pdis.json?$limit=100
Data uploaded to MongoDB collection '{collection_name}' in database '{database_name}' successfully.
CPU times: user 221 ms, sys: 53.4 ms, total: 275 ms
Wall time: 31.7 s
