### Notebook to Transform and Load data in Database 
* Source File: sales_data.csv
* User data End Point: https://jsonplaceholder.typicode.com/users
* Whether Data End Point:  https://api.openweathermap.org/data/2.5/weather
* Final Table Name: aiq.tran_sales

#### Import Libraries

In [5]:
import pandas as pd
import requests
import json

#### Define parameters and varibales

In [6]:
# User end-point
user_url = 'https://jsonplaceholder.typicode.com/users'

#### Read source file into Dataframe

In [7]:
sales_df = pd.read_csv("/opt/spark/data/sales_data.csv")
sales_df[0:3]

Unnamed: 0,order_id,customer_id,product_id,quantity,price,order_date
0,2334,5,40,3,35.6,2022-06-21
1,6228,8,13,7,36.52,2023-03-08
2,7784,9,44,4,46.56,2023-04-22


#### Data Quality Check and Data Standardization

In [51]:
# Data type of each attribute
sales_df.dtypes

order_id         int64
customer_id      int64
product_id       int64
quantity         int64
price          float64
order_date      object
dtype: object

In [53]:
# Convert to appropriate data type
sales_df['order_date'] = pd.to_datetime(sales_df['order_date'])
sales_df.dtypes

order_id                int64
customer_id             int64
product_id              int64
quantity                int64
price                 float64
order_date     datetime64[ns]
dtype: object

In [54]:
# Checking the presence of missing values
sales_df.isnull().values.any()

False

###### No NULL values in the Dataset

### *** Extract User Data from JSONPlaceholder API endpoint ***

In [55]:
# Get users data from JSONPlaceholder API
response = requests.get(user_url)
data = json.loads(response.text)

# Convert json data to pandas dataframe
users_df = pd.json_normalize(data)
users_df[0:2]

Unnamed: 0,id,name,username,email,phone,website,address.street,address.suite,address.city,address.zipcode,address.geo.lat,address.geo.lng,company.name,company.catchPhrase,company.bs
0,1,Leanne Graham,Bret,Sincere@april.biz,1-770-736-8031 x56442,hildegard.org,Kulas Light,Apt. 556,Gwenborough,92998-3874,-37.3159,81.1496,Romaguera-Crona,Multi-layered client-server neural-net,harness real-time e-markets
1,2,Ervin Howell,Antonette,Shanna@melissa.tv,010-692-6593 x09125,anastasia.net,Victor Plains,Suite 879,Wisokyburgh,90566-7771,-43.9509,-34.4618,Deckow-Crist,Proactive didactic contingency,synergize scalable supply-chains


#### Get only required columns and give appropriate names

In [56]:
users_df1 = users_df[["id","name","username","email","address.city","address.geo.lat","address.geo.lng"]]
final_users_df = users_df1.rename(columns={"address.city":"city","address.geo.lat":"lat","address.geo.lng":"lng"})
final_users_df[0:3]

Unnamed: 0,id,name,username,email,city,lat,lng
0,1,Leanne Graham,Bret,Sincere@april.biz,Gwenborough,-37.3159,81.1496
1,2,Ervin Howell,Antonette,Shanna@melissa.tv,Wisokyburgh,-43.9509,-34.4618
2,3,Clementine Bauch,Samantha,Nathan@yesenia.net,McKenziehaven,-68.6102,-47.0653


#### Data Quality Check and Data Standardization

In [57]:
# Data type of each attribute
final_users_df.dtypes

id           int64
name        object
username    object
email       object
city        object
lat         object
lng         object
dtype: object

In [58]:
# Convert object datatypes to appropriate datatypes
final_users_df = final_users_df.convert_dtypes()
final_users_df.dtypes

id                   Int64
name        string[python]
username    string[python]
email       string[python]
city        string[python]
lat         string[python]
lng         string[python]
dtype: object

In [59]:
# Checking the presence of missing values
final_users_df.isnull().values.any()

False

###### No NULL values in the Dataset

#### Merge sales & user dataframes

In [60]:
print(sales_df.shape)
print(final_users_df.shape)

(1000, 6)
(10, 7)


In [61]:
# sales_df left join with final_users_df
sales_user_df = sales_df.merge(final_users_df, left_on='customer_id', right_on='id', how='left')

# Drop 'id' column
sales_user_df.drop('id', axis=1, inplace=True)
sales_user_df[0:3]

Unnamed: 0,order_id,customer_id,product_id,quantity,price,order_date,name,username,email,city,lat,lng
0,2334,5,40,3,35.6,2022-06-21,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,Roscoeview,-31.8129,62.5342
1,6228,8,13,7,36.52,2023-03-08,Nicholas Runolfsdottir V,Maxime_Nienow,Sherwood@rosamond.me,Aliyaview,-14.399,-120.7677
2,7784,9,44,4,46.56,2023-04-22,Glenna Reichert,Delphine,Chaim_McDermott@dana.io,Bartholomebury,24.6463,-168.8889


