## ASSIGNMNET 2 - MOHAN BAKSHI

In this assignment, We'll be integrating data from three distinct sources: Flat File Source, combining employee data from employee.csv and employee.json files; API Source, and Database Source, using the Snowflake database. 

In [1]:
# Importing the libraries

import snowflake.connector
import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
from xml.etree import ElementTree as ET
import requests
from IPython.display import display


In [2]:
# Loading Employees.csv
employee_csv = pd.read_csv('employees.csv')

In [3]:
employee_csv.head()

Unnamed: 0,id,name,department,salary
0,1,Alice Johnson,Finance,50091
1,2,Alice Johnson,IT,57010
2,3,Alice Johnson,Finance,78148


In [4]:
# Loading Employees.json

employee_json = pd.read_json('employees.json')

In [5]:
employee_json.head()

Unnamed: 0,id,name,department,salary
0,1,C Ronald,Finance,22091
1,2,trump D,IT,87010
2,3,Biden J,Finance,99148


In [6]:
# Load XML data
employee_parsing = ET.parse('employees.xml')
get_employee_parsing_root = employee_parsing.getroot()
employee_xml_data = []
for employee in get_employee_parsing_root.findall('employee'):
    employee_dict = {}
    for child in employee:
        employee_dict[child.tag] = child.text
    employee_xml_data.append(employee_dict)

employee_xml = pd.DataFrame(employee_xml_data)

In [7]:
employee_xml.head()

Unnamed: 0,id,name,department,salary
0,1,Alice Kent,Finance,60000
1,2,Victor Johnson,IT,66010
2,3,J Johnson,Finance,11148


In [8]:
# Merging CSV, JSON, and XML data
final_employees_data = pd.concat([employee_csv, employee_json, employee_xml])

# Resetting the index to ensure sequential IDs
final_employees_data.reset_index(drop=True, inplace=True)

# Reassigning sequential IDs starting from 1
final_employees_data['id'] = range(1, len(final_employees_data) + 1)

# Saving the combined data to a new CSV file
final_employees_data.to_csv('final_employees_data.csv', index=False)

In [9]:
# Checking the final_employees_data first 10 records.
final_employees_data.head(10)


Unnamed: 0,id,name,department,salary
0,1,Alice Johnson,Finance,50091
1,2,Alice Johnson,IT,57010
2,3,Alice Johnson,Finance,78148
3,4,C Ronald,Finance,22091
4,5,trump D,IT,87010
5,6,Biden J,Finance,99148
6,7,Alice Kent,Finance,60000
7,8,Victor Johnson,IT,66010
8,9,J Johnson,Finance,11148


In [10]:
# Fetching data from the API
api_url = 'https://jsonplaceholder.typicode.com/posts'
response = requests.get(api_url)

# Convering JSON response to DataFrame
api_dataframe = pd.DataFrame(response.json())


# Checking the shape and first 5 records
display(api_dataframe.shape)
display(api_dataframe.head())


(100, 4)

Unnamed: 0,userId,id,title,body
0,1,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...
1,1,2,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...
2,1,3,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...
3,1,4,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...
4,1,5,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...


In [11]:
# Classifying column into categorical and quantitaive columns

def check_columns(df: pd.DataFrame):
    columns = df.columns
    quantitative_columns = list(set(df._get_numeric_data().columns))
    categorical_columns = list(set(columns) - set(quantitative_columns))
    return quantitative_columns, categorical_columns

quan_cols, cat_cols = check_columns(api_dataframe)
print(f'Quantitative columns are: {quan_cols}')
print(f'Categorical columns are: {cat_cols}')

Quantitative columns are: ['id', 'userId']
Categorical columns are: ['body', 'title']


In [12]:
# Information about the dataset
api_dataframe.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 4 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   userId  100 non-null    int64 
 1   id      100 non-null    int64 
 2   title   100 non-null    object
 3   body    100 non-null    object
dtypes: int64(2), object(2)
memory usage: 3.3+ KB


- The DataFrame has a RangeIndex from 0 to 99.
- It contains four columns: 'userId' (int64), 'id' (int64), 'title' (object), and 'body' (object).
- All 100 entries have non-null values in each column.
- The 'userId' and 'id' columns are of integer type (int64), while the 'title' and 'body' columns are of object type.


In [13]:
# Saving API data to CSV file
api_dataframe.to_csv('api_data.csv', index=False)

In [14]:

# Establish connection to Snowflake
con = snowflake.connector.connect(
    user = 'BAKSHI01',
    password = 'Boa4hancock',
    account = 'vtwzcvl-uo17880',
    warehouse='COMPUTE_WH',
    database = 'MOHAN_AI2024',
    schema = 'ASSIGNMENT2',
    role = 'ACCOUNTADMIN'
)

