### Import Necessary libraries

In [263]:
import requests
import json
from datetime import datetime
import pandas as pd
import string
import random
import hashlib

### Function to get api response

In [264]:
def get_rest_api_data(url):
    response = requests.get(url)
    responsejson = json.loads(response.text)
    return response.text, responsejson

### store api response in variables

In [286]:
user_url = "https://619ca0ea68ebaa001753c9b0.mockapi.io/evaluation/dataengineer/jr/v1/users"

userResult,userResultJson = get_rest_api_data(user_url)

message_url = "https://619ca0ea68ebaa001753c9b0.mockapi.io/evaluation/dataengineer/jr/v1/messages"

messageResult,messageResultJson = get_rest_api_data(message_url)

### id_info

In [266]:
user_profile = pd.json_normalize(userResultJson,  meta= [ ['profile', 'gender'], ['profile', 'isSmoking'], ['profile', 'profession'],['profile', 'income'] ])
id_info = user_profile[['id', 'createdAt', 'updatedAt',  'city','country', 'zipCode', 'email', 'profile.gender', 'birthDate', 'profile.isSmoking', 'profile.income']]

In [267]:
# change data type
user_convert_dict = {'id': str, 
                'createdAt': str, 
                'updatedAt': str, 
                'city': str, 
                'country': str, 
                'zipCode': str, 
                'email': str,       
                'profile.gender': str, 
                'birthDate': str, 
                'profile.isSmoking': bool, 
                'profile.income': float
                }
id_info = id_info.astype(user_convert_dict)

# cahnge datetime columns to date time formate
id_info[['createdAt','updatedAt','birthDate']]= id_info[['createdAt','updatedAt', 'birthDate']].apply(pd.to_datetime)

In [268]:
#get age
now = pd.to_datetime('now',utc=True)
id_info['Age'] =  id_info['birthDate'].apply(lambda x : int((now-x).days/365))

#get email domain name
id_info['domain.name'] = id_info['email'].str.extract('@(\w.+)', expand=True)

#drop birthdate, and email columns
id_info = id_info.drop(columns={'birthDate', 'email'})

In [269]:
#function to pseudonymize user_id
salt = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
key = {}

def hasing_id(myid):
    if myid not in key:
        sha3 = hashlib.sha3_512()
        data = salt + myid
        sha3.update(data.encode('utf-8'))
        hexdigest = sha3.hexdigest()
        key[myid] = hexdigest
        return hexdigest
    else:
        return key[myid]


In [272]:
#pseudonymize user_id
id_info['id'] = id_info['id'].map(hasing_id)

#rename column
id_info = id_info.rename(columns={'id':'user_id'})

#add audit columns
id_info['ingestion_date'] = pd.to_datetime('now',utc=True)
id_info['source'] = 'mockapi/user'

In [273]:
id_info

Unnamed: 0,id,createdAt,updatedAt,city,country,zipCode,profile.gender,profile.isSmoking,profile.income,Age,domain.name,ingestion_date,source
0,cf584bfed32cb77dd11aa9d4c9269a42ecd9286546cd4b...,2021-11-23 16:10:33.614000+00:00,2021-11-24 13:34:15.404000+00:00,Pembroke Pines,United States,05734,male,True,3709.61,1,hotmail.com,2022-10-01 19:03:06.760449+00:00,mockapi/user
1,84325e4f5c9debbfe067d7d5ddb0967ddd2f19e3f30447...,2021-11-23 02:40:10.964000+00:00,2021-11-24 16:04:42.393000+00:00,Meganemouth,Namibia,59236,male,True,1504.25,0,gmail.com,2022-10-01 19:03:06.760449+00:00,mockapi/user
2,fe8dfb4c73daa39d03a210c9417991c7f9fc13e1217002...,2021-11-23 06:26:53.843000+00:00,2021-11-24 06:37:01.117000+00:00,Port Cary,Bulgaria,83202-2695,female,False,3256.41,1,gmail.com,2022-10-01 19:03:06.760449+00:00,mockapi/user
3,79840f98b9e3c3c5411fc3be1e96eb0a19cd92cf60a90b...,2021-11-23 03:27:56.458000+00:00,2021-11-24 17:00:21.524000+00:00,South Christophe,United States,70183,male,True,758.89,1,gmail.com,2022-10-01 19:03:06.760449+00:00,mockapi/user
4,8106d4fa954531dab7cc70645ea9feb98f549be0bb53e6...,2021-11-23 14:57:27.793000+00:00,2021-11-24 05:23:38.587000+00:00,East Darrionhaven,Iceland,98347,female,False,2658.19,1,gmail.com,2022-10-01 19:03:06.760449+00:00,mockapi/user
5,cbc11469076e1806e2b8b4dfeb19ef8b9068b73812f64b...,2021-11-22 19:14:23.721000+00:00,2021-11-24 02:56:07.833000+00:00,,United States,,,True,,1,yahoo.com,2022-10-01 19:03:06.760449+00:00,mockapi/user