In [62]:
sales_user_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 12 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   order_id     1000 non-null   int64         
 1   customer_id  1000 non-null   int64         
 2   product_id   1000 non-null   int64         
 3   quantity     1000 non-null   int64         
 4   price        1000 non-null   float64       
 5   order_date   1000 non-null   datetime64[ns]
 6   name         1000 non-null   string        
 7   username     1000 non-null   string        
 8   email        1000 non-null   string        
 9   city         1000 non-null   string        
 10  lat          1000 non-null   string        
 11  lng          1000 non-null   string        
dtypes: datetime64[ns](1), float64(1), int64(4), string(6)
memory usage: 93.9 KB


### *** Extract OpenWeatherMap API Data ***

#### Let's add fictional store address

In [64]:
# Get unique customer_id
df = sales_user_df["customer_id"].unique()
df

array([ 5,  8,  9,  3, 10,  7,  4,  1,  2,  6])

In [65]:
# Add Fictional Store Address for these customer_ids
storeAdd_dic = { 1: "Abu Dhabi", 2: "Dubai", 3: "Riyadh", 4: "London", 5: "Paris", 6: "Mumbai", 7: "Hyderabad", 8: "Goa", 9: "Washigton", 10: "Beijing" }

storeAddres_df = pd.DataFrame(list(storeAdd_dic.items()), columns=['cust_id', 'store_address'])
storeAddres_df

Unnamed: 0,cust_id,store_address
0,1,Abu Dhabi
1,2,Dubai
2,3,Riyadh
3,4,London
4,5,Paris
5,6,Mumbai
6,7,Hyderabad
7,8,Goa
8,9,Washigton
9,10,Beijing


In [89]:
# Add these Store Address to sales_user_df
salesUser_df = sales_user_df.merge(storeAddres_df, left_on='customer_id', right_on='cust_id', how='left')

# Drop 'cust_id' column
salesUser_df.drop('cust_id', axis=1, inplace=True)
salesUser_df[0:3]

Unnamed: 0,order_id,customer_id,product_id,quantity,price,order_date,name,username,email,city,lat,lng,store_address
0,2334,5,40,3,35.6,2022-06-21,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,Roscoeview,-31.8129,62.5342,Paris
1,6228,8,13,7,36.52,2023-03-08,Nicholas Runolfsdottir V,Maxime_Nienow,Sherwood@rosamond.me,Aliyaview,-14.399,-120.7677,Goa
2,7784,9,44,4,46.56,2023-04-22,Glenna Reichert,Delphine,Chaim_McDermott@dana.io,Bartholomebury,24.6463,-168.8889,Washigton


#### Get the list of store Address to extract Weather

In [71]:
storeAddress_list = salesUser_df[["store_address"]].values.tolist()
storeAddresses = [item for List in storeAddress_list for item in List]

# Remove duplicates
storeAddresses = list(set(storeAddresses))
storeAddresses

['Riyadh',
 'Abu Dhabi',
 'Goa',
 'Paris',
 'Washigton',
 'Hyderabad',
 'London',
 'Beijing',
 'Dubai',
 'Mumbai']

#### Get Weather Data

In [74]:
# Function to extract Weather Data
def get_weather_data(api_key, storeAddress):
   #--------------------------------------------------#
   # function: get_weather_data
   # Objective: To extract weather information
   # Inputs: Access Key -> api_key & city name 
   #---------------------------------------------------#
    
    URL='http://api.openweathermap.org/data/2.5/weather?appid={}&q={}'.format(api_key,storeAddress)
    #print(URL)

    weather_data = requests.get(URL).json()
    weather_dic = dict()
   
    if weather_data['cod'] != '404':
       main_info = weather_data['main']
       temperature = main_info['temp']
       temp_min = main_info['temp_min']
       temp_max = main_info['temp_max']
       pressure = main_info['pressure']
       humidity = main_info['humidity']
       description = weather_data['weather'][0]['description']

       weather_dic = {
       'store_address':[storeAddress],   
       'temperature':[temperature], 
       'temp_min':[temp_min], 
       'temp_max':[temp_max], 
       'pressure':[pressure], 
       'humidity':[humidity], 
       'description':[description]
       }

    return weather_dic

In [75]:
# Define parameters 
api_key = '0af07f370755b7d0154e883fbd374480'

full_weather_df = pd.DataFrame()

for storeAddress in storeAddresses:
    weather_dic = get_weather_data(api_key, storeAddress)
    df = pd.DataFrame(weather_dic)
    full_weather_df = pd.concat([full_weather_df, df], axis=0)

full_weather_df[0:3]

Unnamed: 0,store_address,temperature,temp_min,temp_max,pressure,humidity,description
0,Riyadh,308.23,308.23,308.23,1007,12,clear sky
0,Abu Dhabi,303.7,302.21,304.23,1007,63,clear sky
0,Goa,300.27,300.27,300.27,1006,88,overcast clouds


#### Data Quality Check and Data Standardization

In [84]:
# Data type of each attribute
full_weather_df.dtypes

store_address     object
temperature      float64
temp_min         float64
temp_max         float64
pressure           int64
humidity           int64
description       object
dtype: object

In [86]:
# Convert object datatypes to appropriate datatypes
full_weather_df = full_weather_df.convert_dtypes()
full_weather_df.dtypes