In [15]:
# Defining the SQL query to clone the Accounts table
sql_query = """
CREATE or REPLACE TABLE ASSIGNMENT2.ACCOUNTS AS
SELECT *
FROM ETL_TASKS.ACCOUNTS
"""

# Executing the SQL query
cursor = con.cursor()
cursor.execute(sql_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x15561b680>

In [18]:
cursor.execute('SELECT * FROM ASSIGNMENT2.ACCOUNTS')
accounts_df = cursor.fetch_pandas_all()
accounts_df


Unnamed: 0,ID,NAME,WEBSITE,LAT,LONG,PRIMARY_POC,SALES_REP_ID
0,1001,Walmart,www.walmart.com,40.238496,-75.103297,Tamara Tuma,321500
1,1011,Exxon Mobil,www.exxonmobil.com,41.169156,-73.849374,Sung Shields,321510
2,1021,Apple,www.apple.com,42.290495,-76.084009,Jodee Lupo,321520
3,1031,Berkshire Hathaway,www.berkshirehathaway.com,40.949021,-75.763898,Serafina Banda,321530
4,1041,McKesson,www.mckesson.com,42.217093,-75.284998,Angeles Crusoe,321540
...,...,...,...,...,...,...,...
346,4461,KKR,www.kkr.com,45.545353,-122.655247,Buffy Azure,321970
347,4471,Oneok,www.oneok.com,45.513513,-122.681500,Esta Engelhardt,321960
348,4481,Newmont Mining,www.newmont.com,45.494117,-122.669460,Khadijah Riemann,321970
349,4491,PPL,www.pplweb.com,45.491720,-122.671880,Deanne Hertlein,321960


In [29]:
# Loading data from CSV file into a Pandas DataFrame
api_df = pd.read_csv('api_data.csv')


In [30]:
# Converting column names to uppercase
api_df.columns = map(lambda x: str(x).upper(), api_df.columns)

# Replacing spaces in column names with underscores
api_df.columns = api_df.columns.str.replace(' ', '_')

In [31]:
# Display the first 5 rows of the DataFrame
api_df.head()

Unnamed: 0,USERID,ID,TITLE,BODY
0,1,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...
1,1,2,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...
2,1,3,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...
3,1,4,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...
4,1,5,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...


In [21]:
# SQL query to create or replace a table named API_DATA
create_api_table_query = '''
    CREATE OR REPLACE TABLE API_DATA (
        ID INT PRIMARY KEY,
        USERID INT,
        TITLE STRING,
        BODY STRING
    )
'''

# Executing the SQL query
cursor.execute(create_api_table_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x15561b680>

In [32]:
# Loading final employee data from CSV file into a Pandas DataFrame
final_employee_df = pd.read_csv('final_employees_data.csv')

In [33]:
# Converting column names to uppercase
final_employee_df.columns = map(lambda x: str(x).upper(), final_employee_df.columns)

# Replacing spaces in column names with underscores
final_employee_df.columns = final_employee_df.columns.str.replace(' ', '_')

In [35]:
# Displaying the first 5 rows of the DataFrame for inspection
final_employee_df.head()

Unnamed: 0,ID,NAME,DEPARTMENT,SALARY
0,1,Alice Johnson,Finance,50091
1,2,Alice Johnson,IT,57010
2,3,Alice Johnson,Finance,78148
3,4,C Ronald,Finance,22091
4,5,trump D,IT,87010


In [36]:
# SQL query to create or replace a table named EMPLOYEE_DATA
create_table_query = '''
    CREATE OR REPLACE TABLE EMPLOYEE_DATA (
        ID INT PRIMARY KEY,
        NAME STRING,
        DEPARTMENT STRING,
        SALARY DECIMAL
    )
'''

# Executing the SQL query using the cursor
cursor.execute(create_table_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x15561b680>

In [37]:
# Writing the api_df and final_employee_df DataFrame to a table named 'API_DATA' and 'EMPLOYEE_DATA'
write_pandas(conn= con, df=api_df, table_name='API_DATA')
write_pandas(conn= con, df=final_employee_df, table_name='EMPLOYEE_DATA')

(True,
 1,
 9,
 [('vkorjojppx/file0.txt', 'LOADED', 9, 9, 1, 0, None, None, None, None)])

In [38]:
cursor.close()
con.close()


In conclusion, I have effectively created a data mart in the ASSIGNMENT2 schema of my Snowflake database by integrating employee data from flat file sources, API data from https://jsonplaceholder.typicode.com/posts, and database data from the account table. 

This data mart, comprising three tables (ACCOUNTS, API_DATA, EMPLOYEE_DATA), provides a consolidated and structured view of diverse data sources, facilitating comprehensive analysis and informed decision-making processes.