### df_subs

In [274]:
# get necessary data for subscription table
df_subs = pd.json_normalize(userResultJson, record_path =['subscription'], meta= ['id'])

In [275]:
# change data type
subs_convert_dict = {'createdAt': str, 
                'startDate': str, 
                'endDate': str, 
                'status': str, 
                'amount': float, 
                'id': str, 
                }

df_subs = df_subs.astype(subs_convert_dict)

#convert date time columns to date time format
df_subs[['createdAt','startDate','endDate']]= df_subs[['createdAt','startDate','endDate']].apply(pd.to_datetime, utc = True)

In [278]:
#pseudonymize user_id
df_subs['id'] = df_subs['id'].map(hasing_id)

#rename column
df_subs = df_subs.rename(columns= {'id':'user_id'})

#add audit columns
df_subs['ingestion_date'] = pd.to_datetime('now',utc=True)
df_subs['source'] = 'mockapi/user'

In [279]:
df_subs

Unnamed: 0,createdAt,startDate,endDate,status,amount,user_id,ingestion_date,source
0,2021-11-24 16:58:46.581000+00:00,2021-11-24 05:12:49.301000+00:00,2022-09-15 06:05:59.630000+00:00,Active,43.18,cf584bfed32cb77dd11aa9d4c9269a42ecd9286546cd4b...,2022-10-01 19:09:36.389465+00:00,mockapi/user
1,2021-11-24 14:36:18.895000+00:00,2021-11-24 12:57:48.724000+00:00,2022-07-13 09:14:04.001000+00:00,Active,23.78,84325e4f5c9debbfe067d7d5ddb0967ddd2f19e3f30447...,2022-10-01 19:09:36.389465+00:00,mockapi/user
2,2021-11-22 23:41:32.927000+00:00,2021-11-23 14:42:04.416000+00:00,2022-07-26 17:06:45.413000+00:00,Rejected,64.75,fe8dfb4c73daa39d03a210c9417991c7f9fc13e1217002...,2022-10-01 19:09:36.389465+00:00,mockapi/user
3,2021-11-23 18:57:20.540000+00:00,2021-11-24 18:04:41.908000+00:00,2022-03-03 09:47:26.916000+00:00,Active,88.6,fe8dfb4c73daa39d03a210c9417991c7f9fc13e1217002...,2022-10-01 19:09:36.389465+00:00,mockapi/user
4,2021-11-23 05:23:29.452000+00:00,2021-11-23 09:24:30.685000+00:00,2022-03-25 10:14:15.548000+00:00,Rejected,15.98,79840f98b9e3c3c5411fc3be1e96eb0a19cd92cf60a90b...,2022-10-01 19:09:36.389465+00:00,mockapi/user
5,2021-11-24 02:07:08.482000+00:00,2021-11-24 12:47:33.246000+00:00,2021-12-10 20:22:36.132000+00:00,Inactive,3.51,79840f98b9e3c3c5411fc3be1e96eb0a19cd92cf60a90b...,2022-10-01 19:09:36.389465+00:00,mockapi/user
6,2021-11-24 14:40:24.257000+00:00,2021-11-24 11:22:33.265000+00:00,2022-11-23 12:41:28.319000+00:00,Active,89.71,8106d4fa954531dab7cc70645ea9feb98f549be0bb53e6...,2022-10-01 19:09:36.389465+00:00,mockapi/user


### df_message

In [281]:
df_message = pd.json_normalize(messageResultJson)[['id', 'createdAt','receiverId' , 'senderId']]

In [282]:
# change data type
message_convert_dict = {'id': str, 
                'createdAt': str, 
                'receiverId': str, 
                'senderId': str, 
                }

df_message = df_message.astype(message_convert_dict)

df_message[['createdAt']]= df_message[['createdAt']].apply(pd.to_datetime, utc = True)

In [283]:
#pseudonymize user_id
df_message['receiverId'] = df_message['receiverId'].map(hasing_id)
df_message['senderId'] = df_message['senderId'].map(hasing_id)


#add audit columns
df_message['ingestion_date'] = pd.to_datetime('now',utc=True)
df_message['source'] = 'mockapi/messages'