store_address    string[python]
temperature             Float64
temp_min                Float64
temp_max                Float64
pressure                  Int64
humidity                  Int64
description      string[python]
dtype: object

#### Join weather data to salesUser_df

In [87]:
full_weather_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 9 entries, 0 to 0
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   store_address  9 non-null      string 
 1   temperature    9 non-null      Float64
 2   temp_min       9 non-null      Float64
 3   temp_max       9 non-null      Float64
 4   pressure       9 non-null      Int64  
 5   humidity       9 non-null      Int64  
 6   description    9 non-null      string 
dtypes: Float64(3), Int64(2), string(2)
memory usage: 621.0 bytes


In [100]:
finalTranSales_df = salesUser_df.merge(full_weather_df, left_on='store_address', right_on='store_address', how='left')
finalTranSales_df[0:5]

Unnamed: 0,order_id,customer_id,product_id,quantity,price,order_date,name,username,email,city,lat,lng,store_address,temperature,temp_min,temp_max,pressure,humidity,description
0,2334,5,40,3,35.6,2022-06-21,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,Roscoeview,-31.8129,62.5342,Paris,288.93,287.49,290.47,1009.0,89.0,drizzle
1,6228,8,13,7,36.52,2023-03-08,Nicholas Runolfsdottir V,Maxime_Nienow,Sherwood@rosamond.me,Aliyaview,-14.399,-120.7677,Goa,300.27,300.27,300.27,1006.0,88.0,overcast clouds
2,7784,9,44,4,46.56,2023-04-22,Glenna Reichert,Delphine,Chaim_McDermott@dana.io,Bartholomebury,24.6463,-168.8889,Washigton,,,,,,
3,6588,5,26,1,15.87,2022-10-23,Chelsey Dietrich,Kamren,Lucio_Hettinger@annie.ca,Roscoeview,-31.8129,62.5342,Paris,288.93,287.49,290.47,1009.0,89.0,drizzle
4,5910,8,32,10,77.0,2022-10-05,Nicholas Runolfsdottir V,Maxime_Nienow,Sherwood@rosamond.me,Aliyaview,-14.399,-120.7677,Goa,300.27,300.27,300.27,1006.0,88.0,overcast clouds


In [101]:
finalTranSales_df.dtypes

order_id                  int64
customer_id               int64
product_id                int64
quantity                  int64
price                   float64
order_date       datetime64[ns]
name             string[python]
username         string[python]
email            string[python]
city             string[python]
lat              string[python]
lng              string[python]
store_address            object
temperature             Float64
temp_min                Float64
temp_max                Float64
pressure                  Int64
humidity                  Int64
description      string[python]
dtype: object

In [102]:
# Convert object datatypes to appropriate datatypes
finalTranSales_df = finalTranSales_df.convert_dtypes()
finalTranSales_df.dtypes

order_id                  Int64
customer_id               Int64
product_id                Int64
quantity                  Int64
price                   Float64
order_date       datetime64[ns]
name             string[python]
username         string[python]
email            string[python]
city             string[python]
lat              string[python]
lng              string[python]
store_address    string[python]
temperature             Float64
temp_min                Float64
temp_max                Float64
pressure                  Int64
humidity                  Int64
description      string[python]
dtype: object

#### Write tranformed data into PostgreSql table

In [103]:
# Import Spark Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType, TimestampType

# Spark Session
spark = SparkSession.builder.appName("AIQ Assignment") \
            .config("spark.jars.packages","org.postgresql:postgresql:42.5.4") \
            .getOrCreate()

#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

In [104]:
finalTranSales_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 19 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   order_id       1000 non-null   Int64         
 1   customer_id    1000 non-null   Int64         
 2   product_id     1000 non-null   Int64         
 3   quantity       1000 non-null   Int64         
 4   price          1000 non-null   Float64       
 5   order_date     1000 non-null   datetime64[ns]
 6   name           1000 non-null   string        
 7   username       1000 non-null   string        
 8   email          1000 non-null   string        
 9   city           1000 non-null   string        
 10  lat            1000 non-null   string        
 11  lng            1000 non-null   string        
 12  store_address  1000 non-null   string        
 13  temperature    894 non-null    Float64       
 14  temp_min       894 non-null    Float64       
 15  temp_max       894 non

In [105]:
# Define schema
tr_sales_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("order_date", TimestampType(), True),
    StructField("name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("city", StringType(), True),
    StructField("lat", StringType(), True),
    StructField("lng", StringType(), True),
    StructField("store_address", StringType(), True),
    StructField("temperature", StringType(), True),
    StructField("temp_min", StringType(), True),
    StructField("temp_max", StringType(), True),
    StructField("pressure", StringType(), True),
    StructField("humidity", StringType(), True),
    StructField("description", StringType(), True)
])

In [106]:
finalTranSales_df = spark.createDataFrame(finalTranSales_df, schema=tr_sales_schema)

In [107]:
finalTranSales_df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://192.168.5.154:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "aiq.tra_sales") \
    .option("user", "postgres") \
    .option("password", "postgres")\
    .mode("overwrite") \
    .save()

# Note: 192.168.5.154 is IP address of Host Machine

                                                                                