In [284]:
#rename columns
df_message = df_message.rename(columns = {'id':'message_id', 'receiverId': 'receiver_id', 'senderId':'sender_id'})

In [285]:
df_message

Unnamed: 0,message_id,createdAt,receiver_id,sender_id,ingestion_date,source
0,1,2021-11-25 12:18:57.208000+00:00,5e16c822cfa75a2fffbc93e99e26a99f674603b84f3057...,86240190aa91492c19f782ddce3a0ff9a96e1b98235462...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
1,2,2021-11-25 15:26:33.436000+00:00,7c7bf52cf89f8d5d6fd2db34f5e19440a28e87fdf90396...,5e16c822cfa75a2fffbc93e99e26a99f674603b84f3057...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
2,3,2021-11-25 21:55:29.995000+00:00,86240190aa91492c19f782ddce3a0ff9a96e1b98235462...,7c7bf52cf89f8d5d6fd2db34f5e19440a28e87fdf90396...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
3,4,2021-11-26 03:09:45.900000+00:00,86240190aa91492c19f782ddce3a0ff9a96e1b98235462...,e038e6fba3db8e702c32170145afbea8a186ee4bd02070...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
4,5,2021-11-26 09:15:42.912000+00:00,86240190aa91492c19f782ddce3a0ff9a96e1b98235462...,fb597b113bfb199dff93c9bee8a0a19cb314a4ef8cffe1...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
5,6,2021-11-27 06:42:02.172000+00:00,5e16c822cfa75a2fffbc93e99e26a99f674603b84f3057...,a31a2f02d7ac02245951f34cc45e310aa442392402bc6c...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
6,7,2021-11-27 06:38:39.424000+00:00,fb597b113bfb199dff93c9bee8a0a19cb314a4ef8cffe1...,86240190aa91492c19f782ddce3a0ff9a96e1b98235462...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
7,8,2021-11-27 07:16:37.817000+00:00,a31a2f02d7ac02245951f34cc45e310aa442392402bc6c...,5e16c822cfa75a2fffbc93e99e26a99f674603b84f3057...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
8,9,2021-11-27 12:38:30.049000+00:00,86240190aa91492c19f782ddce3a0ff9a96e1b98235462...,7c7bf52cf89f8d5d6fd2db34f5e19440a28e87fdf90396...,2022-10-01 19:12:47.653118+00:00,mockapi/messages
9,10,2021-11-27 21:11:57.106000+00:00,7c7bf52cf89f8d5d6fd2db34f5e19440a28e87fdf90396...,86240190aa91492c19f782ddce3a0ff9a96e1b98235462...,2022-10-01 19:12:47.653118+00:00,mockapi/messages


In [12]:
pip install snowflake-connector-python

Note: you may need to restart the kernel to use updated packages.




### Snowflake

In [16]:
import snowflake.connector

ModuleNotFoundError: No module named 'snowflake.connector'; 'snowflake' is not a package

In [15]:
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

ModuleNotFoundError: No module named 'snowflake.connector'; 'snowflake' is not a package

In [None]:
credential_info = open('snowCred.json')
credentials = json.load(credential_info)

In [None]:
#connect to db

ctx = snowflake.connector.connect(
    user = credentials['user'],
    password = credentials['password'],
    account = '',
    warehouse = ''
    )
cs = ctx.cursor()

In [None]:
cs.execute('USE ROLE SYSADMIN')
cs.execute('CREATE DATABASE SparkNetwork')
cs.execute('USE DATABASE SparkNetwork')
cs.execute('USE SCHEMA PUBLIC')

In [None]:
cs.execute('CREATE TABLE "myTable"("Username" STRING, "Identifier" INTRGER, "First_name" STRING, "Last_name" STRING) ')

In [None]:
write_pandas(ctx,df,table_name="myTable")

In [10]:
file = open('blobCreds.txt')
lines = file.readlines()
storage_account_key = lines[0]
storage_account_name = lines[1]
connection_string = lines[2]
container_name = "bronze"

In [12]:
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
blob_client = blob_service_client.get_blob_client(container=container_name, blob='raw1.json')

In [18]:
blob_client.upload_blob(outResult)

ServiceRequestError: <urllib3.connection.HTTPSConnection object at 0x00000158F64B5788>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed

In [None]:

def uploadToBlobStorage(file_path,file_name):
   blob_service_client = BlobServiceClient.from_connection_string(connection_string)
   blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)
   with open(file_path,”rb”) as data:
      blob_client.upload_blob(data)
      print(f”Uploaded {file_name}.”)
# calling a function to perform upload
uploadToBlobStorage('PATH_OF_FILE_TO_UPLOAD','FILE_NAME')